aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt34
-rw-r--r--src/core/aio.c216
-rw-r--r--src/core/aio.h15
-rw-r--r--src/core/device.c7
-rw-r--r--src/core/endpt.c13
-rw-r--r--src/core/msgqueue.c4
-rw-r--r--src/core/platform.h7
-rw-r--r--src/core/taskq.c263
-rw-r--r--src/core/taskq.h20
-rw-r--r--src/core/thread.c9
-rw-r--r--src/core/thread.h3
-rw-r--r--src/platform/posix/posix_aio.h3
-rw-r--r--src/platform/posix/posix_epdesc.c476
-rw-r--r--src/platform/posix/posix_pipedesc.c212
-rw-r--r--src/platform/posix/posix_pollq.h38
-rw-r--r--src/platform/posix/posix_pollq_epoll.c445
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c432
-rw-r--r--src/platform/posix/posix_pollq_poll.c532
-rw-r--r--src/platform/posix/posix_pollq_port.c288
-rw-r--r--src/platform/posix/posix_resolv_gai.c136
-rw-r--r--src/platform/posix/posix_thread.c6
-rw-r--r--src/platform/posix/posix_udp.c87
-rw-r--r--src/platform/windows/win_impl.h1
-rw-r--r--src/platform/windows/win_iocp.c7
-rw-r--r--src/platform/windows/win_ipc.c7
-rw-r--r--src/platform/windows/win_resolv.c138
-rw-r--r--src/platform/windows/win_thread.c7
-rw-r--r--src/protocol/reqrep0/rep.c26
-rw-r--r--src/protocol/reqrep0/req.c4
-rw-r--r--src/protocol/survey0/respond.c5
-rw-r--r--src/supplemental/http/http_client.c7
-rw-r--r--src/supplemental/http/http_conn.c15
-rw-r--r--src/supplemental/tls/mbedtls/tls.c17
-rw-r--r--src/supplemental/websocket/websocket.c34
-rw-r--r--src/transport/inproc/inproc.c14
-rw-r--r--src/transport/ipc/ipc.c36
-rw-r--r--src/transport/tcp/tcp.c36
-rw-r--r--src/transport/tls/tls.c34
-rw-r--r--src/transport/ws/websocket.c29
-rw-r--r--src/transport/zerotier/zerotier.c68
40 files changed, 1995 insertions, 1736 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 96747bc3..2db46c60 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -104,24 +104,24 @@ if (NNG_PLATFORM_POSIX)
platform/posix/posix_thread.c
platform/posix/posix_udp.c
)
-endif()
-if (NNG_HAVE_PORT_CREATE)
- set (NNG_SOURCES ${NNG_SOURCES}
- platform/posix/posix_pollq_port.c
- )
-elseif (NNG_HAVE_KQUEUE)
- set (NNG_SOURCES ${NNG_SOURCES}
- platform/posix/posix_pollq_kqueue.c
- )
-elseif (NNG_HAVE_EPOLL)
- set (NNG_SOURCES ${NNG_SOURCES}
- platform/posix/posix_pollq_epoll.c
- )
-else()
- set (NNG_SOURCES ${NNG_SOURCES}
- platform/posix/posix_pollq_poll.c
- )
+ if (NNG_HAVE_PORT_CREATE)
+ set (NNG_SOURCES ${NNG_SOURCES}
+ platform/posix/posix_pollq_port.c
+ )
+ elseif (NNG_HAVE_KQUEUE)
+ set (NNG_SOURCES ${NNG_SOURCES}
+ platform/posix/posix_pollq_kqueue.c
+ )
+ elseif (NNG_HAVE_EPOLL)
+ set (NNG_SOURCES ${NNG_SOURCES}
+ platform/posix/posix_pollq_epoll.c
+ )
+ else()
+ set (NNG_SOURCES ${NNG_SOURCES}
+ platform/posix/posix_pollq_poll.c
+ )
+ endif()
endif()
if (NNG_PLATFORM_WINDOWS)
diff --git a/src/core/aio.c b/src/core/aio.c
index 3606aa14..a5b6c088 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_run;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_aios;
+static nni_aio *nni_aio_expire_aio;
// Design notes.
//
@@ -62,16 +63,11 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
- nni_cv a_cv;
- bool a_fini : 1; // shutting down (no new operations)
- bool a_done : 1; // operation has completed
- bool a_pend : 1; // completion routine pending
- bool a_active : 1; // aio was started
- bool a_expiring : 1; // expiration callback in progress
- bool a_waiting : 1; // a thread is waiting for this to finish
- bool a_synch : 1; // run completion synchronously
- bool a_sleep : 1; // sleeping with no action
- nni_task a_task;
+ bool a_stop; // shutting down (no new operations)
+ bool a_sleep; // sleeping with no action
+ int a_sleeprv; // result when sleep wakes
+ int a_cancelrv; // if canceled between begin and schedule
+ nni_task *a_task;
// Read/write operations.
nni_iov *a_iov;
@@ -109,12 +105,16 @@ int
nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
nni_aio *aio;
+ int rv;
if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
return (NNG_ENOMEM);
}
memset(aio, 0, sizeof(*aio));
- nni_cv_init(&aio->a_cv, &nni_aio_lk);
+ if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) {
+ NNI_FREE_STRUCT(aio);
+ return (rv);
+ }
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
aio->a_iov = aio->a_iovinl;
@@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
if (arg == NULL) {
arg = aio;
}
- nni_task_init(NULL, &aio->a_task, cb, arg);
*aiop = aio;
return (0);
}
@@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio)
if (aio != NULL) {
nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // Wait for the aio to be "done"; this ensures that we don't
+ // destroy an aio from a "normal" completion callback while
+ // the expiration thread is working.
+
+ nni_mtx_lock(&nni_aio_lk);
+ while (nni_aio_expire_aio == aio) {
+ nni_cv_wait(&nni_aio_expire_cv);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_fini(aio->a_task);
+
+ // At this point the AIO is done.
if (aio->a_niovalloc > 0) {
NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc);
}
@@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_mtx_lock(&nni_aio_lk);
- aio->a_fini = true;
+ aio->a_stop = true;
nni_mtx_unlock(&nni_aio_lk);
nni_aio_abort(aio, NNG_ECANCELED);
@@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio)
void
nni_aio_wait(nni_aio *aio)
{
- nni_mtx_lock(&nni_aio_lk);
- // Wait until we're done, and the synchronous completion flag
- // is cleared (meaning any synch completion is finished).
- while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) {
- aio->a_waiting = true;
- nni_cv_wait(&aio->a_cv);
- }
- nni_mtx_unlock(&nni_aio_lk);
- nni_task_wait(&aio->a_task);
+ nni_task_wait(aio->a_task);
}
int
@@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio)
{
nni_mtx_lock(&nni_aio_lk);
// We should not reschedule anything at this point.
- if (aio->a_fini) {
- aio->a_active = false;
+ if (aio->a_stop) {
+ nni_task_unprep(aio->a_task);
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- aio->a_done = false;
- aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- aio->a_active = true;
+ aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_task_prep(aio->a_task);
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
-void
+int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
+ int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = nni_clock();
- break;
+ return (NNG_ETIMEDOUT);
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
@@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
}
nni_mtx_lock(&nni_aio_lk);
+ if (aio->a_stop) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (NNG_ECANCELED);
+ }
+ if ((rv = aio->a_cancelrv) != 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return (rv);
+ }
+
+ // If cancellation occurred in between "begin" and "schedule",
+ // then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- if (aio->a_expire != NNI_TIME_NEVER) {
+ if ((rv = aio->a_cancelrv) != 0) {
+ aio->a_expire = 0;
+ nni_aio_expire_add(aio);
+ } else if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
-}
-
-int
-nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
-{
-
- if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
- return (NNG_ETIMEDOUT);
- }
- nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -379,11 +383,8 @@ nni_aio_finish_impl(
{
nni_mtx_lock(&nni_aio_lk);
- NNI_ASSERT(!aio->a_pend); // provider only calls us *once*
-
nni_list_node_remove(&aio->a_expire_node);
- aio->a_pend = true;
aio->a_result = rv;
aio->a_count = count;
aio->a_prov_cancel = NULL;
@@ -393,38 +394,13 @@ nni_aio_finish_impl(
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
-
- // If we are expiring, then we rely on the expiration thread to
- // complete this; we must not because the expiration thread is
- // still holding the reference.
-
- if (aio->a_expiring) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
- aio->a_done = true;
- aio->a_synch = synch;
+ nni_mtx_unlock(&nni_aio_lk);
if (synch) {
- if (aio->a_task.task_cb != NULL) {
- nni_mtx_unlock(&nni_aio_lk);
- aio->a_task.task_cb(aio->a_task.task_arg);
- nni_mtx_lock(&nni_aio_lk);
- }
+ nni_task_exec(aio->a_task);
} else {
- nni_task_dispatch(&aio->a_task);
+ nni_task_dispatch(aio->a_task);
}
- aio->a_synch = false;
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
- }
-
- // This has to be done with the lock still held, in order
- // to prevent taskq wait from returning prematurely.
- nni_mtx_unlock(&nni_aio_lk);
}
void
@@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio)
static void
nni_aio_expire_loop(void *arg)
{
- nni_list * aios = &nni_aio_expire_aios;
- nni_aio * aio;
- nni_time now;
- nni_aio_cancelfn cancelfn;
- int rv;
+ nni_list *aios = &nni_aio_expire_aios;
NNI_ARG_UNUSED(arg);
for (;;) {
+ nni_aio_cancelfn cancelfn;
+ nni_time now;
+ nni_aio * aio;
+ int rv;
+
now = nni_clock();
nni_mtx_lock(&nni_aio_lk);
- if (nni_aio_expire_run == 0) {
- nni_mtx_unlock(&nni_aio_lk);
- return;
- }
-
if ((aio = nni_list_first(aios)) == NULL) {
+
+ if (nni_aio_expire_run == 0) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
nni_cv_wait(&nni_aio_expire_cv);
nni_mtx_unlock(&nni_aio_lk);
continue;
@@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg)
// This aio's time has come. Expire it, canceling any
// outstanding I/O.
nni_list_remove(aios, aio);
+ rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT;
+
+ if ((cancelfn = aio->a_prov_cancel) != NULL) {
+
+ // Place a temporary hold on the aio. This prevents it
+ // from being destroyed.
+ nni_aio_expire_aio = aio;
- // Mark it as expiring. This acts as a hold on
- // the aio, similar to the consumers. The actual taskq
- // dispatch on completion won't occur until this is cleared,
- // and the done flag won't be set either.
- aio->a_expiring = true;
- cancelfn = aio->a_prov_cancel;
- rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT;
- aio->a_sleep = false;
-
- // Cancel any outstanding activity. This is always non-NULL
- // for a valid aio, and becomes NULL only when an AIO is
- // already being canceled or finished.
- if (cancelfn != NULL) {
+ // We let the cancel function handle the completion.
+ // If there is no cancellation function, then we cannot
+ // terminate the aio - we've tried, but it has to run
+ // to it's natural conclusion.
nni_mtx_unlock(&nni_aio_lk);
cancelfn(aio, rv);
nni_mtx_lock(&nni_aio_lk);
- } else {
- aio->a_pend = true;
- aio->a_result = rv;
- }
-
- NNI_ASSERT(aio->a_pend); // nni_aio_finish was run
- NNI_ASSERT(aio->a_prov_cancel == NULL);
- aio->a_expiring = false;
- aio->a_done = true;
- nni_task_dispatch(&aio->a_task);
-
- if (aio->a_waiting) {
- aio->a_waiting = false;
- nni_cv_wake(&aio->a_cv);
+ nni_aio_expire_aio = NULL;
+ nni_cv_wake(&nni_aio_expire_cv);
}
nni_mtx_unlock(&nni_aio_lk);
}
@@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
return (resid); // we might not have used all of n for this iov
}
+static void
+nni_sleep_cancel(nng_aio *aio, int rv)
+{
+ nni_mtx_lock(&nni_aio_lk);
+ if (!aio->a_sleep) {
+ nni_mtx_unlock(&nni_aio_lk);
+ return;
+ }
+
+ aio->a_sleep = false;
+ nni_list_node_remove(&aio->a_expire_node);
+ nni_mtx_unlock(&nni_aio_lk);
+
+ nni_aio_finish_error(aio, rv);
+}
+
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
+ aio->a_sleeprv = 0;
+ aio->a_sleep = true;
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// If the timeout on the aio is shorter than our sleep time,
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
- aio->a_sleep = false;
- (void) nni_aio_schedule(aio, NULL, NULL);
- return;
+ aio->a_sleeprv = NNG_ETIMEDOUT;
+ ms = aio->a_timeout;
}
}
- aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- // There is no cancellation, apart from just unexpiring.
- nni_aio_schedule(aio, NULL, NULL);
+ if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ }
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index 9b7ac46f..2ed0fb5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -146,14 +146,13 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
// asychronous completion. This also starts the expiration timer. Note that
-// prior to this, the aio is uncancellable.
-extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
-
-// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
-// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
-// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
-// if the operation could not be immediately completed.
-extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+// prior to this, the aio is uncancellable. If the operation has a zero
+// timeout (NNG_FLAG_NONBLOCK) then NNG_ETIMEDOUT is returned. If the
+// operation has already been canceled, or should not be run, then an error
+// is returned. (In that case the caller should probably either return an
+// error to its caller, or possibly cause an asynchronous error by calling
+// nni_aio_finish_error on this aio.)
+extern int nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
extern void nni_sleep_aio(nni_duration, nni_aio *);
diff --git a/src/core/device.c b/src/core/device.c
index e3b1d220..1f3bf233 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -187,12 +187,17 @@ void
nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
+ int rv;
if (nni_aio_begin(user) != 0) {
return;
}
nni_mtx_lock(&dd->mtx);
- nni_aio_schedule(user, nni_device_cancel, dd);
+ if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) {
+ nni_mtx_unlock(&dd->mtx);
+ nni_aio_finish_error(user, rv);
+ return;
+ }
dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 2741a8e6..7593fb42 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -27,7 +27,7 @@ struct nni_ep {
int ep_closed; // full shutdown
int ep_closing; // close pending (waiting on refcnt)
int ep_refcnt;
- int ep_tmo_run;
+ bool ep_tmo_run;
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
@@ -303,7 +303,7 @@ nni_ep_tmo_cancel(nni_aio *aio, int rv)
if (ep->ep_tmo_run) {
nni_aio_finish_error(aio, rv);
}
- ep->ep_tmo_run = 0;
+ ep->ep_tmo_run = false;
nni_mtx_unlock(&ep->ep_mtx);
}
}
@@ -312,6 +312,7 @@ static void
nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
+ int rv;
if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
@@ -333,8 +334,12 @@ nni_ep_tmo_start(nni_ep *ep)
nni_aio_set_timeout(
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
- ep->ep_tmo_run = 1;
- nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
+ if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) !=
+ 0) {
+ nni_aio_finish_error(ep->ep_tmo_aio, rv);
+ }
+
+ ep->ep_tmo_run = true;
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 2367b57f..fa94e32f 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -345,7 +345,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
// If this is an instantaneous poll operation, and the queue has
// no room, nobody is waiting to receive, then report NNG_ETIMEDOUT.
- rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
(nni_list_empty(&mq->mq_aio_getq))) {
nni_mtx_unlock(&mq->mq_lock);
@@ -373,7 +373,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ rv = nni_aio_schedule(aio, nni_msgq_cancel, mq);
if ((rv != 0) && (mq->mq_len == 0) &&
(nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
diff --git a/src/core/platform.h b/src/core/platform.h
index 6e7acdbf..bdc74349 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -136,11 +136,16 @@ extern int nni_plat_cv_until(nni_plat_cv *, nni_time);
// immediately.
extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *);
-// nni_thread_reap waits for the thread to exit, and then releases any
+// nni_plat_thr_fini waits for the thread to exit, and then releases any
// resources associated with the thread. After this returns, it
// is an error to reference the thread in any further way.
extern void nni_plat_thr_fini(nni_plat_thr *);
+// nni_plat_thr_is_self returns true if the caller is the thread
+// identified, and false otherwise. (This allows some deadlock
+// prevention in callbacks, for example.)
+extern bool nni_plat_thr_is_self(nni_plat_thr *);
+
//
// Clock Support
//
diff --git a/src/core/taskq.c b/src/core/taskq.c
index b0fe160b..526fa0b4 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -11,11 +11,22 @@
#include "core/nng_impl.h"
typedef struct nni_taskq_thr nni_taskq_thr;
+struct nni_task {
+ nni_list_node task_node;
+ void * task_arg;
+ nni_cb task_cb;
+ nni_taskq * task_tq;
+ bool task_sched;
+ bool task_run;
+ bool task_done;
+ bool task_exec;
+ bool task_fini;
+ nni_mtx task_mtx;
+ nni_cv task_cv;
+};
struct nni_taskq_thr {
nni_taskq *tqt_tq;
nni_thr tqt_thread;
- nni_task * tqt_running;
- int tqt_wait;
};
struct nni_taskq {
nni_list tq_tasks;
@@ -24,8 +35,7 @@ struct nni_taskq {
nni_cv tq_wait_cv;
nni_taskq_thr *tq_threads;
int tq_nthreads;
- int tq_run;
- int tq_waiting;
+ bool tq_run;
};
static nni_taskq *nni_taskq_systq = NULL;
@@ -40,25 +50,37 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
if ((task = nni_list_first(&tq->tq_tasks)) != NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_run = true;
+ task->task_sched = false;
+ nni_mtx_unlock(&task->task_mtx);
nni_list_remove(&tq->tq_tasks, task);
- thr->tqt_running = task;
nni_mtx_unlock(&tq->tq_mtx);
+
task->task_cb(task->task_arg);
- nni_mtx_lock(&tq->tq_mtx);
- thr->tqt_running = NULL;
- if (thr->tqt_wait || tq->tq_waiting) {
- thr->tqt_wait = 0;
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_sched || task->task_exec) {
+ // task resubmitted itself most likely.
+ // We cannot touch the rest of the flags,
+ // since the called function has taken control.
+ nni_mtx_unlock(&task->task_mtx);
+ } else {
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
+ }
+ }
+ nni_mtx_lock(&tq->tq_mtx);
continue;
}
- if (tq->tq_waiting) {
- tq->tq_waiting = 0;
- nni_cv_wake(&tq->tq_wait_cv);
- }
if (!tq->tq_run) {
break;
}
@@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
for (int i = 0; i < nthr; i++) {
int rv;
- tq->tq_threads[i].tqt_tq = tq;
- tq->tq_threads[i].tqt_running = NULL;
+ tq->tq_threads[i].tqt_tq = tq;
rv = nni_thr_init(&tq->tq_threads[i].tqt_thread,
nni_taskq_thread, &tq->tq_threads[i]);
if (rv != 0) {
@@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (rv);
}
}
- tq->tq_run = 1;
+ tq->tq_run = true;
for (int i = 0; i < tq->tq_nthreads; i++) {
nni_thr_run(&tq->tq_threads[i].tqt_thread);
}
@@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
return (0);
}
-static void
-nni_taskq_drain_locked(nni_taskq *tq)
-{
- // We need to first let the taskq completely drain.
- for (;;) {
- int busy = 0;
- if (!nni_list_empty(&tq->tq_tasks)) {
- busy = 1;
- } else {
- int i;
- for (i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running != 0) {
- busy = 1;
- break;
- }
- }
- }
- if (!busy) {
- break;
- }
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
- }
-}
-
-void
-nni_taskq_drain(nni_taskq *tq)
-{
- nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
- nni_mtx_unlock(&tq->tq_mtx);
-}
-
void
nni_taskq_fini(nni_taskq *tq)
{
- // First drain the taskq completely. This is necessary since some
- // tasks that are presently running may need to schedule additional
- // tasks, and we don't want those to block.
if (tq == NULL) {
return;
}
if (tq->tq_run) {
nni_mtx_lock(&tq->tq_mtx);
- nni_taskq_drain_locked(tq);
-
- tq->tq_run = 0;
+ tq->tq_run = false;
nni_cv_wake(&tq->tq_sched_cv);
nni_mtx_unlock(&tq->tq_mtx);
}
@@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task)
// If there is no callback to perform, then do nothing!
// The user will be none the wiser.
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
nni_mtx_lock(&tq->tq_mtx);
- // It might already be scheduled... if so don't redo it.
- if (!nni_list_active(&tq->tq_tasks, task)) {
- nni_list_append(&tq->tq_tasks, task);
- }
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_run = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
+
+ nni_list_append(&tq->tq_tasks, task);
nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate
nni_mtx_unlock(&tq->tq_mtx);
}
void
-nni_task_wait(nni_task *task)
+nni_task_exec(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
-
if (task->task_cb == NULL) {
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_run = false;
+ task->task_exec = false;
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
return;
}
- nni_mtx_lock(&tq->tq_mtx);
- for (;;) {
- bool running = false;
- if (nni_list_active(&tq->tq_tasks, task)) {
- running = true;
- } else {
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
- }
- if (!running) {
- break;
- }
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_exec) {
+ // recursive taskq_exec, run it asynchronously
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_dispatch(task);
+ return;
+ }
+ task->task_exec = true;
+ task->task_sched = false;
+ task->task_done = false;
+ nni_mtx_unlock(&task->task_mtx);
- tq->tq_waiting = 1;
- nni_cv_wait(&tq->tq_wait_cv);
+ task->task_cb(task->task_arg);
+
+ nni_mtx_lock(&task->task_mtx);
+ task->task_exec = false;
+ if (task->task_sched || task->task_run) {
+ // cb scheduled a task
+ nni_mtx_unlock(&task->task_mtx);
+ return;
+ }
+ task->task_done = true;
+ nni_cv_wake(&task->task_cv);
+ if (task->task_fini) {
+ task->task_fini = false;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_task_fini(task);
+ } else {
+ nni_mtx_unlock(&task->task_mtx);
}
- nni_mtx_unlock(&tq->tq_mtx);
}
-int
-nni_task_cancel(nni_task *task)
+void
+nni_task_prep(nni_task *task)
{
- nni_taskq *tq = task->task_tq;
- bool running;
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = true;
+ task->task_done = false;
+ task->task_run = false;
+ nni_mtx_unlock(&task->task_mtx);
+}
- nni_mtx_lock(&tq->tq_mtx);
- running = true;
- for (;;) {
- running = false;
- for (int i = 0; i < tq->tq_nthreads; i++) {
- if (tq->tq_threads[i].tqt_running == task) {
- running = true;
- break;
- }
- }
+void
+nni_task_unprep(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ nni_cv_wake(&task->task_cv);
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (!running) {
- break;
- }
- // tq->tq_threads[i].tqt_wait = 1;
- tq->tq_waiting++;
- nni_cv_wait(&tq->tq_wait_cv);
+void
+nni_task_wait(nni_task *task)
+{
+ nni_mtx_lock(&task->task_mtx);
+ while ((task->task_sched || task->task_run || task->task_exec) &&
+ (!task->task_done)) {
+ nni_cv_wait(&task->task_cv);
}
+ nni_mtx_unlock(&task->task_mtx);
+}
- if (nni_list_active(&tq->tq_tasks, task)) {
- nni_list_remove(&tq->tq_tasks, task);
+int
+nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg)
+{
+ nni_task *task;
+
+ if ((task = NNI_ALLOC_STRUCT(task)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_unlock(&tq->tq_mtx);
+ NNI_LIST_NODE_INIT(&task->task_node);
+ nni_mtx_init(&task->task_mtx);
+ nni_cv_init(&task->task_cv, &task->task_mtx);
+ task->task_sched = false;
+ task->task_done = false;
+ task->task_run = false;
+ task->task_sched = false;
+ task->task_exec = false;
+ task->task_cb = cb;
+ task->task_arg = arg;
+ task->task_tq = tq != NULL ? tq : nni_taskq_systq;
+ *taskp = task;
return (0);
}
void
-nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg)
+nni_task_fini(nni_task *task)
{
- if (tq == NULL) {
- tq = nni_taskq_systq;
+ NNI_ASSERT(!nni_list_node_active(&task->task_node));
+ nni_mtx_lock(&task->task_mtx);
+ if (task->task_run || task->task_exec) {
+ // destroy later.
+ task->task_fini = true;
+ nni_mtx_unlock(&task->task_mtx);
+ return;
}
- NNI_LIST_NODE_INIT(&task->task_node);
- task->task_cb = cb;
- task->task_arg = arg;
- task->task_tq = tq;
+ nni_mtx_unlock(&task->task_mtx);
+ nni_cv_fini(&task->task_cv);
+ nni_mtx_fini(&task->task_mtx);
+ NNI_FREE_STRUCT(task);
}
int
diff --git a/src/core/taskq.h b/src/core/taskq.h
index 40b5dc00..513b15bb 100644
--- a/src/core/taskq.h
+++ b/src/core/taskq.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,28 +17,22 @@
typedef struct nni_taskq nni_taskq;
typedef struct nni_task nni_task;
-// nni_task is a structure representing a task. Its intended to inlined
-// into structures so that taskq_dispatch can be a guaranteed operation.
-struct nni_task {
- nni_list_node task_node;
- void * task_arg;
- nni_cb task_cb;
- nni_taskq * task_tq;
-};
-
extern int nni_taskq_init(nni_taskq **, int);
extern void nni_taskq_fini(nni_taskq *);
-extern void nni_taskq_drain(nni_taskq *);
// nni_task_dispatch sends the task to the queue. It is guaranteed to
// succeed. (If the queue is shutdown, then the behavior is undefined.)
extern void nni_task_dispatch(nni_task *);
+extern void nni_task_exec(nni_task *);
+extern void nni_task_prep(nni_task *);
+extern void nni_task_unprep(nni_task *);
// nni_task_cancel cancels the task. It will wait for the task to complete
// if it is already running.
extern int nni_task_cancel(nni_task *);
extern void nni_task_wait(nni_task *);
-extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *);
+extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *);
+extern void nni_task_fini(nni_task *);
extern int nni_taskq_sys_init(void);
extern void nni_taskq_sys_fini(void);
diff --git a/src/core/thread.c b/src/core/thread.c
index 54c9c7d2..adc35542 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -173,3 +173,12 @@ nni_thr_fini(nni_thr *thr)
nni_plat_mtx_fini(&thr->mtx);
thr->init = 0;
}
+
+bool
+nni_thr_is_self(nni_thr *thr)
+{
+ if (!thr->init) {
+ return (false);
+ }
+ return (nni_plat_thr_is_self(&thr->thr));
+}
diff --git a/src/core/thread.h b/src/core/thread.h
index ee83b196..c3d5531e 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -82,4 +82,7 @@ extern void nni_thr_run(nni_thr *thr);
// at all.
extern void nni_thr_wait(nni_thr *thr);
+// nni_thr_is_self returns true if the caller is the named thread.
+extern bool nni_thr_is_self(nni_thr *thr);
+
#endif // CORE_THREAD_H
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 15d91db2..08ae99ca 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -18,11 +18,12 @@
// one of several possible different backends.
#include "core/nng_impl.h"
+#include "posix_pollq.h"
typedef struct nni_posix_pipedesc nni_posix_pipedesc;
typedef struct nni_posix_epdesc nni_posix_epdesc;
-extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
+extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, nni_posix_pfd *);
extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *);
extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *);
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 7431dedf..0065806d 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -39,7 +39,7 @@
#endif
struct nni_posix_epdesc {
- nni_posix_pollq_node node;
+ nni_posix_pfd * pfd;
nni_list connectq;
nni_list acceptq;
bool closed;
@@ -53,10 +53,14 @@ struct nni_posix_epdesc {
nni_mtx mtx;
};
+static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *);
+static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *);
+
static void
-nni_posix_epdesc_cancel(nni_aio *aio, int rv)
+nni_epdesc_cancel(nni_aio *aio, int rv)
{
- nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
+ nni_posix_epdesc *ed = nni_aio_get_prov_data(aio);
+ nni_posix_pfd * pfd = NULL;
NNI_ASSERT(rv != 0);
nni_mtx_lock(&ed->mtx);
@@ -64,200 +68,136 @@ nni_posix_epdesc_cancel(nni_aio *aio, int rv)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
+ if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) &&
+ ((pfd = ed->pfd) != NULL)) {
+ nni_posix_pfd_close(pfd);
+ }
nni_mtx_unlock(&ed->mtx);
}
static void
-nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
+nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd)
{
nni_posix_pipedesc *pd = NULL;
// acceptq or connectq.
nni_aio_list_remove(aio);
- if (rv == 0) {
- if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) {
- (void) close(newfd);
- }
- }
if (rv != 0) {
+ NNI_ASSERT(newpfd == NULL);
nni_aio_finish_error(aio, rv);
- } else {
- nni_aio_set_output(aio, 0, pd);
- nni_aio_finish(aio, 0, 0);
+ return;
}
-}
-
-static void
-nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
-{
- nni_aio * aio;
- socklen_t sz;
- int rv;
-
- // Note that normally there will only be a single connect AIO...
- // A socket that is here will have *initiated* with a connect()
- // call, which returned EINPROGRESS. When the connection attempt
- // is done, either way, the descriptor will be noted as writable.
- // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual
- // status of the connection attempt...
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- rv = -1;
- sz = sizeof(rv);
- if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) <
- 0) {
- rv = errno;
- }
- switch (rv) {
- case 0:
- // Success!
- nni_posix_pollq_remove(&ed->node);
- nni_posix_epdesc_finish(aio, 0, ed->node.fd);
- ed->node.fd = -1;
- continue;
-
- case EINPROGRESS:
- // Still in progress... keep trying
- return;
- default:
- if (rv == ENOENT) {
- rv = ECONNREFUSED;
- }
- nni_posix_pollq_remove(&ed->node);
- nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
- (void) close(ed->node.fd);
- ed->node.fd = -1;
- continue;
- }
+ NNI_ASSERT(newpfd != NULL);
+ if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) {
+ nni_posix_pfd_fini(newpfd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
+ nni_aio_set_output(aio, 0, pd);
+ nni_aio_finish(aio, 0, 0);
}
static void
-nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
+nni_epdesc_doaccept(nni_posix_epdesc *ed)
{
nni_aio *aio;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- int newfd;
+ int newfd;
+ int fd;
+ int rv;
+ nni_posix_pfd *pfd;
+
+ fd = nni_posix_pfd_fd(ed->pfd);
#ifdef NNG_USE_ACCEPT4
- newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC);
+ newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(ed->node.fd, NULL, NULL);
+ newfd = accept(fd, NULL, NULL);
}
#else
- newfd = accept(ed->node.fd, NULL, NULL);
+ newfd = accept(fd, NULL, NULL);
#endif
-
- if (newfd >= 0) {
- // successful connection request!
- nni_posix_epdesc_finish(aio, 0, newfd);
- continue;
- }
-
- if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) {
- // Well, let's try later. Note that EWOULDBLOCK
- // is required by standards, but some platforms may
- // use EAGAIN. The values may be the same, so we
- // can't use switch.
- return;
+ if (newfd < 0) {
+ switch (errno) {
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ rv = nni_posix_pfd_arm(ed->pfd, POLLIN);
+ if (rv != 0) {
+ nni_epdesc_finish(aio, rv, NULL);
+ continue;
+ }
+ // Come back later...
+ return;
+ case ECONNABORTED:
+ case ECONNRESET:
+ // Eat them, they aren't interesting.
+ continue;
+ default:
+ // Error this one, but keep moving to the next.
+ rv = nni_plat_errno(errno);
+ nni_epdesc_finish(aio, rv, NULL);
+ continue;
+ }
}
- if ((errno == ECONNABORTED) || (errno == ECONNRESET)) {
- // Let's just eat this one. Perhaps it may be
- // better to report it to the application, but we
- // think most applications don't want to see this.
- // Only someone with a packet trace is going to
- // notice this.
+ if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ close(newfd);
+ nni_epdesc_finish(aio, rv, NULL);
continue;
}
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
- }
-}
-
-static void
-nni_posix_epdesc_doerror(nni_posix_epdesc *ed)
-{
- nni_aio * aio;
- int rv = 1;
- socklen_t sz = sizeof(rv);
-
- if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
- rv = errno;
- }
- if (rv == 0) {
- return;
- }
- rv = nni_plat_errno(rv);
-
- while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_posix_epdesc_finish(aio, rv, 0);
- }
- while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ nni_epdesc_finish(aio, 0, pfd);
}
}
static void
-nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
+nni_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio *aio;
- int fd;
ed->closed = true;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_epdesc_finish(aio, NNG_ECLOSED, 0);
}
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_epdesc_finish(aio, NNG_ECLOSED, 0);
}
- nni_posix_pollq_remove(&ed->node);
+ if (ed->pfd != NULL) {
- if ((fd = ed->node.fd) != -1) {
+ nni_posix_pfd_close(ed->pfd);
+ }
+
+ // clean up stale UNIX socket when closing the server.
+ if ((ed->mode == NNI_EP_MODE_LISTEN) && (ed->loclen != 0) &&
+ (ed->locaddr.ss_family == AF_UNIX)) {
struct sockaddr_un *sun = (void *) &ed->locaddr;
- ed->node.fd = -1;
- (void) shutdown(fd, SHUT_RDWR);
- (void) close(fd);
- if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
- (void) unlink(sun->sun_path);
- }
+ (void) unlink(sun->sun_path);
}
}
static void
-nni_posix_epdesc_cb(void *arg)
+nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_epdesc *ed = arg;
- int events;
nni_mtx_lock(&ed->mtx);
-
- if (ed->node.revents & POLLIN) {
- nni_posix_epdesc_doaccept(ed);
- }
- if (ed->node.revents & POLLOUT) {
- nni_posix_epdesc_doconnect(ed);
- }
- if (ed->node.revents & (POLLERR | POLLHUP)) {
- nni_posix_epdesc_doerror(ed);
- }
- if (ed->node.revents & POLLNVAL) {
- nni_posix_epdesc_doclose(ed);
+ if (events & POLLNVAL) {
+ nni_epdesc_doclose(ed);
+ nni_mtx_unlock(&ed->mtx);
+ return;
}
+ NNI_ASSERT(pfd == ed->pfd);
- events = 0;
- if (!nni_list_empty(&ed->connectq)) {
- events |= POLLOUT;
- }
- if (!nni_list_empty(&ed->acceptq)) {
- events |= POLLIN;
- }
- if ((!ed->closed) && (events != 0)) {
- nni_posix_pollq_arm(&ed->node, events);
- }
+ // Anything else will turn up in accept.
+ nni_epdesc_doaccept(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -265,7 +205,7 @@ void
nni_posix_epdesc_close(nni_posix_epdesc *ed)
{
nni_mtx_lock(&ed->mtx);
- nni_posix_epdesc_doclose(ed);
+ nni_epdesc_doclose(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -276,9 +216,23 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
struct sockaddr_storage *ss;
int rv;
int fd;
+ nni_posix_pfd * pfd;
nni_mtx_lock(&ed->mtx);
+ if (ed->started) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_ESTATE);
+ }
+ if (ed->closed) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_ECLOSED);
+ }
+ if ((len = ed->loclen) == 0) {
+ nni_mtx_unlock(&ed->mtx);
+ return (NNG_EADDRINVAL);
+ }
+
ss = &ed->locaddr;
len = ed->loclen;
@@ -286,18 +240,17 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
nni_mtx_unlock(&ed->mtx);
return (nni_plat_errno(errno));
}
- (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_posix_pfd_fini(pfd);
+ return (rv);
+ }
if (bind(fd, (struct sockaddr *) ss, len) < 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
@@ -314,7 +267,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
if ((rv = chmod(sun->sun_path, perms)) != 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
}
@@ -324,27 +277,24 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
if (listen(fd, 128) != 0) {
rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
+ nni_posix_pfd_fini(pfd);
return (rv);
}
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed);
- ed->node.fd = fd;
- if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- ed->node.fd = -1;
- nni_mtx_unlock(&ed->mtx);
- return (rv);
- }
+ ed->pfd = pfd;
ed->started = true;
nni_mtx_unlock(&ed->mtx);
+
return (0);
}
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
+ int rv;
+
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
@@ -354,14 +304,25 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
}
nni_mtx_lock(&ed->mtx);
+ if (!ed->started) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
if (ed->closed) {
nni_mtx_unlock(&ed->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&ed->acceptq, aio);
- nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
- nni_posix_pollq_arm(&ed->node, POLLIN);
+ if (nni_list_first(&ed->acceptq) == aio) {
+ nni_epdesc_doaccept(ed);
+ }
nni_mtx_unlock(&ed->mtx);
}
@@ -370,79 +331,171 @@ nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = -1;
+
+ nni_mtx_lock(&ed->mtx);
+ if (ed->pfd != NULL) {
+ fd = nni_posix_pfd_fd(ed->pfd);
+ }
+ nni_mtx_unlock(&ed->mtx);
- if (getsockname(ed->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
}
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+static void
+nni_epdesc_connect_start(nni_posix_epdesc *ed)
{
- // NB: We assume that the FD is already set to nonblocking mode.
- int rv;
- int fd;
+ nni_posix_pfd *pfd;
+ int fd;
+ int rv;
+ nni_aio * aio;
- if (nni_aio_begin(aio) != 0) {
+loop:
+ if ((aio = nni_list_first(&ed->connectq)) == NULL) {
return;
}
- nni_mtx_lock(&ed->mtx);
+
+ NNI_ASSERT(ed->pfd == NULL);
+ if (ed->closed) {
+ nni_epdesc_finish(aio, NNG_ECLOSED, NULL);
+ goto loop;
+ }
+ ed->started = true;
if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- nni_aio_finish_error(aio, rv);
- return;
+ nni_epdesc_finish(aio, rv, NULL);
+ goto loop;
}
+ if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
+ (void) close(fd);
+ nni_epdesc_finish(aio, rv, NULL);
+ goto loop;
+ }
// Possibly bind.
if ((ed->loclen != 0) &&
(bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
rv = nni_plat_errno(errno);
- nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
- nni_aio_finish_error(aio, rv);
- return;
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
}
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
- ed->started = true;
- nni_posix_epdesc_finish(aio, 0, fd);
- nni_mtx_unlock(&ed->mtx);
- return;
+ nni_epdesc_finish(aio, 0, pfd);
+ goto loop;
}
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
if (errno == ENOENT) { // For UNIX domain sockets
- errno = ECONNREFUSED;
+ rv = NNG_ECONNREFUSED;
+ } else {
+ rv = nni_plat_errno(errno);
}
- rv = nni_plat_errno(errno);
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
+ }
+ nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed);
+ if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) {
+ nni_epdesc_finish(aio, rv, NULL);
+ nni_posix_pfd_fini(pfd);
+ goto loop;
+ }
+ ed->pfd = pfd;
+ // all done... wait for this to signal via callback
+}
+
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ nni_mtx_lock(&ed->mtx);
+ if (ed->closed) {
+ nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) {
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
nni_aio_finish_error(aio, rv);
return;
}
- // We have to submit to the pollq, because the connection is pending.
- ed->node.fd = fd;
ed->started = true;
- if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- ed->node.fd = -1;
+ nni_list_append(&ed->connectq, aio);
+ if (nni_list_first(&ed->connectq) == aio) {
+ // If there was a stale pfd (probably from an aborted or
+ // canceled connect attempt), discard it so we start fresh.
+ if (ed->pfd != NULL) {
+ nni_posix_pfd_fini(ed->pfd);
+ ed->pfd = NULL;
+ }
+ nni_epdesc_connect_start(ed);
+ }
+ nni_mtx_unlock(&ed->mtx);
+}
+
+static void
+nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
+{
+ nni_posix_epdesc *ed = arg;
+ nni_aio * aio;
+ socklen_t sz;
+ int rv;
+ int fd;
+
+ nni_mtx_lock(&ed->mtx);
+ if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
+ (pfd != ed->pfd)) {
+ // Spurious completion. Ignore it, but discard the PFD.
+ if (ed->pfd == pfd) {
+ ed->pfd = NULL;
+ }
+ nni_posix_pfd_fini(pfd);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
+ fd = nni_posix_pfd_fd(pfd);
+ sz = sizeof(rv);
+
+ if ((events & POLLNVAL) != 0) {
+ rv = EBADF;
+
+ } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ rv = errno;
+ }
+
+ switch (rv) {
+ case 0:
+ // Good connect!
+ ed->pfd = NULL;
+ nni_epdesc_finish(aio, 0, pfd);
+ break;
+ case EINPROGRESS: // still connecting... come back later
nni_mtx_unlock(&ed->mtx);
- (void) close(fd);
- nni_aio_finish_error(aio, rv);
return;
+ default:
+ ed->pfd = NULL;
+ nni_epdesc_finish(aio, nni_plat_errno(rv), NULL);
+ nni_posix_pfd_fini(pfd);
+ break;
}
- nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
- nni_aio_list_append(&ed->connectq, aio);
- nni_posix_pollq_arm(&ed->node, POLLOUT);
+ // Start another connect running, if any is waiting.
+ nni_epdesc_connect_start(ed);
nni_mtx_unlock(&ed->mtx);
}
@@ -450,7 +503,6 @@ int
nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
{
nni_posix_epdesc *ed;
- int rv;
if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
return (NNG_ENOMEM);
@@ -458,28 +510,14 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode)
nni_mtx_init(&ed->mtx);
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the ed to a single pollq we may get some kind of cache warmth.
-
- ed->node.index = 0;
- ed->node.cb = nni_posix_epdesc_cb;
- ed->node.data = ed;
- ed->node.fd = -1;
- ed->closed = false;
- ed->started = false;
- ed->perms = 0; // zero means use default (no change)
- ed->mode = mode;
+ ed->pfd = NULL;
+ ed->closed = false;
+ ed->started = false;
+ ed->perms = 0; // zero means use default (no change)
+ ed->mode = mode;
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
-
- if ((rv = nni_posix_pollq_init(&ed->node)) != 0) {
- nni_mtx_fini(&ed->mtx);
- NNI_FREE_STRUCT(ed);
- return (rv);
- }
*edp = ed;
return (0);
}
@@ -532,14 +570,16 @@ nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- int fd;
+ nni_posix_pfd *pfd;
+
nni_mtx_lock(&ed->mtx);
- if ((fd = ed->node.fd) != -1) {
- (void) close(ed->node.fd);
- nni_posix_epdesc_doclose(ed);
- }
+ nni_epdesc_doclose(ed);
+ pfd = ed->pfd;
nni_mtx_unlock(&ed->mtx);
- nni_posix_pollq_fini(&ed->node);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index e7225395..b11036ea 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -40,11 +40,11 @@
// file descriptor for TCP socket, etc.) This contains the list of pending
// aios for that underlying socket, as well as the socket itself.
struct nni_posix_pipedesc {
- nni_posix_pollq_node node;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
+ nni_posix_pfd *pfd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
};
static void
@@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
+ nni_posix_pfd_close(pd->pfd);
}
static void
nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
unsigned i;
@@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
hdr.msg_iovlen = niov;
hdr.msg_iov = iovec;
- n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+#ifdef EWOULDBLOCK
+#if EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+#endif
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
nni_aio_bump_count(aio, n);
// We completed the entire operation on this aioq.
+ // (Sendmsg never returns a partial result.)
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
@@ -147,6 +158,12 @@ static void
nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
+
+ fd = nni_posix_pfd_fd(pd->pfd);
+ if ((fd < 0) || (pd->closed)) {
+ return;
+ }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
unsigned i;
@@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
}
- n = readv(pd->node.fd, iovec, niov);
- if (n < 0) {
- if ((errno == EAGAIN) || (errno == EINTR)) {
- // Can't write more right now. We're done
- // on this fd for now.
+ if ((n = readv(fd, iovec, niov)) < 0) {
+ switch (errno) {
+ case EINTR:
+ continue;
+ case EAGAIN:
+ return;
+ default:
+ nni_posix_pipedesc_finish(
+ aio, nni_plat_errno(errno));
+ nni_posix_pipedesc_doclose(pd);
return;
}
- nni_posix_pipedesc_finish(aio, nni_plat_errno(errno));
- nni_posix_pipedesc_doclose(pd);
- return;
}
if (n == 0) {
@@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd)
}
static void
-nni_posix_pipedesc_cb(void *arg)
+nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_posix_pipedesc *pd = arg;
nni_mtx_lock(&pd->mtx);
- if (pd->node.revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_pipedesc_doread(pd);
}
- if (pd->node.revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_pipedesc_dowrite(pd);
}
- if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_pipedesc_doclose(pd);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&pd->writeq)) {
events |= POLLOUT;
}
@@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg)
events |= POLLIN;
}
if ((!pd->closed) && (events != 0)) {
- nni_posix_pollq_arm(&pd->node, events);
+ nni_posix_pfd_arm(pfd, events);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq_remove(&pd->node);
-
+ // NB: Events may still occur.
nni_mtx_lock(&pd->mtx);
nni_posix_pipedesc_doclose(pd);
nni_mtx_unlock(&pd->mtx);
@@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->readq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
- // If we are only job on the list, go ahead and try to do an immediate
- // transfer. This allows for faster completions in many cases. We
- // also need not arm a list if it was already armed.
+ // If we are only job on the list, go ahead and try to do an
+ // immediate transfer. This allows for faster completions in
+ // many cases. We also need not arm a list if it was already
+ // armed.
if (nni_list_first(&pd->readq) == aio) {
nni_posix_pipedesc_doread(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->readq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLIN);
+ nni_posix_pfd_arm(pd->pfd, POLLIN);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
return;
}
+ if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&pd->writeq, aio);
- nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
- // If we are still the first thing on the list, that means we
- // didn't finish the job, so arm the poller to complete us.
+ // If we are still the first thing on the list, that
+ // means we didn't finish the job, so arm the poller to
+ // complete us.
if (nni_list_first(&pd->writeq) == aio) {
- nni_posix_pollq_arm(&pd->node, POLLOUT);
+ nni_posix_pfd_arm(pd->pfd, POLLOUT);
}
}
nni_mtx_unlock(&pd->mtx);
@@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getpeername(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) {
+ if (getsockname(fd, (void *) &ss, &sslen) != 0) {
return (nni_plat_errno(errno));
}
return (nni_posix_sockaddr2nn(sa, &ss));
@@ -349,9 +384,9 @@ int
nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay)
{
int val = nodelay ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
@@ -361,61 +396,19 @@ int
nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep)
{
int val = keep ? 1 : 0;
+ int fd = nni_posix_pfd_fd(pd->pfd);
- if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val,
- sizeof(val)) != 0) {
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
return (nni_plat_errno(errno));
}
return (0);
}
int
-nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
-{
- nni_posix_pipedesc *pd;
- int rv;
-
- if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We could randomly choose a different pollq, or for efficiencies
- // sake we could take a modulo of the file desc number to choose
- // one. For now we just have a global pollq. Note that by tying
- // the pd to a single pollq we may get some kind of cache warmth.
-
- pd->closed = false;
- pd->node.fd = fd;
- pd->node.cb = nni_posix_pipedesc_cb;
- pd->node.data = pd;
-
- (void) fcntl(fd, F_SETFL, O_NONBLOCK);
-
-#ifdef SO_NOSIGPIPE
- // Darwin lacks MSG_NOSIGNAL, but has a socket option.
- int one = 1;
- (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
-#endif
-
- nni_mtx_init(&pd->mtx);
- nni_aio_list_init(&pd->readq);
- nni_aio_list_init(&pd->writeq);
-
- if (((rv = nni_posix_pollq_init(&pd->node)) != 0) ||
- ((rv = nni_posix_pollq_add(&pd->node)) != 0)) {
- nni_mtx_fini(&pd->mtx);
- NNI_FREE_STRUCT(pd);
- return (rv);
- }
- *pdp = pd;
- return (0);
-}
-
-int
nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
uint64_t *egid, uint64_t *prid, uint64_t *znid)
{
- int fd = pd->node.fd;
+ int fd = nni_posix_pfd_fd(pd->pfd);
#if defined(NNG_HAVE_GETPEEREID)
uid_t uid;
gid_t gid;
@@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
}
*euid = xu.cr_uid;
*egid = xu.cr_gid;
- *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID...
+ *prid = (uint64_t) -1; // XXX: macOS has undocumented
+ // LOCAL_PEERPID...
*znid = (uint64_t) -1;
return (0);
#else
@@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid,
#endif
}
+int
+nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd)
+{
+ nni_posix_pipedesc *pd;
+
+ if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ pd->closed = false;
+ pd->pfd = pfd;
+
+ nni_mtx_init(&pd->mtx);
+ nni_aio_list_init(&pd->readq);
+ nni_aio_list_init(&pd->writeq);
+
+ nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd);
+
+ *pdp = pd;
+ return (0);
+}
+
void
nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
- // Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
- nni_posix_pollq_fini(&pd->node);
- if (pd->node.fd >= 0) {
- (void) close(pd->node.fd);
- }
-
+ nni_posix_pfd_fini(pd->pfd);
+ pd->pfd = NULL;
nni_mtx_fini(&pd->mtx);
NNI_FREE_STRUCT(pd);
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 2c855da1..b9786330 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -22,32 +22,18 @@
#include "core/nng_impl.h"
#include <poll.h>
-typedef struct nni_posix_pollq_node nni_posix_pollq_node;
-typedef struct nni_posix_pollq nni_posix_pollq;
-
-struct nni_posix_pollq_node {
- nni_list_node node; // linkage into the pollq list
- nni_posix_pollq *pq; // associated pollq
- int index; // used by the poller impl
- int armed; // used by the poller impl
- int fd; // file descriptor to poll
- int events; // events to watch for
- int revents; // events received
- void * data; // user data
- nni_cb cb; // user callback on event
- nni_mtx mx;
- nni_cv cv;
-};
-
-extern nni_posix_pollq *nni_posix_pollq_get(int);
-extern int nni_posix_pollq_sysinit(void);
-extern void nni_posix_pollq_sysfini(void);
-
-extern int nni_posix_pollq_init(nni_posix_pollq_node *);
-extern void nni_posix_pollq_fini(nni_posix_pollq_node *);
-extern int nni_posix_pollq_add(nni_posix_pollq_node *);
-extern void nni_posix_pollq_remove(nni_posix_pollq_node *);
-extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int);
+typedef struct nni_posix_pfd nni_posix_pfd;
+typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, int, void *);
+
+extern int nni_posix_pollq_sysinit(void);
+extern void nni_posix_pollq_sysfini(void);
+
+extern int nni_posix_pfd_init(nni_posix_pfd **, int);
+extern void nni_posix_pfd_fini(nni_posix_pfd *);
+extern int nni_posix_pfd_arm(nni_posix_pfd *, int);
+extern int nni_posix_pfd_fd(nni_posix_pfd *);
+extern void nni_posix_pfd_close(nni_posix_pfd *);
+extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index 0f6867da..a8d8693a 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -12,6 +12,7 @@
#ifdef NNG_HAVE_EPOLL
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
@@ -22,179 +23,215 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+#ifndef EFD_CLOEXEC
+#define EFD_CLOEXEC 0
+#endif
+#ifndef EFD_NONBLOCK
+#define EFD_NONBLOCK 0
+#endif
+
#define NNI_MAX_EPOLL_EVENTS 64
// flags we always want enabled as long as at least one event is active
#define NNI_EPOLL_FLAGS (EPOLLONESHOT | EPOLLERR | EPOLLHUP)
+// Locking strategy:
+//
+// The pollq mutex protects its own reapq, close state, and the close
+// state of the individual pfds. It also protects the pfd cv, which is
+// only signaled when the pfd is closed. This mutex is only acquired
+// when shutting down the pollq, or closing a pfd. For normal hot-path
+// operations we don't need it.
+//
+// The pfd mutex protects the pfd's own "closing" flag (test and set),
+// the callback and arg, and its event mask. This mutex is used a lot,
+// but it should be uncontended excepting possibly when closing.
+
// nni_posix_pollq is a work structure that manages state for the epoll-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int epfd; // epoll handle
- int evfd; // event fd
- bool close; // request for worker to exit
- bool started;
- nni_idhash * nodes;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int epfd; // epoll handle
+ int evfd; // event fd (to wake us for other stuff)
+ bool close; // request for worker to exit
+ nni_thr thr; // worker thread
+ nni_list reapq;
+};
+
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ nni_list_node node;
+ int fd;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ bool closed;
+ bool closing;
+ bool reap;
+ int events;
+ nni_mtx mtx;
+ nni_cv cv;
};
+// single global instance for now.
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
- int rv;
+ nni_posix_pfd * pfd;
nni_posix_pollq * pq;
struct epoll_event ev;
- uint64_t id;
-
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ int rv;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
- }
+ pq = &nni_posix_global_pollq;
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((rv = nni_idhash_alloc(pq->nodes, &id, node)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
- node->index = (int) id;
- node->pq = pq;
- node->events = 0;
+ pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->events = 0;
+ pfd->closing = false;
+ pfd->closed = false;
+
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pfd->node);
// notifications disabled to begin with
ev.events = 0;
- ev.data.u64 = id;
+ ev.data.ptr = pfd;
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, node->fd, &ev);
- if (rv != 0) {
+ if ((rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev)) != 0) {
rv = nni_plat_errno(errno);
- nni_idhash_remove(pq->nodes, id);
- node->index = 0;
- node->pq = NULL;
+ nni_cv_fini(&pfd->cv);
+ NNI_FREE_STRUCT(pfd);
+ return (rv);
}
- nni_mtx_unlock(&pq->mtx);
- return (rv);
+ *pfdp = pfd;
+ return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
{
- int rv;
- struct epoll_event ev;
-
- node->events = 0;
- node->pq = NULL;
-
- ev.events = 0;
- ev.data.u64 = (uint64_t) node->index;
-
- if (node->index != 0) {
- // This deregisters the node. If the poller was blocked
- // then this keeps it from coming back in to find us.
- nni_idhash_remove(pq->nodes, (uint64_t) node->index);
- }
-
- // NB: EPOLL_CTL_DEL actually *ignores* the event, but older Linux
- // versions need it to be non-NULL.
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_DEL, node->fd, &ev);
- if (rv != 0) {
- NNI_ASSERT(errno == EBADF || errno == ENOENT);
+ nni_posix_pollq *pq = pfd->pq;
+
+ // NB: We depend on epoll event flags being the same as their POLLIN
+ // equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so
+ // forth. This turns out to be true both for Linux and the illumos
+ // epoll implementation.
+
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ struct epoll_event ev;
+ pfd->events |= events;
+ events = pfd->events;
+
+ ev.events = events | NNI_EPOLL_FLAGS;
+ ev.data.ptr = pfd;
+
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
+ }
}
+ nni_mtx_unlock(&pfd->mtx);
+ return (0);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
-
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ return (pfd->fd);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
{
- node->index = 0;
- return (0);
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_close(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ nni_posix_pollq * pq = pfd->pq;
+ struct epoll_event ev; // Not actually used.
+ pfd->closing = true;
+
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
}
- nni_mtx_unlock(&pq->mtx);
+ nni_mtx_unlock(&pfd->mtx);
}
void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
{
- int rv;
- struct epoll_event ev;
- nni_posix_pollq * pq = node->pq;
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_posix_pfd_close(pfd);
- NNI_ASSERT(pq != NULL);
- if (events == 0) {
- return;
+ // We have to synchronize with the pollq thread (unless we are
+ // on that thread!)
+ if (!nni_thr_is_self(&pq->thr)) {
+
+ uint64_t one = 1;
+
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pfd);
+
+ // Wake the remote side. For now we assume this always
+ // succeeds. The only failure modes here occur when we
+ // have already excessively signaled this (2^64 times
+ // with no read!!), or when the evfd is closed, or some
+ // kernel bug occurs. Those errors would manifest as
+ // a hang waiting for the poller to reap the pfd in fini,
+ // if it were possible for them to occur. (Barring other
+ // bugs, it isn't.)
+ (void) write(pq->evfd, &one, sizeof(one));
+
+ while (!pfd->closed) {
+ nni_cv_wait(&pfd->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_lock(&pq->mtx);
+ // We're exclusive now.
- node->events |= events;
- ev.events = node->events | NNI_EPOLL_FLAGS;
- ev.data.u64 = (uint64_t) node->index;
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev);
- NNI_ASSERT(rv == 0);
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pfd;
+ nni_mtx_lock(&pq->mtx);
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ // Let fini know we're done with it, and it's safe to
+ // remove.
+ pfd->closed = true;
+ nni_cv_wake(&pfd->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -204,101 +241,74 @@ nni_posix_poll_thr(void *arg)
nni_posix_pollq * pq = arg;
struct epoll_event events[NNI_MAX_EPOLL_EVENTS];
- nni_mtx_lock(&pq->mtx);
-
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
-
- nevents =
- epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1);
+ for (;;) {
+ int n;
+ bool reap = false;
- nni_mtx_lock(&pq->mtx);
-
- if (nevents <= 0) {
- continue;
+ n = epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1);
+ if ((n < 0) && (errno == EBADF)) {
+ // Epoll fd closed, bail.
+ return;
}
// dispatch events
- for (i = 0; i < nevents; ++i) {
+ for (int i = 0; i < n; ++i) {
const struct epoll_event *ev;
- nni_posix_pollq_node * node;
ev = &events[i];
// If the waker pipe was signaled, read from it.
- if ((ev->data.u64 == 0) && (ev->events & POLLIN)) {
- int rv;
+ if ((ev->data.ptr == NULL) && (ev->events & POLLIN)) {
uint64_t clear;
- rv = read(pq->evfd, &clear, sizeof(clear));
- NNI_ASSERT(rv == sizeof(clear));
- continue;
- }
-
- if (nni_idhash_find(pq->nodes, ev->data.u64,
- (void **) &node) != 0) {
- // node was removed while we were blocking
- continue;
+ (void) read(pq->evfd, &clear, sizeof(clear));
+ reap = true;
+ } else {
+ nni_posix_pfd * pfd = ev->data.ptr;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ int events;
+
+ events = ev->events &
+ (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP);
+
+ nni_mtx_lock(&pfd->mtx);
+ pfd->events &= ~events;
+ cb = pfd->cb;
+ arg = pfd->arg;
+ nni_mtx_unlock(&pfd->mtx);
+
+ // Execute the callback with lock released
+ if (cb != NULL) {
+ cb(pfd, events, arg);
+ }
}
+ }
- node->revents = ev->events &
- (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
+ if (reap) {
+ nni_posix_pollq_reap(pq);
nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ if (pq->close) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
}
+ nni_mtx_unlock(&pq->mtx);
}
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- int rv;
- uint64_t wakeval = 1;
+ uint64_t one = 1;
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- rv = write(pq->evfd, &wakeval, sizeof(wakeval));
- NNI_ASSERT(rv == sizeof(wakeval));
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
+ nni_mtx_lock(&pq->mtx);
+ pq->close = true;
+ (void) write(pq->evfd, &one, sizeof(one));
+ nni_mtx_unlock(&pq->mtx);
- if (pq->evfd >= 0) {
- close(pq->evfd);
- pq->evfd = -1;
- }
+ nni_thr_fini(&pq->thr);
+ close(pq->evfd);
close(pq->epfd);
- pq->epfd = -1;
-
- if (pq->nodes != NULL) {
- nni_idhash_fini(pq->nodes);
- }
nni_mtx_fini(&pq->mtx);
}
@@ -308,22 +318,25 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
{
// add event fd so we can wake ourself on exit
struct epoll_event ev;
- int rv;
+ int fd;
memset(&ev, 0, sizeof(ev));
- pq->evfd = eventfd(0, EFD_NONBLOCK);
- if (pq->evfd == -1) {
+ if ((fd = eventfd(0, EFD_NONBLOCK)) < 0) {
return (nni_plat_errno(errno));
}
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ // This is *NOT* one shot. We want to wake EVERY single time.
ev.events = EPOLLIN;
- ev.data.u64 = 0;
+ ev.data.ptr = 0;
- rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pq->evfd, &ev);
- if (rv != 0) {
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ (void) close(fd);
return (nni_plat_errno(errno));
}
+ pq->evfd = fd;
return (0);
}
@@ -332,40 +345,30 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
- if ((pq->epfd = epoll_create1(0)) < 0) {
+ if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) {
return (nni_plat_errno(errno));
}
- pq->evfd = -1;
pq->close = false;
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
- if (((rv = nni_idhash_init(&pq->nodes)) != 0) ||
- ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
- ((rv = nni_posix_pollq_add_eventfd(pq)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) {
+ (void) close(pq->epfd);
+ nni_mtx_fini(&pq->mtx);
+ return (rv);
+ }
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ (void) close(pq->epfd);
+ (void) close(pq->evfd);
+ nni_mtx_fini(&pq->mtx);
return (rv);
}
-
- // Positive values only for node indices. (0 is reserved for eventfd).
- nni_idhash_set_limits(pq->nodes, 1, 0x7FFFFFFFu, 1);
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 0f312170..36ced3ff 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -12,196 +12,203 @@
#ifdef NNG_HAVE_KQUEUE
#include <errno.h>
+#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
#include <sys/event.h>
+#include <sys/socket.h>
#include <unistd.h>
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-// TODO: can this be feature detected in cmake,
-// rather than relying on platform?
-#if defined NNG_PLATFORM_NETBSD
-#define kevent_udata_t intptr_t
-#else
-#define kevent_udata_t void *
-#endif
-
-#define NNI_MAX_KQUEUE_EVENTS 64
-
-// user event id used to shutdown the polling thread
-#define NNI_KQ_EV_EXIT_ID 0xF
+typedef struct nni_posix_pollq nni_posix_pollq;
// nni_posix_pollq is a work structure that manages state for the kqueue-based
// pollq implementation
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- int kq; // kqueue handle
- bool close; // request for worker to exit
- bool started;
- nni_thr thr; // worker thread
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int kq; // kqueue handle
+ nni_thr thr; // worker thread
+ nni_list reapq; // items to reap
};
+struct nni_posix_pfd {
+ nni_list_node node; // linkage into the reap list
+ nni_posix_pollq *pq; // associated pollq
+ int fd; // file descriptor to poll
+ void * data; // user data
+ nni_posix_pfd_cb cb; // user callback on event
+ nni_cv cv; // signaled when poller has unregistered
+ nni_mtx mtx;
+ int events;
+ bool closing;
+ bool closed;
+};
+
+#define NNI_MAX_KQUEUE_EVENTS 64
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
+ nni_posix_pfd * pf;
nni_posix_pollq *pq;
- struct kevent kevents[2];
+ struct kevent ev[2];
+ unsigned flags = EV_ADD | EV_DISABLE;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- return (NNG_ESTATE);
+ if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) {
+ return (NNG_ENOMEM);
}
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
-
- node->pq = pq;
- node->events = 0;
-
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
+ // Create entries in the kevent queue, without enabling them.
+ EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
+ EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node);
-
- if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) {
- nni_mtx_unlock(&pq->mtx);
+ // We update the kqueue list, without polling for events.
+ if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
+ NNI_FREE_STRUCT(pf);
return (nni_plat_errno(errno));
}
+ pf->fd = fd;
+ pf->cb = NULL;
+ pf->pq = pq;
+ nni_mtx_init(&pf->mtx);
+ nni_cv_init(&pf->cv, &pq->mtx);
+ NNI_LIST_NODE_INIT(&pf->node);
+ *pfdp = pf;
- nni_mtx_unlock(&pq->mtx);
return (0);
}
-// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini()
-// called while pq's lock is held
-static void
-nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+void
+nni_posix_pfd_close(nni_posix_pfd *pf)
{
- struct kevent kevents[2];
-
- node->events = 0;
- node->pq = NULL;
+ nni_posix_pollq *pq = pf->pq;
- EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0,
- (kevent_udata_t) node);
-
- EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0,
- 0, (kevent_udata_t) node);
-
- // So it turns out that we can get EBADF, ENOENT, and apparently
- // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting
- // an event, and its harmless if the event removal fails (worst
- // case would be a spurious wakeup), so lets ignore it.
- (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL);
+ nni_mtx_lock(&pq->mtx);
+ if (!pf->closing) {
+ struct kevent ev[2];
+ pf->closing = true;
+ EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
+ EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
+ (void) shutdown(pf->fd, SHUT_RDWR);
+ // This should never fail -- no allocations, just deletion.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pf)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq;
- if (pq == NULL) {
- return;
- }
+ pq = pf->pq;
- nni_mtx_lock(&pq->mtx);
- nni_posix_pollq_remove_helper(pq, node);
+ nni_posix_pfd_close(pf);
- if (pq->close) {
- nni_cv_wake(&pq->cv);
+ if (!nni_thr_is_self(&pq->thr)) {
+ struct kevent ev;
+ nni_mtx_lock(&pq->mtx);
+ nni_list_append(&pq->reapq, pf);
+ EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL);
+
+ // If this fails, the cleanup will stall. That should
+ // only occur in a memory pressure situation, and it
+ // will self-heal when the next event comes in.
+ (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
+ while (!pf->closed) {
+ nni_cv_wait(&pf->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
}
- nni_mtx_unlock(&pq->mtx);
+
+ (void) close(pf->fd);
+ nni_cv_fini(&pf->cv);
+ nni_mtx_fini(&pf->mtx);
+ NNI_FREE_STRUCT(pf);
}
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
+nni_posix_pfd_fd(nni_posix_pfd *pf)
{
- NNI_ARG_UNUSED(node);
- return (0);
+ return (pf->fd);
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
-
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
-
- nni_posix_pollq_remove_helper(pq, node);
-
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+ nni_mtx_lock(&pf->mtx);
+ pf->cb = cb;
+ pf->data = arg;
+ nni_mtx_unlock(&pf->mtx);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pf, int events)
{
- nni_posix_pollq *pq = node->pq;
- struct kevent kevents[2];
- int nevents = 0;
+ struct kevent ev[2];
+ int nev = 0;
+ unsigned flags = EV_ENABLE | EV_DISPATCH;
+ nni_posix_pollq *pq = pf->pq;
+
+ nni_mtx_lock(&pf->mtx);
+ if (pf->closing) {
+ events = 0;
+ } else {
+ pf->events |= events;
+ events = pf->events;
+ }
+ nni_mtx_unlock(&pf->mtx);
- NNI_ASSERT(pq != NULL);
if (events == 0) {
- return;
+ // No events, and kqueue is oneshot, so nothing to do.
+ return (0);
}
- nni_mtx_lock(&pq->mtx);
-
- if (!(node->events & POLLIN) && (events & POLLIN)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLIN) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
-
- if (!(node->events & POLLOUT) && (events & POLLOUT)) {
- EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE,
- EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node);
+ if (events & POLLOUT) {
+ EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
-
- if (nevents > 0) {
- // This call should never fail, really. The only possible
- // legitimate failure would be ENOMEM, but in that case
- // lots of other things are going to be failing, or ENOENT
- // or ESRCH, indicating we already lost interest; the
- // only consequence of ignoring these errors is that a given
- // descriptor might appear "stuck". This beats the alternative
- // of just blithely crashing the application with an assertion.
- (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL);
- node->events |= events;
+ while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
}
+ return (0);
+}
+static void
+nni_posix_pollq_reap(nni_posix_pollq *pq)
+{
+ nni_posix_pfd *pf;
+ nni_mtx_lock(&pq->mtx);
+ while ((pf = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pf);
+ pf->closed = true;
+ nni_cv_wake(&pf->cv);
+ }
nni_mtx_unlock(&pq->mtx);
}
@@ -209,117 +216,71 @@ static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
- struct kevent kevents[NNI_MAX_KQUEUE_EVENTS];
-
- nni_mtx_lock(&pq->mtx);
- while (!pq->close) {
- int i;
- int nevents;
-
- // block indefinitely, timers are handled separately
- nni_mtx_unlock(&pq->mtx);
- nevents = kevent(
- pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL);
- nni_mtx_lock(&pq->mtx);
-
- if (nevents < 0) {
- continue;
+ for (;;) {
+ int n;
+ struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
+ nni_posix_pfd * pf;
+ nni_posix_pfd_cb cb;
+ void * cbarg;
+ int revents;
+ bool reap = false;
+
+ n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
+ if (n < 0) {
+ if (errno == EBADF) {
+ nni_posix_pollq_reap(pq);
+ return;
+ }
+ reap = true;
}
- // dispatch events
- for (i = 0; i < nevents; ++i) {
- struct kevent ev_disable;
- const struct kevent * ev;
- nni_posix_pollq_node *node;
+ for (int i = 0; i < n; i++) {
+ struct kevent *ev = &evs[i];
- ev = &kevents[i];
- if (ev->filter == EVFILT_USER &&
- ev->ident == NNI_KQ_EV_EXIT_ID) {
- // we've woken up to exit the polling thread
+ switch (ev->filter) {
+ case EVFILT_READ:
+ revents = POLLIN;
break;
- }
-
- node = (nni_posix_pollq_node *) ev->udata;
- if (node->pq == NULL) {
- // node was removed while we were blocking
+ case EVFILT_WRITE:
+ revents = POLLOUT;
+ break;
+ case EVFILT_USER:
+ default:
+ reap = true;
continue;
}
- node->revents = 0;
-
+ pf = (void *) ev->udata;
if (ev->flags & (EV_ERROR | EV_EOF)) {
- node->revents |= POLLHUP;
- }
- if (ev->filter == EVFILT_WRITE) {
- node->revents |= POLLOUT;
- } else if (ev->filter == EVFILT_READ) {
- node->revents |= POLLIN;
- } else {
- NNI_ASSERT(false); // unhandled filter
- break;
+ revents |= POLLHUP;
}
- // explicitly disable this event. we'd ideally rely on
- // the behavior of EV_DISPATCH to do this,
- // but that only happens once we've acknowledged the
- // event by reading/or writing the fd. because there
- // can currently be some latency between the time we
- // receive this event and the time we read/write in
- // response, disable the event in the meantime to avoid
- // needless wakeups.
- // revisit if we're able to reduce/remove this latency.
- EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter,
- EV_DISABLE, 0, 0, NULL);
- // this will only fail if the fd is already
- // closed/invalid which we don't mind anyway,
- // so ignore return value.
- (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL);
-
- // mark events as cleared
- node->events &= ~node->revents;
-
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pq->active = node;
-
- // Execute the callback with lock released
- nni_mtx_unlock(&pq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pq->active = NULL;
- if (pq->wait == node) {
- pq->wait = NULL;
- nni_cv_wake(&pq->cv);
+ nni_mtx_lock(&pf->mtx);
+ cb = pf->cb;
+ cbarg = pf->data;
+ pf->events &= ~(revents);
+ nni_mtx_unlock(&pf->mtx);
+
+ if (cb != NULL) {
+ cb(pf, revents, cbarg);
}
}
+ if (reap) {
+ nni_posix_pollq_reap(pq);
+ }
}
-
- nni_mtx_unlock(&pq->mtx);
}
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- if (pq->started) {
- struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE,
- NOTE_TRIGGER, 0, NULL);
- nni_mtx_lock(&pq->mtx);
- pq->close = true;
- pq->started = false;
- (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL);
- nni_mtx_unlock(&pq->mtx);
- }
- nni_thr_fini(&pq->thr);
-
if (pq->kq >= 0) {
close(pq->kq);
pq->kq = -1;
}
+ nni_thr_fini(&pq->thr);
+
+ nni_posix_pollq_reap(pq);
nni_mtx_fini(&pq->mtx);
}
@@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq)
static int
nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq)
{
- // add user event so we can wake ourself on exit
+ int rv;
struct kevent ev;
- EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL);
- return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL)));
+
+ EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL);
+ while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ return (0);
}
static int
@@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (nni_plat_errno(errno));
}
- pq->close = false;
-
nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) ||
(rv = nni_posix_pollq_add_wake_evt(pq)) != 0) {
@@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (rv);
}
- pq->started = true;
nni_thr_run(&pq->thr);
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
+
return (nni_posix_pollq_create(&nni_posix_global_pollq));
}
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index efc9ff48..26753df6 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -11,8 +11,6 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
-#ifdef NNG_USE_POSIX_POLLQ_POLL
-
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
@@ -32,306 +30,285 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().
+
+// Locking strategy: We use the pollq lock to guard the lists on the pollq,
+// the nfds (which counts the number of items in the pollq), the pollq
+// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We
+// use a lock on the pfd only to protect the the events field (which we treat
+// as an atomic bitfield), and the cb and arg pointers. Note that the pfd
+// lock is therefore a leaf lock, which is sometimes acquired while holding
+// the pq lock. Every reasonable effort is made to minimize holding locks.
+// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
+// persists until the pfd is destroyed.)
+
+typedef struct nni_posix_pollq nni_posix_pollq;
+
struct nni_posix_pollq {
- nni_mtx mtx;
- nni_cv cv;
- struct pollfd * fds;
- int nfds;
- int wakewfd; // write side of waker pipe
- int wakerfd; // read side of waker pipe
- int close; // request for worker to exit
- int started;
- nni_thr thr; // worker thread
- nni_list polled; // polled nodes
- nni_list armed; // armed nodes
- nni_list idle; // idle nodes
- int nnodes; // num of nodes in nodes list
- int inpoll; // poller asleep in poll
- nni_posix_pollq_node *wait; // cancel waiting on this
- nni_posix_pollq_node *active; // active node (in callback)
+ nni_mtx mtx;
+ int nfds;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ bool closing; // request for worker to exit
+ bool closed;
+ nni_thr thr; // worker thread
+ nni_list pollq; // armed nodes
+ nni_list reapq;
};
-static int
-nni_posix_pollq_poll_grow(nni_posix_pollq *pq)
-{
- int grow = pq->nnodes + 2; // one for us, one for waker
- struct pollfd *newfds;
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_list_node node;
+ nni_cv cv;
+ nni_mtx mtx;
+ int events;
+ nni_posix_pfd_cb cb;
+ void * arg;
+};
+
+static nni_posix_pollq nni_posix_global_pollq;
- if (grow < pq->nfds) {
- return (0);
+int
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+{
+ nni_posix_pfd * pfd;
+ nni_posix_pollq *pq = &nni_posix_global_pollq;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+#ifdef SO_NOSIGPIPE
+ // Darwin lacks MSG_NOSIGNAL, but has a socket option.
+ // If this code is getting used, you really should be using the
+ // kqueue poller, or you need to upgrade your older system.
+ int one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+#endif
+
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
+ NNI_LIST_NODE_INIT(&pfd->node);
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ pfd->fd = fd;
+ pfd->events = 0;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->pq = pq;
+ nni_mtx_lock(&pq->mtx);
+ if (pq->closing) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+ return (NNG_ECLOSED);
+ }
+ nni_list_append(&pq->pollq, pfd);
+ pq->nfds++;
+ nni_mtx_unlock(&pq->mtx);
+ *pfdp = pfd;
+ return (0);
+}
- grow = grow + 128;
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
+{
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
+}
- if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) {
- return (NNG_ENOMEM);
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
+{
+ return (pfd->fd);
+}
+
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_posix_pfd_close(pfd);
+
+ nni_mtx_lock(&pq->mtx);
+ if (nni_list_active(&pq->pollq, pfd)) {
+ nni_list_remove(&pq->pollq, pfd);
+ pq->nfds--;
}
- if (pq->nfds != 0) {
- NNI_FREE_STRUCTS(pq->fds, pq->nfds);
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_list_append(&pq->reapq, pfd);
+ nni_plat_pipe_raise(pq->wakewfd);
+ while (nni_list_active(&pq->reapq, pfd)) {
+ nni_cv_wait(&pfd->cv);
+ }
}
- pq->fds = newfds;
- pq->nfds = grow;
+ nni_mtx_unlock(&pq->mtx);
+
+ // We're exclusive now.
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
+
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ nni_mtx_lock(&pfd->mtx);
+ pfd->events |= events;
+ nni_mtx_unlock(&pfd->mtx);
+
+ // If we're running on the callback, then don't bother to kick
+ // the pollq again. This is necessary because we cannot modify
+ // the poller while it is polling.
+ if (!nni_thr_is_self(&pq->thr)) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
return (0);
}
static void
nni_posix_poll_thr(void *arg)
{
- nni_posix_pollq * pollq = arg;
- nni_posix_pollq_node *node;
+ nni_posix_pollq *pq = arg;
+ int nalloc = 0;
+ struct pollfd * fds = NULL;
+ nni_posix_pfd ** pfds = NULL;
- nni_mtx_lock(&pollq->mtx);
for (;;) {
- int rv;
int nfds;
- struct pollfd *fds;
+ int events;
+ nni_posix_pfd *pfd;
- if (pollq->close) {
- break;
+ nni_mtx_lock(&pq->mtx);
+ while (nalloc < (pq->nfds + 1)) {
+ int n = pq->nfds + 128;
+
+ // Drop the lock while we sleep or allocate. This
+ // allows additional items to be added or removed (!)
+ // while we wait.
+ nni_mtx_unlock(&pq->mtx);
+
+ // Toss the old ones first; avoids *doubling* memory
+ // consumption during alloc.
+ NNI_FREE_STRUCTS(fds, nalloc);
+ NNI_FREE_STRUCTS(pfds, nalloc);
+ nalloc = 0;
+
+ if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) {
+ nni_msleep(10); // sleep for a bit, try later
+ } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) {
+ NNI_FREE_STRUCTS(pfds, n);
+ nni_msleep(10);
+ } else {
+ nalloc = n;
+ }
+ nni_mtx_lock(&pq->mtx);
}
- fds = pollq->fds;
- nfds = 0;
-
// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
- fds[nfds].fd = pollq->wakerfd;
- fds[nfds].events = POLLIN;
- fds[nfds].revents = 0;
- nfds++;
+ fds[0].fd = pq->wakerfd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ pfds[0] = NULL;
+ nfds = 1;
+
+ // Also lets reap anything that was in the reaplist!
+ while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
+ nni_list_remove(&pq->reapq, pfd);
+ nni_cv_wake(&pfd->cv);
+ }
- // Set up the poll list.
- while ((node = nni_list_first(&pollq->armed)) != NULL) {
- nni_list_remove(&pollq->armed, node);
- nni_list_append(&pollq->polled, node);
- fds[nfds].fd = node->fd;
- fds[nfds].events = node->events;
- fds[nfds].revents = 0;
- node->index = nfds;
- nfds++;
+ // If we're closing down, bail now. This is done *after* we
+ // have ensured that the reapq is empty. Anything still in
+ // the pollq is not going to receive further callbacks.
+ if (pq->closing) {
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ break;
}
- // Now poll it. We block indefinitely, since we use separate
- // timeouts to wake and remove the elements from the list.
- pollq->inpoll = 1;
- nni_mtx_unlock(&pollq->mtx);
- rv = poll(fds, nfds, -1);
- nni_mtx_lock(&pollq->mtx);
- pollq->inpoll = 0;
-
- if (rv < 0) {
- // This shouldn't happen really. If it does, we
- // just try again. (EINTR is probably the only
- // reasonable failure here, unless internal memory
- // allocations fail in the kernel, leading to EAGAIN.)
- continue;
+ // Set up the poll list.
+ NNI_LIST_FOREACH (&pq->pollq, pfd) {
+
+ nni_mtx_lock(&pfd->mtx);
+ events = pfd->events;
+ nni_mtx_unlock(&pfd->mtx);
+
+ if (events != 0) {
+ fds[nfds].fd = pfd->fd;
+ fds[nfds].events = events;
+ fds[nfds].revents = 0;
+ pfds[nfds] = pfd;
+ nfds++;
+ }
}
+ nni_mtx_unlock(&pq->mtx);
+
+ // We could get the result from poll, and avoid iterating
+ // over the entire set of pollfds, but since on average we
+ // will be walking half the list, doubling the work we do
+ // (the condition with a potential pipeline stall) seems like
+ // adding complexity with no real benefit. It also makes the
+ // worst case even worse.
+ (void) poll(fds, nfds, -1);
// If the waker pipe was signaled, read from it.
if (fds[0].revents & POLLIN) {
- NNI_ASSERT(fds[0].fd == pollq->wakerfd);
- nni_plat_pipe_clear(pollq->wakerfd);
+ NNI_ASSERT(fds[0].fd == pq->wakerfd);
+ nni_plat_pipe_clear(pq->wakerfd);
}
- while ((node = nni_list_first(&pollq->polled)) != NULL) {
- int index = node->index;
-
- // We remove ourselves from the polled list, and
- // then put it on either the idle or armed list
- // depending on whether it remains armed.
- node->index = 0;
- nni_list_remove(&pollq->polled, node);
- NNI_ASSERT(index > 0);
- if (fds[index].revents == 0) {
- // If still watching for events, return it
- // to the armed list.
- if (node->events) {
- nni_list_append(&pollq->armed, node);
- } else {
- nni_list_append(&pollq->idle, node);
- }
- continue;
- }
+ for (int i = 1; i < nfds; i++) {
+ if ((events = fds[i].revents) != 0) {
+ nni_posix_pfd_cb cb;
+ void * arg;
- // We are calling the callback, so disarm
- // all events; the node can rearm them in its
- // callback.
- node->revents = fds[index].revents;
- node->events &= ~node->revents;
- if (node->events == 0) {
- nni_list_append(&pollq->idle, node);
- } else {
- nni_list_append(&pollq->armed, node);
- }
+ pfd = pfds[i];
+
+ nni_mtx_lock(&pfd->mtx);
+ cb = pfd->cb;
+ arg = pfd->arg;
+ pfd->events &= ~events;
+ nni_mtx_unlock(&pfd->mtx);
- // Save the active node; we can notice this way
- // when it is busy, and avoid freeing it until
- // we are sure that it is not in use.
- pollq->active = node;
-
- // Execute the callback -- without locks held.
- nni_mtx_unlock(&pollq->mtx);
- node->cb(node->data);
- nni_mtx_lock(&pollq->mtx);
-
- // We finished with this node. If something
- // was blocked waiting for that, wake it up.
- pollq->active = NULL;
- if (pollq->wait == node) {
- pollq->wait = NULL;
- nni_cv_wake(&pollq->cv);
+ if (cb) {
+ cb(pfd, events, arg);
+ }
}
}
}
- nni_mtx_unlock(&pollq->mtx);
-}
-
-int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
-{
- int rv;
- nni_posix_pollq *pq;
-
- NNI_ASSERT(!nni_list_node_active(&node->node));
- pq = nni_posix_pollq_get(node->fd);
- if (node->pq != NULL) {
- return (NNG_ESTATE);
- }
-
- nni_mtx_lock(&pq->mtx);
- if (pq->close) {
- // This shouldn't happen!
- nni_mtx_unlock(&pq->mtx);
- return (NNG_ECLOSED);
- }
- node->pq = pq;
- if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return (rv);
- }
- pq->nnodes++;
- nni_list_append(&pq->idle, node);
- nni_mtx_unlock(&pq->mtx);
- return (0);
+ NNI_FREE_STRUCTS(fds, nalloc);
+ NNI_FREE_STRUCTS(pfds, nalloc);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+static void
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- nni_posix_pollq *pq = node->pq;
-
- if (pq == NULL) {
- return;
- }
- node->pq = NULL;
nni_mtx_lock(&pq->mtx);
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- pq->nnodes--;
- }
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
+ pq->closing = 1;
nni_mtx_unlock(&pq->mtx);
-}
-
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
-{
- NNI_LIST_NODE_INIT(&node->node);
- return (0);
-}
-
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the callback is not active, so that the node
-// may be deallocated. This function must not be called in a callback.
-void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
-{
- nni_posix_pollq *pq = node->pq;
- if (pq == NULL) {
- return;
- }
- node->pq = NULL;
- nni_mtx_lock(&pq->mtx);
- while (pq->active == node) {
- pq->wait = node;
- nni_cv_wait(&pq->cv);
- }
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- pq->nnodes--;
- }
- if (pq->close) {
- nni_cv_wake(&pq->cv);
- }
- nni_mtx_unlock(&pq->mtx);
-}
+ nni_plat_pipe_raise(pq->wakewfd);
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
-{
- nni_posix_pollq *pq = node->pq;
- int oevents;
-
- NNI_ASSERT(pq != NULL);
-
- nni_mtx_lock(&pq->mtx);
- oevents = node->events;
- node->events |= events;
-
- // We move this to the armed list if its not armed, or already
- // on the polled list. The polled list would be the case where
- // the index is set to a positive value.
- if ((oevents == 0) && (events != 0) && (node->index < 1)) {
- nni_list_node_remove(&node->node);
- nni_list_append(&pq->armed, node);
- }
- if ((events != 0) && (oevents != events)) {
- // Possibly wake up poller since we're looking for new events.
- if (pq->inpoll) {
- nni_plat_pipe_raise(pq->wakewfd);
- }
- }
- nni_mtx_unlock(&pq->mtx);
-}
-
-static void
-nni_posix_pollq_destroy(nni_posix_pollq *pq)
-{
- if (pq->started) {
- nni_mtx_lock(&pq->mtx);
- pq->close = 1;
- pq->started = 0;
- nni_plat_pipe_raise(pq->wakewfd);
- nni_mtx_unlock(&pq->mtx);
- }
nni_thr_fini(&pq->thr);
-
- // All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_empty(&pq->polled));
- NNI_ASSERT(nni_list_empty(&pq->armed));
- NNI_ASSERT(nni_list_empty(&pq->idle));
- NNI_ASSERT(pq->nnodes == 0);
-
- if (pq->wakewfd >= 0) {
- nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
- pq->wakewfd = pq->wakerfd = -1;
- }
- if (pq->nfds != 0) {
- NNI_FREE_STRUCTS(pq->fds, pq->nfds);
- pq->fds = NULL;
- pq->nfds = 0;
- }
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
nni_mtx_fini(&pq->mtx);
}
@@ -340,50 +317,27 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
- NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node);
- NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node);
- pq->wakewfd = -1;
- pq->wakerfd = -1;
- pq->close = 0;
-
- nni_mtx_init(&pq->mtx);
- nni_cv_init(&pq->cv, &pq->mtx);
+ NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
+ NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
+ pq->closing = false;
+ pq->closed = false;
- if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
- ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
- ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
- nni_posix_pollq_destroy(pq);
+ if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);
}
- pq->started = 1;
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ return (rv);
+ }
+ nni_mtx_init(&pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
-// We use a single pollq for the entire system, which means only a single
-// thread is polling. This may be somewhat less than optimally efficient,
-// and it may be worth investigating having multiple threads to improve
-// efficiency and scalability. (This would shorten the linked lists,
-// improving C10K scalability, and also allow us to engage multiple cores.)
-// It's not entirely clear how many threads are "optimal".
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
-{
- NNI_ARG_UNUSED(fd);
- // This is the point where we could choose a pollq based on FD.
- return (&nni_posix_global_pollq);
-}
-
int
nni_posix_pollq_sysinit(void)
{
- int rv;
-
- rv = nni_posix_pollq_create(&nni_posix_global_pollq);
- return (rv);
+ return (nni_posix_pollq_create(&nni_posix_global_pollq));
}
void
@@ -391,5 +345,3 @@ nni_posix_pollq_sysfini(void)
{
nni_posix_pollq_destroy(&nni_posix_global_pollq);
}
-
-#endif // NNG_USE_POSIX_POLLQ_POLL
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 7231cb27..63c1d1d1 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -1,7 +1,6 @@
//
// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Liam Staskawicz <liam@stask.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -12,7 +11,9 @@
#ifdef NNG_HAVE_PORT_CREATE
#include <errno.h>
+#include <fcntl.h>
#include <port.h>
+#include <sched.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h> /* for strerror() */
@@ -21,6 +22,9 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_pollq.h"
+#define NNI_MAX_PORTEV 64
+typedef struct nni_posix_pollq nni_posix_pollq;
+
// nni_posix_pollq is a work structure that manages state for the port-event
// based pollq implementation. We only really need to keep track of the
// single thread, and the associated port itself.
@@ -29,190 +33,178 @@ struct nni_posix_pollq {
nni_thr thr; // worker thread
};
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_mtx mtx;
+ nni_cv cv;
+ int events;
+ bool closed;
+ bool closing;
+ nni_posix_pfd_cb cb;
+ void * data;
+};
+
+// single global instance for now
+static nni_posix_pollq nni_posix_global_pollq;
+
int
-nni_posix_pollq_add(nni_posix_pollq_node *node)
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
{
nni_posix_pollq *pq;
+ nni_posix_pfd * pfd;
- pq = nni_posix_pollq_get(node->fd);
- if (pq == NULL) {
- return (NNG_EINVAL);
- }
+ pq = &nni_posix_global_pollq;
- nni_mtx_lock(&node->mx);
- // ensure node was not previously associated with a pollq
- if (node->pq != NULL) {
- nni_mtx_unlock(&node->mx);
- return (NNG_ESTATE);
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
}
-
- node->pq = pq;
- node->events = 0;
- node->armed = false;
- nni_mtx_unlock(&node->mx);
-
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pfd->mtx);
+ pfd->closed = false;
+ pfd->closing = false;
+ pfd->fd = fd;
+ pfd->pq = pq;
+ pfd->cb = NULL;
+ pfd->data = NULL;
+ *pfdp = pfd;
return (0);
}
-// nni_posix_pollq_remove removes the node from the pollq, but
-// does not ensure that the pollq node is safe to destroy. In particular,
-// this function can be called from a callback (the callback may be active).
-void
-nni_posix_pollq_remove(nni_posix_pollq_node *node)
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
+ return (pfd->fd);
+}
- if (pq == NULL) {
- return;
- }
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&node->mx);
- node->events = 0;
- if (node->armed) {
- // Failure modes that can occur are uninteresting.
- (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
- node->armed = false;
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ pfd->closing = true;
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);
}
- nni_mtx_unlock(&node->mx);
-}
+ nni_mtx_unlock(&pfd->mtx);
-// nni_posix_pollq_init merely ensures that the node is ready for use.
-// It does not register the node with any pollq in particular.
-int
-nni_posix_pollq_init(nni_posix_pollq_node *node)
-{
- nni_mtx_init(&node->mx);
- nni_cv_init(&node->cv, &node->mx);
- node->pq = NULL;
- node->armed = false;
- NNI_LIST_NODE_INIT(&node->node);
- return (0);
+ // Send the wake event to the poller to synchronize with it.
+ // Note that port_send should only really fail if out of memory
+ // or we run into a resource limit.
}
-// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
-// but it also ensures that the node is removed properly.
void
-nni_posix_pollq_fini(nni_posix_pollq_node *node)
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = node->pq;
+ nni_posix_pollq *pq = pfd->pq;
- nni_mtx_lock(&node->mx);
- if ((pq = node->pq) != NULL) {
- // Dissociate the port; if it isn't already associated we
- // don't care. (An extra syscall, but it should not matter.)
- (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd);
- node->armed = false;
+ nni_posix_pfd_close(pfd);
- for (;;) {
- if (port_send(pq->port, 0, node) == 0) {
- break;
- }
- switch (errno) {
- case EAGAIN:
- case ENOMEM:
- // Resource exhaustion.
- // Best bet in these cases is to sleep it off.
- // This may appear like a total application
- // hang, but by sleeping here maybe we give
- // a chance for things to clear up.
- nni_mtx_unlock(&node->mx);
- nni_msleep(5000);
- nni_mtx_lock(&node->mx);
- continue;
- case EBADFD:
- case EBADF:
- // Most likely these indicate that the pollq
- // itself has been closed. That's ok.
+ if (!nni_thr_is_self(&pq->thr)) {
+
+ while (port_send(pq->port, 1, pfd) != 0) {
+ if ((errno == EBADF) || (errno == EBADFD)) {
+ pfd->closed = true;
break;
}
+ sched_yield(); // try again later...
}
- // Wait for the pollq thread to tell us with certainty that
- // they are done. This is needed to ensure that the pollq
- // thread isn't executing (or about to execute) the callback
- // before we destroy it.
- while (node->pq != NULL) {
- nni_cv_wait(&node->cv);
+
+ nni_mtx_lock(&pfd->mtx);
+ while (!pfd->closed) {
+ nni_cv_wait(&pfd->cv);
}
+ nni_mtx_unlock(&pfd->mtx);
}
- nni_mtx_unlock(&node->mx);
- nni_cv_fini(&node->cv);
- nni_mtx_fini(&node->mx);
+
+ // We're exclusive now.
+ (void) close(pfd->fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
}
-void
-nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, int events)
{
- nni_posix_pollq *pq = node->pq;
-
- NNI_ASSERT(pq != NULL);
- if (events == 0) {
- return;
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_mtx_lock(&pfd->mtx);
+ if (!pfd->closing) {
+ pfd->events |= events;
+ if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd,
+ pfd->events, pfd) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
+ }
}
-
- nni_mtx_lock(&node->mx);
- node->events |= events;
- node->armed = true;
- (void) port_associate(
- pq->port, PORT_SOURCE_FD, node->fd, node->events, node);
-
- // Possible errors here are:
- //
- // EBADF -- programming error on our part
- // EBADFD -- programming error on our part
- // ENOMEM -- not much we can do here
- // EAGAIN -- too many port events registered (65K!!)
- //
- // For now we ignore them all. (We need to be able to return
- // errors to our caller.) Effect on the application will appear
- // to be a stalled file descriptor (no notifications).
- nni_mtx_unlock(&node->mx);
+ nni_mtx_unlock(&pfd->mtx);
+ return (0);
}
static void
nni_posix_poll_thr(void *arg)
{
-
for (;;) {
- nni_posix_pollq * pq = arg;
- port_event_t ev;
- nni_posix_pollq_node *node;
-
- if (port_get(pq->port, &ev, NULL) != 0) {
+ nni_posix_pollq *pq = arg;
+ port_event_t ev[NNI_MAX_PORTEV];
+ nni_posix_pfd * pfd;
+ int events;
+ nni_posix_pfd_cb cb;
+ void * arg;
+ unsigned n;
+
+ n = 1; // wake us even on just one event
+ if (port_getn(pq->port, ev, NNI_MAX_PORTEV, &n, NULL) != 0) {
if (errno == EINTR) {
continue;
}
return;
}
- switch (ev.portev_source) {
- case PORT_SOURCE_ALERT:
- return;
-
- case PORT_SOURCE_FD:
- node = ev.portev_user;
+ // We run through the returned ports twice. First we
+ // get the callbacks. Then we do the reaps. This way
+ // we ensure that we only reap *after* callbacks have run.
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source != PORT_SOURCE_FD) {
+ continue;
+ }
+ pfd = ev[i].portev_user;
+ events = ev[i].portev_events;
- nni_mtx_lock(&node->mx);
- node->revents = ev.portev_events;
- // mark events as cleared
- node->events &= ~node->revents;
- node->armed = false;
- nni_mtx_unlock(&node->mx);
+ nni_mtx_lock(&pfd->mtx);
+ cb = pfd->cb;
+ arg = pfd->data;
+ pfd->events &= ~events;
+ nni_mtx_unlock(&pfd->mtx);
- node->cb(node->data);
- break;
+ if (cb != NULL) {
+ cb(pfd, events, arg);
+ }
+ }
+ for (unsigned i = 0; i < n; i++) {
+ if (ev[i].portev_source != PORT_SOURCE_USER) {
+ continue;
+ }
- case PORT_SOURCE_USER:
// User event telling us to stop doing things.
- // We signal back to use this as a coordination event
- // between the pollq and the thread handler.
- // NOTE: It is absolutely critical that there is only
- // a single thread per pollq. Otherwise we cannot
- // be sure that we are blocked completely,
- node = ev.portev_user;
- nni_mtx_lock(&node->mx);
- node->pq = NULL;
- nni_cv_wake(&node->cv);
- nni_mtx_unlock(&node->mx);
+ // We signal back to use this as a coordination
+ // event between the pollq and the thread
+ // handler. NOTE: It is absolutely critical
+ // that there is only a single thread per
+ // pollq. Otherwise we cannot be sure that we
+ // are blocked completely,
+ pfd = ev[i].portev_user;
+ nni_mtx_lock(&pfd->mtx);
+ pfd->closed = true;
+ nni_cv_wake(&pfd->cv);
+ nni_mtx_unlock(&pfd->mtx);
}
}
}
@@ -220,7 +212,6 @@ nni_posix_poll_thr(void *arg)
static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
- port_alert(pq->port, PORT_ALERT_SET, 1, NULL);
(void) close(pq->port);
nni_thr_fini(&pq->thr);
}
@@ -243,14 +234,15 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
-// single global instance for now
-static nni_posix_pollq nni_posix_global_pollq;
-
-nni_posix_pollq *
-nni_posix_pollq_get(int fd)
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
{
- NNI_ARG_UNUSED(fd);
- return (&nni_posix_global_pollq);
+ NNI_ASSERT(cb != NULL); // must not be null when established.
+
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->data = arg;
+ nni_mtx_unlock(&pfd->mtx);
}
int
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index d86a2008..96d8debd 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -37,50 +37,51 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_posix_resolv_tq = NULL;
-static nni_mtx nni_posix_resolv_mtx;
-
-typedef struct nni_posix_resolv_item nni_posix_resolv_item;
-struct nni_posix_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *resolv_tq = NULL;
+static nni_mtx resolv_mtx;
+
+typedef struct resolv_item resolv_item;
+struct resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv)
+resolv_finish(resolv_item *item, int rv)
{
nni_aio *aio;
- if ((aio = item->aio) != NULL) {
- if (nni_aio_get_prov_data(aio) == item) {
- nni_aio_set_prov_data(aio, NULL);
- item->aio = NULL;
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
- }
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ item->aio = NULL;
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
}
}
static void
-nni_posix_resolv_cancel(nni_aio *aio, int rv)
+resolv_cancel(nni_aio *aio, int rv)
{
- nni_posix_resolv_item *item;
+ resolv_item *item;
- nni_mtx_lock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
item->aio = NULL;
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ nni_mtx_unlock(&resolv_mtx);
nni_aio_finish_error(aio, rv);
}
@@ -116,14 +117,23 @@ nni_posix_gai_errno(int rv)
}
static void
-nni_posix_resolv_task(void *arg)
+resolv_task(void *arg)
{
- nni_posix_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ resolv_item * item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
+
+ nni_mtx_lock(&resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&resolv_mtx);
results = NULL;
@@ -170,7 +180,7 @@ nni_posix_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nng_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nng_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -196,17 +206,18 @@ done:
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_posix_resolv_mtx);
- nni_posix_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
+ resolv_finish(item, rv);
+ nni_mtx_unlock(&resolv_mtx);
}
static void
-nni_posix_resolv_ip(const char *host, const char *serv, int passive,
- int family, int proto, nni_aio *aio)
+resolv_ip(const char *host, const char *serv, int passive, int family,
+ int proto, nni_aio *aio)
{
- nni_posix_resolv_item *item;
- sa_family_t fam;
+ resolv_item *item;
+ sa_family_t fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -231,10 +242,15 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
return;
}
- nni_task_init(
- nni_posix_resolv_tq, &item->task, nni_posix_resolv_task, item);
+ if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
+ 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ };
// NB: host and serv must remain valid until this is completed.
+ memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
item->name = host;
item->serv = serv;
@@ -242,24 +258,30 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_posix_resolv_mtx);
- nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_posix_resolv_mtx);
+ nni_mtx_lock(&resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -267,10 +289,10 @@ nni_posix_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_posix_resolv_mtx);
+ nni_mtx_init(&resolv_mtx);
- if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_posix_resolv_mtx);
+ if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&resolv_mtx);
return (rv);
}
return (0);
@@ -279,11 +301,11 @@ nni_posix_resolv_sysinit(void)
void
nni_posix_resolv_sysfini(void)
{
- if (nni_posix_resolv_tq != NULL) {
- nni_taskq_fini(nni_posix_resolv_tq);
- nni_posix_resolv_tq = NULL;
+ if (resolv_tq != NULL) {
+ nni_taskq_fini(resolv_tq);
+ resolv_tq = NULL;
}
- nni_mtx_fini(&nni_posix_resolv_mtx);
+ nni_mtx_fini(&resolv_mtx);
}
#endif // NNG_USE_POSIX_RESOLV_GAI
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index cca8165c..0a521513 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -422,6 +422,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (pthread_self() == thr->tid);
+}
+
void
nni_atfork_child(void)
{
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 654d31e3..759bdb96 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -36,24 +36,29 @@
#endif
struct nni_plat_udp {
- nni_posix_pollq_node udp_pitem;
- int udp_fd;
- nni_list udp_recvq;
- nni_list udp_sendq;
- nni_mtx udp_mtx;
+ nni_posix_pfd *udp_pfd;
+ int udp_fd;
+ nni_list udp_recvq;
+ nni_list udp_sendq;
+ nni_mtx udp_mtx;
};
static void
-nni_posix_udp_doclose(nni_plat_udp *udp)
+nni_posix_udp_doerror(nni_plat_udp *udp, int rv)
{
nni_aio *aio;
while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) ||
((aio = nni_list_first(&udp->udp_sendq)) != NULL)) {
nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_aio_finish_error(aio, rv);
}
- // Underlying socket left open until close API called.
+}
+
+static void
+nni_posix_udp_doclose(nni_plat_udp *udp)
+{
+ nni_posix_udp_doerror(udp, NNG_ECLOSED);
}
static void
@@ -169,23 +174,22 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
// This function is called by the poller on activity on the FD.
static void
-nni_posix_udp_cb(void *arg)
+nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_plat_udp *udp = arg;
- int revents;
+ NNI_ASSERT(pfd == udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
- revents = udp->udp_pitem.revents;
- if (revents & POLLIN) {
+ if (events & POLLIN) {
nni_posix_udp_dorecv(udp);
}
- if (revents & POLLOUT) {
+ if (events & POLLOUT) {
nni_posix_udp_dosend(udp);
}
- if (revents & (POLLHUP | POLLERR | POLLNVAL)) {
+ if (events & (POLLHUP | POLLERR | POLLNVAL)) {
nni_posix_udp_doclose(udp);
} else {
- int events = 0;
+ events = 0;
if (!nni_list_empty(&udp->udp_sendq)) {
events |= POLLOUT;
}
@@ -193,7 +197,11 @@ nni_posix_udp_cb(void *arg)
events |= POLLIN;
}
if (events) {
- nni_posix_pollq_arm(&udp->udp_pitem, events);
+ int rv;
+ rv = nni_posix_pfd_arm(udp->udp_pfd, events);
+ if (rv != 0) {
+ nni_posix_udp_doerror(udp, rv);
+ }
}
}
nni_mtx_unlock(&udp->udp_mtx);
@@ -232,22 +240,16 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
NNI_FREE_STRUCT(udp);
return (rv);
}
- udp->udp_pitem.fd = udp->udp_fd;
- udp->udp_pitem.cb = nni_posix_udp_cb;
- udp->udp_pitem.data = udp;
-
- (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK);
-
- nni_aio_list_init(&udp->udp_recvq);
- nni_aio_list_init(&udp->udp_sendq);
-
- if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) ||
- ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) {
+ if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) {
(void) close(udp->udp_fd);
nni_mtx_fini(&udp->udp_mtx);
NNI_FREE_STRUCT(udp);
return (rv);
}
+ nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp);
+
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
*upp = udp;
return (0);
@@ -256,8 +258,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
void
nni_plat_udp_close(nni_plat_udp *udp)
{
- // We're no longer interested in events.
- nni_posix_pollq_fini(&udp->udp_pitem);
+ nni_posix_pfd_fini(udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
@@ -284,26 +285,46 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&udp->udp_mtx);
- nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_list_first(&udp->udp_recvq) == aio) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ }
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&udp->udp_mtx);
- nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) {
+ nni_mtx_unlock(&udp->udp_mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_list_first(&udp->udp_sendq) == aio) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ }
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index b3c4738f..0bd12b24 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -34,6 +34,7 @@ struct nni_plat_thr {
void (*func)(void *);
void * arg;
HANDLE handle;
+ DWORD id;
};
struct nni_plat_mtx {
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index a3ae3748..5ead9cbc 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -155,11 +155,16 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&evt->mtx);
- nni_aio_schedule(aio, nni_win_event_cancel, evt);
+ if ((rv = nni_aio_schedule(aio, nni_win_event_cancel, evt)) != 0) {
+ nni_mtx_unlock(&evt->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index c6376cc7..169a2e00 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -572,17 +572,22 @@ void
nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&w->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) {
+ nni_mtx_unlock(&w->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
- nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 070bf40d..2bc68c32 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -29,49 +29,55 @@
#define NNG_WIN_RESOLV_CONCURRENCY 4
#endif
-static nni_taskq *nni_win_resolv_tq = NULL;
-static nni_mtx nni_win_resolv_mtx;
-
-typedef struct nni_win_resolv_item nni_win_resolv_item;
-struct nni_win_resolv_item {
- int family;
- int passive;
- const char *name;
- const char *serv;
- int proto;
- nni_aio * aio;
- nni_task task;
+static nni_taskq *win_resolv_tq = NULL;
+static nni_mtx win_resolv_mtx;
+
+typedef struct win_resolv_item win_resolv_item;
+struct win_resolv_item {
+ int family;
+ int passive;
+ const char * name;
+ const char * serv;
+ int proto;
+ nni_aio * aio;
+ nni_task * task;
+ nng_sockaddr sa;
};
static void
-nni_win_resolv_finish(nni_win_resolv_item *item, int rv)
+win_resolv_finish(win_resolv_item *item, int rv)
{
- nni_aio *aio = item->aio;
-
- nni_aio_set_prov_data(aio, NULL);
- nni_aio_finish(aio, rv, 0);
- NNI_FREE_STRUCT(item);
+ nni_aio *aio;
+
+ if (((aio = item->aio) != NULL) &&
+ (nni_aio_get_prov_data(aio) == item)) {
+ nni_sockaddr *sa = nni_aio_get_input(aio, 0);
+ nni_aio_set_prov_data(aio, NULL);
+ memcpy(sa, &item->sa, sizeof(*sa));
+ nni_aio_finish(aio, rv, 0);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ }
}
static void
-nni_win_resolv_cancel(nni_aio *aio, int rv)
+win_resolv_cancel(nni_aio *aio, int rv)
{
- nni_win_resolv_item *item;
+ win_resolv_item *item;
- nni_mtx_lock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_unlock(&win_resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
- nni_mtx_unlock(&nni_win_resolv_mtx);
- nni_task_cancel(&item->task);
- NNI_FREE_STRUCT(item);
+ item->aio = NULL;
+ nni_mtx_unlock(&win_resolv_mtx);
nni_aio_finish_error(aio, rv);
}
static int
-nni_win_gai_errno(int rv)
+win_gai_errno(int rv)
{
switch (rv) {
case 0:
@@ -98,17 +104,26 @@ nni_win_gai_errno(int rv)
}
static void
-nni_win_resolv_task(void *arg)
+win_resolv_task(void *arg)
{
- nni_win_resolv_item *item = arg;
- nni_aio * aio = item->aio;
- struct addrinfo hints;
- struct addrinfo * results;
- struct addrinfo * probe;
- int rv;
+ win_resolv_item *item = arg;
+ struct addrinfo hints;
+ struct addrinfo *results;
+ struct addrinfo *probe;
+ int rv;
results = NULL;
+ nni_mtx_lock(&win_resolv_mtx);
+ if (item->aio == NULL) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ // Caller canceled, and no longer cares about this.
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ return;
+ }
+ nni_mtx_unlock(&win_resolv_mtx);
+
// We treat these all as IP addresses. The service and the
// host part are split.
memset(&hints, 0, sizeof(hints));
@@ -124,7 +139,7 @@ nni_win_resolv_task(void *arg)
rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
- rv = nni_win_gai_errno(rv);
+ rv = win_gai_errno(rv);
goto done;
}
@@ -142,7 +157,7 @@ nni_win_resolv_task(void *arg)
if (probe != NULL) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nni_sockaddr * sa = nni_aio_get_input(aio, 0);
+ nni_sockaddr * sa = &item->sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
@@ -167,17 +182,18 @@ done:
if (results != NULL) {
freeaddrinfo(results);
}
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_win_resolv_finish(item, rv);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ win_resolv_finish(item, rv);
+ nni_mtx_unlock(&win_resolv_mtx);
}
static void
-nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
+win_resolv_ip(const char *host, const char *serv, int passive, int family,
int proto, nni_aio *aio)
{
- nni_win_resolv_item *item;
- int fam;
+ win_resolv_item *item;
+ int fam;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -202,8 +218,12 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}
- nni_task_init(
- nni_win_resolv_tq, &item->task, nni_win_resolv_task, item);
+ rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
item->passive = passive;
item->name = host;
@@ -212,24 +232,30 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->aio = aio;
item->family = fam;
- nni_mtx_lock(&nni_win_resolv_mtx);
- nni_aio_schedule(aio, nni_win_resolv_cancel, item);
- nni_task_dispatch(&item->task);
- nni_mtx_unlock(&nni_win_resolv_mtx);
+ nni_mtx_lock(&win_resolv_mtx);
+ if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) {
+ nni_mtx_unlock(&win_resolv_mtx);
+ nni_task_fini(item->task);
+ NNI_FREE_STRUCT(item);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_task_dispatch(item->task);
+ nni_mtx_unlock(&win_resolv_mtx);
}
void
nni_plat_tcp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio);
}
void
nni_plat_udp_resolv(
const char *host, const char *serv, int family, int passive, nni_aio *aio)
{
- nni_win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
+ win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}
int
@@ -237,10 +263,10 @@ nni_win_resolv_sysinit(void)
{
int rv;
- nni_mtx_init(&nni_win_resolv_mtx);
+ nni_mtx_init(&win_resolv_mtx);
- if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) {
- nni_mtx_fini(&nni_win_resolv_mtx);
+ if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) {
+ nni_mtx_fini(&win_resolv_mtx);
return (rv);
}
return (0);
@@ -249,11 +275,11 @@ nni_win_resolv_sysinit(void)
void
nni_win_resolv_sysfini(void)
{
- if (nni_win_resolv_tq != NULL) {
- nni_taskq_fini(nni_win_resolv_tq);
- nni_win_resolv_tq = NULL;
+ if (win_resolv_tq != NULL) {
+ nni_taskq_fini(win_resolv_tq);
+ win_resolv_tq = NULL;
}
- nni_mtx_fini(&nni_win_resolv_mtx);
+ nni_mtx_fini(&win_resolv_mtx);
}
#endif // NNG_PLATFORM_WINDOWS
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index a2ae72fe..2e9d58d7 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -107,6 +107,7 @@ static unsigned int __stdcall nni_plat_thr_main(void *arg)
{
nni_plat_thr *thr = arg;
+ thr->id = GetCurrentThreadId();
thr->func(thr->arg);
return (0);
}
@@ -138,6 +139,12 @@ nni_plat_thr_fini(nni_plat_thr *thr)
}
}
+bool
+nni_plat_thr_is_self(nni_plat_thr *thr)
+{
+ return (GetCurrentThreadId() == thr->id);
+}
+
static LONG plat_inited = 0;
int
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 019727be..965cbea7 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -216,8 +216,7 @@ rep0_ctx_send(void *arg, nni_aio *aio)
return;
}
- rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
- if (rv != 0) {
+ if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, rv);
return;
@@ -354,6 +353,9 @@ rep0_pipe_stop(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+
nni_mtx_lock(&s->lk);
if (nni_list_active(&s->recvpipes, p)) {
// We are no longer "receivable".
@@ -379,9 +381,6 @@ rep0_pipe_stop(void *arg)
}
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
nni_mtx_unlock(&s->lk);
-
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
}
static void
@@ -459,8 +458,7 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&s->lk);
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
int rv;
- rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx);
- if (rv != 0) {
+ if ((rv = nni_aio_schedule(aio, rep0_cancel_recv, ctx)) != 0) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, rv);
return;
@@ -516,10 +514,10 @@ rep0_pipe_recv_cb(void *arg)
bool end = false;
if (hops > s->ttl) {
- // This isn't malformed, but it has gone through
- // too many hops. Do not disconnect, because we
- // can legitimately receive messages with too many
- // hops from devices, etc.
+ // This isn't malformed, but it has gone
+ // through too many hops. Do not disconnect,
+ // because we can legitimately receive messages
+ // with too many hops from devices, etc.
goto drop;
}
hops++;
@@ -566,9 +564,9 @@ rep0_pipe_recv_cb(void *arg)
nni_msg_header_clear(msg);
ctx->pipe_id = p->id;
- // If we got a request on a pipe that wasn't busy, we should mark
- // it sendable. (The sendable flag is not set when there is no
- // request needing a reply.)
+ // If we got a request on a pipe that wasn't busy, we should
+ // mark it sendable. (The sendable flag is not set when there
+ // is no request needing a reply.)
if ((ctx == s->ctx) && (!p->busy)) {
nni_pollable_raise(s->sendable);
}
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 3ecc8604..bbb0b886 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -638,7 +638,7 @@ req0_ctx_recv(void *arg, nni_aio *aio)
if ((msg = ctx->repmsg) == NULL) {
int rv;
- rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_recv, ctx);
+ rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx);
if (rv != 0) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, rv);
@@ -740,7 +740,7 @@ req0_ctx_send(void *arg, nni_aio *aio)
}
// If no pipes are ready, and the request was a poll (no background
// schedule), then fail it. Should be NNG_TIMEDOUT.
- rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_send, ctx);
+ rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx);
if ((rv != 0) && (nni_list_empty(&s->readypipes))) {
nni_idhash_remove(s->reqids, id);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 60cf188a..e553f6ce 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -210,8 +210,7 @@ resp0_ctx_send(void *arg, nni_aio *aio)
return;
}
- if ((rv = nni_aio_schedule_verify(aio, resp0_ctx_cancel_send, ctx)) !=
- 0) {
+ if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -450,7 +449,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_lock(&s->mtx);
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
int rv;
- rv = nni_aio_schedule_verify(aio, resp0_cancel_recv, ctx);
+ rv = nni_aio_schedule(aio, resp0_cancel_recv, ctx);
if (rv != 0) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, rv);
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index a2427009..aba72715 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -241,11 +241,16 @@ http_connect_cancel(nni_aio *aio, int rv)
void
nni_http_client_connect(nni_http_client *c, nni_aio *aio)
{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- nni_aio_schedule(aio, http_connect_cancel, aio);
+ if ((rv = nni_aio_schedule(aio, http_connect_cancel, aio)) != 0) {
+ nni_mtx_unlock(&c->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&c->aios, aio);
if (nni_list_first(&c->aios) == aio) {
http_conn_start(c);
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index 0de40e10..f3b16370 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -365,6 +365,8 @@ http_rd_cancel(nni_aio *aio, int rv)
static void
http_rd_submit(nni_http_conn *conn, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -372,7 +374,10 @@ http_rd_submit(nni_http_conn *conn, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_schedule(aio, http_rd_cancel, conn);
+ if ((rv = nni_aio_schedule(aio, http_rd_cancel, conn)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&conn->rdq, aio);
if (conn->rd_uaio == NULL) {
http_rd_start(conn);
@@ -479,6 +484,8 @@ http_wr_cancel(nni_aio *aio, int rv)
static void
http_wr_submit(nni_http_conn *conn, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -486,8 +493,12 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if ((rv = nni_aio_schedule(aio, http_wr_cancel, conn)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&conn->wrq, aio);
- nni_aio_schedule(aio, http_wr_cancel, conn);
+
if (conn->wr_uaio == NULL) {
http_wr_start(conn);
}
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index 5246e06c..f721d4aa 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -566,6 +566,8 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len)
void
nni_tls_send(nni_tls *tp, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -575,7 +577,11 @@ nni_tls_send(nni_tls *tp, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_schedule(aio, nni_tls_cancel, tp);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) {
+ nni_mtx_unlock(&tp->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&tp->sends, aio);
nni_tls_do_send(tp);
nni_mtx_unlock(&tp->lk);
@@ -584,6 +590,8 @@ nni_tls_send(nni_tls *tp, nni_aio *aio)
void
nni_tls_recv(nni_tls *tp, nni_aio *aio)
{
+ int rv;
+
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -593,7 +601,12 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_schedule(aio, nni_tls_cancel, tp);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) {
+ nni_mtx_unlock(&tp->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
nni_list_append(&tp->recvs, aio);
nni_tls_do_recv(tp);
nni_mtx_unlock(&tp->lk);
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index e562b209..9a3f5519 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -667,8 +667,12 @@ ws_send_close(nni_ws *ws, uint16_t code)
return;
}
// Close frames get priority!
+ if ((rv = nni_aio_schedule(aio, ws_cancel_close, ws)) != 0) {
+ ws->wclose = false;
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_prepend(&ws->txmsgs, wm);
- nni_aio_schedule(aio, ws_cancel_close, ws);
ws_start_write(ws);
}
@@ -737,7 +741,12 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
ws_msg_fini(wm);
return;
}
- nni_aio_schedule(aio, ws_write_cancel, ws);
+ if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ nni_aio_finish_error(aio, rv);
+ ws_msg_fini(wm);
+ return;
+ }
nni_aio_set_prov_extra(aio, 0, wm);
nni_list_append(&ws->sendq, aio);
nni_list_append(&ws->txmsgs, wm);
@@ -1060,7 +1069,12 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
return;
}
nni_mtx_lock(&ws->mtx);
- nni_aio_schedule(aio, ws_read_cancel, ws);
+ if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) {
+ nni_mtx_unlock(&ws->mtx);
+ ws_msg_fini(wm);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_set_prov_extra(aio, 0, wm);
nni_list_append(&ws->recvq, aio);
nni_list_append(&ws->rxmsgs, wm);
@@ -1647,6 +1661,7 @@ void
nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
{
nni_ws *ws;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -1669,7 +1684,11 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
nni_aio_finish(aio, 0, 0);
return;
}
- nni_aio_schedule(aio, ws_accept_cancel, l);
+ if ((rv = nni_aio_schedule(aio, ws_accept_cancel, l)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&l->mtx);
+ return;
+ }
nni_list_append(&l->aios, aio);
nni_mtx_unlock(&l->mtx);
}
@@ -1995,11 +2014,16 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
ws_fini(ws);
return;
}
+ if ((rv = nni_aio_schedule(aio, ws_dial_cancel, ws)) != 0) {
+ nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, rv);
+ ws_fini(ws);
+ return;
+ }
ws->dialer = d;
ws->useraio = aio;
ws->mode = NNI_EP_MODE_DIAL;
nni_list_append(&d->wspend, ws);
- nni_aio_schedule(aio, ws_dial_cancel, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 8bfb097e..0f159d3a 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,6 +349,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -375,7 +376,11 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
// We don't have to worry about the case where a zero timeout
// on connect was specified, as there is no option to specify
// that in the upper API.
- nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -407,6 +412,7 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -416,7 +422,11 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
// We need not worry about the case where a non-blocking
// accept was tried -- there is no API to do such a thing.
- nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index c91606c6..2347e24c 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,12 +418,17 @@ static void
nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
- nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,14 +479,18 @@ static void
nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
- // Transports never have a zero length timeout.
- (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -496,11 +505,17 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
nni_ipc_pipe *pipe = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&pipe->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_start, pipe)) != 0) {
+ nni_mtx_unlock(&pipe->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
pipe->txhead[2] = 'P';
@@ -517,7 +532,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -728,6 +742,7 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -735,7 +750,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -746,6 +765,7 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -753,7 +773,11 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 3d738a98..f2cdf8ac 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -409,12 +409,17 @@ static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_tx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -465,12 +470,18 @@ static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_rx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -535,11 +546,17 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_tcp_pipe *p = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_nego, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->txlen[0] = 0;
p->txlen[1] = 'S';
p->txlen[2] = 'P';
@@ -556,7 +573,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -749,6 +765,7 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -756,7 +773,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -767,6 +788,7 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -774,7 +796,11 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 78fdd622..c863a85e 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -416,12 +416,17 @@ static void
nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, nni_tls_cancel_tx, p);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_tx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -472,13 +477,18 @@ static void
nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_rx, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
- nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -542,11 +552,17 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_tls_pipe *p = arg;
nni_aio * negaio;
nni_iov iov;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_nego, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->txlen[0] = 0;
p->txlen[1] = 'S';
p->txlen[2] = 'P';
@@ -563,7 +579,6 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -769,13 +784,18 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
@@ -785,13 +805,17 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
+ if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ }
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 9d967668..12f3aeb5 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -128,12 +128,17 @@ static void
ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
+ if ((rv = nni_aio_schedule(aio, ws_pipe_recv_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->user_rxaio = aio;
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
@@ -158,12 +163,17 @@ static void
ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
- nni_aio_schedule(aio, ws_pipe_send_cancel, p);
+ if ((rv = nni_aio_schedule(aio, ws_pipe_send_cancel, p)) != 0) {
+ nni_mtx_unlock(&p->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -289,6 +299,7 @@ static void
ws_ep_accept(void *arg, nni_aio *aio)
{
ws_ep *ep = arg;
+ int rv;
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
@@ -297,7 +308,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&ep->mtx);
- nni_aio_schedule(aio, ws_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -309,6 +324,7 @@ static void
ws_ep_connect(void *arg, nni_aio *aio)
{
ws_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -326,9 +342,12 @@ ws_ep_connect(void *arg, nni_aio *aio)
}
nni_mtx_lock(&ep->mtx);
+ if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
NNI_ASSERT(nni_list_empty(&ep->aios));
-
- nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 54e402be..f7139b94 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1902,6 +1902,7 @@ static void
zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -1912,7 +1913,11 @@ zt_pipe_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ if ((rv = nni_aio_schedule(aio, zt_pipe_cancel_recv, p)) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->zp_user_rxaio = aio;
zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
@@ -2049,7 +2054,13 @@ zt_pipe_ping_cb(void *arg)
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
if (nni_aio_begin(aio) == 0) {
- nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
+ int rv;
+ rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
+ if (rv != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
p->zp_ping_try++;
@@ -2081,10 +2092,15 @@ zt_pipe_start(void *arg, nni_aio *aio)
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
if (nni_aio_begin(p->zp_ping_aio) == 0) {
- nni_aio_schedule(
+ int rv;
+ rv = nni_aio_schedule(
p->zp_ping_aio, zt_pipe_cancel_ping, p);
- p->zp_ping_active = 1;
- zt_pipe_send_ping(p);
+ if (rv != 0) {
+ nni_aio_finish_error(p->zp_ping_aio, rv);
+ } else {
+ p->zp_ping_active = 1;
+ zt_pipe_send_ping(p);
+ }
}
}
nni_aio_finish(aio, 0, 0);
@@ -2405,12 +2421,17 @@ static void
zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&zt_lk);
- nni_aio_schedule(aio, zt_ep_cancel, ep);
+ if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&ep->ze_aios, aio);
zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
@@ -2486,10 +2507,14 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
if (nni_aio_begin(aio) == 0) {
- nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
- ep->ze_creq_active = 1;
- ep->ze_creq_try++;
- zt_ep_send_conn_req(ep);
+ rv = nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
+ if (rv != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ ep->ze_creq_active = 1;
+ ep->ze_creq_try++;
+ zt_ep_send_conn_req(ep);
+ }
}
}
@@ -2523,18 +2548,27 @@ zt_ep_connect(void *arg, nni_aio *aio)
if ((ep->ze_raddr >> 24) == 0) {
ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
}
+ if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
nni_aio_list_append(&ep->ze_aios, aio);
ep->ze_running = 1;
- nni_aio_schedule(aio, zt_ep_cancel, ep);
nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
if (nni_aio_begin(ep->ze_creq_aio) == 0) {
- nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
+ rv = nni_aio_schedule(
+ ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ if (rv != 0) {
+ nni_aio_finish_error(ep->ze_creq_aio, rv);
+ } else {
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
+ }
}
nni_mtx_unlock(&zt_lk);
}