aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-15 21:59:55 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-16 18:31:42 -0700
commita9633313ec8e578c805cd53b37ba3360d83157bc (patch)
tree14d32c4031ea1c8508a75469407ca77e353fa315 /src
parente7e2a6c14f0317eb77711951c6f1a650d4013dfe (diff)
downloadnng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.gz
nng-a9633313ec8e578c805cd53b37ba3360d83157bc.tar.bz2
nng-a9633313ec8e578c805cd53b37ba3360d83157bc.zip
Provide versions of mutex, condvar, and aio init that never fail.
If the underlying platform fails (FreeBSD is the only one I'm aware of that does this!), we use a global lock or condition variable instead. This means that our lock initializers never ever fail. Probably we could eliminate most of this for Linux and Darwin, since on those platforms, mutex and condvar initialization reasonably never fails. Initial benchmarks show little difference either way -- so we can revisit (optimize) later. This removes a lot of otherwise untested code in error cases and so forth, improving coverage and resilience in the face of allocation failures. Platforms other than POSIX should follow a similar pattern if they need this. (VxWorks, I'm thinking of you.) Most sane platforms won't have an issue here, since normally these initializations do not need to allocate memory. (Reportedly, even FreeBSD has plans to "fix" this in libthr2.) While here, some bugs were fixed in initialization & teardown. The fallback code is properly tested with dedicated test cases.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c55
-rw-r--r--src/core/aio.h2
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/endpt.c19
-rw-r--r--src/core/event.c3
-rw-r--r--src/core/event.h2
-rw-r--r--src/core/idhash.c6
-rw-r--r--src/core/init.c4
-rw-r--r--src/core/msgqueue.c35
-rw-r--r--src/core/pipe.c15
-rw-r--r--src/core/platform.h21
-rw-r--r--src/core/random.c6
-rw-r--r--src/core/socket.c41
-rw-r--r--src/core/taskq.c9
-rw-r--r--src/core/thread.c20
-rw-r--r--src/core/thread.h8
-rw-r--r--src/core/timer.c9
-rw-r--r--src/core/transport.c5
-rw-r--r--src/nng.c2
-rw-r--r--src/platform/posix/posix_epdesc.c11
-rw-r--r--src/platform/posix/posix_impl.h17
-rw-r--r--src/platform/posix/posix_pipe.c1
-rw-r--r--src/platform/posix/posix_pipedesc.c5
-rw-r--r--src/platform/posix/posix_pollq_poll.c7
-rw-r--r--src/platform/posix/posix_resolv_gai.c5
-rw-r--r--src/platform/posix/posix_thread.c417
-rw-r--r--src/platform/posix/posix_udp.c5
-rw-r--r--src/platform/windows/win_iocp.c13
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_net.c2
-rw-r--r--src/platform/windows/win_pipe.c2
-rw-r--r--src/platform/windows/win_resolv.c5
-rw-r--r--src/platform/windows/win_thread.c6
-rw-r--r--src/protocol/bus/bus.c57
-rw-r--r--src/protocol/pair/pair_v0.c27
-rw-r--r--src/protocol/pair/pair_v1.c50
-rw-r--r--src/protocol/pipeline/pull.c13
-rw-r--r--src/protocol/pipeline/push.c17
-rw-r--r--src/protocol/pubsub/pub.c36
-rw-r--r--src/protocol/pubsub/sub.c9
-rw-r--r--src/protocol/reqrep/rep.c46
-rw-r--r--src/protocol/reqrep/req.c42
-rw-r--r--src/protocol/survey/respond.c48
-rw-r--r--src/protocol/survey/survey.c43
-rw-r--r--src/transport/inproc/inproc.c12
-rw-r--r--src/transport/ipc/ipc.c22
-rw-r--r--src/transport/tcp/tcp.c22
47 files changed, 569 insertions, 644 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a39c0118..fe9bcde8 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -56,20 +56,14 @@ static nni_list nni_aio_expire_aios;
static void nni_aio_expire_add(nni_aio *);
-int
+void
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
- int rv;
-
memset(aio, 0, sizeof(*aio));
- if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) {
- return (rv);
- }
+ nni_cv_init(&aio->a_cv, &nni_aio_lk);
aio->a_expire = NNI_TIME_NEVER;
aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
-
- return (0);
}
void
@@ -350,46 +344,43 @@ nni_aio_expire_loop(void *arg)
}
}
-int
-nni_aio_sys_init(void)
+void
+nni_aio_sys_fini(void)
{
- int rv;
nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
- if (((rv = nni_mtx_init(mtx)) != 0) ||
- ((rv = nni_cv_init(cv, mtx)) != 0) ||
- ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) {
- goto fail;
+ if (nni_aio_expire_run) {
+ nni_mtx_lock(mtx);
+ nni_aio_expire_run = 0;
+ nni_cv_wake(cv);
+ nni_mtx_unlock(mtx);
}
- NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node);
- nni_aio_expire_run = 1;
- nni_thr_run(thr);
- return (0);
-fail:
nni_thr_fini(thr);
nni_cv_fini(cv);
nni_mtx_fini(mtx);
- return (rv);
}
-void
-nni_aio_sys_fini(void)
+int
+nni_aio_sys_init(void)
{
+ int rv;
nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
- if (nni_aio_expire_run) {
- nni_mtx_lock(mtx);
- nni_aio_expire_run = 0;
- nni_cv_wake(cv);
- nni_mtx_unlock(mtx);
+ NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node);
+ nni_mtx_init(mtx);
+ nni_cv_init(cv, mtx);
+
+ if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) {
+ nni_aio_sys_fini();
+ return (rv);
}
- nni_thr_fini(thr);
- nni_cv_fini(cv);
- nni_mtx_fini(mtx);
-} \ No newline at end of file
+ nni_aio_expire_run = 1;
+ nni_thr_run(thr);
+ return (0);
+}
diff --git a/src/core/aio.h b/src/core/aio.h
index d48442eb..aabb3fa9 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -66,7 +66,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern int nni_aio_init(nni_aio *, nni_cb, void *);
+extern void nni_aio_init(nni_aio *, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
diff --git a/src/core/defs.h b/src/core/defs.h
index 69fe4843..4d8e6ffb 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -17,9 +17,13 @@
// superior, support for such are not universal.
#define NNI_ARG_UNUSED(x) ((void) x);
+#ifndef NDEBUG
#define NNI_ASSERT(x) \
if (!(x)) \
nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, #x)
+#else
+#define NNI_ASSERT(x)
+#endif
// These types are common but have names shared with user space.
typedef struct nng_msg nni_msg;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0ab35ea3..34debc0e 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -57,10 +57,10 @@ nni_ep_sys_init(void)
{
int rv;
- if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) ||
- ((rv = nni_idhash_init(&nni_eps)) != 0)) {
+ if ((rv = nni_idhash_init(&nni_eps)) != 0) {
return (rv);
}
+ nni_mtx_init(&nni_ep_lk);
nni_idhash_set_limits(
nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
@@ -152,13 +152,14 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_pipe_ep_list_init(&ep->ep_pipes);
- if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) ||
- ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
- ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ nni_mtx_init(&ep->ep_mtx);
+ nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
+ nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
+ nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
+ nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
+ nni_aio_init(&ep->ep_con_syn, NULL, NULL);
+
+ if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
diff --git a/src/core/event.c b/src/core/event.c
index 79910d80..ab157b02 100644
--- a/src/core/event.c
+++ b/src/core/event.c
@@ -12,13 +12,12 @@
#include <stdlib.h>
#include <string.h>
-int
+void
nni_ev_init(nni_event *event, int type, nni_sock *sock)
{
memset(event, 0, sizeof(*event));
event->e_type = type;
event->e_sock = sock;
- return (0);
}
void
diff --git a/src/core/event.h b/src/core/event.h
index 6d9a9394..22af096e 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -28,7 +28,7 @@ struct nng_notify {
nni_aio n_aio;
};
-extern int nni_ev_init(nni_event *, int, nni_sock *);
+extern void nni_ev_init(nni_event *, int, nni_sock *);
extern void nni_ev_fini(nni_event *);
#endif // CORE_EVENT_H
diff --git a/src/core/idhash.c b/src/core/idhash.c
index ab6b67e1..03854fc8 100644
--- a/src/core/idhash.c
+++ b/src/core/idhash.c
@@ -36,15 +36,11 @@ int
nni_idhash_init(nni_idhash **hp)
{
nni_idhash *h;
- int rv;
if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&h->ih_mtx)) != 0) {
- NNI_FREE_STRUCT(h);
- return (rv);
- }
+ nni_mtx_init(&h->ih_mtx);
h->ih_entries = NULL;
h->ih_count = 0;
h->ih_load = 0;
diff --git a/src/core/init.c b/src/core/init.c
index 24a7b118..4025c0f4 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -1,5 +1,5 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -39,8 +39,6 @@ nni_init(void)
void
nni_fini(void)
{
- // XXX: We should make sure that underlying sockets and
- // file descriptors are closed. Details TBD.
nni_tran_sys_fini();
nni_pipe_sys_fini();
nni_ep_sys_fini();
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 6530857e..ef7dd5a2 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -39,7 +39,6 @@ int
nni_msgq_init(nni_msgq **mqp, unsigned cap)
{
struct nni_msgq *mq;
- int rv;
int alloc;
// We allocate 2 extra cells in the fifo. One to accommodate a
@@ -52,21 +51,18 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) {
+ NNI_FREE_STRUCT(mq);
+ return (NNG_ENOMEM);
+ }
+
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_aio_list_init(&mq->mq_aio_notify_get);
nni_aio_list_init(&mq->mq_aio_notify_put);
- if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) {
- rv = NNG_ENOMEM;
- goto fail;
- }
+ nni_mtx_init(&mq->mq_lock);
+ nni_cv_init(&mq->mq_drained, &mq->mq_lock);
mq->mq_cap = cap;
mq->mq_alloc = alloc;
@@ -80,15 +76,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
*mqp = mq;
return (0);
-
-fail:
- nni_cv_fini(&mq->mq_drained);
- nni_mtx_fini(&mq->mq_lock);
- if (mq->mq_msgs != NULL) {
- nni_free(mq->mq_msgs, sizeof(nng_msg *) * alloc);
- }
- NNI_FREE_STRUCT(mq);
- return (rv);
}
void
@@ -413,9 +400,7 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
nni_aio aio;
int rv;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
aio.a_expire = expire;
nni_msgq_aio_get(mq, &aio);
nni_aio_wait(&aio);
@@ -433,9 +418,7 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
nni_aio aio;
int rv;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
aio.a_expire = expire;
aio.a_msg = msg;
nni_msgq_aio_put(mq, &aio);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 75c4c8d6..cdbeb6a7 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -49,9 +49,10 @@ nni_pipe_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
+ nni_mtx_init(&nni_pipe_reap_lk);
+ nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);
+
if (((rv = nni_idhash_init(&nni_pipes)) != 0) ||
- ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) ||
- ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) ||
((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) !=
0)) {
return (rv);
@@ -240,11 +241,11 @@ nni_pipe_create(nni_ep *ep, void *tdata)
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
- if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
- ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) ||
- ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) !=
- 0) ||
- ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
+ nni_mtx_init(&p->p_mtx);
+ nni_cv_init(&p->p_cv, &p->p_mtx);
+ nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p);
+
+ if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
((rv = nni_ep_pipe_add(ep, p)) != 0) ||
((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
diff --git a/src/core/platform.h b/src/core/platform.h
index 7acf16ef..f6f0b974 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -85,10 +85,9 @@ typedef struct nni_plat_thr nni_plat_thr;
// Threading & Synchronization Support
//
-// nni_plat_mtx_init initializes a mutex structure. This may require dynamic
-// allocation, depending on the platform. It can return NNG_ENOMEM if that
-// fails. An initialized mutex must be distinguishable from zeroed memory.
-extern int nni_plat_mtx_init(nni_plat_mtx *);
+// nni_plat_mtx_init initializes a mutex structure. An initialized mutex must
+// be distinguishable from zeroed memory.
+extern void nni_plat_mtx_init(nni_plat_mtx *);
// nni_plat_mtx_fini destroys the mutex and releases any resources allocated
// for it's use. If the mutex is zeroed memory, this should do nothing.
@@ -99,20 +98,14 @@ extern void nni_plat_mtx_fini(nni_plat_mtx *);
extern void nni_plat_mtx_lock(nni_plat_mtx *);
// nni_plat_mtx_unlock unlocks the mutex. This can only be performed by the
-// threadthat owned the mutex.
+// thread that owned the mutex.
extern void nni_plat_mtx_unlock(nni_plat_mtx *);
-// nni_plat_mtx_tryenter tries to lock the mutex. If it can't, it may return
-// NNG_EBUSY if the mutex is already owned.
-extern int nni_plat_mtx_trylock(nni_plat_mtx *);
-
// nni_plat_cv_init initializes a condition variable. We require a mutex be
// supplied with it, and that mutex must always be held when performing any
-// operations on the condition variable (other than fini.) This may require
-// dynamic allocation, and if so this operation may fail with NNG_ENOMEM.
-// As with mutexes, an initialized mutex should be distinguishable from
-// zeroed memory.
-extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *);
+// operations on the condition variable (other than fini.) As with mutexes, an
+// initialized mutex should be distinguishable from zeroed memory.
+extern void nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *);
// nni_plat_cv_fini releases all resources associated with condition variable.
// If the cv points to just zeroed memory (was never initialized), it does
diff --git a/src/core/random.c b/src/core/random.c
index eb0b4fc2..febd0848 100644
--- a/src/core/random.c
+++ b/src/core/random.c
@@ -170,12 +170,8 @@ nni_random_sys_init(void)
{
// minimally, grab the system clock
nni_isaac_ctx *ctx = &nni_random_ctx;
- int rv;
-
- if ((rv = nni_mtx_init(&ctx->mx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&ctx->mx);
nni_plat_seed_prng(ctx->randrsl, sizeof(ctx->randrsl));
nni_isaac_randinit(ctx, 1);
return (0);
diff --git a/src/core/socket.c b/src/core/socket.c
index 6a243650..9aa89a2d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -231,7 +231,6 @@ nni_notify *
nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
{
nni_notify *notify;
- int rv;
if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
return (NULL);
@@ -244,31 +243,19 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
switch (type) {
case NNG_EV_CAN_RCV:
- rv = nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
break;
case NNG_EV_CAN_SND:
- rv = nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
break;
default:
- rv = NNG_ENOTSUP;
- goto fail;
- break;
+ NNI_FREE_STRUCT(notify);
+ return (NULL);
}
return (notify);
-
-fail:
- nni_aio_fini(&notify->n_aio);
- NNI_FREE_STRUCT(notify);
- return (NULL);
}
void
@@ -343,13 +330,13 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
+ nni_mtx_init(&s->s_mx);
+ nni_cv_init(&s->s_cv, &s->s_mx);
+ nni_cv_init(&s->s_close_cv, &nni_sock_lk);
+ nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s);
+ nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s);
- if (((rv = nni_mtx_init(&s->s_mx)) != 0) ||
- ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) ||
- ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) ||
- ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) ||
- ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) ||
- ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
+ if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) {
nni_sock_destroy(s);
@@ -365,8 +352,9 @@ nni_sock_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
- if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) ||
- ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) {
+ nni_mtx_init(&nni_sock_lk);
+
+ if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) {
nni_sock_sys_fini();
} else {
nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
@@ -562,6 +550,9 @@ nni_sock_closeall(void)
{
nni_sock *s;
+ if (nni_sock_hash == NULL) {
+ return;
+ }
for (;;) {
nni_mtx_lock(&nni_sock_lk);
if ((s = nni_list_first(&nni_sock_list)) == NULL) {
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e0fc456c..d0f04e8a 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -85,12 +85,9 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
tq->tq_nthreads = nthr;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
- if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) ||
- ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) ||
- ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) {
- nni_taskq_fini(tq);
- return (rv);
- }
+ nni_mtx_init(&tq->tq_mtx);
+ nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx);
+ nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx);
for (i = 0; i < nthr; i++) {
tq->tq_threads[i].tqt_tq = tq;
diff --git a/src/core/thread.c b/src/core/thread.c
index 6c3bd9f3..54c9c7d2 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -9,10 +9,10 @@
#include "core/nng_impl.h"
-int
+void
nni_mtx_init(nni_mtx *mtx)
{
- return (nni_plat_mtx_init(mtx));
+ nni_plat_mtx_init(mtx);
}
void
@@ -33,10 +33,10 @@ nni_mtx_unlock(nni_mtx *mtx)
nni_plat_mtx_unlock(mtx);
}
-int
+void
nni_cv_init(nni_cv *cv, nni_mtx *mtx)
{
- return (nni_plat_cv_init(cv, mtx));
+ nni_plat_cv_init(cv, mtx);
}
void
@@ -110,15 +110,9 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg)
thr->fn = fn;
thr->arg = arg;
- if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) {
- thr->done = 1;
- return (rv);
- }
- if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) {
- nni_plat_mtx_fini(&thr->mtx);
- thr->done = 1;
- return (rv);
- }
+ nni_plat_mtx_init(&thr->mtx);
+ nni_plat_cv_init(&thr->cv, &thr->mtx);
+
if (fn == NULL) {
thr->init = 1;
thr->done = 1;
diff --git a/src/core/thread.h b/src/core/thread.h
index 94b2a984..ee83b196 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -25,9 +26,8 @@ struct nni_thr {
int init;
};
-// nni_mtx_init initializes the mutex. (Win32 programmers take note;
-// our mutexes are actually CriticalSections on Win32.)
-extern int nni_mtx_init(nni_mtx *mtx);
+// nni_mtx_init initializes the mutex.
+extern void nni_mtx_init(nni_mtx *mtx);
// nni_mtx_fini destroys the mutex and releases any resources used by it.
extern void nni_mtx_fini(nni_mtx *mtx);
@@ -43,7 +43,7 @@ extern void nni_mtx_unlock(nni_mtx *mtx);
// nni_cv_init initializes the condition variable. The mutex supplied
// must always be locked with the condition variable.
-extern int nni_cv_init(nni_cv *cv, nni_mtx *);
+extern void nni_cv_init(nni_cv *cv, nni_mtx *);
// nni_cv_fini releases resources associated with the condition variable,
// which must not be in use at the time.
diff --git a/src/core/timer.c b/src/core/timer.c
index 73bc7604..7608f7d5 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -40,10 +40,11 @@ nni_timer_sys_init(void)
memset(timer, 0, sizeof(*timer));
NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node);
- if (((rv = nni_mtx_init(&timer->t_mx)) != 0) ||
- ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) ||
- ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) ||
- ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) {
+ nni_mtx_init(&timer->t_mx);
+ nni_cv_init(&timer->t_sched_cv, &timer->t_mx);
+ nni_cv_init(&timer->t_wait_cv, &timer->t_mx);
+
+ if ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0) {
nni_timer_sys_fini();
return (rv);
}
diff --git a/src/core/transport.c b/src/core/transport.c
index 278c7d1e..6f73a6b2 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -84,8 +84,9 @@ nni_tran_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node);
- if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) ||
- ((rv = nni_tran_register(&nni_inproc_tran)) != 0) ||
+ nni_mtx_init(&nni_tran_lk);
+
+ if (((rv = nni_tran_register(&nni_inproc_tran)) != 0) ||
((rv = nni_tran_register(&nni_ipc_tran)) != 0) ||
((rv = nni_tran_register(&nni_tcp_tran)) != 0)) {
nni_tran_sys_fini();
diff --git a/src/nng.c b/src/nng.c
index 822d2713..a833eeed 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -24,6 +24,7 @@
void
nng_fini(void)
{
+ nni_sock_closeall();
nni_fini();
}
@@ -184,7 +185,6 @@ nng_free(void *buf, size_t sz)
int
nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
{
- nni_time expire;
int rv;
nni_sock *sock;
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 46fe2bea..f511f35f 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
static void
nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
{
- nni_aio * aio;
- int newfd;
- struct sockaddr_storage ss;
- socklen_t slen;
+ nni_aio *aio;
+ int newfd;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
// We could argue that knowing the remote peer address would
@@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&ed->mtx)) != 0) {
- NNI_FREE_STRUCT(ed);
- return (rv);
- }
+ nni_mtx_init(&ed->mtx);
// We could randomly choose a different pollq, or for efficiencies
// sake we could take a modulo of the file desc number to choose
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 46ebbc1d..81fbd48b 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -54,9 +54,19 @@ extern int nni_plat_errno(int);
// elsewhere.
struct nni_plat_mtx {
- int init;
pthread_t owner;
pthread_mutex_t mtx;
+ int fallback;
+ int flags;
+};
+
+struct nni_plat_cv {
+ pthread_cond_t cv;
+ nni_plat_mtx * mtx;
+ int fallback;
+ int flags;
+ int gen;
+ int wake;
};
struct nni_plat_thr {
@@ -65,11 +75,6 @@ struct nni_plat_thr {
void *arg;
};
-struct nni_plat_cv {
- pthread_cond_t cv;
- nni_plat_mtx * mtx;
-};
-
#endif
extern int nni_posix_pollq_sysinit(void);
diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c
index 78415d26..314e2f5e 100644
--- a/src/platform/posix/posix_pipe.c
+++ b/src/platform/posix/posix_pipe.c
@@ -105,7 +105,6 @@ void
nni_plat_pipe_clear(int rfd)
{
char buf[32];
- int rv;
for (;;) {
// Completely drain the pipe, but don't wait. This coalesces
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index bd74e0c0..b2c1cb1f 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
// one. For now we just have a global pollq. Note that by tying
// the pd to a single pollq we may get some kind of cache warmth.
- if ((rv = nni_mtx_init(&pd->mtx)) != 0) {
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
pd->closed = 0;
pd->node.fd = fd;
pd->node.cb = nni_posix_pipedesc_cb;
@@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ nni_mtx_init(&pd->mtx);
nni_aio_list_init(&pd->readq);
nni_aio_list_init(&pd->writeq);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index d3fdf394..df5a1799 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
pq->wakerfd = -1;
pq->close = 0;
- if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
- ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) ||
- ((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
+
+ if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
nni_posix_pollq_fini(pq);
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 09d40b94..dce8270a 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void)
{
int rv;
- if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&nni_posix_resolv_mtx);
+
if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) {
nni_mtx_fini(&nni_posix_resolv_mtx);
return (rv);
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 0ef4754c..44bfbed2 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -24,118 +24,325 @@
#include <time.h>
#include <unistd.h>
-static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
-static int nni_plat_inited = 0;
-static int nni_plat_forked = 0;
+static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER;
+static int nni_plat_inited = 0;
+static int nni_plat_forked = 0;
+
+pthread_condattr_t nni_cvattr;
+pthread_mutexattr_t nni_mxattr;
+
+#ifndef NDEBUG
+int nni_plat_sync_fallback = 0;
+#endif
-pthread_condattr_t nni_cvattr;
-pthread_mutexattr_t nni_mxattr;
-static pthread_attr_t nni_pthread_attr;
+enum nni_plat_sync_flags {
+ NNI_PLAT_SYNC_INIT = 0x01,
+ NNI_PLAT_SYNC_LOCKED = 0x04,
+ NNI_PLAT_SYNC_WAIT = 0x08,
+};
-// We open a /dev/null file descriptor so that we can dup2() it to
-// cause MacOS X to wakeup. This gives us a "safe" close semantic.
+void
+nni_plat_mtx_init(nni_plat_mtx *mtx)
+{
+ if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) {
+ mtx->fallback = 1;
+ } else {
+ mtx->flags = NNI_PLAT_SYNC_INIT;
+ }
+#ifndef NDEBUG
+ if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) {
+ mtx->fallback = 1;
+ }
+#endif
+}
-int nni_plat_devnull = -1;
+void
+nni_plat_mtx_fini(nni_plat_mtx *mtx)
+{
+ if (mtx->flags & NNI_PLAT_SYNC_INIT) {
+ int rv;
+ if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) {
+ nni_panic("pthread_mutex_destroy: %s", strerror(rv));
+ }
+ }
+ mtx->flags = 0;
+}
-int
-nni_plat_mtx_init(nni_plat_mtx *mtx)
+static void
+nni_pthread_mutex_lock(pthread_mutex_t *m)
{
int rv;
- if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) {
- switch (rv) {
- case EAGAIN:
- case ENOMEM:
- return (NNG_ENOMEM);
+ if ((rv = pthread_mutex_lock(m)) != 0) {
+ nni_panic("pthread_mutex_lock: %s", strerror(rv));
+ }
+}
- default:
- nni_panic("pthread_mutex_init: %s", strerror(rv));
- }
+static void
+nni_pthread_mutex_unlock(pthread_mutex_t *m)
+{
+ int rv;
+
+ if ((rv = pthread_mutex_unlock(m)) != 0) {
+ nni_panic("pthread_mutex_unlock: %s", strerror(rv));
}
- mtx->init = 1;
- return (0);
}
-void
-nni_plat_mtx_fini(nni_plat_mtx *mtx)
+static void
+nni_pthread_cond_broadcast(pthread_cond_t *c)
{
int rv;
- if (!mtx->init) {
- return;
+ if ((rv = pthread_cond_broadcast(c)) != 0) {
+ nni_panic("pthread_cond_broadcast: %s", strerror(rv));
}
- pthread_mutex_lock(&mtx->mtx);
- pthread_mutex_unlock(&mtx->mtx);
- if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_fini: %s", strerror(rv));
+}
+
+static void
+nni_pthread_cond_signal(pthread_cond_t *c)
+{
+ int rv;
+ if ((rv = pthread_cond_signal(c)) != 0) {
+ nni_panic("pthread_cond_signal: %s", strerror(rv));
}
- mtx->init = 0;
}
-void
-nni_plat_mtx_lock(nni_plat_mtx *mtx)
+static void
+nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m)
{
int rv;
- if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_lock: %s", strerror(rv));
+ if ((rv = pthread_cond_wait(c, m)) != 0) {
+ nni_panic("pthread_cond_wait: %s", strerror(rv));
+ }
+}
+
+static int
+nni_pthread_cond_timedwait(
+ pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts)
+{
+ int rv;
+
+ switch ((rv = pthread_cond_timedwait(c, m, ts))) {
+ case 0:
+ return (0);
+ case ETIMEDOUT:
+ case EAGAIN:
+ return (NNG_ETIMEDOUT);
+ }
+ nni_panic("pthread_cond_timedwait: %s", strerror(rv));
+ return (NNG_EINVAL);
+}
+
+static void
+nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx)
+{
+ while (mtx->flags & NNI_PLAT_SYNC_LOCKED) {
+ mtx->flags |= NNI_PLAT_SYNC_WAIT;
+ nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock);
}
+ mtx->flags |= NNI_PLAT_SYNC_LOCKED;
mtx->owner = pthread_self();
}
+static void
+nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx)
+{
+ NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED);
+ mtx->flags &= ~NNI_PLAT_SYNC_LOCKED;
+ if (mtx->flags & NNI_PLAT_SYNC_WAIT) {
+ mtx->flags &= ~NNI_PLAT_SYNC_WAIT;
+ pthread_cond_broadcast(&nni_plat_lock_cond);
+ }
+}
+
+static void
+nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ nni_plat_mtx_lock_fallback_locked(mtx);
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ nni_plat_mtx_unlock_fallback_locked(mtx);
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wake_fallback(nni_cv *cv)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (cv->flags & NNI_PLAT_SYNC_WAIT) {
+ cv->gen++;
+ cv->wake = 0;
+ nni_pthread_cond_broadcast(&nni_plat_cond_cond);
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wake1_fallback(nni_cv *cv)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (cv->flags & NNI_PLAT_SYNC_WAIT) {
+ cv->wake++;
+ nni_pthread_cond_broadcast(&nni_plat_cond_cond);
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wait_fallback(nni_cv *cv)
+{
+ int gen;
+
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (!cv->mtx->fallback) {
+ // transform the mutex to a fallback one. we have it held.
+ cv->mtx->fallback = 1;
+ cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED;
+ nni_pthread_mutex_unlock(&cv->mtx->mtx);
+ }
+
+ NNI_ASSERT(cv->mtx->owner == pthread_self());
+ NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED);
+ gen = cv->gen;
+ while ((cv->gen == gen) && (cv->wake == 0)) {
+ nni_plat_mtx_unlock_fallback_locked(cv->mtx);
+ cv->flags |= NNI_PLAT_SYNC_WAIT;
+ nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock);
+
+ nni_plat_mtx_lock_fallback_locked(cv->mtx);
+ }
+ if (cv->wake > 0) {
+ cv->wake--;
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static int
+nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts)
+{
+ int gen;
+ int rv = 0;
+
+ if (!cv->mtx->fallback) {
+ // transform the mutex to a fallback one. we have it held.
+ cv->mtx->fallback = 1;
+ cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED;
+ nni_pthread_mutex_unlock(&cv->mtx->mtx);
+ }
+
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ gen = cv->gen;
+ while ((cv->gen == gen) && (cv->wake == 0)) {
+ nni_plat_mtx_unlock_fallback_locked(cv->mtx);
+ cv->flags |= NNI_PLAT_SYNC_WAIT;
+ rv = nni_pthread_cond_timedwait(
+ &nni_plat_cond_cond, &nni_plat_lock, ts);
+ nni_plat_mtx_lock_fallback_locked(cv->mtx);
+ if (rv != 0) {
+ break;
+ }
+ }
+ if ((rv == 0) && (cv->wake > 0)) {
+ cv->wake--;
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+ return (rv);
+}
+
void
-nni_plat_mtx_unlock(nni_plat_mtx *mtx)
+nni_plat_mtx_lock(nni_plat_mtx *mtx)
{
int rv;
+ if (!mtx->fallback) {
+ nni_pthread_mutex_lock(&mtx->mtx);
+
+ // We might have changed to a fallback lock; make
+ // sure this did not occur. Note that transitions to
+ // fallback locks only happen when a thread accesses
+ // a condition variable already holding this lock,
+ // so this is guranteed to be safe.
+ if (!mtx->fallback) {
+ mtx->owner = pthread_self();
+ return;
+ }
+ nni_pthread_mutex_unlock(&mtx->mtx);
+ }
+
+ // Fallback mode
+ nni_plat_mtx_lock_fallback(mtx);
+}
+
+void
+nni_plat_mtx_unlock(nni_plat_mtx *mtx)
+{
NNI_ASSERT(mtx->owner == pthread_self());
mtx->owner = 0;
- if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_unlock: %s", strerror(rv));
+
+ if (mtx->fallback) {
+ nni_plat_mtx_unlock_fallback(mtx);
+ } else {
+ nni_pthread_mutex_unlock(&mtx->mtx);
}
}
-int
+void
nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
{
- int rv;
-
- if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) {
- switch (rv) {
- case ENOMEM:
- case EAGAIN:
- return (NNG_ENOMEM);
-
- default:
- nni_panic("pthread_cond_init: %s", strerror(rv));
- }
+ if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) {
+ cv->fallback = 1;
+ } else {
+ cv->flags = NNI_PLAT_SYNC_INIT;
+ }
+#ifndef NDEBUG
+ if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) {
+ cv->fallback = 1;
}
+#endif
cv->mtx = mtx;
- return (0);
}
void
nni_plat_cv_wake(nni_plat_cv *cv)
{
- (void) pthread_cond_broadcast(&cv->cv);
+ if (cv->fallback) {
+ nni_plat_cv_wake_fallback(cv);
+ } else {
+ nni_pthread_cond_broadcast(&cv->cv);
+ }
}
void
nni_plat_cv_wake1(nni_plat_cv *cv)
{
- (void) pthread_cond_signal(&cv->cv);
+ int rv;
+ if (cv->fallback) {
+ nni_plat_cv_wake1_fallback(cv);
+ } else {
+ nni_pthread_cond_signal(&cv->cv);
+ }
}
void
nni_plat_cv_wait(nni_plat_cv *cv)
{
- int rv;
-
NNI_ASSERT(cv->mtx->owner == pthread_self());
- if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) {
- nni_panic("pthread_cond_wait: %s", strerror(rv));
+ if (cv->fallback) {
+ nni_plat_cv_wait_fallback(cv);
+ } else {
+ nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx);
+ cv->mtx->owner = pthread_self();
}
- cv->mtx->owner = pthread_self();
}
int
@@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until)
ts.tv_sec = until / 1000000;
ts.tv_nsec = (until % 1000000) * 1000;
- rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts);
- cv->mtx->owner = pthread_self();
- if (rv == ETIMEDOUT) {
- return (NNG_ETIMEDOUT);
- } else if (rv != 0) {
- nni_panic("pthread_cond_timedwait: %d", rv);
+ if (cv->fallback) {
+ rv = nni_plat_cv_until_fallback(cv, &ts);
+ } else {
+ rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts);
+ cv->mtx->owner = pthread_self();
}
- return (0);
+ return (rv);
}
void
@@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv)
{
int rv;
- if (cv->mtx == NULL) {
- return;
- }
- if ((rv = pthread_cond_destroy(&cv->cv)) != 0) {
+ if ((cv->flags & NNI_PLAT_SYNC_INIT) &&
+ ((rv = pthread_cond_destroy(&cv->cv)) != 0)) {
nni_panic("pthread_cond_destroy: %s", strerror(rv));
}
- cv->mtx = NULL;
+ cv->flags = 0;
+ cv->mtx = NULL;
}
static void *
@@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->arg = arg;
// POSIX wants functions to return a void *, but we don't care.
- rv = pthread_create(
- &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr);
+ rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr);
if (rv != 0) {
- // nni_printf("pthread_create: %s", strerror(rv));
+ // nni_printf("pthread_create: %s",
+ // strerror(rv));
return (NNG_ENOMEM);
}
return (0);
@@ -227,7 +432,6 @@ int
nni_plat_init(int (*helper)(void))
{
int rv;
- int devnull;
if (nni_plat_forked) {
nni_panic("nng is not fork-reentrant safe");
@@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void))
if (nni_plat_inited) {
return (0); // fast path
}
- if ((devnull = open("/dev/null", O_RDONLY)) < 0) {
- return (nni_plat_errno(errno));
- }
- pthread_mutex_lock(&nni_plat_lock);
+ pthread_mutex_lock(&nni_plat_init_lock);
if (nni_plat_inited) { // check again under the lock to be sure
- pthread_mutex_unlock(&nni_plat_lock);
- close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (0);
}
if (pthread_condattr_init(&nni_cvattr) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (NNG_ENOMEM);
}
#if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME
if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (NNG_ENOMEM);
}
#endif
if (pthread_mutexattr_init(&nni_mxattr) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- pthread_condattr_destroy(&nni_cvattr);
- (void) close(devnull);
- return (NNG_ENOMEM);
- }
-
- rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK);
- if (rv != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
- pthread_mutexattr_destroy(&nni_mxattr);
- pthread_condattr_destroy(&nni_cvattr);
- return (NNG_ENOMEM);
- }
-
- rv = pthread_attr_init(&nni_pthread_attr);
- if (rv != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(nni_plat_devnull);
- pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_mutex_unlock(&nni_plat_init_lock);
pthread_condattr_destroy(&nni_cvattr);
return (NNG_ENOMEM);
}
- // We don't force this, but we want to have it small... we could
- // probably get by with even just 8k, but Linux usually wants 16k
- // as a minimum. If this fails, its not fatal, just we won't be
- // as scalable / thrifty with our use of VM.
- //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+ // if this one fails we don't care.
+ (void) pthread_mutexattr_settype(
+ &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK);
if ((rv = nni_posix_pollq_sysinit()) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(nni_plat_devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (rv);
}
if ((rv = nni_posix_resolv_sysinit()) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
nni_posix_pollq_sysfini();
- (void) close(nni_plat_devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (rv);
}
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
- (void) close(devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (NNG_ENOMEM);
}
if ((rv = helper()) == 0) {
nni_plat_inited = 1;
}
- nni_plat_devnull = devnull;
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (rv);
}
@@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void))
void
nni_plat_fini(void)
{
- pthread_mutex_lock(&nni_plat_lock);
+ pthread_mutex_lock(&nni_plat_init_lock);
if (nni_plat_inited) {
nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
- (void) close(nni_plat_devnull);
- nni_plat_devnull = -1;
- nni_plat_inited = 0;
+ nni_plat_inited = 0;
}
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
}
#else
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index db6d7af4..2aa16b36 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) {
- NNI_FREE_STRUCT(udp);
- return (rv);
- }
+ nni_mtx_init(&udp->udp_mtx);
udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (udp->udp_fd < 0) {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index c4cdcb8a..9c3343b7 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h)
int
nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
{
- int rv;
-
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
- if (((rv = nni_mtx_init(&evt->mtx)) != 0) ||
- ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) {
- return (rv); // NB: This will never happen on Windows.
- }
+ nni_mtx_init(&evt->mtx);
+ nni_cv_init(&evt->cv, &evt->mtx);
+
evt->ops = *ops;
evt->aio = NULL;
evt->ptr = ptr;
@@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void)
goto fail;
}
}
- if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) {
- goto fail;
- }
+ nni_mtx_init(&nni_win_iocp_mtx);
for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) {
nni_thr_run(&nni_win_iocp_thrs[i]);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c9eb20ec..a60815fa 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void)
NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
- if (((rv = nni_mtx_init(&worker->mtx)) != 0) ||
- ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) {
- return (rv);
- }
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
if (rv != 0) {
return (rv);
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 63295a71..80e3724d 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -680,8 +680,6 @@ int
nni_win_tcp_sysinit(void)
{
WSADATA data;
- WORD ver;
- ver = MAKEWORD(2, 2);
if (WSAStartup(MAKEWORD(2, 2), &data) != 0) {
NNI_ASSERT(LOBYTE(data.wVersion) == 2);
NNI_ASSERT(HIBYTE(data.wVersion) == 2);
diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c
index 861fbc76..edc4df3f 100644
--- a/src/platform/windows/win_pipe.c
+++ b/src/platform/windows/win_pipe.c
@@ -19,9 +19,9 @@
int
nni_plat_pipe_open(int *wfdp, int *rfdp)
{
- SOCKET afd = INVALID_SOCKET;
SOCKET rfd = INVALID_SOCKET;
SOCKET wfd = INVALID_SOCKET;
+ SOCKET afd;
struct sockaddr_in addr;
socklen_t alen;
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index a01dc123..4ce12d84 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void)
{
int rv;
- if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&nni_win_resolv_mtx);
+
if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
nni_mtx_fini(&nni_win_resolv_mtx);
return (rv);
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index c01ec782..879cd772 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -30,12 +30,11 @@ nni_free(void *b, size_t z)
HeapFree(GetProcessHeap(), 0, b);
}
-int
+void
nni_plat_mtx_init(nni_plat_mtx *mtx)
{
InitializeSRWLock(&mtx->srl);
mtx->init = 1;
- return (0);
}
void
@@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx)
ReleaseSRWLockExclusive(&mtx->srl);
}
-int
+void
nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
{
InitializeConditionVariable(&cv->cv);
cv->srl = &mtx->srl;
- return (0);
}
void
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 79d7187e..88fd93e0 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -59,40 +59,28 @@ nni_bus_sock_fini(void *arg)
{
nni_bus_sock *psock = arg;
- if (psock != NULL) {
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
- }
+ nni_aio_stop(&psock->aio_getq);
+ nni_aio_fini(&psock->aio_getq);
+ nni_mtx_fini(&psock->mtx);
+ NNI_FREE_STRUCT(psock);
}
static int
nni_bus_sock_init(void **sp, nni_sock *nsock)
{
nni_bus_sock *psock;
- int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node);
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
- if (rv != 0) {
- goto fail;
- }
+ nni_mtx_init(&psock->mtx);
+ nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
psock->nsock = nsock;
psock->raw = 0;
*sp = psock;
return (0);
-
-fail:
- nni_bus_sock_fini(psock);
- return (rv);
}
static void
@@ -134,36 +122,21 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_NODE_INIT(&ppipe->node);
- if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) ||
- ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
- if (rv != 0) {
- goto fail;
+ if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
}
+ NNI_LIST_NODE_INIT(&ppipe->node);
+ nni_mtx_init(&ppipe->mtx);
+ nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
+ nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
+ nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
ppipe->npipe = npipe;
ppipe->psock = psock;
*pp = ppipe;
return (0);
-
-fail:
- nni_bus_pipe_fini(ppipe);
- return (rv);
}
static int
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index a0e907f2..acf9ec25 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -53,15 +53,11 @@ static int
pair0_sock_init(void **sp, nni_sock *nsock)
{
pair0_sock *s;
- int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&s->mtx)) != 0) {
- NNI_FREE_STRUCT(s);
- return (rv);
- }
+ nni_mtx_init(&s->mtx);
s->nsock = nsock;
s->ppipe = NULL;
s->raw = 0;
@@ -84,22 +80,19 @@ static int
pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
pair0_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
- pair0_pipe_fini(p);
- } else {
- p->npipe = npipe;
- p->psock = psock;
- *pp = p;
- }
- return (rv);
+ nni_aio_init(&p->aio_send, pair0_send_cb, p);
+ nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
+ nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
+ nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+
+ p->npipe = npipe;
+ p->psock = psock;
+ *pp = p;
+ return (0);
}
static void
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index 1a7ad9fa..2b8a9120 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -73,22 +73,24 @@ pair1_sock_init(void **sp, nni_sock *nsock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&s->pipes)) != 0) {
+ NNI_FREE_STRUCT(s);
+ return (NNG_ENOMEM);
+ }
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
// Raw mode uses this.
- if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
- ((rv = nni_mtx_init(&s->mtx)) != 0) ||
- ((rv = nni_idhash_init(&s->pipes)) != 0)) {
- pair1_sock_fini(s);
- } else {
- s->nsock = nsock;
- s->raw = 0;
- s->poly = 0;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
- *sp = s;
- }
+ nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s);
+ nni_mtx_init(&s->mtx);
+
+ s->nsock = nsock;
+ s->raw = 0;
+ s->poly = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
+
return (0);
}
@@ -101,17 +103,19 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
- pair1_pipe_fini(p);
- } else {
- p->npipe = npipe;
- p->psock = psock;
- *pp = p;
+ if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(p);
+ return (NNG_ENOMEM);
}
+ nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
+ nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p);
+ nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p);
+
+ p->npipe = npipe;
+ p->psock = psock;
+ *pp = p;
+
return (rv);
}
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index e3c73342..1ebcc4a2 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -63,20 +63,13 @@ static int
nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pull_pipe *pp;
- int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) {
- NNI_FREE_STRUCT(pp);
- return (rv);
- }
- if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) {
- nni_aio_fini(&pp->putq_aio);
- NNI_FREE_STRUCT(pp);
- return (rv);
- }
+ nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp);
+ nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp);
+
pp->pipe = pipe;
pp->pull = psock;
*ppp = pp;
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 14b3b191..1bc1659c 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -93,30 +93,19 @@ static int
nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_push_pipe *pp;
- int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) {
- goto fail;
- }
+ nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp);
+ nni_aio_init(&pp->aio_send, nni_push_send_cb, pp);
+ nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp);
NNI_LIST_NODE_INIT(&pp->node);
pp->pipe = pipe;
pp->push = psock;
*ppp = pp;
return (0);
-
-fail:
- nni_push_pipe_fini(pp);
- return (rv);
}
static int
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 161a5d79..940f2139 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -53,20 +53,13 @@ static int
nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
- int rv;
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pub->mtx)) != 0) {
- nni_pub_sock_fini(pub);
- return (rv);
- }
- rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
- if (rv != 0) {
- nni_pub_sock_fini(pub);
- return (rv);
- }
+ nni_mtx_init(&pub->mtx);
+ nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
+
pub->sock = sock;
pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
@@ -127,31 +120,18 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
}
// XXX: consider making this depth tunable
if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
- goto fail;
- }
-
- rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
- if (rv != 0) {
- goto fail;
+ NNI_FREE_STRUCT(pp);
+ return (rv);
}
- rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
+ nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
+ nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
- rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
- if (rv != 0) {
- goto fail;
- }
pp->pipe = pipe;
pp->pub = psock;
*ppp = pp;
return (0);
-
-fail:
- nni_pub_pipe_fini(pp);
- return (rv);
}
static int
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 53f01e0f..78b9d157 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -95,16 +95,13 @@ static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
- int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp)) != 0) ||
- ((rv = nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp)) != 0)) {
- nni_sub_pipe_fini(sp);
- return (rv);
- }
+ nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp);
+ nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp);
+
sp->pipe = pipe;
sp->sub = ssock;
*spp = sp;
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 4319cbf8..09f2b285 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -74,20 +74,18 @@ nni_rep_sock_init(void **repp, nni_sock *sock)
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
+ NNI_FREE_STRUCT(rep);
+ return (rv);
+ }
+
rep->ttl = 8; // Per RFC
rep->sock = sock;
rep->raw = 0;
rep->btrace = NULL;
rep->btrace_len = 0;
- if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
- goto fail;
- }
-
- rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep);
rep->uwq = nni_sock_sendq(sock);
rep->urq = nni_sock_recvq(sock);
@@ -96,10 +94,6 @@ nni_rep_sock_init(void **repp, nni_sock *sock)
nni_sock_senderr(sock, NNG_ESTATE);
return (0);
-
-fail:
- nni_rep_sock_fini(rep);
- return (rv);
}
static void
@@ -128,32 +122,18 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
return (NNG_ENOMEM);
}
if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) !=
- 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) !=
- 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) !=
- 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) !=
- 0) {
- goto fail;
+ NNI_FREE_STRUCT(rp);
+ return (rv);
}
+ nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp);
+ nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp);
+ nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp);
+ nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp);
+
rp->pipe = pipe;
rp->rep = rsock;
*rpp = rp;
return (0);
-
-fail:
- nni_rep_pipe_fini(rp);
- return (rv);
}
static void
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index fdf29fd9..bab81331 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -75,19 +75,12 @@ static int
nni_req_sock_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
- int rv;
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&req->mtx)) != 0) {
- NNI_FREE_STRUCT(req);
- return (rv);
- }
- if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) {
- nni_mtx_fini(&req->mtx);
- NNI_FREE_STRUCT(req);
- }
+ nni_mtx_init(&req->mtx);
+ nni_cv_init(&req->cv, &req->mtx);
NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
@@ -152,41 +145,22 @@ static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
- int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&rp->mtx)) != 0) {
- goto failed;
- }
- if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) {
- goto failed;
- }
- if ((rv = nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp)) != 0) {
- goto failed;
- }
- if ((rv = nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp)) != 0) {
- goto failed;
- }
- rv = nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
- if (rv != 0) {
- goto failed;
- }
+ nni_mtx_init(&rp->mtx);
+ nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp);
+ nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp);
+ nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp);
+ nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
+ nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
rp->req = rsock;
*rpp = rp;
return (0);
-
-failed:
- nni_req_pipe_fini(rp);
- return (rv);
}
static void
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 32513134..a097f551 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -77,6 +77,11 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
+ NNI_FREE_STRUCT(psock);
+ return (rv);
+ }
+
psock->ttl = 8; // Per RFC
psock->nsock = nsock;
psock->raw = 0;
@@ -85,24 +90,12 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
psock->urq = nni_sock_recvq(nsock);
psock->uwq = nni_sock_sendq(nsock);
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- goto fail;
- }
- if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock);
- if (rv != 0) {
- goto fail;
- }
+ nni_mtx_init(&psock->mtx);
+ nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock);
*pp = psock;
nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
-
-fail:
- nni_resp_sock_fini(psock);
- return (rv);
}
static void
@@ -131,33 +124,18 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (NNG_ENOMEM);
}
if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe);
- if (rv != 0) {
- goto fail;
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
}
+ nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe);
+ nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe);
ppipe->npipe = npipe;
ppipe->psock = psock;
*pp = ppipe;
return (0);
-
-fail:
- nni_resp_pipe_fini(ppipe);
- return (rv);
}
static void
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index cb90c13f..2a32f289 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -70,19 +70,13 @@ static int
nni_surv_sock_init(void **sp, nni_sock *nsock)
{
nni_surv_sock *psock;
- int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock);
- if (rv != 0) {
- goto fail;
- }
NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
+ nni_mtx_init(&psock->mtx);
+ nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock);
nni_timer_init(&psock->timer, nni_surv_timeout, psock);
psock->nextid = nni_random();
@@ -96,10 +90,6 @@ nni_surv_sock_init(void **sp, nni_sock *nsock)
*sp = psock;
nni_sock_recverr(nsock, NNG_ESTATE);
return (0);
-
-fail:
- nni_surv_sock_fini(psock);
- return (rv);
}
static void
@@ -143,32 +133,19 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
}
// This depth could be tunable.
if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe);
- if (rv != 0) {
- goto failed;
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
}
+
+ nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe);
+ nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe);
+
ppipe->npipe = npipe;
ppipe->psock = psock;
*pp = ppipe;
return (0);
-
-failed:
- nni_surv_pipe_fini(ppipe);
- return (rv);
}
static int
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 3bc24c41..08cf99a2 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -65,14 +65,9 @@ static nni_inproc_global nni_inproc;
static int
nni_inproc_init(void)
{
- int rv;
-
NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node);
- if ((rv = nni_mtx_init(&nni_inproc.mx)) != 0) {
- return (rv);
- }
-
+ nni_mtx_init(&nni_inproc.mx);
return (0);
}
@@ -309,8 +304,7 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
continue;
}
- if (((rv = nni_mtx_init(&pair->mx)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
+ if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
nni_inproc_pair_destroy(pair);
nni_inproc_conn_finish(caio, rv);
@@ -318,6 +312,8 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
continue;
}
+ nni_mtx_init(&pair->mx);
+
pair->pipes[0] = caio->a_pipe;
pair->pipes[1] = saio->a_pipe;
pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7d976122..0b0e487f 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -107,18 +107,14 @@ static int
nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&p->mtx)) != 0) ||
- ((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) {
- nni_ipc_pipe_fini(p);
- return (rv);
- }
+ nni_mtx_init(&p->mtx);
+ nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p);
+ nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -490,12 +486,14 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
- ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) ||
- ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) {
- nni_ipc_ep_fini(ep);
+ if ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0) {
+ NNI_FREE_STRUCT(ep);
return (rv);
}
+
+ nni_mtx_init(&ep->mtx);
+ nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep);
+
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4d47733b..b3136b35 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -107,18 +107,15 @@ static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&p->mtx)) != 0) ||
- ((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) {
- nni_tcp_pipe_fini(p);
- return (rv);
- }
+ nni_mtx_init(&p->mtx);
+ nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p);
+ nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p);
+
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
p->tpp = tpp;
@@ -555,12 +552,13 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
- ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) ||
- ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) {
- nni_tcp_ep_fini(ep);
+ if ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0) {
+ NNI_FREE_STRUCT(ep);
return (rv);
}
+ nni_mtx_init(&ep->mtx);
+ nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep);
+
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);