aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c55
-rw-r--r--src/core/aio.h2
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/endpt.c19
-rw-r--r--src/core/event.c3
-rw-r--r--src/core/event.h2
-rw-r--r--src/core/idhash.c6
-rw-r--r--src/core/init.c4
-rw-r--r--src/core/msgqueue.c35
-rw-r--r--src/core/pipe.c15
-rw-r--r--src/core/platform.h21
-rw-r--r--src/core/random.c6
-rw-r--r--src/core/socket.c41
-rw-r--r--src/core/taskq.c9
-rw-r--r--src/core/thread.c20
-rw-r--r--src/core/thread.h8
-rw-r--r--src/core/timer.c9
-rw-r--r--src/core/transport.c5
-rw-r--r--src/nng.c2
-rw-r--r--src/platform/posix/posix_epdesc.c11
-rw-r--r--src/platform/posix/posix_impl.h17
-rw-r--r--src/platform/posix/posix_pipe.c1
-rw-r--r--src/platform/posix/posix_pipedesc.c5
-rw-r--r--src/platform/posix/posix_pollq_poll.c7
-rw-r--r--src/platform/posix/posix_resolv_gai.c5
-rw-r--r--src/platform/posix/posix_thread.c417
-rw-r--r--src/platform/posix/posix_udp.c5
-rw-r--r--src/platform/windows/win_iocp.c13
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_net.c2
-rw-r--r--src/platform/windows/win_pipe.c2
-rw-r--r--src/platform/windows/win_resolv.c5
-rw-r--r--src/platform/windows/win_thread.c6
-rw-r--r--src/protocol/bus/bus.c57
-rw-r--r--src/protocol/pair/pair_v0.c27
-rw-r--r--src/protocol/pair/pair_v1.c50
-rw-r--r--src/protocol/pipeline/pull.c13
-rw-r--r--src/protocol/pipeline/push.c17
-rw-r--r--src/protocol/pubsub/pub.c36
-rw-r--r--src/protocol/pubsub/sub.c9
-rw-r--r--src/protocol/reqrep/rep.c46
-rw-r--r--src/protocol/reqrep/req.c42
-rw-r--r--src/protocol/survey/respond.c48
-rw-r--r--src/protocol/survey/survey.c43
-rw-r--r--src/transport/inproc/inproc.c12
-rw-r--r--src/transport/ipc/ipc.c22
-rw-r--r--src/transport/tcp/tcp.c22
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/pair1.c3
-rw-r--r--tests/platform.c105
-rw-r--r--tests/synch.c292
51 files changed, 907 insertions, 707 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a39c0118..fe9bcde8 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -56,20 +56,14 @@ static nni_list nni_aio_expire_aios;
static void nni_aio_expire_add(nni_aio *);
-int
+void
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
- int rv;
-
memset(aio, 0, sizeof(*aio));
- if ((rv = nni_cv_init(&aio->a_cv, &nni_aio_lk)) != 0) {
- return (rv);
- }
+ nni_cv_init(&aio->a_cv, &nni_aio_lk);
aio->a_expire = NNI_TIME_NEVER;
aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
-
- return (0);
}
void
@@ -350,46 +344,43 @@ nni_aio_expire_loop(void *arg)
}
}
-int
-nni_aio_sys_init(void)
+void
+nni_aio_sys_fini(void)
{
- int rv;
nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
- if (((rv = nni_mtx_init(mtx)) != 0) ||
- ((rv = nni_cv_init(cv, mtx)) != 0) ||
- ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0)) {
- goto fail;
+ if (nni_aio_expire_run) {
+ nni_mtx_lock(mtx);
+ nni_aio_expire_run = 0;
+ nni_cv_wake(cv);
+ nni_mtx_unlock(mtx);
}
- NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node);
- nni_aio_expire_run = 1;
- nni_thr_run(thr);
- return (0);
-fail:
nni_thr_fini(thr);
nni_cv_fini(cv);
nni_mtx_fini(mtx);
- return (rv);
}
-void
-nni_aio_sys_fini(void)
+int
+nni_aio_sys_init(void)
{
+ int rv;
nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
- if (nni_aio_expire_run) {
- nni_mtx_lock(mtx);
- nni_aio_expire_run = 0;
- nni_cv_wake(cv);
- nni_mtx_unlock(mtx);
+ NNI_LIST_INIT(&nni_aio_expire_aios, nni_aio, a_expire_node);
+ nni_mtx_init(mtx);
+ nni_cv_init(cv, mtx);
+
+ if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) {
+ nni_aio_sys_fini();
+ return (rv);
}
- nni_thr_fini(thr);
- nni_cv_fini(cv);
- nni_mtx_fini(mtx);
-} \ No newline at end of file
+ nni_aio_expire_run = 1;
+ nni_thr_run(thr);
+ return (0);
+}
diff --git a/src/core/aio.h b/src/core/aio.h
index d48442eb..aabb3fa9 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -66,7 +66,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern int nni_aio_init(nni_aio *, nni_cb, void *);
+extern void nni_aio_init(nni_aio *, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
diff --git a/src/core/defs.h b/src/core/defs.h
index 69fe4843..4d8e6ffb 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -17,9 +17,13 @@
// superior, support for such are not universal.
#define NNI_ARG_UNUSED(x) ((void) x);
+#ifndef NDEBUG
#define NNI_ASSERT(x) \
if (!(x)) \
nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, #x)
+#else
+#define NNI_ASSERT(x)
+#endif
// These types are common but have names shared with user space.
typedef struct nng_msg nni_msg;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0ab35ea3..34debc0e 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -57,10 +57,10 @@ nni_ep_sys_init(void)
{
int rv;
- if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) ||
- ((rv = nni_idhash_init(&nni_eps)) != 0)) {
+ if ((rv = nni_idhash_init(&nni_eps)) != 0) {
return (rv);
}
+ nni_mtx_init(&nni_ep_lk);
nni_idhash_set_limits(
nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
@@ -152,13 +152,14 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_pipe_ep_list_init(&ep->ep_pipes);
- if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) ||
- ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
- ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ nni_mtx_init(&ep->ep_mtx);
+ nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
+ nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
+ nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
+ nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
+ nni_aio_init(&ep->ep_con_syn, NULL, NULL);
+
+ if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
diff --git a/src/core/event.c b/src/core/event.c
index 79910d80..ab157b02 100644
--- a/src/core/event.c
+++ b/src/core/event.c
@@ -12,13 +12,12 @@
#include <stdlib.h>
#include <string.h>
-int
+void
nni_ev_init(nni_event *event, int type, nni_sock *sock)
{
memset(event, 0, sizeof(*event));
event->e_type = type;
event->e_sock = sock;
- return (0);
}
void
diff --git a/src/core/event.h b/src/core/event.h
index 6d9a9394..22af096e 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -28,7 +28,7 @@ struct nng_notify {
nni_aio n_aio;
};
-extern int nni_ev_init(nni_event *, int, nni_sock *);
+extern void nni_ev_init(nni_event *, int, nni_sock *);
extern void nni_ev_fini(nni_event *);
#endif // CORE_EVENT_H
diff --git a/src/core/idhash.c b/src/core/idhash.c
index ab6b67e1..03854fc8 100644
--- a/src/core/idhash.c
+++ b/src/core/idhash.c
@@ -36,15 +36,11 @@ int
nni_idhash_init(nni_idhash **hp)
{
nni_idhash *h;
- int rv;
if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&h->ih_mtx)) != 0) {
- NNI_FREE_STRUCT(h);
- return (rv);
- }
+ nni_mtx_init(&h->ih_mtx);
h->ih_entries = NULL;
h->ih_count = 0;
h->ih_load = 0;
diff --git a/src/core/init.c b/src/core/init.c
index 24a7b118..4025c0f4 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -1,5 +1,5 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -39,8 +39,6 @@ nni_init(void)
void
nni_fini(void)
{
- // XXX: We should make sure that underlying sockets and
- // file descriptors are closed. Details TBD.
nni_tran_sys_fini();
nni_pipe_sys_fini();
nni_ep_sys_fini();
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 6530857e..ef7dd5a2 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -39,7 +39,6 @@ int
nni_msgq_init(nni_msgq **mqp, unsigned cap)
{
struct nni_msgq *mq;
- int rv;
int alloc;
// We allocate 2 extra cells in the fifo. One to accommodate a
@@ -52,21 +51,18 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) {
+ NNI_FREE_STRUCT(mq);
+ return (NNG_ENOMEM);
+ }
+
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_aio_list_init(&mq->mq_aio_notify_get);
nni_aio_list_init(&mq->mq_aio_notify_put);
- if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) {
- rv = NNG_ENOMEM;
- goto fail;
- }
+ nni_mtx_init(&mq->mq_lock);
+ nni_cv_init(&mq->mq_drained, &mq->mq_lock);
mq->mq_cap = cap;
mq->mq_alloc = alloc;
@@ -80,15 +76,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
*mqp = mq;
return (0);
-
-fail:
- nni_cv_fini(&mq->mq_drained);
- nni_mtx_fini(&mq->mq_lock);
- if (mq->mq_msgs != NULL) {
- nni_free(mq->mq_msgs, sizeof(nng_msg *) * alloc);
- }
- NNI_FREE_STRUCT(mq);
- return (rv);
}
void
@@ -413,9 +400,7 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
nni_aio aio;
int rv;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
aio.a_expire = expire;
nni_msgq_aio_get(mq, &aio);
nni_aio_wait(&aio);
@@ -433,9 +418,7 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
nni_aio aio;
int rv;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
+ nni_aio_init(&aio, NULL, NULL);
aio.a_expire = expire;
aio.a_msg = msg;
nni_msgq_aio_put(mq, &aio);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 75c4c8d6..cdbeb6a7 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -49,9 +49,10 @@ nni_pipe_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
+ nni_mtx_init(&nni_pipe_reap_lk);
+ nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);
+
if (((rv = nni_idhash_init(&nni_pipes)) != 0) ||
- ((rv = nni_mtx_init(&nni_pipe_reap_lk)) != 0) ||
- ((rv = nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk)) != 0) ||
((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) !=
0)) {
return (rv);
@@ -240,11 +241,11 @@ nni_pipe_create(nni_ep *ep, void *tdata)
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
- if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
- ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) ||
- ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) !=
- 0) ||
- ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
+ nni_mtx_init(&p->p_mtx);
+ nni_cv_init(&p->p_cv, &p->p_mtx);
+ nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p);
+
+ if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
((rv = nni_ep_pipe_add(ep, p)) != 0) ||
((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
diff --git a/src/core/platform.h b/src/core/platform.h
index 7acf16ef..f6f0b974 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -85,10 +85,9 @@ typedef struct nni_plat_thr nni_plat_thr;
// Threading & Synchronization Support
//
-// nni_plat_mtx_init initializes a mutex structure. This may require dynamic
-// allocation, depending on the platform. It can return NNG_ENOMEM if that
-// fails. An initialized mutex must be distinguishable from zeroed memory.
-extern int nni_plat_mtx_init(nni_plat_mtx *);
+// nni_plat_mtx_init initializes a mutex structure. An initialized mutex must
+// be distinguishable from zeroed memory.
+extern void nni_plat_mtx_init(nni_plat_mtx *);
// nni_plat_mtx_fini destroys the mutex and releases any resources allocated
// for it's use. If the mutex is zeroed memory, this should do nothing.
@@ -99,20 +98,14 @@ extern void nni_plat_mtx_fini(nni_plat_mtx *);
extern void nni_plat_mtx_lock(nni_plat_mtx *);
// nni_plat_mtx_unlock unlocks the mutex. This can only be performed by the
-// threadthat owned the mutex.
+// thread that owned the mutex.
extern void nni_plat_mtx_unlock(nni_plat_mtx *);
-// nni_plat_mtx_tryenter tries to lock the mutex. If it can't, it may return
-// NNG_EBUSY if the mutex is already owned.
-extern int nni_plat_mtx_trylock(nni_plat_mtx *);
-
// nni_plat_cv_init initializes a condition variable. We require a mutex be
// supplied with it, and that mutex must always be held when performing any
-// operations on the condition variable (other than fini.) This may require
-// dynamic allocation, and if so this operation may fail with NNG_ENOMEM.
-// As with mutexes, an initialized mutex should be distinguishable from
-// zeroed memory.
-extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *);
+// operations on the condition variable (other than fini.) As with mutexes, an
+// initialized mutex should be distinguishable from zeroed memory.
+extern void nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *);
// nni_plat_cv_fini releases all resources associated with condition variable.
// If the cv points to just zeroed memory (was never initialized), it does
diff --git a/src/core/random.c b/src/core/random.c
index eb0b4fc2..febd0848 100644
--- a/src/core/random.c
+++ b/src/core/random.c
@@ -170,12 +170,8 @@ nni_random_sys_init(void)
{
// minimally, grab the system clock
nni_isaac_ctx *ctx = &nni_random_ctx;
- int rv;
-
- if ((rv = nni_mtx_init(&ctx->mx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&ctx->mx);
nni_plat_seed_prng(ctx->randrsl, sizeof(ctx->randrsl));
nni_isaac_randinit(ctx, 1);
return (0);
diff --git a/src/core/socket.c b/src/core/socket.c
index 6a243650..9aa89a2d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -231,7 +231,6 @@ nni_notify *
nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
{
nni_notify *notify;
- int rv;
if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
return (NULL);
@@ -244,31 +243,19 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
switch (type) {
case NNG_EV_CAN_RCV:
- rv = nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
break;
case NNG_EV_CAN_SND:
- rv = nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
break;
default:
- rv = NNG_ENOTSUP;
- goto fail;
- break;
+ NNI_FREE_STRUCT(notify);
+ return (NULL);
}
return (notify);
-
-fail:
- nni_aio_fini(&notify->n_aio);
- NNI_FREE_STRUCT(notify);
- return (NULL);
}
void
@@ -343,13 +330,13 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
+ nni_mtx_init(&s->s_mx);
+ nni_cv_init(&s->s_cv, &s->s_mx);
+ nni_cv_init(&s->s_close_cv, &nni_sock_lk);
+ nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s);
+ nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s);
- if (((rv = nni_mtx_init(&s->s_mx)) != 0) ||
- ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) ||
- ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) ||
- ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) ||
- ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) ||
- ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
+ if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) {
nni_sock_destroy(s);
@@ -365,8 +352,9 @@ nni_sock_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
- if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) ||
- ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) {
+ nni_mtx_init(&nni_sock_lk);
+
+ if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) {
nni_sock_sys_fini();
} else {
nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
@@ -562,6 +550,9 @@ nni_sock_closeall(void)
{
nni_sock *s;
+ if (nni_sock_hash == NULL) {
+ return;
+ }
for (;;) {
nni_mtx_lock(&nni_sock_lk);
if ((s = nni_list_first(&nni_sock_list)) == NULL) {
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e0fc456c..d0f04e8a 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -85,12 +85,9 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
tq->tq_nthreads = nthr;
NNI_LIST_INIT(&tq->tq_tasks, nni_task, task_node);
- if (((rv = nni_mtx_init(&tq->tq_mtx)) != 0) ||
- ((rv = nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx)) != 0) ||
- ((rv = nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx)) != 0)) {
- nni_taskq_fini(tq);
- return (rv);
- }
+ nni_mtx_init(&tq->tq_mtx);
+ nni_cv_init(&tq->tq_sched_cv, &tq->tq_mtx);
+ nni_cv_init(&tq->tq_wait_cv, &tq->tq_mtx);
for (i = 0; i < nthr; i++) {
tq->tq_threads[i].tqt_tq = tq;
diff --git a/src/core/thread.c b/src/core/thread.c
index 6c3bd9f3..54c9c7d2 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -9,10 +9,10 @@
#include "core/nng_impl.h"
-int
+void
nni_mtx_init(nni_mtx *mtx)
{
- return (nni_plat_mtx_init(mtx));
+ nni_plat_mtx_init(mtx);
}
void
@@ -33,10 +33,10 @@ nni_mtx_unlock(nni_mtx *mtx)
nni_plat_mtx_unlock(mtx);
}
-int
+void
nni_cv_init(nni_cv *cv, nni_mtx *mtx)
{
- return (nni_plat_cv_init(cv, mtx));
+ nni_plat_cv_init(cv, mtx);
}
void
@@ -110,15 +110,9 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg)
thr->fn = fn;
thr->arg = arg;
- if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) {
- thr->done = 1;
- return (rv);
- }
- if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) {
- nni_plat_mtx_fini(&thr->mtx);
- thr->done = 1;
- return (rv);
- }
+ nni_plat_mtx_init(&thr->mtx);
+ nni_plat_cv_init(&thr->cv, &thr->mtx);
+
if (fn == NULL) {
thr->init = 1;
thr->done = 1;
diff --git a/src/core/thread.h b/src/core/thread.h
index 94b2a984..ee83b196 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -25,9 +26,8 @@ struct nni_thr {
int init;
};
-// nni_mtx_init initializes the mutex. (Win32 programmers take note;
-// our mutexes are actually CriticalSections on Win32.)
-extern int nni_mtx_init(nni_mtx *mtx);
+// nni_mtx_init initializes the mutex.
+extern void nni_mtx_init(nni_mtx *mtx);
// nni_mtx_fini destroys the mutex and releases any resources used by it.
extern void nni_mtx_fini(nni_mtx *mtx);
@@ -43,7 +43,7 @@ extern void nni_mtx_unlock(nni_mtx *mtx);
// nni_cv_init initializes the condition variable. The mutex supplied
// must always be locked with the condition variable.
-extern int nni_cv_init(nni_cv *cv, nni_mtx *);
+extern void nni_cv_init(nni_cv *cv, nni_mtx *);
// nni_cv_fini releases resources associated with the condition variable,
// which must not be in use at the time.
diff --git a/src/core/timer.c b/src/core/timer.c
index 73bc7604..7608f7d5 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -40,10 +40,11 @@ nni_timer_sys_init(void)
memset(timer, 0, sizeof(*timer));
NNI_LIST_INIT(&timer->t_entries, nni_timer_node, t_node);
- if (((rv = nni_mtx_init(&timer->t_mx)) != 0) ||
- ((rv = nni_cv_init(&timer->t_sched_cv, &timer->t_mx)) != 0) ||
- ((rv = nni_cv_init(&timer->t_wait_cv, &timer->t_mx)) != 0) ||
- ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0)) {
+ nni_mtx_init(&timer->t_mx);
+ nni_cv_init(&timer->t_sched_cv, &timer->t_mx);
+ nni_cv_init(&timer->t_wait_cv, &timer->t_mx);
+
+ if ((rv = nni_thr_init(&timer->t_thr, nni_timer_loop, timer)) != 0) {
nni_timer_sys_fini();
return (rv);
}
diff --git a/src/core/transport.c b/src/core/transport.c
index 278c7d1e..6f73a6b2 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -84,8 +84,9 @@ nni_tran_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node);
- if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) ||
- ((rv = nni_tran_register(&nni_inproc_tran)) != 0) ||
+ nni_mtx_init(&nni_tran_lk);
+
+ if (((rv = nni_tran_register(&nni_inproc_tran)) != 0) ||
((rv = nni_tran_register(&nni_ipc_tran)) != 0) ||
((rv = nni_tran_register(&nni_tcp_tran)) != 0)) {
nni_tran_sys_fini();
diff --git a/src/nng.c b/src/nng.c
index 822d2713..a833eeed 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -24,6 +24,7 @@
void
nng_fini(void)
{
+ nni_sock_closeall();
nni_fini();
}
@@ -184,7 +185,6 @@ nng_free(void *buf, size_t sz)
int
nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
{
- nni_time expire;
int rv;
nni_sock *sock;
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 46fe2bea..f511f35f 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -126,10 +126,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
static void
nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
{
- nni_aio * aio;
- int newfd;
- struct sockaddr_storage ss;
- socklen_t slen;
+ nni_aio *aio;
+ int newfd;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
// We could argue that knowing the remote peer address would
@@ -456,10 +454,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&ed->mtx)) != 0) {
- NNI_FREE_STRUCT(ed);
- return (rv);
- }
+ nni_mtx_init(&ed->mtx);
// We could randomly choose a different pollq, or for efficiencies
// sake we could take a modulo of the file desc number to choose
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 46ebbc1d..81fbd48b 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -54,9 +54,19 @@ extern int nni_plat_errno(int);
// elsewhere.
struct nni_plat_mtx {
- int init;
pthread_t owner;
pthread_mutex_t mtx;
+ int fallback;
+ int flags;
+};
+
+struct nni_plat_cv {
+ pthread_cond_t cv;
+ nni_plat_mtx * mtx;
+ int fallback;
+ int flags;
+ int gen;
+ int wake;
};
struct nni_plat_thr {
@@ -65,11 +75,6 @@ struct nni_plat_thr {
void *arg;
};
-struct nni_plat_cv {
- pthread_cond_t cv;
- nni_plat_mtx * mtx;
-};
-
#endif
extern int nni_posix_pollq_sysinit(void);
diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c
index 78415d26..314e2f5e 100644
--- a/src/platform/posix/posix_pipe.c
+++ b/src/platform/posix/posix_pipe.c
@@ -105,7 +105,6 @@ void
nni_plat_pipe_clear(int rfd)
{
char buf[32];
- int rv;
for (;;) {
// Completely drain the pipe, but don't wait. This coalesces
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index bd74e0c0..b2c1cb1f 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -302,10 +302,6 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
// one. For now we just have a global pollq. Note that by tying
// the pd to a single pollq we may get some kind of cache warmth.
- if ((rv = nni_mtx_init(&pd->mtx)) != 0) {
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
pd->closed = 0;
pd->node.fd = fd;
pd->node.cb = nni_posix_pipedesc_cb;
@@ -313,6 +309,7 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ nni_mtx_init(&pd->mtx);
nni_aio_list_init(&pd->readq);
nni_aio_list_init(&pd->writeq);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index d3fdf394..df5a1799 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -347,9 +347,10 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
pq->wakerfd = -1;
pq->close = 0;
- if (((rv = nni_mtx_init(&pq->mtx)) != 0) ||
- ((rv = nni_cv_init(&pq->cv, &pq->mtx)) != 0) ||
- ((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
+ nni_mtx_init(&pq->mtx);
+ nni_cv_init(&pq->cv, &pq->mtx);
+
+ if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
nni_posix_pollq_fini(pq);
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 09d40b94..dce8270a 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -276,9 +276,8 @@ nni_posix_resolv_sysinit(void)
{
int rv;
- if ((rv = nni_mtx_init(&nni_posix_resolv_mtx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&nni_posix_resolv_mtx);
+
if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) {
nni_mtx_fini(&nni_posix_resolv_mtx);
return (rv);
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 0ef4754c..44bfbed2 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -24,118 +24,325 @@
#include <time.h>
#include <unistd.h>
-static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
-static int nni_plat_inited = 0;
-static int nni_plat_forked = 0;
+static pthread_mutex_t nni_plat_init_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t nni_plat_cond_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t nni_plat_lock_cond = PTHREAD_COND_INITIALIZER;
+static int nni_plat_inited = 0;
+static int nni_plat_forked = 0;
+
+pthread_condattr_t nni_cvattr;
+pthread_mutexattr_t nni_mxattr;
+
+#ifndef NDEBUG
+int nni_plat_sync_fallback = 0;
+#endif
-pthread_condattr_t nni_cvattr;
-pthread_mutexattr_t nni_mxattr;
-static pthread_attr_t nni_pthread_attr;
+enum nni_plat_sync_flags {
+ NNI_PLAT_SYNC_INIT = 0x01,
+ NNI_PLAT_SYNC_LOCKED = 0x04,
+ NNI_PLAT_SYNC_WAIT = 0x08,
+};
-// We open a /dev/null file descriptor so that we can dup2() it to
-// cause MacOS X to wakeup. This gives us a "safe" close semantic.
+void
+nni_plat_mtx_init(nni_plat_mtx *mtx)
+{
+ if (pthread_mutex_init(&mtx->mtx, &nni_mxattr) != 0) {
+ mtx->fallback = 1;
+ } else {
+ mtx->flags = NNI_PLAT_SYNC_INIT;
+ }
+#ifndef NDEBUG
+ if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) {
+ mtx->fallback = 1;
+ }
+#endif
+}
-int nni_plat_devnull = -1;
+void
+nni_plat_mtx_fini(nni_plat_mtx *mtx)
+{
+ if (mtx->flags & NNI_PLAT_SYNC_INIT) {
+ int rv;
+ if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) {
+ nni_panic("pthread_mutex_destroy: %s", strerror(rv));
+ }
+ }
+ mtx->flags = 0;
+}
-int
-nni_plat_mtx_init(nni_plat_mtx *mtx)
+static void
+nni_pthread_mutex_lock(pthread_mutex_t *m)
{
int rv;
- if ((rv = pthread_mutex_init(&mtx->mtx, &nni_mxattr)) != 0) {
- switch (rv) {
- case EAGAIN:
- case ENOMEM:
- return (NNG_ENOMEM);
+ if ((rv = pthread_mutex_lock(m)) != 0) {
+ nni_panic("pthread_mutex_lock: %s", strerror(rv));
+ }
+}
- default:
- nni_panic("pthread_mutex_init: %s", strerror(rv));
- }
+static void
+nni_pthread_mutex_unlock(pthread_mutex_t *m)
+{
+ int rv;
+
+ if ((rv = pthread_mutex_unlock(m)) != 0) {
+ nni_panic("pthread_mutex_unlock: %s", strerror(rv));
}
- mtx->init = 1;
- return (0);
}
-void
-nni_plat_mtx_fini(nni_plat_mtx *mtx)
+static void
+nni_pthread_cond_broadcast(pthread_cond_t *c)
{
int rv;
- if (!mtx->init) {
- return;
+ if ((rv = pthread_cond_broadcast(c)) != 0) {
+ nni_panic("pthread_cond_broadcast: %s", strerror(rv));
}
- pthread_mutex_lock(&mtx->mtx);
- pthread_mutex_unlock(&mtx->mtx);
- if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_fini: %s", strerror(rv));
+}
+
+static void
+nni_pthread_cond_signal(pthread_cond_t *c)
+{
+ int rv;
+ if ((rv = pthread_cond_signal(c)) != 0) {
+ nni_panic("pthread_cond_signal: %s", strerror(rv));
}
- mtx->init = 0;
}
-void
-nni_plat_mtx_lock(nni_plat_mtx *mtx)
+static void
+nni_pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m)
{
int rv;
- if ((rv = pthread_mutex_lock(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_lock: %s", strerror(rv));
+ if ((rv = pthread_cond_wait(c, m)) != 0) {
+ nni_panic("pthread_cond_wait: %s", strerror(rv));
+ }
+}
+
+static int
+nni_pthread_cond_timedwait(
+ pthread_cond_t *c, pthread_mutex_t *m, struct timespec *ts)
+{
+ int rv;
+
+ switch ((rv = pthread_cond_timedwait(c, m, ts))) {
+ case 0:
+ return (0);
+ case ETIMEDOUT:
+ case EAGAIN:
+ return (NNG_ETIMEDOUT);
+ }
+ nni_panic("pthread_cond_timedwait: %s", strerror(rv));
+ return (NNG_EINVAL);
+}
+
+static void
+nni_plat_mtx_lock_fallback_locked(nni_plat_mtx *mtx)
+{
+ while (mtx->flags & NNI_PLAT_SYNC_LOCKED) {
+ mtx->flags |= NNI_PLAT_SYNC_WAIT;
+ nni_pthread_cond_wait(&nni_plat_lock_cond, &nni_plat_lock);
}
+ mtx->flags |= NNI_PLAT_SYNC_LOCKED;
mtx->owner = pthread_self();
}
+static void
+nni_plat_mtx_unlock_fallback_locked(nni_plat_mtx *mtx)
+{
+ NNI_ASSERT(mtx->flags & NNI_PLAT_SYNC_LOCKED);
+ mtx->flags &= ~NNI_PLAT_SYNC_LOCKED;
+ if (mtx->flags & NNI_PLAT_SYNC_WAIT) {
+ mtx->flags &= ~NNI_PLAT_SYNC_WAIT;
+ pthread_cond_broadcast(&nni_plat_lock_cond);
+ }
+}
+
+static void
+nni_plat_mtx_lock_fallback(nni_plat_mtx *mtx)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ nni_plat_mtx_lock_fallback_locked(mtx);
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_mtx_unlock_fallback(nni_plat_mtx *mtx)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ nni_plat_mtx_unlock_fallback_locked(mtx);
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wake_fallback(nni_cv *cv)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (cv->flags & NNI_PLAT_SYNC_WAIT) {
+ cv->gen++;
+ cv->wake = 0;
+ nni_pthread_cond_broadcast(&nni_plat_cond_cond);
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wake1_fallback(nni_cv *cv)
+{
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (cv->flags & NNI_PLAT_SYNC_WAIT) {
+ cv->wake++;
+ nni_pthread_cond_broadcast(&nni_plat_cond_cond);
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static void
+nni_plat_cv_wait_fallback(nni_cv *cv)
+{
+ int gen;
+
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ if (!cv->mtx->fallback) {
+ // transform the mutex to a fallback one. we have it held.
+ cv->mtx->fallback = 1;
+ cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED;
+ nni_pthread_mutex_unlock(&cv->mtx->mtx);
+ }
+
+ NNI_ASSERT(cv->mtx->owner == pthread_self());
+ NNI_ASSERT(cv->mtx->flags & NNI_PLAT_SYNC_LOCKED);
+ gen = cv->gen;
+ while ((cv->gen == gen) && (cv->wake == 0)) {
+ nni_plat_mtx_unlock_fallback_locked(cv->mtx);
+ cv->flags |= NNI_PLAT_SYNC_WAIT;
+ nni_pthread_cond_wait(&nni_plat_cond_cond, &nni_plat_lock);
+
+ nni_plat_mtx_lock_fallback_locked(cv->mtx);
+ }
+ if (cv->wake > 0) {
+ cv->wake--;
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+}
+
+static int
+nni_plat_cv_until_fallback(nni_cv *cv, struct timespec *ts)
+{
+ int gen;
+ int rv = 0;
+
+ if (!cv->mtx->fallback) {
+ // transform the mutex to a fallback one. we have it held.
+ cv->mtx->fallback = 1;
+ cv->mtx->flags |= NNI_PLAT_SYNC_LOCKED;
+ nni_pthread_mutex_unlock(&cv->mtx->mtx);
+ }
+
+ nni_pthread_mutex_lock(&nni_plat_lock);
+ gen = cv->gen;
+ while ((cv->gen == gen) && (cv->wake == 0)) {
+ nni_plat_mtx_unlock_fallback_locked(cv->mtx);
+ cv->flags |= NNI_PLAT_SYNC_WAIT;
+ rv = nni_pthread_cond_timedwait(
+ &nni_plat_cond_cond, &nni_plat_lock, ts);
+ nni_plat_mtx_lock_fallback_locked(cv->mtx);
+ if (rv != 0) {
+ break;
+ }
+ }
+ if ((rv == 0) && (cv->wake > 0)) {
+ cv->wake--;
+ }
+ nni_pthread_mutex_unlock(&nni_plat_lock);
+ return (rv);
+}
+
void
-nni_plat_mtx_unlock(nni_plat_mtx *mtx)
+nni_plat_mtx_lock(nni_plat_mtx *mtx)
{
int rv;
+ if (!mtx->fallback) {
+ nni_pthread_mutex_lock(&mtx->mtx);
+
+ // We might have changed to a fallback lock; make
+ // sure this did not occur. Note that transitions to
+ // fallback locks only happen when a thread accesses
+ // a condition variable already holding this lock,
+ // so this is guranteed to be safe.
+ if (!mtx->fallback) {
+ mtx->owner = pthread_self();
+ return;
+ }
+ nni_pthread_mutex_unlock(&mtx->mtx);
+ }
+
+ // Fallback mode
+ nni_plat_mtx_lock_fallback(mtx);
+}
+
+void
+nni_plat_mtx_unlock(nni_plat_mtx *mtx)
+{
NNI_ASSERT(mtx->owner == pthread_self());
mtx->owner = 0;
- if ((rv = pthread_mutex_unlock(&mtx->mtx)) != 0) {
- nni_panic("pthread_mutex_unlock: %s", strerror(rv));
+
+ if (mtx->fallback) {
+ nni_plat_mtx_unlock_fallback(mtx);
+ } else {
+ nni_pthread_mutex_unlock(&mtx->mtx);
}
}
-int
+void
nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
{
- int rv;
-
- if ((rv = pthread_cond_init(&cv->cv, &nni_cvattr)) != 0) {
- switch (rv) {
- case ENOMEM:
- case EAGAIN:
- return (NNG_ENOMEM);
-
- default:
- nni_panic("pthread_cond_init: %s", strerror(rv));
- }
+ if (mtx->fallback || (pthread_cond_init(&cv->cv, &nni_cvattr) != 0)) {
+ cv->fallback = 1;
+ } else {
+ cv->flags = NNI_PLAT_SYNC_INIT;
+ }
+#ifndef NDEBUG
+ if (nni_plat_sync_fallback || getenv("NNG_SYNC_FALLBACK")) {
+ cv->fallback = 1;
}
+#endif
cv->mtx = mtx;
- return (0);
}
void
nni_plat_cv_wake(nni_plat_cv *cv)
{
- (void) pthread_cond_broadcast(&cv->cv);
+ if (cv->fallback) {
+ nni_plat_cv_wake_fallback(cv);
+ } else {
+ nni_pthread_cond_broadcast(&cv->cv);
+ }
}
void
nni_plat_cv_wake1(nni_plat_cv *cv)
{
- (void) pthread_cond_signal(&cv->cv);
+ int rv;
+ if (cv->fallback) {
+ nni_plat_cv_wake1_fallback(cv);
+ } else {
+ nni_pthread_cond_signal(&cv->cv);
+ }
}
void
nni_plat_cv_wait(nni_plat_cv *cv)
{
- int rv;
-
NNI_ASSERT(cv->mtx->owner == pthread_self());
- if ((rv = pthread_cond_wait(&cv->cv, &cv->mtx->mtx)) != 0) {
- nni_panic("pthread_cond_wait: %s", strerror(rv));
+ if (cv->fallback) {
+ nni_plat_cv_wait_fallback(cv);
+ } else {
+ nni_pthread_cond_wait(&cv->cv, &cv->mtx->mtx);
+ cv->mtx->owner = pthread_self();
}
- cv->mtx->owner = pthread_self();
}
int
@@ -150,14 +357,13 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until)
ts.tv_sec = until / 1000000;
ts.tv_nsec = (until % 1000000) * 1000;
- rv = pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts);
- cv->mtx->owner = pthread_self();
- if (rv == ETIMEDOUT) {
- return (NNG_ETIMEDOUT);
- } else if (rv != 0) {
- nni_panic("pthread_cond_timedwait: %d", rv);
+ if (cv->fallback) {
+ rv = nni_plat_cv_until_fallback(cv, &ts);
+ } else {
+ rv = nni_pthread_cond_timedwait(&cv->cv, &cv->mtx->mtx, &ts);
+ cv->mtx->owner = pthread_self();
}
- return (0);
+ return (rv);
}
void
@@ -165,13 +371,12 @@ nni_plat_cv_fini(nni_plat_cv *cv)
{
int rv;
- if (cv->mtx == NULL) {
- return;
- }
- if ((rv = pthread_cond_destroy(&cv->cv)) != 0) {
+ if ((cv->flags & NNI_PLAT_SYNC_INIT) &&
+ ((rv = pthread_cond_destroy(&cv->cv)) != 0)) {
nni_panic("pthread_cond_destroy: %s", strerror(rv));
}
- cv->mtx = NULL;
+ cv->flags = 0;
+ cv->mtx = NULL;
}
static void *
@@ -198,10 +403,10 @@ nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
thr->arg = arg;
// POSIX wants functions to return a void *, but we don't care.
- rv = pthread_create(
- &thr->tid, &nni_pthread_attr, nni_plat_thr_main, thr);
+ rv = pthread_create(&thr->tid, NULL, nni_plat_thr_main, thr);
if (rv != 0) {
- // nni_printf("pthread_create: %s", strerror(rv));
+ // nni_printf("pthread_create: %s",
+ // strerror(rv));
return (NNG_ENOMEM);
}
return (0);
@@ -227,7 +432,6 @@ int
nni_plat_init(int (*helper)(void))
{
int rv;
- int devnull;
if (nni_plat_forked) {
nni_panic("nng is not fork-reentrant safe");
@@ -235,94 +439,60 @@ nni_plat_init(int (*helper)(void))
if (nni_plat_inited) {
return (0); // fast path
}
- if ((devnull = open("/dev/null", O_RDONLY)) < 0) {
- return (nni_plat_errno(errno));
- }
- pthread_mutex_lock(&nni_plat_lock);
+ pthread_mutex_lock(&nni_plat_init_lock);
if (nni_plat_inited) { // check again under the lock to be sure
- pthread_mutex_unlock(&nni_plat_lock);
- close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (0);
}
if (pthread_condattr_init(&nni_cvattr) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (NNG_ENOMEM);
}
#if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME
if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (NNG_ENOMEM);
}
#endif
if (pthread_mutexattr_init(&nni_mxattr) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- pthread_condattr_destroy(&nni_cvattr);
- (void) close(devnull);
- return (NNG_ENOMEM);
- }
-
- rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK);
- if (rv != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(devnull);
- pthread_mutexattr_destroy(&nni_mxattr);
- pthread_condattr_destroy(&nni_cvattr);
- return (NNG_ENOMEM);
- }
-
- rv = pthread_attr_init(&nni_pthread_attr);
- if (rv != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(nni_plat_devnull);
- pthread_mutexattr_destroy(&nni_mxattr);
+ pthread_mutex_unlock(&nni_plat_init_lock);
pthread_condattr_destroy(&nni_cvattr);
return (NNG_ENOMEM);
}
- // We don't force this, but we want to have it small... we could
- // probably get by with even just 8k, but Linux usually wants 16k
- // as a minimum. If this fails, its not fatal, just we won't be
- // as scalable / thrifty with our use of VM.
- //(void) pthread_attr_setstacksize(&nni_pthread_attr, 16384);
+ // if this one fails we don't care.
+ (void) pthread_mutexattr_settype(
+ &nni_mxattr, PTHREAD_MUTEX_ERRORCHECK);
if ((rv = nni_posix_pollq_sysinit()) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
- (void) close(nni_plat_devnull);
+ pthread_mutex_unlock(&nni_plat_init_lock);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (rv);
}
if ((rv = nni_posix_resolv_sysinit()) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
nni_posix_pollq_sysfini();
- (void) close(nni_plat_devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (rv);
}
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
- (void) close(devnull);
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
return (NNG_ENOMEM);
}
if ((rv = helper()) == 0) {
nni_plat_inited = 1;
}
- nni_plat_devnull = devnull;
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
return (rv);
}
@@ -330,18 +500,15 @@ nni_plat_init(int (*helper)(void))
void
nni_plat_fini(void)
{
- pthread_mutex_lock(&nni_plat_lock);
+ pthread_mutex_lock(&nni_plat_init_lock);
if (nni_plat_inited) {
nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
- pthread_attr_destroy(&nni_pthread_attr);
- (void) close(nni_plat_devnull);
- nni_plat_devnull = -1;
- nni_plat_inited = 0;
+ nni_plat_inited = 0;
}
- pthread_mutex_unlock(&nni_plat_lock);
+ pthread_mutex_unlock(&nni_plat_init_lock);
}
#else
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index db6d7af4..2aa16b36 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -208,10 +208,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
if ((udp = NNI_ALLOC_STRUCT(udp)) != NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&udp->udp_mtx)) != 0) {
- NNI_FREE_STRUCT(udp);
- return (rv);
- }
+ nni_mtx_init(&udp->udp_mtx);
udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (udp->udp_fd < 0) {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index c4cdcb8a..9c3343b7 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -174,17 +174,14 @@ nni_win_iocp_register(HANDLE h)
int
nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
{
- int rv;
-
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
- if (((rv = nni_mtx_init(&evt->mtx)) != 0) ||
- ((rv = nni_cv_init(&evt->cv, &evt->mtx)) != 0)) {
- return (rv); // NB: This will never happen on Windows.
- }
+ nni_mtx_init(&evt->mtx);
+ nni_cv_init(&evt->cv, &evt->mtx);
+
evt->ops = *ops;
evt->aio = NULL;
evt->ptr = ptr;
@@ -240,9 +237,7 @@ nni_win_iocp_sysinit(void)
goto fail;
}
}
- if ((rv = nni_mtx_init(&nni_win_iocp_mtx)) != 0) {
- goto fail;
- }
+ nni_mtx_init(&nni_win_iocp_mtx);
for (i = 0; i < NNI_WIN_IOCP_NTHREADS; i++) {
nni_thr_run(&nni_win_iocp_thrs[i]);
}
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c9eb20ec..a60815fa 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -566,10 +566,9 @@ nni_win_ipc_sysinit(void)
NNI_LIST_INIT(&worker->workers, nni_plat_ipc_ep, node);
NNI_LIST_INIT(&worker->waiters, nni_plat_ipc_ep, node);
- if (((rv = nni_mtx_init(&worker->mtx)) != 0) ||
- ((rv = nni_cv_init(&worker->cv, &worker->mtx)) != 0)) {
- return (rv);
- }
+ nni_mtx_init(&worker->mtx);
+ nni_cv_init(&worker->cv, &worker->mtx);
+
rv = nni_thr_init(&worker->thr, nni_win_ipc_conn_thr, worker);
if (rv != 0) {
return (rv);
diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c
index 63295a71..80e3724d 100644
--- a/src/platform/windows/win_net.c
+++ b/src/platform/windows/win_net.c
@@ -680,8 +680,6 @@ int
nni_win_tcp_sysinit(void)
{
WSADATA data;
- WORD ver;
- ver = MAKEWORD(2, 2);
if (WSAStartup(MAKEWORD(2, 2), &data) != 0) {
NNI_ASSERT(LOBYTE(data.wVersion) == 2);
NNI_ASSERT(HIBYTE(data.wVersion) == 2);
diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c
index 861fbc76..edc4df3f 100644
--- a/src/platform/windows/win_pipe.c
+++ b/src/platform/windows/win_pipe.c
@@ -19,9 +19,9 @@
int
nni_plat_pipe_open(int *wfdp, int *rfdp)
{
- SOCKET afd = INVALID_SOCKET;
SOCKET rfd = INVALID_SOCKET;
SOCKET wfd = INVALID_SOCKET;
+ SOCKET afd;
struct sockaddr_in addr;
socklen_t alen;
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index a01dc123..4ce12d84 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -254,9 +254,8 @@ nni_win_resolv_sysinit(void)
{
int rv;
- if ((rv = nni_mtx_init(&nni_win_resolv_mtx)) != 0) {
- return (rv);
- }
+ nni_mtx_init(&nni_win_resolv_mtx);
+
if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
nni_mtx_fini(&nni_win_resolv_mtx);
return (rv);
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index c01ec782..879cd772 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -30,12 +30,11 @@ nni_free(void *b, size_t z)
HeapFree(GetProcessHeap(), 0, b);
}
-int
+void
nni_plat_mtx_init(nni_plat_mtx *mtx)
{
InitializeSRWLock(&mtx->srl);
mtx->init = 1;
- return (0);
}
void
@@ -56,12 +55,11 @@ nni_plat_mtx_unlock(nni_plat_mtx *mtx)
ReleaseSRWLockExclusive(&mtx->srl);
}
-int
+void
nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
{
InitializeConditionVariable(&cv->cv);
cv->srl = &mtx->srl;
- return (0);
}
void
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 79d7187e..88fd93e0 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -59,40 +59,28 @@ nni_bus_sock_fini(void *arg)
{
nni_bus_sock *psock = arg;
- if (psock != NULL) {
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
- }
+ nni_aio_stop(&psock->aio_getq);
+ nni_aio_fini(&psock->aio_getq);
+ nni_mtx_fini(&psock->mtx);
+ NNI_FREE_STRUCT(psock);
}
static int
nni_bus_sock_init(void **sp, nni_sock *nsock)
{
nni_bus_sock *psock;
- int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node);
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
- if (rv != 0) {
- goto fail;
- }
+ nni_mtx_init(&psock->mtx);
+ nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
psock->nsock = nsock;
psock->raw = 0;
*sp = psock;
return (0);
-
-fail:
- nni_bus_sock_fini(psock);
- return (rv);
}
static void
@@ -134,36 +122,21 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_NODE_INIT(&ppipe->node);
- if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) ||
- ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
- if (rv != 0) {
- goto fail;
+ if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
}
+ NNI_LIST_NODE_INIT(&ppipe->node);
+ nni_mtx_init(&ppipe->mtx);
+ nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
+ nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
+ nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
ppipe->npipe = npipe;
ppipe->psock = psock;
*pp = ppipe;
return (0);
-
-fail:
- nni_bus_pipe_fini(ppipe);
- return (rv);
}
static int
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index a0e907f2..acf9ec25 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -53,15 +53,11 @@ static int
pair0_sock_init(void **sp, nni_sock *nsock)
{
pair0_sock *s;
- int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&s->mtx)) != 0) {
- NNI_FREE_STRUCT(s);
- return (rv);
- }
+ nni_mtx_init(&s->mtx);
s->nsock = nsock;
s->ppipe = NULL;
s->raw = 0;
@@ -84,22 +80,19 @@ static int
pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
pair0_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
- pair0_pipe_fini(p);
- } else {
- p->npipe = npipe;
- p->psock = psock;
- *pp = p;
- }
- return (rv);
+ nni_aio_init(&p->aio_send, pair0_send_cb, p);
+ nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
+ nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
+ nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+
+ p->npipe = npipe;
+ p->psock = psock;
+ *pp = p;
+ return (0);
}
static void
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index 1a7ad9fa..2b8a9120 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -73,22 +73,24 @@ pair1_sock_init(void **sp, nni_sock *nsock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&s->pipes)) != 0) {
+ NNI_FREE_STRUCT(s);
+ return (NNG_ENOMEM);
+ }
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
// Raw mode uses this.
- if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
- ((rv = nni_mtx_init(&s->mtx)) != 0) ||
- ((rv = nni_idhash_init(&s->pipes)) != 0)) {
- pair1_sock_fini(s);
- } else {
- s->nsock = nsock;
- s->raw = 0;
- s->poly = 0;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
- *sp = s;
- }
+ nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s);
+ nni_mtx_init(&s->mtx);
+
+ s->nsock = nsock;
+ s->raw = 0;
+ s->poly = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
+
return (0);
}
@@ -101,17 +103,19 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
- pair1_pipe_fini(p);
- } else {
- p->npipe = npipe;
- p->psock = psock;
- *pp = p;
+ if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(p);
+ return (NNG_ENOMEM);
}
+ nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
+ nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p);
+ nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p);
+
+ p->npipe = npipe;
+ p->psock = psock;
+ *pp = p;
+
return (rv);
}
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index e3c73342..1ebcc4a2 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -63,20 +63,13 @@ static int
nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pull_pipe *pp;
- int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) {
- NNI_FREE_STRUCT(pp);
- return (rv);
- }
- if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) {
- nni_aio_fini(&pp->putq_aio);
- NNI_FREE_STRUCT(pp);
- return (rv);
- }
+ nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp);
+ nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp);
+
pp->pipe = pipe;
pp->pull = psock;
*ppp = pp;
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 14b3b191..1bc1659c 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -93,30 +93,19 @@ static int
nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_push_pipe *pp;
- int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) {
- goto fail;
- }
+ nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp);
+ nni_aio_init(&pp->aio_send, nni_push_send_cb, pp);
+ nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp);
NNI_LIST_NODE_INIT(&pp->node);
pp->pipe = pipe;
pp->push = psock;
*ppp = pp;
return (0);
-
-fail:
- nni_push_pipe_fini(pp);
- return (rv);
}
static int
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 161a5d79..940f2139 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -53,20 +53,13 @@ static int
nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
- int rv;
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pub->mtx)) != 0) {
- nni_pub_sock_fini(pub);
- return (rv);
- }
- rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
- if (rv != 0) {
- nni_pub_sock_fini(pub);
- return (rv);
- }
+ nni_mtx_init(&pub->mtx);
+ nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
+
pub->sock = sock;
pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
@@ -127,31 +120,18 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
}
// XXX: consider making this depth tunable
if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
- goto fail;
- }
-
- rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
- if (rv != 0) {
- goto fail;
+ NNI_FREE_STRUCT(pp);
+ return (rv);
}
- rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
+ nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
+ nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
- rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
- if (rv != 0) {
- goto fail;
- }
pp->pipe = pipe;
pp->pub = psock;
*ppp = pp;
return (0);
-
-fail:
- nni_pub_pipe_fini(pp);
- return (rv);
}
static int
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 53f01e0f..78b9d157 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -95,16 +95,13 @@ static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
- int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp)) != 0) ||
- ((rv = nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp)) != 0)) {
- nni_sub_pipe_fini(sp);
- return (rv);
- }
+ nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp);
+ nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp);
+
sp->pipe = pipe;
sp->sub = ssock;
*spp = sp;
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 4319cbf8..09f2b285 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -74,20 +74,18 @@ nni_rep_sock_init(void **repp, nni_sock *sock)
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
+ NNI_FREE_STRUCT(rep);
+ return (rv);
+ }
+
rep->ttl = 8; // Per RFC
rep->sock = sock;
rep->raw = 0;
rep->btrace = NULL;
rep->btrace_len = 0;
- if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
- goto fail;
- }
-
- rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep);
- if (rv != 0) {
- goto fail;
- }
+ nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep);
rep->uwq = nni_sock_sendq(sock);
rep->urq = nni_sock_recvq(sock);
@@ -96,10 +94,6 @@ nni_rep_sock_init(void **repp, nni_sock *sock)
nni_sock_senderr(sock, NNG_ESTATE);
return (0);
-
-fail:
- nni_rep_sock_fini(rep);
- return (rv);
}
static void
@@ -128,32 +122,18 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
return (NNG_ENOMEM);
}
if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) !=
- 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) !=
- 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) !=
- 0) {
- goto fail;
- }
- if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) !=
- 0) {
- goto fail;
+ NNI_FREE_STRUCT(rp);
+ return (rv);
}
+ nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp);
+ nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp);
+ nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp);
+ nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp);
+
rp->pipe = pipe;
rp->rep = rsock;
*rpp = rp;
return (0);
-
-fail:
- nni_rep_pipe_fini(rp);
- return (rv);
}
static void
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index fdf29fd9..bab81331 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -75,19 +75,12 @@ static int
nni_req_sock_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
- int rv;
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&req->mtx)) != 0) {
- NNI_FREE_STRUCT(req);
- return (rv);
- }
- if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) {
- nni_mtx_fini(&req->mtx);
- NNI_FREE_STRUCT(req);
- }
+ nni_mtx_init(&req->mtx);
+ nni_cv_init(&req->cv, &req->mtx);
NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
@@ -152,41 +145,22 @@ static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
- int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&rp->mtx)) != 0) {
- goto failed;
- }
- if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) {
- goto failed;
- }
- if ((rv = nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp)) != 0) {
- goto failed;
- }
- if ((rv = nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp)) != 0) {
- goto failed;
- }
- rv = nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
- if (rv != 0) {
- goto failed;
- }
+ nni_mtx_init(&rp->mtx);
+ nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp);
+ nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp);
+ nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp);
+ nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
+ nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
rp->req = rsock;
*rpp = rp;
return (0);
-
-failed:
- nni_req_pipe_fini(rp);
- return (rv);
}
static void
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 32513134..a097f551 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -77,6 +77,11 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
+ NNI_FREE_STRUCT(psock);
+ return (rv);
+ }
+
psock->ttl = 8; // Per RFC
psock->nsock = nsock;
psock->raw = 0;
@@ -85,24 +90,12 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
psock->urq = nni_sock_recvq(nsock);
psock->uwq = nni_sock_sendq(nsock);
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- goto fail;
- }
- if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock);
- if (rv != 0) {
- goto fail;
- }
+ nni_mtx_init(&psock->mtx);
+ nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock);
*pp = psock;
nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
-
-fail:
- nni_resp_sock_fini(psock);
- return (rv);
}
static void
@@ -131,33 +124,18 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (NNG_ENOMEM);
}
if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe);
- if (rv != 0) {
- goto fail;
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
}
+ nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe);
+ nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe);
ppipe->npipe = npipe;
ppipe->psock = psock;
*pp = ppipe;
return (0);
-
-fail:
- nni_resp_pipe_fini(ppipe);
- return (rv);
}
static void
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index cb90c13f..2a32f289 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -70,19 +70,13 @@ static int
nni_surv_sock_init(void **sp, nni_sock *nsock)
{
nni_surv_sock *psock;
- int rv;
if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock);
- if (rv != 0) {
- goto fail;
- }
NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
+ nni_mtx_init(&psock->mtx);
+ nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock);
nni_timer_init(&psock->timer, nni_surv_timeout, psock);
psock->nextid = nni_random();
@@ -96,10 +90,6 @@ nni_surv_sock_init(void **sp, nni_sock *nsock)
*sp = psock;
nni_sock_recverr(nsock, NNG_ESTATE);
return (0);
-
-fail:
- nni_surv_sock_fini(psock);
- return (rv);
}
static void
@@ -143,32 +133,19 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
}
// This depth could be tunable.
if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe);
- if (rv != 0) {
- goto failed;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe);
- if (rv != 0) {
- goto failed;
+ NNI_FREE_STRUCT(ppipe);
+ return (rv);
}
+
+ nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe);
+ nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe);
+ nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe);
+
ppipe->npipe = npipe;
ppipe->psock = psock;
*pp = ppipe;
return (0);
-
-failed:
- nni_surv_pipe_fini(ppipe);
- return (rv);
}
static int
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 3bc24c41..08cf99a2 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -65,14 +65,9 @@ static nni_inproc_global nni_inproc;
static int
nni_inproc_init(void)
{
- int rv;
-
NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node);
- if ((rv = nni_mtx_init(&nni_inproc.mx)) != 0) {
- return (rv);
- }
-
+ nni_mtx_init(&nni_inproc.mx);
return (0);
}
@@ -309,8 +304,7 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
continue;
}
- if (((rv = nni_mtx_init(&pair->mx)) != 0) ||
- ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
+ if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
nni_inproc_pair_destroy(pair);
nni_inproc_conn_finish(caio, rv);
@@ -318,6 +312,8 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
continue;
}
+ nni_mtx_init(&pair->mx);
+
pair->pipes[0] = caio->a_pipe;
pair->pipes[1] = saio->a_pipe;
pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7d976122..0b0e487f 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -107,18 +107,14 @@ static int
nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&p->mtx)) != 0) ||
- ((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) {
- nni_ipc_pipe_fini(p);
- return (rv);
- }
+ nni_mtx_init(&p->mtx);
+ nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p);
+ nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p);
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -490,12 +486,14 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
- ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) ||
- ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) {
- nni_ipc_ep_fini(ep);
+ if ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0) {
+ NNI_FREE_STRUCT(ep);
return (rv);
}
+
+ nni_mtx_init(&ep->mtx);
+ nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep);
+
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4d47733b..b3136b35 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -107,18 +107,15 @@ static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&p->mtx)) != 0) ||
- ((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) {
- nni_tcp_pipe_fini(p);
- return (rv);
- }
+ nni_mtx_init(&p->mtx);
+ nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p);
+ nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p);
+
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
p->tpp = tpp;
@@ -555,12 +552,13 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_mtx_init(&ep->mtx)) != 0) ||
- ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) ||
- ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) {
- nni_tcp_ep_fini(ep);
+ if ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0) {
+ NNI_FREE_STRUCT(ep);
return (rv);
}
+ nni_mtx_init(&ep->mtx);
+ nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep);
+
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 73484a4a..754d3ab2 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -85,6 +85,7 @@ add_nng_test(pubsub 5)
add_nng_test(resolv 10)
add_nng_test(sock 5)
add_nng_test(survey 5)
+add_nng_test(synch 5)
add_nng_test(tcp 5)
add_nng_test(scalability 20)
add_nng_test(message 5)
diff --git a/tests/pair1.c b/tests/pair1.c
index 08691fd4..d160acdd 100644
--- a/tests/pair1.c
+++ b/tests/pair1.c
@@ -94,7 +94,6 @@ TestMain("PAIRv1 protocol", {
});
Convey("Cannot set raw mode after connect", {
- int r = 1;
So(nng_listen(s1, addr, NULL, 0) == 0);
So(nng_dial(c1, addr, NULL, 0) == 0);
nng_usleep(100000);
@@ -313,7 +312,6 @@ TestMain("PAIRv1 protocol", {
int ttl;
ttl = 0;
- sz = sizeof(ttl);
So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0) ==
NNG_EINVAL);
@@ -423,7 +421,6 @@ TestMain("PAIRv1 protocol", {
uint32_t hops;
nng_pipe p1;
nng_pipe p2;
- size_t sz;
So(nng_getopt_int(s1, NNG_OPT_POLYAMOROUS, &v) == 0);
So(v == 0);
diff --git a/tests/platform.c b/tests/platform.c
index 5bfdc367..a28bc4e4 100644
--- a/tests/platform.c
+++ b/tests/platform.c
@@ -1,5 +1,6 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -8,20 +9,20 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
-#ifndef _WIN32
+#ifndef _WIN32
#include <sys/time.h>
#endif
uint64_t
getms(void)
{
-#ifdef _WIN32
- return (GetTickCount64()) ;
+#ifdef _WIN32
+ return (GetTickCount64());
#else
- static time_t epoch;
+ static time_t epoch;
struct timeval tv;
if (epoch == 0) {
@@ -35,7 +36,7 @@ getms(void)
return (0);
}
tv.tv_sec -= epoch;
- return (((uint64_t)(tv.tv_sec ) * 1000) + (tv.tv_usec / 1000));
+ return (((uint64_t)(tv.tv_sec) * 1000) + (tv.tv_usec / 1000));
#endif
}
@@ -43,15 +44,15 @@ getms(void)
void
add(void *arg)
{
- *(int *)arg += 1;
+ *(int *) arg += 1;
}
// Notify tests for verifying condvars.
struct notifyarg {
- int did;
- int when;
+ int did;
+ int when;
nni_mtx mx;
- nni_cv cv;
+ nni_cv cv;
};
void
@@ -66,17 +67,10 @@ notifyafter(void *arg)
nni_mtx_unlock(&na->mx);
}
-int
-nop(void)
-{
- return (0);
-}
+TestMain("Platform Operations", {
-Main({
nni_init();
- Test("Platform Operations", {
-
// This is required for anything else to work
Convey("The clock works", {
uint64_t now = getms();
@@ -84,13 +78,13 @@ Main({
Convey("usleep works", {
nni_usleep(100000);
- So((getms() - now) >= 100); // cannot be *shorter*!!
- So((getms() - now) < 150); // crummy clock resolution?
- })
+ So((getms() - now) >= 100); // cannot be *shorter*!!
+ So((getms() - now) < 150); // crummy clock resolution?
+ });
Convey("times work", {
uint64_t msend;
- int usdelta;
- int msdelta;
+ int usdelta;
+ int msdelta;
nni_time usend;
nni_time usnow = nni_clock();
nni_usleep(200000);
@@ -99,19 +93,17 @@ Main({
So(usend > usnow);
So(msend > now);
- usdelta = (int)((usend - usnow) / 1000);
- msdelta = (int)((msend - now));
+ usdelta = (int) ((usend - usnow) / 1000);
+ msdelta = (int) ((msend - now));
So(usdelta >= 200);
So(usdelta < 220);
So(abs(msdelta - usdelta) < 20);
- })
- })
+ });
+ });
Convey("Mutexes work", {
static nni_mtx mx;
- int rv;
- rv = nni_mtx_init(&mx);
- So(rv == 0);
+ nni_mtx_init(&mx);
Convey("We can lock a mutex", {
nni_mtx_lock(&mx);
@@ -124,40 +116,36 @@ Main({
So(1);
nni_mtx_unlock(&mx);
So(1);
- })
- })
- })
- Convey("We can finalize it", {
- nni_mtx_fini(&mx);
- })
- })
+ });
+ });
+ });
+ Convey("We can finalize it", { nni_mtx_fini(&mx); });
+ });
Convey("Threads work", {
static nni_thr thr;
- int val = 0;
- int rv;
+ int val = 0;
+ int rv;
Convey("We can create threads", {
rv = nni_thr_init(&thr, add, &val);
So(rv == 0);
nni_thr_run(&thr);
- Reset({
- nni_thr_fini(&thr);
- })
+ Reset({ nni_thr_fini(&thr); });
Convey("It ran", {
- nni_usleep(50000); // for context switch
+ nni_usleep(50000); // for context switch
So(val == 1);
- })
- })
- })
+ });
+ });
+ });
Convey("Condition variables work", {
static struct notifyarg arg;
- static nni_thr thr;
+ static nni_thr thr;
- So(nni_mtx_init(&arg.mx) == 0);
- So(nni_cv_init(&arg.cv, &arg.mx) == 0);
+ nni_mtx_init(&arg.mx);
+ nni_cv_init(&arg.cv, &arg.mx);
So(nni_thr_init(&thr, notifyafter, &arg) == 0);
Reset({
@@ -167,7 +155,7 @@ Main({
});
Convey("Notification works", {
- arg.did = 0;
+ arg.did = 0;
arg.when = 10000;
nni_thr_run(&thr);
@@ -178,10 +166,10 @@ Main({
nni_mtx_unlock(&arg.mx);
nni_thr_wait(&thr);
So(arg.did == 1);
- })
+ });
Convey("Timeout works", {
- arg.did = 0;
+ arg.did = 0;
arg.when = 200000;
nni_thr_run(&thr);
nni_mtx_lock(&arg.mx);
@@ -191,10 +179,9 @@ Main({
So(arg.did == 0);
nni_mtx_unlock(&arg.mx);
nni_thr_wait(&thr);
- })
-
+ });
Convey("Not running works", {
- arg.did = 0;
+ arg.did = 0;
arg.when = 1;
nni_mtx_lock(&arg.mx);
if (!arg.did) {
@@ -202,8 +189,6 @@ Main({
}
So(arg.did == 0);
nni_mtx_unlock(&arg.mx);
- })
- })
- })
- nni_fini();
+ });
+ });
})
diff --git a/tests/synch.c b/tests/synch.c
new file mode 100644
index 00000000..a49e27e1
--- /dev/null
+++ b/tests/synch.c
@@ -0,0 +1,292 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "core/nng_impl.h"
+#include "nng.h"
+
+// Notify tests for verifying condvars.
+struct notifyarg {
+ int did;
+ int when;
+ nni_mtx mx;
+ nni_cv cv;
+};
+
+#ifdef PLATFORM_POSIX
+#ifndef NDEBUG
+#define SYNC_FALLBACK 1
+#endif
+#endif
+
+void
+notifyafter(void *arg)
+{
+ struct notifyarg *na = arg;
+
+ nni_usleep(na->when);
+ nni_mtx_lock(&na->mx);
+ na->did = 1;
+ nni_cv_wake(&na->cv);
+ nni_mtx_unlock(&na->mx);
+}
+
+struct notifyarg arg;
+nni_thr thr;
+
+static void
+test_sync(void)
+{
+ Convey("Mutexes work", {
+ nni_mtx mx;
+
+ nni_mtx_init(&mx);
+
+ Convey("We can lock a mutex", {
+ nni_mtx_lock(&mx);
+ So(1);
+ Convey("And we can unlock it", {
+ nni_mtx_unlock(&mx);
+ So(1);
+ Convey("And then lock it again", {
+ nni_mtx_lock(&mx);
+ So(1);
+ nni_mtx_unlock(&mx);
+ So(1);
+ });
+ });
+ Convey("Things block properly", {
+
+ nni_mtx_init(&arg.mx);
+ nni_cv_init(&arg.cv, &arg.mx);
+ So(nni_thr_init(&thr, notifyafter, &arg) == 0);
+ arg.did = 0;
+ arg.when = 0;
+ nni_mtx_lock(&arg.mx);
+ nni_thr_run(&thr);
+ nng_usleep(10000);
+ So(arg.did == 0);
+ nni_mtx_unlock(&arg.mx);
+ nng_usleep(10000);
+ nni_mtx_lock(&arg.mx);
+ while (!arg.did) {
+ nni_cv_wait(&arg.cv);
+ }
+ So(arg.did != 0);
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_fini(&thr);
+ nni_cv_fini(&arg.cv);
+ nni_mtx_fini(&arg.mx);
+ })
+ });
+ Convey("We can finalize it", { nni_mtx_fini(&mx); });
+ });
+
+ Convey("Condition variables work", {
+
+ nni_mtx_init(&arg.mx);
+ nni_cv_init(&arg.cv, &arg.mx);
+ So(nni_thr_init(&thr, notifyafter, &arg) == 0);
+
+ Reset({
+ nni_cv_fini(&arg.cv);
+ nni_mtx_fini(&arg.mx);
+ nni_thr_fini(&thr);
+ });
+
+ Convey("Notification works", {
+ arg.did = 0;
+ arg.when = 10000;
+ nni_thr_run(&thr);
+
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_wait(&arg.cv);
+ }
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_wait(&thr);
+ So(arg.did == 1);
+ });
+
+ Convey("Timeout works", {
+ arg.did = 0;
+ arg.when = 200000;
+ nni_thr_run(&thr);
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_until(&arg.cv, nni_clock() + 10000);
+ }
+ So(arg.did == 0);
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_wait(&thr);
+ });
+
+ Convey("Empty timeout is EAGAIN", {
+ nni_mtx_lock(&arg.mx);
+ So(nni_cv_until(&arg.cv, 0) == NNG_EAGAIN);
+ nni_mtx_unlock(&arg.mx);
+ });
+
+ Convey("Not running works", {
+ arg.did = 0;
+ arg.when = 1;
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_until(&arg.cv, nni_clock() + 10000);
+ }
+ So(arg.did == 0);
+ nni_mtx_unlock(&arg.mx);
+ });
+ });
+}
+
+#if SYNC_FALLBACK
+extern int nni_plat_sync_fallback;
+
+#define ConveyFB(x, y) Convey(x, y)
+
+static void
+test_sync_fallback(void)
+{
+ nni_plat_sync_fallback = 1;
+ Convey("Mutexes work", {
+ nni_mtx mx;
+ int rv;
+
+ nni_mtx_init(&mx);
+
+ Convey("We can lock a mutex", {
+ nni_mtx_lock(&mx);
+ So(1);
+ Convey("And we can unlock it", {
+ nni_mtx_unlock(&mx);
+ So(1);
+ Convey("And then lock it again", {
+ nni_mtx_lock(&mx);
+ So(1);
+ nni_mtx_unlock(&mx);
+ So(1);
+ });
+ });
+ Convey("Things block properly", {
+
+ nni_mtx_init(&arg.mx);
+ nni_cv_init(&arg.cv, &arg.mx);
+ So(nni_thr_init(&thr, notifyafter, &arg) == 0);
+ arg.did = 0;
+ arg.when = 0;
+ nni_mtx_lock(&arg.mx);
+ nni_thr_run(&thr);
+ nng_usleep(10000);
+ So(arg.did == 0);
+ nni_mtx_unlock(&arg.mx);
+ nng_usleep(10000);
+ nni_mtx_lock(&arg.mx);
+ while (!arg.did) {
+ nni_cv_wait(&arg.cv);
+ }
+ So(arg.did != 0);
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_fini(&thr);
+ nni_cv_fini(&arg.cv);
+ nni_mtx_fini(&arg.mx);
+ })
+ });
+ Convey("We can finalize it", { nni_mtx_fini(&mx); });
+ });
+
+ Convey("Condition variables work", {
+
+ nni_mtx_init(&arg.mx);
+ nni_cv_init(&arg.cv, &arg.mx);
+ So(nni_thr_init(&thr, notifyafter, &arg) == 0);
+
+ Reset({
+ nni_cv_fini(&arg.cv);
+ nni_mtx_fini(&arg.mx);
+ nni_thr_fini(&thr);
+ });
+
+ Convey("Notification works", {
+ arg.did = 0;
+ arg.when = 10000;
+ nni_thr_run(&thr);
+
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_wait(&arg.cv);
+ }
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_wait(&thr);
+ So(arg.did == 1);
+ });
+
+ Convey("Timeout works", {
+ arg.did = 0;
+ arg.when = 200000;
+ nni_thr_run(&thr);
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_until(&arg.cv, nni_clock() + 10000);
+ }
+ So(arg.did == 0);
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_wait(&thr);
+ });
+
+ Convey("Empty timeout is EAGAIN", {
+ nni_mtx_lock(&arg.mx);
+ So(nni_cv_until(&arg.cv, 0) == NNG_EAGAIN);
+ nni_mtx_unlock(&arg.mx);
+ });
+
+ Convey("Not running works", {
+ arg.did = 0;
+ arg.when = 1;
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_until(&arg.cv, nni_clock() + 10000);
+ }
+ So(arg.did == 0);
+ nni_mtx_unlock(&arg.mx);
+ });
+ });
+}
+#else
+#define ConveyFB(x, y)
+#endif
+
+TestMain("Synchronization", {
+ nni_init();
+
+ Convey("Synchronization works", { test_sync(); });
+
+ ConveyFB("Fallback synchronization works", { test_sync_fallback(); });
+
+ ConveyFB("Transform works", {
+ nni_plat_sync_fallback = 0;
+ nni_mtx_init(&arg.mx);
+ nni_plat_sync_fallback = 1;
+ nni_cv_init(&arg.cv, &arg.mx);
+ So(nni_thr_init(&thr, notifyafter, &arg) == 0);
+
+ arg.did = 0;
+ arg.when = 10000;
+ nni_thr_run(&thr);
+
+ nni_mtx_lock(&arg.mx);
+ if (!arg.did) {
+ nni_cv_wait(&arg.cv);
+ }
+ nni_mtx_unlock(&arg.mx);
+ nni_thr_wait(&thr);
+ So(arg.did == 1);
+ });
+})