diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-15 21:59:55 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-16 18:31:42 -0700 |
| commit | a9633313ec8e578c805cd53b37ba3360d83157bc (patch) | |
| tree | 14d32c4031ea1c8508a75469407ca77e353fa315 /src | |
| parent | e7e2a6c14f0317eb77711951c6f1a650d4013dfe (diff) | |
| download | nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.gz nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.bz2 nng-a9633313ec8e578c805cd53b37ba3360d83157bc.zip | |
Provide versions of mutex, condvar, and aio init that never fail.
If the underlying platform fails (FreeBSD is the only one I'm aware
of that does this!), we use a global lock or condition variable instead.
This means that our lock initializers never ever fail.
Probably we could eliminate most of this for Linux and Darwin, since
on those platforms, mutex and condvar initialization reasonably never
fails. Initial benchmarks show little difference either way -- so we
can revisit (optimize) later.
This removes a lot of otherwise untested code in error cases and so forth,
improving coverage and resilience in the face of allocation failures.
Platforms other than POSIX should follow a similar pattern if they need
this. (VxWorks, I'm thinking of you.) Most sane platforms won't have
an issue here, since normally these initializations do not need to allocate
memory. (Reportedly, even FreeBSD has plans to "fix" this in libthr2.)
While here, some bugs were fixed in initialization & teardown.
The fallback code is properly tested with dedicated test cases.
Diffstat (limited to 'src')
47 files changed, 569 insertions, 644 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a39c0118..fe9bcde8 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -56,20 +56,14 @@ static nni_list nni_aio_expire_aios; static void nni_aio_expire_add(nni_aio *); -int +void nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { - int rv; - memset(aio, 0, sizeof(*aio)); - if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) { - return (rv); - } + nni_cv_init(&aio->a_cv, &nni_aio_lk); aio->a_expire = NNI_TIME_NEVER; aio->a_init = 1; nni_task_init(NULL, &aio->a_task, cb, arg); - - return (0); } void @@ -350,46 +344,43 @@ nni_aio_expire_loop(void *arg) } } -int -nni_aio_sys_init(void) +void +nni_aio_sys_fini(void) { - int rv; nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; - if (((rv = nni_mtx_init(mtx)) != 0) || - ((rv = nni_cv_init(cv, mtx)) != 0) || - ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) { - goto fail; + if (nni_aio_expire_run) { + nni_mtx_lock(mtx); + nni_aio_expire_run = 0; + nni_cv_wake(cv); + nni_mtx_unlock(mtx); } - NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); - nni_aio_expire_run = 1; - nni_thr_run(thr); - return (0); -fail: nni_thr_fini(thr); nni_cv_fini(cv); nni_mtx_fini(mtx); - return (rv); } -void -nni_aio_sys_fini(void) +int +nni_aio_sys_init(void) { + int rv; nni_mtx *mtx = &nni_aio_lk; nni_cv * cv = &nni_aio_expire_cv; nni_thr *thr = &nni_aio_expire_thr; - if (nni_aio_expire_run) { - nni_mtx_lock(mtx); - nni_aio_expire_run = 0; - nni_cv_wake(cv); - nni_mtx_unlock(mtx); + NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node); + nni_mtx_init(mtx); + nni_cv_init(cv, mtx); + + if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) { + nni_aio_sys_fini(); + return (rv); } - nni_thr_fini(thr); - nni_cv_fini(cv); - nni_mtx_fini(mtx); -}
\ No newline at end of file + nni_aio_expire_run = 1; + nni_thr_run(thr); + return (0); +} diff --git a/src/core/aio.h b/src/core/aio.h index d48442eb..aabb3fa9 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -66,7 +66,7 @@ struct nni_aio { // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern int nni_aio_init(nni_aio *, nni_cb, void *); +extern void nni_aio_init(nni_aio *, nni_cb, void *); // nni_aio_fini finalizes the aio, releasing resources (locks) // associated with it. The caller is responsible for ensuring that any diff --git a/src/core/defs.h b/src/core/defs.h index 69fe4843..4d8e6ffb 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -17,9 +17,13 @@ // superior, support for such are not universal. #define NNI_ARG_UNUSED(x) ((void) x); +#ifndef NDEBUG #define NNI_ASSERT(x) \ if (!(x)) \ nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, #x) +#else +#define NNI_ASSERT(x) +#endif // These types are common but have names shared with user space. typedef struct nng_msg nni_msg; diff --git a/src/core/endpt.c b/src/core/endpt.c index 0ab35ea3..34debc0e 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -57,10 +57,10 @@ nni_ep_sys_init(void) { int rv; - if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) || - ((rv = nni_idhash_init(&nni_eps)) != 0)) { + if ((rv = nni_idhash_init(&nni_eps)) != 0) { return (rv); } + nni_mtx_init(&nni_ep_lk); nni_idhash_set_limits( nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); @@ -152,13 +152,14 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) nni_pipe_ep_list_init(&ep->ep_pipes); - if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) || - ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) || - ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || - ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || - ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + nni_mtx_init(&ep->ep_mtx); + nni_cv_init(&ep->ep_cv, &ep->ep_mtx); + nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep); + nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep); + nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep); + nni_aio_init(&ep->ep_con_syn, NULL, NULL); + + if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); diff --git a/src/core/event.c b/src/core/event.c index 79910d80..ab157b02 100644 --- a/src/core/event.c +++ b/src/core/event.c @@ -12,13 +12,12 @@ #include <stdlib.h> #include <string.h> -int +void nni_ev_init(nni_event *event, int type, nni_sock *sock) { memset(event, 0, sizeof(*event)); event->e_type = type; event->e_sock = sock; - return (0); } void diff --git a/src/core/event.h b/src/core/event.h index 6d9a9394..22af096e 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -28,7 +28,7 @@ struct nng_notify { nni_aio n_aio; }; -extern int nni_ev_init(nni_event *, int, nni_sock *); +extern void nni_ev_init(nni_event *, int, nni_sock *); extern void nni_ev_fini(nni_event *); #endif // CORE_EVENT_H diff --git a/src/core/idhash.c b/src/core/idhash.c index ab6b67e1..03854fc8 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -36,15 +36,11 @@ int nni_idhash_init(nni_idhash **hp) { nni_idhash *h; - int rv; if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&h->ih_mtx)) != 0) { - NNI_FREE_STRUCT(h); - return (rv); - } + nni_mtx_init(&h->ih_mtx); h->ih_entries = NULL; h->ih_count = 0; h->ih_load = 0; diff --git a/src/core/init.c b/src/core/init.c index 24a7b118..4025c0f4 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,5 +1,5 @@ // -// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Garrett D'Amore <garrett@damore.org> // Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -39,8 +39,6 @@ nni_init(void) void nni_fini(void) { - // XXX: We should make sure that underlying sockets and - // file descriptors are closed. Details TBD. nni_tran_sys_fini(); nni_pipe_sys_fini(); nni_ep_sys_fini(); diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6530857e..ef7dd5a2 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -39,7 +39,6 @@ int nni_msgq_init(nni_msgq **mqp, unsigned cap) { struct nni_msgq *mq; - int rv; int alloc; // We allocate 2 extra cells in the fifo. One to accommodate a @@ -52,21 +51,18 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) { return (NNG_ENOMEM); } + if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) { + NNI_FREE_STRUCT(mq); + return (NNG_ENOMEM); + } + nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_aio_list_init(&mq->mq_aio_notify_get); nni_aio_list_init(&mq->mq_aio_notify_put); - if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { - goto fail; - } - if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) { - goto fail; - } - if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) { - rv = NNG_ENOMEM; - goto fail; - } + nni_mtx_init(&mq->mq_lock); + nni_cv_init(&mq->mq_drained, &mq->mq_lock); mq->mq_cap = cap; mq->mq_alloc = alloc; @@ -80,15 +76,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) *mqp = mq; return (0); - -fail: - nni_cv_fini(&mq->mq_drained); - nni_mtx_fini(&mq->mq_lock); - if (mq->mq_msgs != NULL) { - nni_free(mq->mq_msgs, sizeof(nng_msg *) * alloc); - } - NNI_FREE_STRUCT(mq); - return (rv); } void @@ -413,9 +400,7 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) nni_aio aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); aio.a_expire = expire; nni_msgq_aio_get(mq, &aio); nni_aio_wait(&aio); @@ -433,9 +418,7 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) nni_aio aio; int rv; - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } + nni_aio_init(&aio, NULL, NULL); aio.a_expire = expire; aio.a_msg = msg; nni_msgq_aio_put(mq, &aio); diff --git a/src/core/pipe.c b/src/core/pipe.c index 75c4c8d6..cdbeb6a7 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -49,9 +49,10 @@ nni_pipe_sys_init(void) int rv; NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); + nni_mtx_init(&nni_pipe_reap_lk); + nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); + if (((rv = nni_idhash_init(&nni_pipes)) != 0) || - ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) || - ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) || ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != 0)) { return (rv); @@ -240,11 +241,11 @@ nni_pipe_create(nni_ep *ep, void *tdata) NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || - ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) || - ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != - 0) || - ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || + nni_mtx_init(&p->p_mtx); + nni_cv_init(&p->p_cv, &p->p_mtx); + nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p); + + if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) || ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); diff --git a/src/core/platform.h b/src/core/platform.h index 7acf16ef..f6f0b974 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -85,10 +85,9 @@ typedef struct nni_plat_thr nni_plat_thr; // Threading & Synchronization Support // -// nni_plat_mtx_init initializes a mutex structure. This may require dynamic -// allocation, depending on the platform. It can return NNG_ENOMEM if that -// fails. An initialized mutex must be distinguishable from zeroed memory. -extern int nni_plat_mtx_init(nni_plat_mtx *); +// nni_plat_mtx_init initializes a mutex structure. An initialized mutex must +// be distinguishable from zeroed memory. +extern void nni_plat_mtx_init(nni_plat_mtx *); // nni_plat_mtx_fini destroys the mutex and releases any resources allocated // for it's use. If the mutex is zeroed memory, this should do nothing. @@ -99,20 +98,14 @@ extern void nni_plat_mtx_fini(nni_plat_mtx *); extern void nni_plat_mtx_lock(nni_plat_mtx *); // nni_plat_mtx_unlock unlocks the mutex. This can only be performed by the -// threadthat owned the mutex. +// thread that owned the mutex. extern void nni_plat_mtx_unlock(nni_plat_mtx *); -// nni_plat_mtx_tryenter tries to lock the mutex. If it can't, it may return -// NNG_EBUSY if the mutex is already owned. -extern int nni_plat_mtx_trylock(nni_plat_mtx *); - // nni_plat_cv_init initializes a condition variable. We require a mutex be // supplied with it, and that mutex must always be held when performing any -// operations on the condition variable (other than fini.) This may require -// dynamic allocation, and if so this operation may fail with NNG_ENOMEM. -// As with mutexes, an initialized mutex should be distinguishable from -// zeroed memory. -extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); +// operations on the condition variable (other than fini.) As with mutexes, an +// initialized mutex should be distinguishable from zeroed memory. +extern void nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); // nni_plat_cv_fini releases all resources associated with condition variable. // If the cv points to just zeroed memory (was never initialized), it does diff --git a/src/core/random.c b/src/core/random.c index eb0b4fc2..febd0848 100644 --- a/src/core/random.c +++ b/src/core/random.c @@ -170,12 +170,8 @@ nni_random_sys_init(void) { // minimally, grab the system clock nni_isaac_ctx *ctx = &nni_random_ctx; - int rv; - - if ((rv = nni_mtx_init(&ctx->mx)) != 0) { - return (rv); - } + nni_mtx_init(&ctx->mx); nni_plat_seed_prng(ctx->randrsl, sizeof(ctx->randrsl)); nni_isaac_randinit(ctx, 1); return (0); diff --git a/src/core/socket.c b/src/core/socket.c index 6a243650..9aa89a2d 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -231,7 +231,6 @@ nni_notify * nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) { nni_notify *notify; - int rv; if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { return (NULL); @@ -244,31 +243,19 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) switch (type) { case NNG_EV_CAN_RCV: - rv = nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - if (rv != 0) { - goto fail; - } + nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); nni_msgq_aio_notify_get(sock->s_urq, ¬ify->n_aio); break; case NNG_EV_CAN_SND: - rv = nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - if (rv != 0) { - goto fail; - } + nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); nni_msgq_aio_notify_put(sock->s_uwq, ¬ify->n_aio); break; default: - rv = NNG_ENOTSUP; - goto fail; - break; + NNI_FREE_STRUCT(notify); + return (NULL); } return (notify); - -fail: - nni_aio_fini(¬ify->n_aio); - NNI_FREE_STRUCT(notify); - return (NULL); } void @@ -343,13 +330,13 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); + nni_mtx_init(&s->s_mx); + nni_cv_init(&s->s_cv, &s->s_mx); + nni_cv_init(&s->s_close_cv, &nni_sock_lk); + nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s); + nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s); - if (((rv = nni_mtx_init(&s->s_mx)) != 0) || - ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) || - ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) || - ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) || - ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) || - ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || + if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) { nni_sock_destroy(s); @@ -365,8 +352,9 @@ nni_sock_sys_init(void) int rv; NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); - if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || - ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) { + nni_mtx_init(&nni_sock_lk); + + if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) { nni_sock_sys_fini(); } else { nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); @@ -562,6 +550,9 @@ nni_sock_closeall(void) { nni_sock *s; + if (nni_sock_hash == NULL) { + return; + } for (;;) { nni_mtx_lock(&nni_sock_lk); if ((s = nni_list_first(&nni_sock_list)) == NULL) { diff --git a/src/core/taskq.c b/src/core/taskq.c index e0fc456c..d0f04e8a 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -85,12 +85,9 @@ nni_taskq_init(nni_taskq **tqp, int nthr) tq->tq_nthreads = nthr; NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node); - if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) || - ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) || - ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) { - nni_taskq_fini(tq); - return (rv); - } + nni_mtx_init(&tq->tq_mtx); + nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx); + nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx); for (i = 0; i < nthr; i++) { tq->tq_threads[i].tqt_tq = tq; diff --git a/src/core/thread.c b/src/core/thread.c index 6c3bd9f3..54c9c7d2 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -9,10 +9,10 @@ #include "core/nng_impl.h" -int +void nni_mtx_init(nni_mtx *mtx) { - return (nni_plat_mtx_init(mtx)); + nni_plat_mtx_init(mtx); } void @@ -33,10 +33,10 @@ nni_mtx_unlock(nni_mtx *mtx) nni_plat_mtx_unlock(mtx); } -int +void nni_cv_init(nni_cv *cv, nni_mtx *mtx) { - return (nni_plat_cv_init(cv, mtx)); + nni_plat_cv_init(cv, mtx); } void @@ -110,15 +110,9 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) thr->fn = fn; thr->arg = arg; - if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) { - thr->done = 1; - return (rv); - } - if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) { - nni_plat_mtx_fini(&thr->mtx); - thr->done = 1; - return (rv); - } + nni_plat_mtx_init(&thr->mtx); + nni_plat_cv_init(&thr->cv, &thr->mtx); + if (fn == NULL) { thr->init = 1; thr->done = 1; diff --git a/src/core/thread.h b/src/core/thread.h index 94b2a984..ee83b196 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -25,9 +26,8 @@ struct nni_thr { int init; }; -// nni_mtx_init initializes the mutex. (Win32 programmers take note; -// our mutexes are actually CriticalSections on Win32.) -extern int nni_mtx_init(nni_mtx *mtx); +// nni_mtx_init initializes the mutex. +extern void nni_mtx_init(nni_mtx *mtx); // nni_mtx_fini destroys the mutex and releases any resources used by it. extern void nni_mtx_fini(nni_mtx *mtx); @@ -43,7 +43,7 @@ extern void nni_mtx_unlock(nni_mtx *mtx); // nni_cv_init initializes the condition variable. The mutex supplied // must always be locked with the condition variable. -extern int nni_cv_init(nni_cv *cv, nni_mtx *); +extern void nni_cv_init(nni_cv *cv, nni_mtx *); // nni_cv_fini releases resources associated with the condition variable, // which must not be in use at the time. diff --git a/src/core/timer.c b/src/core/timer.c index 73bc7604..7608f7d5 100644 --- a/src/core/timer.c +++ b/src/core/timer.c @@ -40,10 +40,11 @@ nni_timer_sys_init(void) memset(timer, 0, sizeof(*timer)); NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node); - if (((rv = nni_mtx_init(&timer->t_mx)) != 0) || - ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) || - ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) || - ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) { + nni_mtx_init(&timer->t_mx); + nni_cv_init(&timer->t_sched_cv, &timer->t_mx); + nni_cv_init(&timer->t_wait_cv, &timer->t_mx); + + if ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0) { nni_timer_sys_fini(); return (rv); } diff --git a/src/core/transport.c b/src/core/transport.c index 278c7d1e..6f73a6b2 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -84,8 +84,9 @@ nni_tran_sys_init(void) int rv; NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node); - if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) || - ((rv = nni_tran_register(&nni_inproc_tran)) != 0) || + nni_mtx_init(&nni_tran_lk); + + if (((rv = nni_tran_register(&nni_inproc_tran)) != 0) || ((rv = nni_tran_register(&nni_ipc_tran)) != 0) || ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) { nni_tran_sys_fini(); @@ -24,6 +24,7 @@ void nng_fini(void) { + nni_sock_closeall(); nni_fini(); } @@ -184,7 +185,6 @@ nng_free(void *buf, size_t sz) int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - nni_time expire; int rv; nni_sock *sock; diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 46fe2bea..f511f35f 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) static void nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) { - nni_aio * aio; - int newfd; - struct sockaddr_storage ss; - socklen_t slen; + nni_aio *aio; + int newfd; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { // We could argue that knowing the remote peer address would @@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url) return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&ed->mtx)) != 0) { - NNI_FREE_STRUCT(ed); - return (rv); - } + nni_mtx_init(&ed->mtx); // We could randomly choose a different pollq, or for efficiencies // sake we could take a modulo of the file desc number to choose diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 46ebbc1d..81fbd48b 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -54,9 +54,19 @@ extern int nni_plat_errno(int); // elsewhere. struct nni_plat_mtx { - int init; pthread_t owner; pthread_mutex_t mtx; + int fallback; + int flags; +}; + +struct nni_plat_cv { + pthread_cond_t cv; + nni_plat_mtx * mtx; + int fallback; + int flags; + int gen; + int wake; }; struct nni_plat_thr { @@ -65,11 +75,6 @@ struct nni_plat_thr { void *arg; }; -struct nni_plat_cv { - pthread_cond_t cv; - nni_plat_mtx * mtx; -}; - #endif extern int nni_posix_pollq_sysinit(void); diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c index 78415d26..314e2f5e 100644 --- a/src/platform/posix/posix_pipe.c +++ b/src/platform/posix/posix_pipe.c @@ -105,7 +105,6 @@ void nni_plat_pipe_clear(int rfd) { char buf[32]; - int rv; for (;;) { // Completely drain the pipe, but don't wait. This coalesces diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index bd74e0c0..b2c1cb1f 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) // one. For now we just have a global pollq. Note that by tying // the pd to a single pollq we may get some kind of cache warmth. - if ((rv = nni_mtx_init(&pd->mtx)) != 0) { - NNI_FREE_STRUCT(pd); - return (rv); - } pd->closed = 0; pd->node.fd = fd; pd->node.cb = nni_posix_pipedesc_cb; @@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) (void) fcntl(fd, F_SETFL, O_NONBLOCK); + nni_mtx_init(&pd->mtx); nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d3fdf394..df5a1799 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq) pq->wakerfd = -1; pq->close = 0; - if (((rv = nni_mtx_init(&pq->mtx)) != 0) || - ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) || - ((rv = nni_posix_pollq_poll_grow(pq)) != 0) || + nni_mtx_init(&pq->mtx); + nni_cv_init(&pq->cv, &pq->mtx); + + if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { nni_posix_pollq_fini(pq); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index 09d40b94..dce8270a 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_posix_resolv_mtx); + if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_posix_resolv_mtx); return (rv); diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 0ef4754c..44bfbed2 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -24,118 +24,325 @@ #include <time.h> #include <unistd.h> -static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; -static int nni_plat_inited = 0; -static int nni_plat_forked = 0; +static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER; +static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER; +static int nni_plat_inited = 0; +static int nni_plat_forked = 0; + +pthread_condattr_t nni_cvattr; +pthread_mutexattr_t nni_mxattr; + +#ifndef NDEBUG +int nni_plat_sync_fallback = 0; +#endif -pthread_condattr_t nni_cvattr; -pthread_mutexattr_t nni_mxattr; -static pthread_attr_t nni_pthread_attr; +enum nni_plat_sync_flags { + NNI_PLAT_SYNC_INIT = 0x01, + NNI_PLAT_SYNC_LOCKED = 0x04, + NNI_PLAT_SYNC_WAIT = 0x08, +}; -// We open a /dev/null file descriptor so that we can dup2() it to -// cause MacOS X to wakeup. This gives us a "safe" close semantic. +void +nni_plat_mtx_init(nni_plat_mtx *mtx) +{ + if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) { + mtx->fallback = 1; + } else { + mtx->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + mtx->fallback = 1; + } +#endif +} -int nni_plat_devnull = -1; +void +nni_plat_mtx_fini(nni_plat_mtx *mtx) +{ + if (mtx->flags & NNI_PLAT_SYNC_INIT) { + int rv; + if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { + nni_panic("pthread_mutex_destroy: %s", strerror(rv)); + } + } + mtx->flags = 0; +} -int -nni_plat_mtx_init(nni_plat_mtx *mtx) +static void +nni_pthread_mutex_lock(pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) { - switch (rv) { - case EAGAIN: - case ENOMEM: - return (NNG_ENOMEM); + if ((rv = pthread_mutex_lock(m)) != 0) { + nni_panic("pthread_mutex_lock: %s", strerror(rv)); + } +} - default: - nni_panic("pthread_mutex_init: %s", strerror(rv)); - } +static void +nni_pthread_mutex_unlock(pthread_mutex_t *m) +{ + int rv; + + if ((rv = pthread_mutex_unlock(m)) != 0) { + nni_panic("pthread_mutex_unlock: %s", strerror(rv)); } - mtx->init = 1; - return (0); } -void -nni_plat_mtx_fini(nni_plat_mtx *mtx) +static void +nni_pthread_cond_broadcast(pthread_cond_t *c) { int rv; - if (!mtx->init) { - return; + if ((rv = pthread_cond_broadcast(c)) != 0) { + nni_panic("pthread_cond_broadcast: %s", strerror(rv)); } - pthread_mutex_lock(&mtx->mtx); - pthread_mutex_unlock(&mtx->mtx); - if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_fini: %s", strerror(rv)); +} + +static void +nni_pthread_cond_signal(pthread_cond_t *c) +{ + int rv; + if ((rv = pthread_cond_signal(c)) != 0) { + nni_panic("pthread_cond_signal: %s", strerror(rv)); } - mtx->init = 0; } -void -nni_plat_mtx_lock(nni_plat_mtx *mtx) +static void +nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m) { int rv; - if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_lock: %s", strerror(rv)); + if ((rv = pthread_cond_wait(c, m)) != 0) { + nni_panic("pthread_cond_wait: %s", strerror(rv)); + } +} + +static int +nni_pthread_cond_timedwait( + pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts) +{ + int rv; + + switch ((rv = pthread_cond_timedwait(c, m, ts))) { + case 0: + return (0); + case ETIMEDOUT: + case EAGAIN: + return (NNG_ETIMEDOUT); + } + nni_panic("pthread_cond_timedwait: %s", strerror(rv)); + return (NNG_EINVAL); +} + +static void +nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx) +{ + while (mtx->flags & NNI_PLAT_SYNC_LOCKED) { + mtx->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock); } + mtx->flags |= NNI_PLAT_SYNC_LOCKED; mtx->owner = pthread_self(); } +static void +nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx) +{ + NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED); + mtx->flags &= ~NNI_PLAT_SYNC_LOCKED; + if (mtx->flags & NNI_PLAT_SYNC_WAIT) { + mtx->flags &= ~NNI_PLAT_SYNC_WAIT; + pthread_cond_broadcast(&nni_plat_lock_cond); + } +} + +static void +nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_lock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + nni_plat_mtx_unlock_fallback_locked(mtx); + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->gen++; + cv->wake = 0; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wake1_fallback(nni_cv *cv) +{ + nni_pthread_mutex_lock(&nni_plat_lock); + if (cv->flags & NNI_PLAT_SYNC_WAIT) { + cv->wake++; + nni_pthread_cond_broadcast(&nni_plat_cond_cond); + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static void +nni_plat_cv_wait_fallback(nni_cv *cv) +{ + int gen; + + nni_pthread_mutex_lock(&nni_plat_lock); + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + NNI_ASSERT(cv->mtx->owner == pthread_self()); + NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock); + + nni_plat_mtx_lock_fallback_locked(cv->mtx); + } + if (cv->wake > 0) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); +} + +static int +nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts) +{ + int gen; + int rv = 0; + + if (!cv->mtx->fallback) { + // transform the mutex to a fallback one. we have it held. + cv->mtx->fallback = 1; + cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED; + nni_pthread_mutex_unlock(&cv->mtx->mtx); + } + + nni_pthread_mutex_lock(&nni_plat_lock); + gen = cv->gen; + while ((cv->gen == gen) && (cv->wake == 0)) { + nni_plat_mtx_unlock_fallback_locked(cv->mtx); + cv->flags |= NNI_PLAT_SYNC_WAIT; + rv = nni_pthread_cond_timedwait( + &nni_plat_cond_cond, &nni_plat_lock, ts); + nni_plat_mtx_lock_fallback_locked(cv->mtx); + if (rv != 0) { + break; + } + } + if ((rv == 0) && (cv->wake > 0)) { + cv->wake--; + } + nni_pthread_mutex_unlock(&nni_plat_lock); + return (rv); +} + void -nni_plat_mtx_unlock(nni_plat_mtx *mtx) +nni_plat_mtx_lock(nni_plat_mtx *mtx) { int rv; + if (!mtx->fallback) { + nni_pthread_mutex_lock(&mtx->mtx); + + // We might have changed to a fallback lock; make + // sure this did not occur. Note that transitions to + // fallback locks only happen when a thread accesses + // a condition variable already holding this lock, + // so this is guranteed to be safe. + if (!mtx->fallback) { + mtx->owner = pthread_self(); + return; + } + nni_pthread_mutex_unlock(&mtx->mtx); + } + + // Fallback mode + nni_plat_mtx_lock_fallback(mtx); +} + +void +nni_plat_mtx_unlock(nni_plat_mtx *mtx) +{ NNI_ASSERT(mtx->owner == pthread_self()); mtx->owner = 0; - if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) { - nni_panic("pthread_mutex_unlock: %s", strerror(rv)); + + if (mtx->fallback) { + nni_plat_mtx_unlock_fallback(mtx); + } else { + nni_pthread_mutex_unlock(&mtx->mtx); } } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { - int rv; - - if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) { - switch (rv) { - case ENOMEM: - case EAGAIN: - return (NNG_ENOMEM); - - default: - nni_panic("pthread_cond_init: %s", strerror(rv)); - } + if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) { + cv->fallback = 1; + } else { + cv->flags = NNI_PLAT_SYNC_INIT; + } +#ifndef NDEBUG + if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) { + cv->fallback = 1; } +#endif cv->mtx = mtx; - return (0); } void nni_plat_cv_wake(nni_plat_cv *cv) { - (void) pthread_cond_broadcast(&cv->cv); + if (cv->fallback) { + nni_plat_cv_wake_fallback(cv); + } else { + nni_pthread_cond_broadcast(&cv->cv); + } } void nni_plat_cv_wake1(nni_plat_cv *cv) { - (void) pthread_cond_signal(&cv->cv); + int rv; + if (cv->fallback) { + nni_plat_cv_wake1_fallback(cv); + } else { + nni_pthread_cond_signal(&cv->cv); + } } void nni_plat_cv_wait(nni_plat_cv *cv) { - int rv; - NNI_ASSERT(cv->mtx->owner == pthread_self()); - if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) { - nni_panic("pthread_cond_wait: %s", strerror(rv)); + if (cv->fallback) { + nni_plat_cv_wait_fallback(cv); + } else { + nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx); + cv->mtx->owner = pthread_self(); } - cv->mtx->owner = pthread_self(); } int @@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) ts.tv_sec = until / 1000000; ts.tv_nsec = (until % 1000000) * 1000; - rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); - cv->mtx->owner = pthread_self(); - if (rv == ETIMEDOUT) { - return (NNG_ETIMEDOUT); - } else if (rv != 0) { - nni_panic("pthread_cond_timedwait: %d", rv); + if (cv->fallback) { + rv = nni_plat_cv_until_fallback(cv, &ts); + } else { + rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts); + cv->mtx->owner = pthread_self(); } - return (0); + return (rv); } void @@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv) { int rv; - if (cv->mtx == NULL) { - return; - } - if ((rv = pthread_cond_destroy(&cv->cv)) != 0) { + if ((cv->flags & NNI_PLAT_SYNC_INIT) && + ((rv = pthread_cond_destroy(&cv->cv)) != 0)) { nni_panic("pthread_cond_destroy: %s", strerror(rv)); } - cv->mtx = NULL; + cv->flags = 0; + cv->mtx = NULL; } static void * @@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) thr->arg = arg; // POSIX wants functions to return a void *, but we don't care. - rv = pthread_create( - &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr); + rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr); if (rv != 0) { - // nni_printf("pthread_create: %s", strerror(rv)); + // nni_printf("pthread_create: %s", + // strerror(rv)); return (NNG_ENOMEM); } return (0); @@ -227,7 +432,6 @@ int nni_plat_init(int (*helper)(void)) { int rv; - int devnull; if (nni_plat_forked) { nni_panic("nng is not fork-reentrant safe"); @@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void)) if (nni_plat_inited) { return (0); // fast path } - if ((devnull = open("/dev/null", O_RDONLY)) < 0) { - return (nni_plat_errno(errno)); - } - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { // check again under the lock to be sure - pthread_mutex_unlock(&nni_plat_lock); - close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (0); } if (pthread_condattr_init(&nni_cvattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); + pthread_mutex_unlock(&nni_plat_init_lock); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mxattr) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - pthread_condattr_destroy(&nni_cvattr); - (void) close(devnull); - return (NNG_ENOMEM); - } - - rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(devnull); - pthread_mutexattr_destroy(&nni_mxattr); - pthread_condattr_destroy(&nni_cvattr); - return (NNG_ENOMEM); - } - - rv = pthread_attr_init(&nni_pthread_attr); - if (rv != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); - pthread_mutexattr_destroy(&nni_mxattr); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_condattr_destroy(&nni_cvattr); return (NNG_ENOMEM); } - // We don't force this, but we want to have it small... we could - // probably get by with even just 8k, but Linux usually wants 16k - // as a minimum. If this fails, its not fatal, just we won't be - // as scalable / thrifty with our use of VM. - //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384); + // if this one fails we don't care. + (void) pthread_mutexattr_settype( + &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); if ((rv = nni_posix_pollq_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); - (void) close(nni_plat_devnull); + pthread_mutex_unlock(&nni_plat_init_lock); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if ((rv = nni_posix_resolv_sysinit()) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_pollq_sysfini(); - (void) close(nni_plat_devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (rv); } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); - (void) close(devnull); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { nni_plat_inited = 1; } - nni_plat_devnull = devnull; - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); return (rv); } @@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - pthread_mutex_lock(&nni_plat_lock); + pthread_mutex_lock(&nni_plat_init_lock); if (nni_plat_inited) { nni_posix_resolv_sysfini(); nni_posix_pollq_sysfini(); pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); - pthread_attr_destroy(&nni_pthread_attr); - (void) close(nni_plat_devnull); - nni_plat_devnull = -1; - nni_plat_inited = 0; + nni_plat_inited = 0; } - pthread_mutex_unlock(&nni_plat_lock); + pthread_mutex_unlock(&nni_plat_init_lock); } #else diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index db6d7af4..2aa16b36 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) { - NNI_FREE_STRUCT(udp); - return (rv); - } + nni_mtx_init(&udp->udp_mtx); udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (udp->udp_fd < 0) { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index c4cdcb8a..9c3343b7 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h) int nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) { - int rv; - ZeroMemory(&evt->olpd, sizeof(evt->olpd)); evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); if (evt->olpd.hEvent == NULL) { return (nni_win_error(GetLastError())); } - if (((rv = nni_mtx_init(&evt->mtx)) != 0) || - ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) { - return (rv); // NB: This will never happen on Windows. - } + nni_mtx_init(&evt->mtx); + nni_cv_init(&evt->cv, &evt->mtx); + evt->ops = *ops; evt->aio = NULL; evt->ptr = ptr; @@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void) goto fail; } } - if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) { - goto fail; - } + nni_mtx_init(&nni_win_iocp_mtx); for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) { nni_thr_run(&nni_win_iocp_thrs[i]); } diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index c9eb20ec..a60815fa 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void) NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node); NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node); - if (((rv = nni_mtx_init(&worker->mtx)) != 0) || - ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) { - return (rv); - } + nni_mtx_init(&worker->mtx); + nni_cv_init(&worker->cv, &worker->mtx); + rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker); if (rv != 0) { return (rv); diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 63295a71..80e3724d 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -680,8 +680,6 @@ int nni_win_tcp_sysinit(void) { WSADATA data; - WORD ver; - ver = MAKEWORD(2, 2); if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { NNI_ASSERT(LOBYTE(data.wVersion) == 2); NNI_ASSERT(HIBYTE(data.wVersion) == 2); diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c index 861fbc76..edc4df3f 100644 --- a/src/platform/windows/win_pipe.c +++ b/src/platform/windows/win_pipe.c @@ -19,9 +19,9 @@ int nni_plat_pipe_open(int *wfdp, int *rfdp) { - SOCKET afd = INVALID_SOCKET; SOCKET rfd = INVALID_SOCKET; SOCKET wfd = INVALID_SOCKET; + SOCKET afd; struct sockaddr_in addr; socklen_t alen; diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index a01dc123..4ce12d84 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void) { int rv; - if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) { - return (rv); - } + nni_mtx_init(&nni_win_resolv_mtx); + if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) { nni_mtx_fini(&nni_win_resolv_mtx); return (rv); diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index c01ec782..879cd772 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -30,12 +30,11 @@ nni_free(void *b, size_t z) HeapFree(GetProcessHeap(), 0, b); } -int +void nni_plat_mtx_init(nni_plat_mtx *mtx) { InitializeSRWLock(&mtx->srl); mtx->init = 1; - return (0); } void @@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx) ReleaseSRWLockExclusive(&mtx->srl); } -int +void nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) { InitializeConditionVariable(&cv->cv); cv->srl = &mtx->srl; - return (0); } void diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 79d7187e..88fd93e0 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -59,40 +59,28 @@ nni_bus_sock_fini(void *arg) { nni_bus_sock *psock = arg; - if (psock != NULL) { - nni_aio_stop(&psock->aio_getq); - nni_aio_fini(&psock->aio_getq); - nni_mtx_fini(&psock->mtx); - NNI_FREE_STRUCT(psock); - } + nni_aio_stop(&psock->aio_getq); + nni_aio_fini(&psock->aio_getq); + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); } static int nni_bus_sock_init(void **sp, nni_sock *nsock) { nni_bus_sock *psock; - int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node); - if ((rv = nni_mtx_init(&psock->mtx)) != 0) { - goto fail; - } - rv = nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); - if (rv != 0) { - goto fail; - } + nni_mtx_init(&psock->mtx); + nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock); psock->nsock = nsock; psock->raw = 0; *sp = psock; return (0); - -fail: - nni_bus_sock_fini(psock); - return (rv); } static void @@ -134,36 +122,21 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_NODE_INIT(&ppipe->node); - if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) || - ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); - if (rv != 0) { - goto fail; + if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { + NNI_FREE_STRUCT(ppipe); + return (rv); } + NNI_LIST_NODE_INIT(&ppipe->node); + nni_mtx_init(&ppipe->mtx); + nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe); + nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe); + nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe); + nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe); ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); - -fail: - nni_bus_pipe_fini(ppipe); - return (rv); } static int diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index a0e907f2..acf9ec25 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -53,15 +53,11 @@ static int pair0_sock_init(void **sp, nni_sock *nsock) { pair0_sock *s; - int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&s->mtx)) != 0) { - NNI_FREE_STRUCT(s); - return (rv); - } + nni_mtx_init(&s->mtx); s->nsock = nsock; s->ppipe = NULL; s->raw = 0; @@ -84,22 +80,19 @@ static int pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock) { pair0_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) { - pair0_pipe_fini(p); - } else { - p->npipe = npipe; - p->psock = psock; - *pp = p; - } - return (rv); + nni_aio_init(&p->aio_send, pair0_send_cb, p); + nni_aio_init(&p->aio_recv, pair0_recv_cb, p); + nni_aio_init(&p->aio_getq, pair0_getq_cb, p); + nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + + p->npipe = npipe; + p->psock = psock; + *pp = p; + return (0); } static void diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index 1a7ad9fa..2b8a9120 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -73,22 +73,24 @@ pair1_sock_init(void **sp, nni_sock *nsock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&s->pipes)) != 0) { + NNI_FREE_STRUCT(s); + return (NNG_ENOMEM); + } NNI_LIST_INIT(&s->plist, pair1_pipe, node); // Raw mode uses this. - if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) || - ((rv = nni_mtx_init(&s->mtx)) != 0) || - ((rv = nni_idhash_init(&s->pipes)) != 0)) { - pair1_sock_fini(s); - } else { - s->nsock = nsock; - s->raw = 0; - s->poly = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; - } + nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s); + nni_mtx_init(&s->mtx); + + s->nsock = nsock; + s->raw = 0; + s->poly = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; + return (0); } @@ -101,17 +103,19 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) { - pair1_pipe_fini(p); - } else { - p->npipe = npipe; - p->psock = psock; - *pp = p; + if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) { + NNI_FREE_STRUCT(p); + return (NNG_ENOMEM); } + nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); + nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p); + nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p); + + p->npipe = npipe; + p->psock = psock; + *pp = p; + return (rv); } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index e3c73342..1ebcc4a2 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -63,20 +63,13 @@ static int nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_pull_pipe *pp; - int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) { - NNI_FREE_STRUCT(pp); - return (rv); - } - if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) { - nni_aio_fini(&pp->putq_aio); - NNI_FREE_STRUCT(pp); - return (rv); - } + nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp); + nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp); + pp->pipe = pipe; pp->pull = psock; *ppp = pp; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 14b3b191..1bc1659c 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -93,30 +93,19 @@ static int nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_push_pipe *pp; - int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) { - goto fail; - } - if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) { - goto fail; - } - if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) { - goto fail; - } + nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp); + nni_aio_init(&pp->aio_send, nni_push_send_cb, pp); + nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp); NNI_LIST_NODE_INIT(&pp->node); pp->pipe = pipe; pp->push = psock; *ppp = pp; return (0); - -fail: - nni_push_pipe_fini(pp); - return (rv); } static int diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 161a5d79..940f2139 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -53,20 +53,13 @@ static int nni_pub_sock_init(void **pubp, nni_sock *sock) { nni_pub_sock *pub; - int rv; if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pub->mtx)) != 0) { - nni_pub_sock_fini(pub); - return (rv); - } - rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub); - if (rv != 0) { - nni_pub_sock_fini(pub); - return (rv); - } + nni_mtx_init(&pub->mtx); + nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub); + pub->sock = sock; pub->raw = 0; NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node); @@ -127,31 +120,18 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock) } // XXX: consider making this depth tunable if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) { - goto fail; - } - - rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp); - if (rv != 0) { - goto fail; + NNI_FREE_STRUCT(pp); + return (rv); } - rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp); - if (rv != 0) { - goto fail; - } + nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp); + nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp); + nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp); - rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp); - if (rv != 0) { - goto fail; - } pp->pipe = pipe; pp->pub = psock; *ppp = pp; return (0); - -fail: - nni_pub_pipe_fini(pp); - return (rv); } static int diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 53f01e0f..78b9d157 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -95,16 +95,13 @@ static int nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) { nni_sub_pipe *sp; - int rv; if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp)) != 0) || - ((rv = nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp)) != 0)) { - nni_sub_pipe_fini(sp); - return (rv); - } + nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp); + nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp); + sp->pipe = pipe; sp->sub = ssock; *spp = sp; diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 4319cbf8..09f2b285 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -74,20 +74,18 @@ nni_rep_sock_init(void **repp, nni_sock *sock) if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&rep->pipes)) != 0) { + NNI_FREE_STRUCT(rep); + return (rv); + } + rep->ttl = 8; // Per RFC rep->sock = sock; rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_init(&rep->pipes)) != 0) { - goto fail; - } - - rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); - if (rv != 0) { - goto fail; - } + nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); rep->uwq = nni_sock_sendq(sock); rep->urq = nni_sock_recvq(sock); @@ -96,10 +94,6 @@ nni_rep_sock_init(void **repp, nni_sock *sock) nni_sock_senderr(sock, NNG_ESTATE); return (0); - -fail: - nni_rep_sock_fini(rep); - return (rv); } static void @@ -128,32 +122,18 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) return (NNG_ENOMEM); } if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != - 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != - 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != - 0) { - goto fail; - } - if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != - 0) { - goto fail; + NNI_FREE_STRUCT(rp); + return (rv); } + nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp); + nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp); + nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp); + nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp); + rp->pipe = pipe; rp->rep = rsock; *rpp = rp; return (0); - -fail: - nni_rep_pipe_fini(rp); - return (rv); } static void diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index fdf29fd9..bab81331 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -75,19 +75,12 @@ static int nni_req_sock_init(void **reqp, nni_sock *sock) { nni_req_sock *req; - int rv; if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&req->mtx)) != 0) { - NNI_FREE_STRUCT(req); - return (rv); - } - if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) { - nni_mtx_fini(&req->mtx); - NNI_FREE_STRUCT(req); - } + nni_mtx_init(&req->mtx); + nni_cv_init(&req->cv, &req->mtx); NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); @@ -152,41 +145,22 @@ static int nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { nni_req_pipe *rp; - int rv; if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&rp->mtx)) != 0) { - goto failed; - } - if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) { - goto failed; - } - if ((rv = nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp)) != 0) { - goto failed; - } - if ((rv = nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp)) != 0) { - goto failed; - } - rv = nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp); - if (rv != 0) { - goto failed; - } + nni_mtx_init(&rp->mtx); + nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp); + nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp); + nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp); + nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp); + nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp); NNI_LIST_NODE_INIT(&rp->node); rp->pipe = pipe; rp->req = rsock; *rpp = rp; return (0); - -failed: - nni_req_pipe_fini(rp); - return (rv); } static void diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 32513134..a097f551 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -77,6 +77,11 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&psock->pipes)) != 0) { + NNI_FREE_STRUCT(psock); + return (rv); + } + psock->ttl = 8; // Per RFC psock->nsock = nsock; psock->raw = 0; @@ -85,24 +90,12 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->urq = nni_sock_recvq(nsock); psock->uwq = nni_sock_sendq(nsock); - if ((rv = nni_mtx_init(&psock->mtx)) != 0) { - goto fail; - } - if ((rv = nni_idhash_init(&psock->pipes)) != 0) { - goto fail; - } - rv = nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock); - if (rv != 0) { - goto fail; - } + nni_mtx_init(&psock->mtx); + nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock); *pp = psock; nni_sock_senderr(nsock, NNG_ESTATE); return (0); - -fail: - nni_resp_sock_fini(psock); - return (rv); } static void @@ -131,33 +124,18 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) return (NNG_ENOMEM); } if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe); - if (rv != 0) { - goto fail; - } - rv = nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe); - if (rv != 0) { - goto fail; + NNI_FREE_STRUCT(ppipe); + return (rv); } + nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe); + nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe); + nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe); + nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe); ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); - -fail: - nni_resp_pipe_fini(ppipe); - return (rv); } static void diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index cb90c13f..2a32f289 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -70,19 +70,13 @@ static int nni_surv_sock_init(void **sp, nni_sock *nsock) { nni_surv_sock *psock; - int rv; if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&psock->mtx)) != 0) { - goto fail; - } - rv = nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); - if (rv != 0) { - goto fail; - } NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); + nni_mtx_init(&psock->mtx); + nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); nni_timer_init(&psock->timer, nni_surv_timeout, psock); psock->nextid = nni_random(); @@ -96,10 +90,6 @@ nni_surv_sock_init(void **sp, nni_sock *nsock) *sp = psock; nni_sock_recverr(nsock, NNG_ESTATE); return (0); - -fail: - nni_surv_sock_fini(psock); - return (rv); } static void @@ -143,32 +133,19 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) } // This depth could be tunable. if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe); - if (rv != 0) { - goto failed; - } - rv = nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe); - if (rv != 0) { - goto failed; + NNI_FREE_STRUCT(ppipe); + return (rv); } + + nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); + nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe); + nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe); + nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe); + ppipe->npipe = npipe; ppipe->psock = psock; *pp = ppipe; return (0); - -failed: - nni_surv_pipe_fini(ppipe); - return (rv); } static int diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 3bc24c41..08cf99a2 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -65,14 +65,9 @@ static nni_inproc_global nni_inproc; static int nni_inproc_init(void) { - int rv; - NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node); - if ((rv = nni_mtx_init(&nni_inproc.mx)) != 0) { - return (rv); - } - + nni_mtx_init(&nni_inproc.mx); return (0); } @@ -309,8 +304,7 @@ nni_inproc_accept_clients(nni_inproc_ep *server) continue; } - if (((rv = nni_mtx_init(&pair->mx)) != 0) || - ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { nni_inproc_pair_destroy(pair); nni_inproc_conn_finish(caio, rv); @@ -318,6 +312,8 @@ nni_inproc_accept_clients(nni_inproc_ep *server) continue; } + nni_mtx_init(&pair->mx); + pair->pipes[0] = caio->a_pipe; pair->pipes[1] = saio->a_pipe; pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 7d976122..0b0e487f 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -107,18 +107,14 @@ static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&p->mtx)) != 0) || - ((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) { - nni_ipc_pipe_fini(p); - return (rv); - } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p); + nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p); + nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p); p->proto = ep->proto; p->rcvmax = ep->rcvmax; @@ -490,12 +486,14 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&ep->mtx)) != 0) || - ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || - ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) { - nni_ipc_ep_fini(ep); + if ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0) { + NNI_FREE_STRUCT(ep); return (rv); } + + nni_mtx_init(&ep->mtx); + nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep); + ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 4d47733b..b3136b35 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -107,18 +107,15 @@ static int nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&p->mtx)) != 0) || - ((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) { - nni_tcp_pipe_fini(p); - return (rv); - } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p); + nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p); + nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p); + p->proto = ep->proto; p->rcvmax = ep->rcvmax; p->tpp = tpp; @@ -555,12 +552,13 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_mtx_init(&ep->mtx)) != 0) || - ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) || - ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) { - nni_tcp_ep_fini(ep); + if ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0) { + NNI_FREE_STRUCT(ep); return (rv); } + nni_mtx_init(&ep->mtx); + nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep); + ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); |
