diff options
51 files changed, 907 insertions, 707 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a39c0118..fe9bcde8 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -56,20 +56,14 @@ static nni_list nni_aio_expire_aios; static void nni_aio_expire_add(nni_aio *); -int +void nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { - int rv; - memset(aio, 0, sizeof(*aio)); - if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) { - return (rv); - } + nni_cv_init(&aio->a_cv, &nni_aio_lk); aio->a_expire = NNI_TIME_NEVER; aio->a_init = 1; nni_task_init(NULL, &aio->a_task, cb, arg); - - return (0); } void @@ -350,46 +344,43 @@ nni_aio_expire_loop(void *arg) } } -int -nni_aio_sys_init(void) +void +nni_aio_sys_fini(void) { - int rv; nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; - if (((rv = nni_mtx_init(mtx)) != 0) || - ((rv = nni_cv_init(cv, mtx)) != 0) || - ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) { - goto fail; + if (nni_aio_expire_run) { + nni_mtx_lock(mtx); + nni_aio_expire_run = 0; + nni_cv_wake(cv); + nni_mtx_unlock(mtx); } - NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); - nni_aio_expire_run = 1; - nni_thr_run(thr); - return (0); -fail: nni_thr_fini(thr); nni_cv_fini(cv); nni_mtx_fini(mtx); - return (rv); } -void -nni_aio_sys_fini(void) +int +nni_aio_sys_init(void) { + int rv; nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; - if (nni_aio_expire_run) { - nni_mtx_lock(mtx); - nni_aio_expire_run = 0; - nni_cv_wake(cv); - nni_mtx_unlock(mtx); + NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); + nni_mtx_init(mtx); + nni_cv_init(cv, mtx); + + if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) { + nni_aio_sys_fini(); + return (rv); } - nni_thr_fini(thr); - nni_cv_fini(cv); - nni_mtx_fini(mtx); -}
\ No newline at end of file + nni_aio_expire_run = 1; + nni_thr_run(thr); + return (0); +} diff --git a/src/core/aio.h b/src/core/aio.h index d48442eb..aabb3fa9 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -66,7 +66,7 @@ struct nni_aio { // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern int nni_aio_init(nni_aio *, nni_cb, void *); +extern void nni_aio_init(nni_aio *, nni_cb, void *); // nni_aio_fini finalizes the aio, releasing resources (locks) // associated with it. The caller is responsible for ensuring that any diff --git a/src/core/defs.h b/src/core/defs.h index 69fe4843..4d8e6ffb 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -17,9 +17,13 @@ // superior, support for such are not universal. #define NNI_ARG_UNUSED(x) ((void) x); +#ifndef NDEBUG #define NNI_ASSERT(x) \ if (!(x)) \ nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, #x) +#else +#define NNI_ASSERT(x) +#endif // These types are common but have names shared with user space. typedef struct nng_msg nni_msg; diff --git a/src/core/endpt.c b/src/core/endpt.c index 0ab35ea3..34debc0e 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -57,10 +57,10 @@ nni_ep_sys_init(void) { int rv; - if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) || - ((rv = nni_idhash_init(&nni_eps)) != 0)) { + if ((rv = nni_idhash_init(&nni_eps)) != 0) { return (rv); } + nni_mtx_init(&nni_ep_lk); nni_idhash_set_limits( nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); @@ -152,13 +152,14 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) nni_pipe_ep_list_init(&ep->ep_pipes); - if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) || - ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) || - ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || - ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + nni_mtx_init(&ep->ep_mtx); + nni_cv_init(&ep->ep_cv, &ep->ep_mtx); + nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep); + nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep); + nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep); + nni_aio_init(&ep->ep_con_syn, NULL, NULL); + + if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); diff --git a/src/core/event.c b/src/core/event.c index 79910d80..ab157b02 100644 --- a/src/core/event.c +++ b/src/core/event.c @@ -12,13 +12,12 @@ #include <stdlib.h> #include <string.h> -int +void nni_ev_init(nni_event *event, int type, nni_sock *sock) { memset(event, 0, sizeof(*event)); event->e_type = type; event->e_sock = sock; - return (0); } void diff --git a/src/core/event.h b/src/core/event.h index 6d9a9394..22af096e 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -28,7 +28,7 @@ struct nng_notify { nni_aio n_aio; }; -extern int nni_ev_init(nni_event *, int, nni_sock *); +extern void nni_ev_init(nni_event *, int, nni_sock *); extern void nni_ev_fini(nni_event *); #endif // CORE_EVENT_H diff --git a/src/core/idhash.c b/src/core/idhash.c index ab6b67e1..03854fc8 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -36,15 +36,11 @@ int nni_idhash_init(nni_idhash **hp) { nni_idhash *h; - int rv; if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&h->ih_mtx)) != 0) { - NNI_FREE_STRUCT(h); - return (rv); - } + nni_mtx_init(&h->ih_mtx); h->ih_entries = NULL; h->ih_count = 0; h->ih_load = 0; diff --git a/src/core/init.c b/src/core/init.c index 24a7b118..4025c0f4 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,5 +1,5 @@ // -// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Garrett D'Amore <garrett@damore.org> // Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -39,8 +39,6 @@ nni_init(void) void nni_fini(void) { - // XXX: We should make sure that underlying sockets and - // file descriptors are closed. Details TBD. nni_tran_sys_fini(); nni_pipe_sys_fini(); nni_ep_sys_fini(); diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6530857e..ef7dd5a2 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -39,7 +39,6 @@ int nni_msgq_init(nni_msgq **mqp, unsigned cap) { struct nni_msgq *mq; - int rv; int alloc; // We allocate 2 extra cells in the fifo. One to accommodate a @@ -52,21 +51,18 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) { return (NNG_ENOMEM); } + if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) { + NNI_FREE_STRUCT(mq); + return (NNG_ENOMEM); + } + nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_aio_list_init(&mq->mq_aio_notify_get); nni_aio_list_init(&mq->mq_aio_notify_put); - if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { - goto fail; - } - if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { - goto fail; - } - if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) { - rv = NNG_ENOMEM; - goto fail; - } + nni_mtx_init(&mq->mq_lock); + nni_cv_init(&mq->mq_drained, &mq->mq_lock); mq->mq_cap = cap; mq->mq_alloc = alloc; @@ -80,15 +76,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) *mqp = mq; return (0); - -fail: - nni_cv_fini(&mq->mq_drained); - nni_mtx_fini(&mq->mq_lock); - if (mq->mq_msgs != NULL) { - nni_free(mq->mq_msgs, sizeof(nng_msg *) * alloc); - } - NNI_FREE_STRUCT(mq); - return (rv); } void @@ -413,9 +400,7 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) nni_aio aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); aio.a_expire = expire; nni_msgq_aio_get(mq, &aio); nni_aio_wait(&aio); @@ -433,9 +418,7 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) nni_aio aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); aio.a_expire = expire; aio.a_msg = msg; nni_msgq_aio_put(mq, &aio); diff --git a/src/core/pipe.c b/src/core/pipe.c index 75c4c8d6..cdbeb6a7 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -49,9 +49,10 @@ nni_pipe_sys_init(void) int rv; NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); + nni_mtx_init(&nni_pipe_reap_lk); + nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); + if (((rv = nni_idhash_init(&nni_pipes)) != 0) || - ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) || - ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) || ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != 0)) { return (rv); @@ -240,11 +241,11 @@ nni_pipe_create(nni_ep *ep, void *tdata) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || - ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) || - ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != - 0) || - ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || + nni_mtx_init(&p->p_mtx); + nni_cv_init(&p->p_cv, &p->p_mtx); + nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p); + + if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) || ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); diff --git a/src/core/platform.h b/src/core/platform.h index 7acf16ef..f6f0b974 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -85,10 +85,9 @@ typedef struct nni_plat_thr nni_plat_thr; // Threading & Synchronization Support // -// nni_plat_mtx_init initializes a mutex structure. This may require dynamic -// allocation, depending on the platform. It can return NNG_ENOMEM if that -// fails. An initialized mutex must be distinguishable from zeroed memory. -extern int nni_plat_mtx_init(nni_plat_mtx *); +// nni_plat_mtx_init initializes a mutex structure. An initialized mutex must +// be distinguishable from zeroed memory. +extern void nni_plat_mtx_init(nni_plat_mtx *); // nni_plat_mtx_fini destroys the mutex and releases any resources allocated // for it's use. If the mutex is zeroed memory, this should do nothing. @@ -99,20 +98,14 @@ extern void nni_plat_mtx_fini(nni_plat_mtx *); extern void nni_plat_mtx_lock(nni_plat_mtx *); // nni_plat_mtx_unlock unlocks the mutex. This can only be performed by the -// threadthat owned the mutex. +// thread that owned the mutex. extern void nni_plat_mtx_unlock(nni_plat_mtx *); -// nni_plat_mtx_tryenter tries to lock the mutex. If it can't, it may return -// NNG_EBUSY if the mutex is already owned. -extern int nni_plat_mtx_trylock(nni_plat_mtx *); - // nni_plat_cv_init initializes a condition variable. We require a mutex be // supplied with it, and that mutex must always be held when performing any -// operations on the condition variable (other than fini.) This may require -// dynamic allocation, and if so this operation may fail with NNG_ENOMEM. -// As with mutexes, an initialized mutex should be distinguishable from -// zeroed memory. -extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); +// operations on the condition variable (other than fini.) As with mutexes, an +// initialized mutex should be distinguishable from zeroed memory. +extern void nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); // nni_plat_cv_fini releases all resources associated with condition variable. // If the cv points to just zeroed memory (was never initialized), it does diff --git a/src/core/random.c b/src/core/random.c index eb0b4fc2..febd0848 100644 --- a/src/core/random.c +++ b/src/core/random.c @@ -170,12 +170,8 @@ nni_random_sys_init(void) { // minimally, grab the system clock nni_isaac_ctx *ctx = &nni_random_ctx; - int rv; - - if ((rv = nni_mtx_init(&ctx->mx)) != 0) { - return (rv); - } + nni_mtx_init(&ctx->mx); nni_plat_seed_prng(ctx->randrsl, sizeof(ctx->randrsl)); nni_isaac_randinit(ctx, 1); return (0); diff --git a/src/core/socket.c b/src/core/socket.c index 6a243650..9aa89a2d 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -231,7 +231,6 @@ nni_notify * nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) { nni_notify *notify; - int rv; if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { return (NULL); @@ -244,31 +243,19 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) switch (type) { case NNG_EV_CAN_RCV: - rv = nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - if (rv != 0) { - goto fail; - } + nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); nni_msgq_aio_notify_get(sock->s_urq, ¬ify->n_aio); break; case NNG_EV_CAN_SND: - rv = nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - if (rv != 0) { - goto fail; - } + nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); nni_msgq_aio_notify_put(sock->s_uwq, ¬ify->n_aio); break; default: - rv = NNG_ENOTSUP; - goto fail; - break; + NNI_FREE_STRUCT(notify); + return (NULL); } return (notify); - -fail: - nni_aio_fini(¬ify->n_aio); - NNI_FREE_STRUCT(notify); - return (NULL); } void @@ -343,13 +330,13 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); + nni_mtx_init(&s->s_mx); + nni_cv_init(&s->s_cv, &s->s_mx); + nni_cv_init(&s->s_close_cv, &nni_sock_lk); + nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s); + nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s); - if (((rv = nni_mtx_init(&s->s_mx)) != 0) || - ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) || - ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) || - ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) || - ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) || - ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || + if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) { nni_sock_destroy(s); @@ -365,8 +352,9 @@ nni_sock_sys_init(void) int rv; NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); - if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || - ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) { + nni_mtx_init(&nni_sock_lk); + + if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) { nni_sock_sys_fini(); } else { nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); @@ -562,6 +550,9 @@ nni_sock_closeall(void) { nni_sock *s; + if (nni_sock_hash == NULL) { + return; + } for (;;) { nni_mtx_lock(&nni_sock_lk); if ((s = nni_list_first(&nni_sock_list)) == NULL) { diff --git a/src/core/taskq.c b/src/core/taskq.c index e0fc456c..d0f04e8a 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -85,12 +85,9 @@ nni_taskq_init(nni_taskq **tqp, int nthr) tq->tq_nthreads = nthr; NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); - if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) || - ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) || - ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) { - nni_taskq_fini(tq); - return (rv); - } + nni_mtx_init(&tq->tq_mtx); + nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx); + nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx); for (i = 0; i < nthr; i++) { tq->tq_threads[i].tqt_tq = tq; diff --git a/src/core/thread.c b/src/core/thread.c index 6c3bd9f3..54c9c7d2 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -9,10 +9,10 @@ #include "core/nng_impl.h" -int +void nni_mtx_init(nni_mtx *mtx) { - return (nni_plat_mtx_init(mtx)); + nni_plat_mtx_init(mtx); } void @@ -33,10 +33,10 @@ nni_mtx_unlock(nni_mtx *mtx) nni_plat_mtx_unlock(mtx); } -int +void nni_cv_init(nni_cv *cv, nni_mtx *mtx) { - return (nni_plat_cv_init(cv, mtx)); + nni_plat_cv_init(cv, mtx); } void @@ -110,15 +110,9 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) thr->fn = fn; thr->arg = arg; - if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) { - thr->done = 1; - return (rv); - } - if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) { - nni_plat_mtx_fini(&thr->mtx); - thr->done = 1; - return (rv); - } + nni_plat_mtx_init(&thr->mtx); + nni_plat_cv_init(&thr->cv, &thr->mtx); + if (fn == NULL) { thr->init = 1; thr->done = 1; diff --git a/src/core/thread.h b/src/core/thread.h index 94b2a984..ee83b196 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -25,9 +26,8 @@ struct nni_thr { int init; }; -// nni_mtx_init initializes the mutex. (Win32 programmers take note; -// our mutexes are actually CriticalSections on Win32.) -extern int nni_mtx_init(nni_mtx *mtx); +// nni_mtx_init initializes the mutex. +extern void nni_mtx_init(nni_mtx *mtx); // nni_mtx_fini destroys the mutex and releases any resources used by it. extern void nni_mtx_fini(nni_mtx *mtx); @@ -43,7 +43,7 @@ extern void nni_mtx_unlock(nni_mtx *mtx); // nni_cv_init initializes the condition variable. The mutex supplied // must always be locked with the condition variable. -extern int nni_cv_init(nni_cv *cv, nni_mtx *); +extern void nni_cv_init(nni_cv *cv, nni_mtx *); // nni_cv_fini releases resources associated with the condition variable, // which must not be in use at the time. diff --git a/src/core/timer.c b/src/core/timer.c index 73bc7604..7608f7d5 100644 --- a/src/core/timer.c +++ b/src/core/timer.c @@ -40,10 +40,11 @@ nni_timer_sys_init(void) memset(timer, 0, sizeof(*timer)); NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node); - if (((rv = nni_mtx_init(&timer->t_mx)) != 0) || - ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) || - ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) || - ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) { + nni_mtx_init(&timer->t_mx); + nni_cv_init(&timer->t_sched_cv, &timer->t_mx); + nni_cv_init(&timer->t_wait_cv, &timer->t_mx); + + if ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0) { nni_timer_sys_fini(); return (rv); } diff --git a/src/core/transport.c b/src/core/transport.c index 278c7d1e..6f73a6b2 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -84,8 +84,9 @@ nni_tran_sys_init(void) int rv; NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node); - if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) || - ((rv = nni_tran_register(&nni_inproc_tran)) != 0) || + nni_mtx_init(&nni_tran_lk); + + if (((rv = nni_tran_register(&nni_inproc_tran)) != 0) || ((rv = nni_tran_register(&nni_ipc_tran)) != 0) || ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) { nni_tran_sys_fini(); @@ -24,6 +24,7 @@ void nng_fini(void) { + nni_sock_closeall(); nni_fini(); } @@ -184,7 +185,6 @@ nng_free(void *buf, size_t sz) int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - nni_time expire; int rv; nni_sock *sock; diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 46fe2bea..f511f35f 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) static void nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) { - nni_aio * aio; - int newfd; - struct sockaddr_storage ss; - socklen_t slen; + nni_aio *aio; + int newfd; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { // We could argue that knowing the remote peer address would @@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&ed->mtx)) != 0) { - NNI_FREE_STRUCT(ed); - return (rv); - } + nni_mtx_init(&ed->mtx); // We could randomly choose a different pollq, or for efficiencies // sake we could take a modulo of the file desc number to choose diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 46ebbc1d..81fbd48b 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -54,9 +54,19 @@ extern int nni_plat_errno(int); // elsewhere. struct nni_plat_mtx { - int init; pthread_t owner; pthread_mutex_t mtx; + int fallback; + int flags; +}; + +struct nni_plat_cv { + pthread_cond_t cv; + nni_plat_mtx * mtx; + int fallback; + int flags; + int gen; + int wake; }; struct nni_plat_thr { @@ -65,11 +75,6 @@ struct nni_plat_thr { void *arg; }; -struct nni_plat_cv { - pthread_cond_t cv; - nni_plat_mtx * mtx; -}; - #endif extern int nni_posix_pollq_sysinit(void); diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c index 78415d26..314e2f5e 100644 --- a/src/platform/posix/posix_pipe.c +++ b/src/platform/posix/posix_pipe.c @@ -105,7 +105,6 @@ void nni_plat_pipe_clear(int rfd) { char buf[32]; - int rv; for (;;) { // Completely drain the pipe, but don't wait. This coalesces diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index bd74e0c0..b2c1cb1f 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) // one. For now we just have a global pollq. Note that by tying // the pd to a single pollq we may get some kind of cache warmth. - if ((rv = nni_mtx_init(&pd->mtx)) != 0) { - NNI_FREE_STRUCT(pd); - return (rv); - } pd->closed = 0; pd->node.fd = fd; pd->node.cb = nni_posix_pipedesc_cb; @@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) (void) fcntl(fd, F_SETFL, O_NONBLOCK); + nni_mtx_init(&pd->mtx); nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d3fdf394..df5a1799 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq) pq->wakerfd = -1; pq->close = 0; - if (((rv = nni_mtx_init(&pq->mtx)) != 0) || - ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) || - ((rv = nni_posix_pollq_poll_grow(pq)) != 0) || + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); + + if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { nni_posix_pollq_fini(pq); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 09d40b94..dce8270a 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_posix_resolv_mtx); + if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_posix_resolv_mtx); return (rv); diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 0ef4754c..44bfbed2 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -24,118 +24,325 @@ #include <time.h> #include <unistd.h> -static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; -static int nni_plat_inited = 0; -static int nni_plat_forked = 0; +static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER; +static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER; +static int nni_plat_inited = 0; +static int nni_plat_forked = 0; + +pthread_condattr_t nni_cvattr; +pthread_mutexattr_t nni_mxattr; + +#ifndef NDEBUG +int nni_plat_sync_fallback = 0; +#endif -pthread_condattr_t nni_cvattr; -pthread_mutexattr_t nni_mxattr; -static pthread_attr_t nni_pthread_attr; +enum nni_plat_sync_flags { + NNI_PLAT_SYNC_INIT = 0x01, + NNI_PLAT_SYNC_LOCKED = 0x04, + NNI_PLAT_SYNC_WAIT = 0x08, +}; -// We open a /dev/null file descriptor so that we can dup2() it to -// cause MacOS X to wakeup. This gives us a "safe" close semantic. +void +nni_plat_mtx_init(nni_plat_mtx *mtx) +{ + if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) { + mtx->fallback = 1; + } else { + mtx->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + mtx->fallback = 1; + } +#endif +} -int nni_plat_devnull = -1; +void +nni_plat_mtx_fini(nni_plat_mtx *mtx) +{ + if (mtx->flags & NNI_PLAT_SYNC_INIT) { + int rv; + if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { + nni_panic("pthread_mutex_destroy: %s", strerror(rv)); + } + } + mtx->flags = 0; +} -int -nni_plat_mtx_init(nni_plat_mtx *mtx) +static void +nni_pthread_mutex_lock(pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) { - switch (rv) { - case EAGAIN: - case ENOMEM: - return (NNG_ENOMEM); + if ((rv = pthread_mutex_lock(m)) != 0) { + nni_panic("pthread_mutex_lock: %s", strerror(rv)); + } +} - default: - nni_panic("pthread_mutex_init: %s", strerror(rv)); - } +static void +nni_pthread_mutex_unlock(pthread_mutex_t *m) +{ + int rv; + + if ((rv = pthread_mutex_unlock(m)) != 0) { + nni_panic("pthread_mutex_unlock: %s", strerror(rv)); } - mtx->init = 1; - return (0); } -void -nni_plat_mtx_fini(nni_plat_mtx *mtx) +static void +nni_pthread_cond_broadcast(pthread_cond_t *c) { int rv; - if (!mtx->init) { - return; + if ((rv = pthread_cond_broadcast(c)) != 0) { + nni_panic("pthread_cond_broadcast: %s", strerror(rv)); } - pthread_mutex_lock(&mtx->mtx); - pthread_mutex_unlock(&mtx->mtx); - if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_fini: %s", strerror(rv)); +} + +static void +nni_pthread_cond_signal(pthread_cond_t *c) +{ + int rv; + if ((rv = pthread_cond_signal(c)) != 0) { + nni_panic("pthread_cond_signal: %s", strerror(rv)); } - mtx->init = 0; } -void -nni_plat_mtx_lock(nni_plat_mtx *mtx) +static void +nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_lock: %s", strerror(rv)); + if ((rv = pthread_cond_wait(c, m)) != 0) { + nni_panic("pthread_cond_wait: %s", strerror(rv)); + } +} + +static int +nni_pthread_cond_timedwait( + pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts) +{ + int rv; + + switch ((rv = pthread_cond_timedwait(c, m, ts))) { + case 0: + return (0); + case ETIMEDOUT: + case EAGAIN: + return (NNG_ETIMEDOUT); + } + nni_panic("pthread_cond_timedwait: %s", strerror(rv)); + return (NNG_EINVAL); +} + +static void +nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx) +{ + while (mtx->flags & NNI_PLAT_SYNC_LOCKED) { + mtx->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock); } + mtx->flags |= NNI_PLAT_SYNC_LOCKED; mtx->owner = pthread_self(); } +static void +nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx) +{ + NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED); + mtx->flags &= ~NNI_PLAT_SYNC_LOCKED; + if (mtx->flags & NNI_PLAT_SYNC_WAIT) { + mtx->flags &= ~NNI_PLAT_SYNC_WAIT; + pthread_cond_broadcast(&nni_plat_lock_cond); + } +} + +static void +nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_lock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_unlock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->gen++; + cv->wake = 0; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake1_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->wake++; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wait_fallback(nni_cv *cv) +{ + int gen; + + nni_pthread_mutex_lock(&nni_plat_lock); + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + NNI_ASSERT(cv->mtx->owner == pthread_self()); + NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock); + + nni_plat_mtx_lock_fallback_locked(cv->mtx); + } + if (cv->wake > 0) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static int +nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts) +{ + int gen; + int rv = 0; + + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + nni_pthread_mutex_lock(&nni_plat_lock); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + rv = nni_pthread_cond_timedwait( + &nni_plat_cond_cond, &nni_plat_lock, ts); + nni_plat_mtx_lock_fallback_locked(cv->mtx); + if (rv != 0) { + break; + } + } + if ((rv == 0) && (cv->wake > 0)) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); + return (rv); +} + void -nni_plat_mtx_unlock(nni_plat_mtx *mtx) +nni_plat_mtx_lock(nni_plat_mtx *mtx) { int rv; + if (!mtx->fallback) { + nni_pthread_mutex_lock(&mtx->mtx); + + // We might have changed to a fallback lock; make + // sure this did not occur. Note that transitions to + // fallback locks only happen when a thread accesses + // a condition variable already holding this lock, + // so this is guranteed to be safe. + if (!mtx->fallback) { + mtx->owner = pthread_self(); + return; + } + nni_pthread_mutex_unlock(&mtx->mtx); + } + + // Fallback mode + nni_plat_mtx_lock_fallback(mtx); +} + +void +nni_plat_mtx_unlock(nni_plat_mtx *mtx) +{ NNI_ASSERT(mtx->owner == pthread_self()); mtx->owner = 0; - if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_unlock: %s", strerror(rv)); + + if (mtx->fallback) { + nni_plat_mtx_unlock_fallback(mtx); + } else { + nni_pthread_mutex_unlock(&mtx->mtx); } } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { - int rv; - - if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) { - switch (rv) { - case ENOMEM: - case EAGAIN: - return (NNG_ENOMEM); - - default: - nni_panic("pthread_cond_init: %s", strerror(rv)); - } + if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) { + cv->fallback = 1; + } else { + cv->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + cv->fallback = 1; } +#endif cv->mtx = mtx; - return (0); } void nni_plat_cv_wake(nni_plat_cv *cv) { - (void) pthread_cond_broadcast(&cv->cv); + if (cv->fallback) { + nni_plat_cv_wake_fallback(cv); + } else { + nni_pthread_cond_broadcast(&cv->cv); + } } void nni_plat_cv_wake1(nni_plat_cv *cv) { - (void) pthread_cond_signal(&cv->cv); + int rv; + if (cv->fallback) { + nni_plat_cv_wake1_fallback(cv); + } else { + nni_pthread_cond_signal(&cv->cv); + } } void nni_plat_cv_wait(nni_plat_cv *cv) { - int rv; - NNI_ASSERT(cv->mtx->owner == pthread_self()); - if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) { - nni_panic("pthread_cond_wait: %s", strerror(rv)); + if (cv->fallback) { + nni_plat_cv_wait_fallback(cv); + } else { + nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx); + cv->mtx->owner = pthread_self(); } - cv->mtx->owner = pthread_self(); } int @@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) ts.tv_sec = until / 1000000; ts.tv_nsec = (until % 1000000) * 1000; - rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); - cv->mtx->owner = pthread_self(); - if (rv == ETIMEDOUT) { - return (NNG_ETIMEDOUT); - } else if (rv != 0) { - nni_panic("pthread_cond_timedwait: %d", rv); + if (cv->fallback) { + rv = nni_plat_cv_until_fallback(cv, &ts); + } else { + rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); + cv->mtx->owner = pthread_self(); } - return (0); + return (rv); } void @@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv) { int rv; - if (cv->mtx == NULL) { - return; - } - if ((rv = pthread_cond_destroy(&cv->cv)) != 0) { + if ((cv->flags & NNI_PLAT_SYNC_INIT) && + ((rv = pthread_cond_destroy(&cv->cv)) != 0)) { nni_panic("pthread_cond_destroy: %s", strerror(rv)); } - cv->mtx = NULL; + cv->flags = 0; + cv->mtx = NULL; } static void * @@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->arg = arg; // POSIX wants functions to return a void *, but we don't care. - rv = pthread_create( - &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr); + rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr); if (rv != 0) { - // nni_printf("pthread_create: %s", strerror(rv)); + // nni_printf("pthread_create: %s", + // strerror(rv)); return (NNG_ENOMEM); } return (0); @@ -227,7 +432,6 @@ int nni_plat_init(int (*helper)(void)) { int rv; - int devnull; if (nni_plat_forked) { nni_panic("nng is not fork-reentrant safe"); @@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void)) if (nni_plat_inited) { return (0); // fast path } - if ((devnull = open("/dev/null", O_RDONLY)) < 0) { - return (nni_plat_errno(errno)); - } - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { // check again under the lock to be sure - pthread_mutex_unlock(&nni_plat_lock); - close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (0); } if (pthread_condattr_init(&nni_cvattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mxattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - pthread_condattr_destroy(&nni_cvattr); - (void) close(devnull); - return (NNG_ENOMEM); - } - - rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); - pthread_mutexattr_destroy(&nni_mxattr); - pthread_condattr_destroy(&nni_cvattr); - return (NNG_ENOMEM); - } - - rv = pthread_attr_init(&nni_pthread_attr); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); - pthread_mutexattr_destroy(&nni_mxattr); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_condattr_destroy(&nni_cvattr); return (NNG_ENOMEM); } - // We don't force this, but we want to have it small... we could - // probably get by with even just 8k, but Linux usually wants 16k - // as a minimum. If this fails, its not fatal, just we won't be - // as scalable / thrifty with our use of VM. - //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + // if this one fails we don't care. + (void) pthread_mutexattr_settype( + &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); if ((rv = nni_posix_pollq_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if ((rv = nni_posix_resolv_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_pollq_sysfini(); - (void) close(nni_plat_devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); - (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { nni_plat_inited = 1; } - nni_plat_devnull = devnull; - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); return (rv); } @@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); - (void) close(nni_plat_devnull); - nni_plat_devnull = -1; - nni_plat_inited = 0; + nni_plat_inited = 0; } - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); } #else diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index db6d7af4..2aa16b36 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) { - NNI_FREE_STRUCT(udp); - return (rv); - } + nni_mtx_init(&udp->udp_mtx); udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (udp->udp_fd < 0) { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index c4cdcb8a..9c3343b7 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h) int nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) { - int rv; - ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - if (((rv = nni_mtx_init(&evt->mtx)) != 0) || - ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { - return (rv); // NB: This will never happen on Windows. - } + nni_mtx_init(&evt->mtx); + nni_cv_init(&evt->cv, &evt->mtx); + evt->ops = *ops; evt->aio = NULL; evt->ptr = ptr; @@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void) goto fail; } } - if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) { - goto fail; - } + nni_mtx_init(&nni_win_iocp_mtx); for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { nni_thr_run(&nni_win_iocp_thrs[i]); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index c9eb20ec..a60815fa 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void) NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); - if (((rv = nni_mtx_init(&worker->mtx)) != 0) || - ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) { - return (rv); - } + nni_mtx_init(&worker->mtx); + nni_cv_init(&worker->cv, &worker->mtx); + rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); if (rv != 0) { return (rv); diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 63295a71..80e3724d 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -680,8 +680,6 @@ int nni_win_tcp_sysinit(void) { WSADATA data; - WORD ver; - ver = MAKEWORD(2, 2); if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { NNI_ASSERT(LOBYTE(data.wVersion) == 2); NNI_ASSERT(HIBYTE(data.wVersion) == 2); diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c index 861fbc76..edc4df3f 100644 --- a/src/platform/windows/win_pipe.c +++ b/src/platform/windows/win_pipe.c @@ -19,9 +19,9 @@ int nni_plat_pipe_open(int *wfdp, int *rfdp) { - SOCKET afd = INVALID_SOCKET; SOCKET rfd = INVALID_SOCKET; SOCKET wfd = INVALID_SOCKET; + SOCKET afd; struct sockaddr_in addr; socklen_t alen; diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index a01dc123..4ce12d84 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_win_resolv_mtx); + if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_win_resolv_mtx); return (rv); diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index c01ec782..879cd772 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -30,12 +30,11 @@ nni_free(void *b, size_t z) HeapFree(GetProcessHeap(), 0, b); } -int +void nni_plat_mtx_init(nni_plat_mtx *mtx) { InitializeSRWLock(&mtx->srl); mtx->init = 1; - return (0); } void @@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx) ReleaseSRWLockExclusive(&mtx->srl); } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { InitializeConditionVariable(&cv->cv); cv->srl = &mtx->srl; - return (0); } void diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 79d7187e..88fd93e0 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -59,40 +59,28 @@ nni_bus_sock_fini(void *arg) { nni_bus_sock *psock = arg; - if (psock != NULL) { - nni_aio_stop(&psock->aio_getq); - nni_aio_fini(&psock->aio_getq); - nni_mtx_fini(&psock->mtx); - NNI_FREE_STRUCT(psock); - } + nni_aio_stop(&psock->aio_getq); + nni_aio_fini(&psock->aio_getq); + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); } static int nni_bus_sock_init(void **sp, nni_sock *nsock) { nni_bus_sock *psock; - int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); - if ((rv = nni_mtx_init(&psock->mtx)) != 0) { - goto fail; - } - rv = nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); - if (rv != 0) { - goto fail; - } + nni_mtx_init(&psock->mtx); + nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); psock->nsock = nsock; psock->raw = 0; *sp = psock; return (0); - -fail: - nni_bus_sock_fini(psock); - return (rv); } static void @@ -134,36 +122,21 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_NODE_INIT(&ppipe->node); - if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) || - ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); - if (rv != 0) { - goto fail; + if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + NNI_FREE_STRUCT(ppipe); + return (rv); } + NNI_LIST_NODE_INIT(&ppipe->node); + nni_mtx_init(&ppipe->mtx); + nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); + nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); + nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); + nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); - -fail: - nni_bus_pipe_fini(ppipe); - return (rv); } static int diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index a0e907f2..acf9ec25 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -53,15 +53,11 @@ static int pair0_sock_init(void **sp, nni_sock *nsock) { pair0_sock *s; - int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&s->mtx)) != 0) { - NNI_FREE_STRUCT(s); - return (rv); - } + nni_mtx_init(&s->mtx); s->nsock = nsock; s->ppipe = NULL; s->raw = 0; @@ -84,22 +80,19 @@ static int pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) { pair0_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { - pair0_pipe_fini(p); - } else { - p->npipe = npipe; - p->psock = psock; - *pp = p; - } - return (rv); + nni_aio_init(&p->aio_send, pair0_send_cb, p); + nni_aio_init(&p->aio_recv, pair0_recv_cb, p); + nni_aio_init(&p->aio_getq, pair0_getq_cb, p); + nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + + p->npipe = npipe; + p->psock = psock; + *pp = p; + return (0); } static void diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index 1a7ad9fa..2b8a9120 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -73,22 +73,24 @@ pair1_sock_init(void **sp, nni_sock *nsock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&s->pipes)) != 0) { + NNI_FREE_STRUCT(s); + return (NNG_ENOMEM); + } NNI_LIST_INIT(&s->plist, pair1_pipe, node); // Raw mode uses this. - if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) || - ((rv = nni_mtx_init(&s->mtx)) != 0) || - ((rv = nni_idhash_init(&s->pipes)) != 0)) { - pair1_sock_fini(s); - } else { - s->nsock = nsock; - s->raw = 0; - s->poly = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; - } + nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s); + nni_mtx_init(&s->mtx); + + s->nsock = nsock; + s->raw = 0; + s->poly = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; + return (0); } @@ -101,17 +103,19 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { - pair1_pipe_fini(p); - } else { - p->npipe = npipe; - p->psock = psock; - *pp = p; + if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { + NNI_FREE_STRUCT(p); + return (NNG_ENOMEM); } + nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); + nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p); + nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p); + + p->npipe = npipe; + p->psock = psock; + *pp = p; + return (rv); } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index e3c73342..1ebcc4a2 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -63,20 +63,13 @@ static int nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_pull_pipe *pp; - int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) { - NNI_FREE_STRUCT(pp); - return (rv); - } - if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) { - nni_aio_fini(&pp->putq_aio); - NNI_FREE_STRUCT(pp); - return (rv); - } + nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp); + nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp); + pp->pipe = pipe; pp->pull = psock; *ppp = pp; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 14b3b191..1bc1659c 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -93,30 +93,19 @@ static int nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_push_pipe *pp; - int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) { - goto fail; - } - if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) { - goto fail; - } - if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) { - goto fail; - } + nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp); + nni_aio_init(&pp->aio_send, nni_push_send_cb, pp); + nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp); NNI_LIST_NODE_INIT(&pp->node); pp->pipe = pipe; pp->push = psock; *ppp = pp; return (0); - -fail: - nni_push_pipe_fini(pp); - return (rv); } static int diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 161a5d79..940f2139 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -53,20 +53,13 @@ static int nni_pub_sock_init(void **pubp, nni_sock *sock) { nni_pub_sock *pub; - int rv; if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pub->mtx)) != 0) { - nni_pub_sock_fini(pub); - return (rv); - } - rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub); - if (rv != 0) { - nni_pub_sock_fini(pub); - return (rv); - } + nni_mtx_init(&pub->mtx); + nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub); + pub->sock = sock; pub->raw = 0; NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node); @@ -127,31 +120,18 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) } // XXX: consider making this depth tunable if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) { - goto fail; - } - - rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp); - if (rv != 0) { - goto fail; + NNI_FREE_STRUCT(pp); + return (rv); } - rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp); - if (rv != 0) { - goto fail; - } + nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp); + nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp); + nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp); - rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp); - if (rv != 0) { - goto fail; - } pp->pipe = pipe; pp->pub = psock; *ppp = pp; return (0); - -fail: - nni_pub_pipe_fini(pp); - return (rv); } static int diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 53f01e0f..78b9d157 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -95,16 +95,13 @@ static int nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) { nni_sub_pipe *sp; - int rv; if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp)) != 0) || - ((rv = nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp)) != 0)) { - nni_sub_pipe_fini(sp); - return (rv); - } + nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp); + nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp); + sp->pipe = pipe; sp->sub = ssock; *spp = sp; diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 4319cbf8..09f2b285 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -74,20 +74,18 @@ nni_rep_sock_init(void **repp, nni_sock *sock) if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&rep->pipes)) != 0) { + NNI_FREE_STRUCT(rep); + return (rv); + } + rep->ttl = 8; // Per RFC rep->sock = sock; rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_init(&rep->pipes)) != 0) { - goto fail; - } - - rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); - if (rv != 0) { - goto fail; - } + nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); rep->uwq = nni_sock_sendq(sock); rep->urq = nni_sock_recvq(sock); @@ -96,10 +94,6 @@ nni_rep_sock_init(void **repp, nni_sock *sock) nni_sock_senderr(sock, NNG_ESTATE); return (0); - -fail: - nni_rep_sock_fini(rep); - return (rv); } static void @@ -128,32 +122,18 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) return (NNG_ENOMEM); } if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != - 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != - 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != - 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != - 0) { - goto fail; + NNI_FREE_STRUCT(rp); + return (rv); } + nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp); + nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp); + nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp); + nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp); + rp->pipe = pipe; rp->rep = rsock; *rpp = rp; return (0); - -fail: - nni_rep_pipe_fini(rp); - return (rv); } static void diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index fdf29fd9..bab81331 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -75,19 +75,12 @@ static int nni_req_sock_init(void **reqp, nni_sock *sock) { nni_req_sock *req; - int rv; if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&req->mtx)) != 0) { - NNI_FREE_STRUCT(req); - return (rv); - } - if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) { - nni_mtx_fini(&req->mtx); - NNI_FREE_STRUCT(req); - } + nni_mtx_init(&req->mtx); + nni_cv_init(&req->cv, &req->mtx); NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); @@ -152,41 +145,22 @@ static int nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { nni_req_pipe *rp; - int rv; if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&rp->mtx)) != 0) { - goto failed; - } - if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) { - goto failed; - } - if ((rv = nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp)) != 0) { - goto failed; - } - if ((rv = nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp)) != 0) { - goto failed; - } - rv = nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp); - if (rv != 0) { - goto failed; - } + nni_mtx_init(&rp->mtx); + nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp); + nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp); + nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp); + nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp); + nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp); NNI_LIST_NODE_INIT(&rp->node); rp->pipe = pipe; rp->req = rsock; *rpp = rp; return (0); - -failed: - nni_req_pipe_fini(rp); - return (rv); } static void diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 32513134..a097f551 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -77,6 +77,11 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&psock->pipes)) != 0) { + NNI_FREE_STRUCT(psock); + return (rv); + } + psock->ttl = 8; // Per RFC psock->nsock = nsock; psock->raw = 0; @@ -85,24 +90,12 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->urq = nni_sock_recvq(nsock); psock->uwq = nni_sock_sendq(nsock); - if ((rv = nni_mtx_init(&psock->mtx)) != 0) { - goto fail; - } - if ((rv = nni_idhash_init(&psock->pipes)) != 0) { - goto fail; - } - rv = nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock); - if (rv != 0) { - goto fail; - } + nni_mtx_init(&psock->mtx); + nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock); *pp = psock; nni_sock_senderr(nsock, NNG_ESTATE); return (0); - -fail: - nni_resp_sock_fini(psock); - return (rv); } static void @@ -131,33 +124,18 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (NNG_ENOMEM); } if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe); - if (rv != 0) { - goto fail; + NNI_FREE_STRUCT(ppipe); + return (rv); } + nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe); + nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe); + nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe); + nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe); ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); - -fail: - nni_resp_pipe_fini(ppipe); - return (rv); } static void diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index cb90c13f..2a32f289 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -70,19 +70,13 @@ static int nni_surv_sock_init(void **sp, nni_sock *nsock) { nni_surv_sock *psock; - int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&psock->mtx)) != 0) { - goto fail; - } - rv = nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); - if (rv != 0) { - goto fail; - } NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); + nni_mtx_init(&psock->mtx); + nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); nni_timer_init(&psock->timer, nni_surv_timeout, psock); psock->nextid = nni_random(); @@ -96,10 +90,6 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) *sp = psock; nni_sock_recverr(nsock, NNG_ESTATE); return (0); - -fail: - nni_surv_sock_fini(psock); - return (rv); } static void @@ -143,32 +133,19 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) } // This depth could be tunable. if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe); - if (rv != 0) { - goto failed; + NNI_FREE_STRUCT(ppipe); + return (rv); } + + nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); + nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe); + nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe); + nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe); + ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); - -failed: - nni_surv_pipe_fini(ppipe); - return (rv); } static int diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 3bc24c41..08cf99a2 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -65,14 +65,9 @@ static nni_inproc_global nni_inproc; static int nni_inproc_init(void) { - int rv; - NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node); - if ((rv = nni_mtx_init(&nni_inproc.mx)) != 0) { - return (rv); - } - + nni_mtx_init(&nni_inproc.mx); return (0); } @@ -309,8 +304,7 @@ nni_inproc_accept_clients(nni_inproc_ep *server) continue; } - if (((rv = nni_mtx_init(&pair->mx)) != 0) || - ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { nni_inproc_pair_destroy(pair); nni_inproc_conn_finish(caio, rv); @@ -318,6 +312,8 @@ nni_inproc_accept_clients(nni_inproc_ep *server) continue; } + nni_mtx_init(&pair->mx); + pair->pipes[0] = caio->a_pipe; pair->pipes[1] = saio->a_pipe; pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 7d976122..0b0e487f 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -107,18 +107,14 @@ static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&p->mtx)) != 0) || - ((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) { - nni_ipc_pipe_fini(p); - return (rv); - } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p); + nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p); + nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -490,12 +486,14 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&ep->mtx)) != 0) || - ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || - ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) { - nni_ipc_ep_fini(ep); + if ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0) { + NNI_FREE_STRUCT(ep); return (rv); } + + nni_mtx_init(&ep->mtx); + nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep); + ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 4d47733b..b3136b35 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -107,18 +107,15 @@ static int nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&p->mtx)) != 0) || - ((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) { - nni_tcp_pipe_fini(p); - return (rv); - } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p); + nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p); + nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p); + p->proto = ep->proto; p->rcvmax = ep->rcvmax; p->tpp = tpp; @@ -555,12 +552,13 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&ep->mtx)) != 0) || - ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) || - ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) { - nni_tcp_ep_fini(ep); + if ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0) { + NNI_FREE_STRUCT(ep); return (rv); } + nni_mtx_init(&ep->mtx); + nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep); + ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 73484a4a..754d3ab2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -85,6 +85,7 @@ add_nng_test(pubsub 5) add_nng_test(resolv 10) add_nng_test(sock 5) add_nng_test(survey 5) +add_nng_test(synch 5) add_nng_test(tcp 5) add_nng_test(scalability 20) add_nng_test(message 5) diff --git a/tests/pair1.c b/tests/pair1.c index 08691fd4..d160acdd 100644 --- a/tests/pair1.c +++ b/tests/pair1.c @@ -94,7 +94,6 @@ TestMain("PAIRv1 protocol", { }); Convey("Cannot set raw mode after connect", { - int r = 1; So(nng_listen(s1, addr, NULL, 0) == 0); So(nng_dial(c1, addr, NULL, 0) == 0); nng_usleep(100000); @@ -313,7 +312,6 @@ TestMain("PAIRv1 protocol", { int ttl; ttl = 0; - sz = sizeof(ttl); So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0) == NNG_EINVAL); @@ -423,7 +421,6 @@ TestMain("PAIRv1 protocol", { uint32_t hops; nng_pipe p1; nng_pipe p2; - size_t sz; So(nng_getopt_int(s1, NNG_OPT_POLYAMOROUS, &v) == 0); So(v == 0); diff --git a/tests/platform.c b/tests/platform.c index 5bfdc367..a28bc4e4 100644 --- a/tests/platform.c +++ b/tests/platform.c @@ -1,5 +1,6 @@ // -// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -8,20 +9,20 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" -#ifndef _WIN32 +#ifndef _WIN32 #include <sys/time.h> #endif uint64_t getms(void) { -#ifdef _WIN32 - return (GetTickCount64()) ; +#ifdef _WIN32 + return (GetTickCount64()); #else - static time_t epoch; + static time_t epoch; struct timeval tv; if (epoch == 0) { @@ -35,7 +36,7 @@ getms(void) return (0); } tv.tv_sec -= epoch; - return (((uint64_t)(tv.tv_sec ) * 1000) + (tv.tv_usec / 1000)); + return (((uint64_t)(tv.tv_sec) * 1000) + (tv.tv_usec / 1000)); #endif } @@ -43,15 +44,15 @@ getms(void) void add(void *arg) { - *(int *)arg += 1; + *(int *) arg += 1; } // Notify tests for verifying condvars. struct notifyarg { - int did; - int when; + int did; + int when; nni_mtx mx; - nni_cv cv; + nni_cv cv; }; void @@ -66,17 +67,10 @@ notifyafter(void *arg) nni_mtx_unlock(&na->mx); } -int -nop(void) -{ - return (0); -} +TestMain("Platform Operations", { -Main({ nni_init(); - Test("Platform Operations", { - // This is required for anything else to work Convey("The clock works", { uint64_t now = getms(); @@ -84,13 +78,13 @@ Main({ Convey("usleep works", { nni_usleep(100000); - So((getms() - now) >= 100); // cannot be *shorter*!! - So((getms() - now) < 150); // crummy clock resolution? - }) + So((getms() - now) >= 100); // cannot be *shorter*!! + So((getms() - now) < 150); // crummy clock resolution? + }); Convey("times work", { uint64_t msend; - int usdelta; - int msdelta; + int usdelta; + int msdelta; nni_time usend; nni_time usnow = nni_clock(); nni_usleep(200000); @@ -99,19 +93,17 @@ Main({ So(usend > usnow); So(msend > now); - usdelta = (int)((usend - usnow) / 1000); - msdelta = (int)((msend - now)); + usdelta = (int) ((usend - usnow) / 1000); + msdelta = (int) ((msend - now)); So(usdelta >= 200); So(usdelta < 220); So(abs(msdelta - usdelta) < 20); - }) - }) + }); + }); Convey("Mutexes work", { static nni_mtx mx; - int rv; - rv = nni_mtx_init(&mx); - So(rv == 0); + nni_mtx_init(&mx); Convey("We can lock a mutex", { nni_mtx_lock(&mx); @@ -124,40 +116,36 @@ Main({ So(1); nni_mtx_unlock(&mx); So(1); - }) - }) - }) - Convey("We can finalize it", { - nni_mtx_fini(&mx); - }) - }) + }); + }); + }); + Convey("We can finalize it", { nni_mtx_fini(&mx); }); + }); Convey("Threads work", { static nni_thr thr; - int val = 0; - int rv; + int val = 0; + int rv; Convey("We can create threads", { rv = nni_thr_init(&thr, add, &val); So(rv == 0); nni_thr_run(&thr); - Reset({ - nni_thr_fini(&thr); - }) + Reset({ nni_thr_fini(&thr); }); Convey("It ran", { - nni_usleep(50000); // for context switch + nni_usleep(50000); // for context switch So(val == 1); - }) - }) - }) + }); + }); + }); Convey("Condition variables work", { static struct notifyarg arg; - static nni_thr thr; + static nni_thr thr; - So(nni_mtx_init(&arg.mx) == 0); - So(nni_cv_init(&arg.cv, &arg.mx) == 0); + nni_mtx_init(&arg.mx); + nni_cv_init(&arg.cv, &arg.mx); So(nni_thr_init(&thr, notifyafter, &arg) == 0); Reset({ @@ -167,7 +155,7 @@ Main({ }); Convey("Notification works", { - arg.did = 0; + arg.did = 0; arg.when = 10000; nni_thr_run(&thr); @@ -178,10 +166,10 @@ Main({ nni_mtx_unlock(&arg.mx); nni_thr_wait(&thr); So(arg.did == 1); - }) + }); Convey("Timeout works", { - arg.did = 0; + arg.did = 0; arg.when = 200000; nni_thr_run(&thr); nni_mtx_lock(&arg.mx); @@ -191,10 +179,9 @@ Main({ So(arg.did == 0); nni_mtx_unlock(&arg.mx); nni_thr_wait(&thr); - }) - + }); Convey("Not running works", { - arg.did = 0; + arg.did = 0; arg.when = 1; nni_mtx_lock(&arg.mx); if (!arg.did) { @@ -202,8 +189,6 @@ Main({ } So(arg.did == 0); nni_mtx_unlock(&arg.mx); - }) - }) - }) - nni_fini(); + }); + }); }) diff --git a/tests/synch.c b/tests/synch.c new file mode 100644 index 00000000..a49e27e1 --- /dev/null +++ b/tests/synch.c @@ -0,0 +1,292 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "core/nng_impl.h" +#include "nng.h" + +// Notify tests for verifying condvars. +struct notifyarg { + int did; + int when; + nni_mtx mx; + nni_cv cv; +}; + +#ifdef PLATFORM_POSIX +#ifndef NDEBUG +#define SYNC_FALLBACK 1 +#endif +#endif + +void +notifyafter(void *arg) +{ + struct notifyarg *na = arg; + + nni_usleep(na->when); + nni_mtx_lock(&na->mx); + na->did = 1; + nni_cv_wake(&na->cv); + nni_mtx_unlock(&na->mx); +} + +struct notifyarg arg; +nni_thr thr; + +static void +test_sync(void) +{ + Convey("Mutexes work", { + nni_mtx mx; + + nni_mtx_init(&mx); + + Convey("We can lock a mutex", { + nni_mtx_lock(&mx); + So(1); + Convey("And we can unlock it", { + nni_mtx_unlock(&mx); + So(1); + Convey("And then lock it again", { + nni_mtx_lock(&mx); + So(1); + nni_mtx_unlock(&mx); + So(1); + }); + }); + Convey("Things block properly", { + + nni_mtx_init(&arg.mx); + nni_cv_init(&arg.cv, &arg.mx); + So(nni_thr_init(&thr, notifyafter, &arg) == 0); + arg.did = 0; + arg.when = 0; + nni_mtx_lock(&arg.mx); + nni_thr_run(&thr); + nng_usleep(10000); + So(arg.did == 0); + nni_mtx_unlock(&arg.mx); + nng_usleep(10000); + nni_mtx_lock(&arg.mx); + while (!arg.did) { + nni_cv_wait(&arg.cv); + } + So(arg.did != 0); + nni_mtx_unlock(&arg.mx); + nni_thr_fini(&thr); + nni_cv_fini(&arg.cv); + nni_mtx_fini(&arg.mx); + }) + }); + Convey("We can finalize it", { nni_mtx_fini(&mx); }); + }); + + Convey("Condition variables work", { + + nni_mtx_init(&arg.mx); + nni_cv_init(&arg.cv, &arg.mx); + So(nni_thr_init(&thr, notifyafter, &arg) == 0); + + Reset({ + nni_cv_fini(&arg.cv); + nni_mtx_fini(&arg.mx); + nni_thr_fini(&thr); + }); + + Convey("Notification works", { + arg.did = 0; + arg.when = 10000; + nni_thr_run(&thr); + + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_wait(&arg.cv); + } + nni_mtx_unlock(&arg.mx); + nni_thr_wait(&thr); + So(arg.did == 1); + }); + + Convey("Timeout works", { + arg.did = 0; + arg.when = 200000; + nni_thr_run(&thr); + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_until(&arg.cv, nni_clock() + 10000); + } + So(arg.did == 0); + nni_mtx_unlock(&arg.mx); + nni_thr_wait(&thr); + }); + + Convey("Empty timeout is EAGAIN", { + nni_mtx_lock(&arg.mx); + So(nni_cv_until(&arg.cv, 0) == NNG_EAGAIN); + nni_mtx_unlock(&arg.mx); + }); + + Convey("Not running works", { + arg.did = 0; + arg.when = 1; + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_until(&arg.cv, nni_clock() + 10000); + } + So(arg.did == 0); + nni_mtx_unlock(&arg.mx); + }); + }); +} + +#if SYNC_FALLBACK +extern int nni_plat_sync_fallback; + +#define ConveyFB(x, y) Convey(x, y) + +static void +test_sync_fallback(void) +{ + nni_plat_sync_fallback = 1; + Convey("Mutexes work", { + nni_mtx mx; + int rv; + + nni_mtx_init(&mx); + + Convey("We can lock a mutex", { + nni_mtx_lock(&mx); + So(1); + Convey("And we can unlock it", { + nni_mtx_unlock(&mx); + So(1); + Convey("And then lock it again", { + nni_mtx_lock(&mx); + So(1); + nni_mtx_unlock(&mx); + So(1); + }); + }); + Convey("Things block properly", { + + nni_mtx_init(&arg.mx); + nni_cv_init(&arg.cv, &arg.mx); + So(nni_thr_init(&thr, notifyafter, &arg) == 0); + arg.did = 0; + arg.when = 0; + nni_mtx_lock(&arg.mx); + nni_thr_run(&thr); + nng_usleep(10000); + So(arg.did == 0); + nni_mtx_unlock(&arg.mx); + nng_usleep(10000); + nni_mtx_lock(&arg.mx); + while (!arg.did) { + nni_cv_wait(&arg.cv); + } + So(arg.did != 0); + nni_mtx_unlock(&arg.mx); + nni_thr_fini(&thr); + nni_cv_fini(&arg.cv); + nni_mtx_fini(&arg.mx); + }) + }); + Convey("We can finalize it", { nni_mtx_fini(&mx); }); + }); + + Convey("Condition variables work", { + + nni_mtx_init(&arg.mx); + nni_cv_init(&arg.cv, &arg.mx); + So(nni_thr_init(&thr, notifyafter, &arg) == 0); + + Reset({ + nni_cv_fini(&arg.cv); + nni_mtx_fini(&arg.mx); + nni_thr_fini(&thr); + }); + + Convey("Notification works", { + arg.did = 0; + arg.when = 10000; + nni_thr_run(&thr); + + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_wait(&arg.cv); + } + nni_mtx_unlock(&arg.mx); + nni_thr_wait(&thr); + So(arg.did == 1); + }); + + Convey("Timeout works", { + arg.did = 0; + arg.when = 200000; + nni_thr_run(&thr); + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_until(&arg.cv, nni_clock() + 10000); + } + So(arg.did == 0); + nni_mtx_unlock(&arg.mx); + nni_thr_wait(&thr); + }); + + Convey("Empty timeout is EAGAIN", { + nni_mtx_lock(&arg.mx); + So(nni_cv_until(&arg.cv, 0) == NNG_EAGAIN); + nni_mtx_unlock(&arg.mx); + }); + + Convey("Not running works", { + arg.did = 0; + arg.when = 1; + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_until(&arg.cv, nni_clock() + 10000); + } + So(arg.did == 0); + nni_mtx_unlock(&arg.mx); + }); + }); +} +#else +#define ConveyFB(x, y) +#endif + +TestMain("Synchronization", { + nni_init(); + + Convey("Synchronization works", { test_sync(); }); + + ConveyFB("Fallback synchronization works", { test_sync_fallback(); }); + + ConveyFB("Transform works", { + nni_plat_sync_fallback = 0; + nni_mtx_init(&arg.mx); + nni_plat_sync_fallback = 1; + nni_cv_init(&arg.cv, &arg.mx); + So(nni_thr_init(&thr, notifyafter, &arg) == 0); + + arg.did = 0; + arg.when = 10000; + nni_thr_run(&thr); + + nni_mtx_lock(&arg.mx); + if (!arg.did) { + nni_cv_wait(&arg.cv); + } + nni_mtx_unlock(&arg.mx); + nni_thr_wait(&thr); + So(arg.did == 1); + }); +}) |
