diff options
40 files changed, 1995 insertions, 1736 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 96747bc3..2db46c60 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -104,24 +104,24 @@ if (NNG_PLATFORM_POSIX) platform/posix/posix_thread.c platform/posix/posix_udp.c ) -endif() -if (NNG_HAVE_PORT_CREATE) - set (NNG_SOURCES ${NNG_SOURCES} - platform/posix/posix_pollq_port.c - ) -elseif (NNG_HAVE_KQUEUE) - set (NNG_SOURCES ${NNG_SOURCES} - platform/posix/posix_pollq_kqueue.c - ) -elseif (NNG_HAVE_EPOLL) - set (NNG_SOURCES ${NNG_SOURCES} - platform/posix/posix_pollq_epoll.c - ) -else() - set (NNG_SOURCES ${NNG_SOURCES} - platform/posix/posix_pollq_poll.c - ) + if (NNG_HAVE_PORT_CREATE) + set (NNG_SOURCES ${NNG_SOURCES} + platform/posix/posix_pollq_port.c + ) + elseif (NNG_HAVE_KQUEUE) + set (NNG_SOURCES ${NNG_SOURCES} + platform/posix/posix_pollq_kqueue.c + ) + elseif (NNG_HAVE_EPOLL) + set (NNG_SOURCES ${NNG_SOURCES} + platform/posix/posix_pollq_epoll.c + ) + else() + set (NNG_SOURCES ${NNG_SOURCES} + platform/posix/posix_pollq_poll.c + ) + endif() endif() if (NNG_PLATFORM_WINDOWS) diff --git a/src/core/aio.c b/src/core/aio.c index 3606aa14..a5b6c088 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -17,6 +17,7 @@ static nni_cv nni_aio_expire_cv; static int nni_aio_expire_run; static nni_thr nni_aio_expire_thr; static nni_list nni_aio_expire_aios; +static nni_aio *nni_aio_expire_aio; // Design notes. // @@ -62,16 +63,11 @@ struct nng_aio { nni_duration a_timeout; // Relative timeout // These fields are private to the aio framework. - nni_cv a_cv; - bool a_fini : 1; // shutting down (no new operations) - bool a_done : 1; // operation has completed - bool a_pend : 1; // completion routine pending - bool a_active : 1; // aio was started - bool a_expiring : 1; // expiration callback in progress - bool a_waiting : 1; // a thread is waiting for this to finish - bool a_synch : 1; // run completion synchronously - bool a_sleep : 1; // sleeping with no action - nni_task a_task; + bool a_stop; // shutting down (no new operations) + bool a_sleep; // sleeping with no action + int a_sleeprv; // result when sleep wakes + int a_cancelrv; // if canceled between begin and schedule + nni_task *a_task; // Read/write operations. nni_iov *a_iov; @@ -109,12 +105,16 @@ int nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) { nni_aio *aio; + int rv; if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) { return (NNG_ENOMEM); } memset(aio, 0, sizeof(*aio)); - nni_cv_init(&aio->a_cv, &nni_aio_lk); + if ((rv = nni_task_init(&aio->a_task, NULL, cb, arg)) != 0) { + NNI_FREE_STRUCT(aio); + return (rv); + } aio->a_expire = NNI_TIME_NEVER; aio->a_timeout = NNG_DURATION_INFINITE; aio->a_iov = aio->a_iovinl; @@ -122,7 +122,6 @@ nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg) if (arg == NULL) { arg = aio; } - nni_task_init(NULL, &aio->a_task, cb, arg); *aiop = aio; return (0); } @@ -133,9 +132,19 @@ nni_aio_fini(nni_aio *aio) if (aio != NULL) { nni_aio_stop(aio); - // At this point the AIO is done. - nni_cv_fini(&aio->a_cv); + // Wait for the aio to be "done"; this ensures that we don't + // destroy an aio from a "normal" completion callback while + // the expiration thread is working. + + nni_mtx_lock(&nni_aio_lk); + while (nni_aio_expire_aio == aio) { + nni_cv_wait(&nni_aio_expire_cv); + } + nni_mtx_unlock(&nni_aio_lk); + nni_task_fini(aio->a_task); + + // At this point the AIO is done. if (aio->a_niovalloc > 0) { NNI_FREE_STRUCTS(aio->a_iovalloc, aio->a_niovalloc); } @@ -186,7 +195,7 @@ nni_aio_stop(nni_aio *aio) { if (aio != NULL) { nni_mtx_lock(&nni_aio_lk); - aio->a_fini = true; + aio->a_stop = true; nni_mtx_unlock(&nni_aio_lk); nni_aio_abort(aio, NNG_ECANCELED); @@ -279,15 +288,7 @@ nni_aio_count(nni_aio *aio) void nni_aio_wait(nni_aio *aio) { - nni_mtx_lock(&nni_aio_lk); - // Wait until we're done, and the synchronous completion flag - // is cleared (meaning any synch completion is finished). - while ((aio->a_active) && ((!aio->a_done) || (aio->a_synch))) { - aio->a_waiting = true; - nni_cv_wait(&aio->a_cv); - } - nni_mtx_unlock(&nni_aio_lk); - nni_task_wait(&aio->a_task); + nni_task_wait(aio->a_task); } int @@ -295,35 +296,34 @@ nni_aio_begin(nni_aio *aio) { nni_mtx_lock(&nni_aio_lk); // We should not reschedule anything at this point. - if (aio->a_fini) { - aio->a_active = false; + if (aio->a_stop) { + nni_task_unprep(aio->a_task); aio->a_result = NNG_ECANCELED; nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } - aio->a_done = false; - aio->a_pend = false; aio->a_result = 0; aio->a_count = 0; aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - aio->a_active = true; + aio->a_cancelrv = 0; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } + nni_task_prep(aio->a_task); nni_mtx_unlock(&nni_aio_lk); return (0); } -void +int nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { + int rv; if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: - aio->a_expire = nni_clock(); - break; + return (NNG_ETIMEDOUT); case NNG_DURATION_INFINITE: case NNG_DURATION_DEFAULT: aio->a_expire = NNI_TIME_NEVER; @@ -335,22 +335,26 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } nni_mtx_lock(&nni_aio_lk); + if (aio->a_stop) { + nni_mtx_unlock(&nni_aio_lk); + return (NNG_ECANCELED); + } + if ((rv = aio->a_cancelrv) != 0) { + nni_mtx_unlock(&nni_aio_lk); + return (rv); + } + + // If cancellation occurred in between "begin" and "schedule", + // then cancel it right now. aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; - if (aio->a_expire != NNI_TIME_NEVER) { + if ((rv = aio->a_cancelrv) != 0) { + aio->a_expire = 0; + nni_aio_expire_add(aio); + } else if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } nni_mtx_unlock(&nni_aio_lk); -} - -int -nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) -{ - - if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) { - return (NNG_ETIMEDOUT); - } - nni_aio_schedule(aio, cancelfn, data); return (0); } @@ -379,11 +383,8 @@ nni_aio_finish_impl( { nni_mtx_lock(&nni_aio_lk); - NNI_ASSERT(!aio->a_pend); // provider only calls us *once* - nni_list_node_remove(&aio->a_expire_node); - aio->a_pend = true; aio->a_result = rv; aio->a_count = count; aio->a_prov_cancel = NULL; @@ -393,38 +394,13 @@ nni_aio_finish_impl( aio->a_expire = NNI_TIME_NEVER; aio->a_sleep = false; - - // If we are expiring, then we rely on the expiration thread to - // complete this; we must not because the expiration thread is - // still holding the reference. - - if (aio->a_expiring) { - nni_mtx_unlock(&nni_aio_lk); - return; - } - - aio->a_done = true; - aio->a_synch = synch; + nni_mtx_unlock(&nni_aio_lk); if (synch) { - if (aio->a_task.task_cb != NULL) { - nni_mtx_unlock(&nni_aio_lk); - aio->a_task.task_cb(aio->a_task.task_arg); - nni_mtx_lock(&nni_aio_lk); - } + nni_task_exec(aio->a_task); } else { - nni_task_dispatch(&aio->a_task); + nni_task_dispatch(aio->a_task); } - aio->a_synch = false; - - if (aio->a_waiting) { - aio->a_waiting = false; - nni_cv_wake(&aio->a_cv); - } - - // This has to be done with the lock still held, in order - // to prevent taskq wait from returning prematurely. - nni_mtx_unlock(&nni_aio_lk); } void @@ -510,25 +486,27 @@ nni_aio_expire_add(nni_aio *aio) static void nni_aio_expire_loop(void *arg) { - nni_list * aios = &nni_aio_expire_aios; - nni_aio * aio; - nni_time now; - nni_aio_cancelfn cancelfn; - int rv; + nni_list *aios = &nni_aio_expire_aios; NNI_ARG_UNUSED(arg); for (;;) { + nni_aio_cancelfn cancelfn; + nni_time now; + nni_aio * aio; + int rv; + now = nni_clock(); nni_mtx_lock(&nni_aio_lk); - if (nni_aio_expire_run == 0) { - nni_mtx_unlock(&nni_aio_lk); - return; - } - if ((aio = nni_list_first(aios)) == NULL) { + + if (nni_aio_expire_run == 0) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + nni_cv_wait(&nni_aio_expire_cv); nni_mtx_unlock(&nni_aio_lk); continue; @@ -544,38 +522,24 @@ nni_aio_expire_loop(void *arg) // This aio's time has come. Expire it, canceling any // outstanding I/O. nni_list_remove(aios, aio); + rv = aio->a_sleep ? aio->a_sleeprv : NNG_ETIMEDOUT; + + if ((cancelfn = aio->a_prov_cancel) != NULL) { + + // Place a temporary hold on the aio. This prevents it + // from being destroyed. + nni_aio_expire_aio = aio; - // Mark it as expiring. This acts as a hold on - // the aio, similar to the consumers. The actual taskq - // dispatch on completion won't occur until this is cleared, - // and the done flag won't be set either. - aio->a_expiring = true; - cancelfn = aio->a_prov_cancel; - rv = aio->a_sleep ? 0 : NNG_ETIMEDOUT; - aio->a_sleep = false; - - // Cancel any outstanding activity. This is always non-NULL - // for a valid aio, and becomes NULL only when an AIO is - // already being canceled or finished. - if (cancelfn != NULL) { + // We let the cancel function handle the completion. + // If there is no cancellation function, then we cannot + // terminate the aio - we've tried, but it has to run + // to it's natural conclusion. nni_mtx_unlock(&nni_aio_lk); cancelfn(aio, rv); nni_mtx_lock(&nni_aio_lk); - } else { - aio->a_pend = true; - aio->a_result = rv; - } - - NNI_ASSERT(aio->a_pend); // nni_aio_finish was run - NNI_ASSERT(aio->a_prov_cancel == NULL); - aio->a_expiring = false; - aio->a_done = true; - nni_task_dispatch(&aio->a_task); - - if (aio->a_waiting) { - aio->a_waiting = false; - nni_cv_wake(&aio->a_cv); + nni_aio_expire_aio = NULL; + nni_cv_wake(&nni_aio_expire_cv); } nni_mtx_unlock(&nni_aio_lk); } @@ -656,12 +620,31 @@ nni_aio_iov_advance(nni_aio *aio, size_t n) return (resid); // we might not have used all of n for this iov } +static void +nni_sleep_cancel(nng_aio *aio, int rv) +{ + nni_mtx_lock(&nni_aio_lk); + if (!aio->a_sleep) { + nni_mtx_unlock(&nni_aio_lk); + return; + } + + aio->a_sleep = false; + nni_list_node_remove(&aio->a_expire_node); + nni_mtx_unlock(&nni_aio_lk); + + nni_aio_finish_error(aio, rv); +} + void nni_sleep_aio(nng_duration ms, nng_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } + aio->a_sleeprv = 0; + aio->a_sleep = true; switch (aio->a_timeout) { case NNG_DURATION_DEFAULT: case NNG_DURATION_INFINITE: @@ -671,16 +654,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) // If the timeout on the aio is shorter than our sleep time, // then let it still wake up early, but with NNG_ETIMEDOUT. if (ms > aio->a_timeout) { - aio->a_sleep = false; - (void) nni_aio_schedule(aio, NULL, NULL); - return; + aio->a_sleeprv = NNG_ETIMEDOUT; + ms = aio->a_timeout; } } - aio->a_sleep = true; aio->a_expire = nni_clock() + ms; - // There is no cancellation, apart from just unexpiring. - nni_aio_schedule(aio, NULL, NULL); + if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { + nni_aio_finish_error(aio, rv); + } } void diff --git a/src/core/aio.h b/src/core/aio.h index 9b7ac46f..2ed0fb5b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -146,14 +146,13 @@ extern void nni_aio_bump_count(nni_aio *, size_t); // nni_aio_schedule indicates that the AIO has begun, and is scheduled for // asychronous completion. This also starts the expiration timer. Note that -// prior to this, the aio is uncancellable. -extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *); - -// nni_aio_schedule_verify is like nni_aio_schedule, except that if the -// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it -// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling -// if the operation could not be immediately completed. -extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *); +// prior to this, the aio is uncancellable. If the operation has a zero +// timeout (NNG_FLAG_NONBLOCK) then NNG_ETIMEDOUT is returned. If the +// operation has already been canceled, or should not be run, then an error +// is returned. (In that case the caller should probably either return an +// error to its caller, or possibly cause an asynchronous error by calling +// nni_aio_finish_error on this aio.) +extern int nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *); extern void nni_sleep_aio(nni_duration, nni_aio *); diff --git a/src/core/device.c b/src/core/device.c index e3b1d220..1f3bf233 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -187,12 +187,17 @@ void nni_device_start(nni_device_data *dd, nni_aio *user) { int i; + int rv; if (nni_aio_begin(user) != 0) { return; } nni_mtx_lock(&dd->mtx); - nni_aio_schedule(user, nni_device_cancel, dd); + if ((rv = nni_aio_schedule(user, nni_device_cancel, dd)) != 0) { + nni_mtx_unlock(&dd->mtx); + nni_aio_finish_error(user, rv); + return; + } dd->user = user; for (i = 0; i < dd->npath; i++) { nni_device_path *p = &dd->paths[i]; diff --git a/src/core/endpt.c b/src/core/endpt.c index 2741a8e6..7593fb42 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -27,7 +27,7 @@ struct nni_ep { int ep_closed; // full shutdown int ep_closing; // close pending (waiting on refcnt) int ep_refcnt; - int ep_tmo_run; + bool ep_tmo_run; nni_mtx ep_mtx; nni_cv ep_cv; nni_list ep_pipes; @@ -303,7 +303,7 @@ nni_ep_tmo_cancel(nni_aio *aio, int rv) if (ep->ep_tmo_run) { nni_aio_finish_error(aio, rv); } - ep->ep_tmo_run = 0; + ep->ep_tmo_run = false; nni_mtx_unlock(&ep->ep_mtx); } } @@ -312,6 +312,7 @@ static void nni_ep_tmo_start(nni_ep *ep) { nni_duration backoff; + int rv; if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) { return; @@ -333,8 +334,12 @@ nni_ep_tmo_start(nni_ep *ep) nni_aio_set_timeout( ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0)); - ep->ep_tmo_run = 1; - nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); + if ((rv = nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep)) != + 0) { + nni_aio_finish_error(ep->ep_tmo_aio, rv); + } + + ep->ep_tmo_run = true; } static void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 2367b57f..fa94e32f 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -345,7 +345,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) // If this is an instantaneous poll operation, and the queue has // no room, nobody is waiting to receive, then report NNG_ETIMEDOUT. - rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len >= mq->mq_cap) && (nni_list_empty(&mq->mq_aio_getq))) { nni_mtx_unlock(&mq->mq_lock); @@ -373,7 +373,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_aio_finish_error(aio, mq->mq_geterr); return; } - rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq); + rv = nni_aio_schedule(aio, nni_msgq_cancel, mq); if ((rv != 0) && (mq->mq_len == 0) && (nni_list_empty(&mq->mq_aio_putq))) { nni_mtx_unlock(&mq->mq_lock); diff --git a/src/core/platform.h b/src/core/platform.h index 6e7acdbf..bdc74349 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -136,11 +136,16 @@ extern int nni_plat_cv_until(nni_plat_cv *, nni_time); // immediately. extern int nni_plat_thr_init(nni_plat_thr *, void (*)(void *), void *); -// nni_thread_reap waits for the thread to exit, and then releases any +// nni_plat_thr_fini waits for the thread to exit, and then releases any // resources associated with the thread. After this returns, it // is an error to reference the thread in any further way. extern void nni_plat_thr_fini(nni_plat_thr *); +// nni_plat_thr_is_self returns true if the caller is the thread +// identified, and false otherwise. (This allows some deadlock +// prevention in callbacks, for example.) +extern bool nni_plat_thr_is_self(nni_plat_thr *); + // // Clock Support // diff --git a/src/core/taskq.c b/src/core/taskq.c index b0fe160b..526fa0b4 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -11,11 +11,22 @@ #include "core/nng_impl.h" typedef struct nni_taskq_thr nni_taskq_thr; +struct nni_task { + nni_list_node task_node; + void * task_arg; + nni_cb task_cb; + nni_taskq * task_tq; + bool task_sched; + bool task_run; + bool task_done; + bool task_exec; + bool task_fini; + nni_mtx task_mtx; + nni_cv task_cv; +}; struct nni_taskq_thr { nni_taskq *tqt_tq; nni_thr tqt_thread; - nni_task * tqt_running; - int tqt_wait; }; struct nni_taskq { nni_list tq_tasks; @@ -24,8 +35,7 @@ struct nni_taskq { nni_cv tq_wait_cv; nni_taskq_thr *tq_threads; int tq_nthreads; - int tq_run; - int tq_waiting; + bool tq_run; }; static nni_taskq *nni_taskq_systq = NULL; @@ -40,25 +50,37 @@ nni_taskq_thread(void *self) nni_mtx_lock(&tq->tq_mtx); for (;;) { if ((task = nni_list_first(&tq->tq_tasks)) != NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_run = true; + task->task_sched = false; + nni_mtx_unlock(&task->task_mtx); nni_list_remove(&tq->tq_tasks, task); - thr->tqt_running = task; nni_mtx_unlock(&tq->tq_mtx); + task->task_cb(task->task_arg); - nni_mtx_lock(&tq->tq_mtx); - thr->tqt_running = NULL; - if (thr->tqt_wait || tq->tq_waiting) { - thr->tqt_wait = 0; - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } + nni_mtx_lock(&task->task_mtx); + if (task->task_sched || task->task_exec) { + // task resubmitted itself most likely. + // We cannot touch the rest of the flags, + // since the called function has taken control. + nni_mtx_unlock(&task->task_mtx); + } else { + task->task_done = true; + nni_cv_wake(&task->task_cv); + + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); + } + } + nni_mtx_lock(&tq->tq_mtx); continue; } - if (tq->tq_waiting) { - tq->tq_waiting = 0; - nni_cv_wake(&tq->tq_wait_cv); - } if (!tq->tq_run) { break; } @@ -89,8 +111,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) for (int i = 0; i < nthr; i++) { int rv; - tq->tq_threads[i].tqt_tq = tq; - tq->tq_threads[i].tqt_running = NULL; + tq->tq_threads[i].tqt_tq = tq; rv = nni_thr_init(&tq->tq_threads[i].tqt_thread, nni_taskq_thread, &tq->tq_threads[i]); if (rv != 0) { @@ -98,7 +119,7 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (rv); } } - tq->tq_run = 1; + tq->tq_run = true; for (int i = 0; i < tq->tq_nthreads; i++) { nni_thr_run(&tq->tq_threads[i].tqt_thread); } @@ -106,53 +127,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr) return (0); } -static void -nni_taskq_drain_locked(nni_taskq *tq) -{ - // We need to first let the taskq completely drain. - for (;;) { - int busy = 0; - if (!nni_list_empty(&tq->tq_tasks)) { - busy = 1; - } else { - int i; - for (i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running != 0) { - busy = 1; - break; - } - } - } - if (!busy) { - break; - } - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); - } -} - -void -nni_taskq_drain(nni_taskq *tq) -{ - nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - nni_mtx_unlock(&tq->tq_mtx); -} - void nni_taskq_fini(nni_taskq *tq) { - // First drain the taskq completely. This is necessary since some - // tasks that are presently running may need to schedule additional - // tasks, and we don't want those to block. if (tq == NULL) { return; } if (tq->tq_run) { nni_mtx_lock(&tq->tq_mtx); - nni_taskq_drain_locked(tq); - - tq->tq_run = 0; + tq->tq_run = false; nni_cv_wake(&tq->tq_sched_cv); nni_mtx_unlock(&tq->tq_mtx); } @@ -174,90 +157,142 @@ nni_task_dispatch(nni_task *task) // If there is no callback to perform, then do nothing! // The user will be none the wiser. if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } nni_mtx_lock(&tq->tq_mtx); - // It might already be scheduled... if so don't redo it. - if (!nni_list_active(&tq->tq_tasks, task)) { - nni_list_append(&tq->tq_tasks, task); - } + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_run = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); + + nni_list_append(&tq->tq_tasks, task); nni_cv_wake1(&tq->tq_sched_cv); // waking just one waiter is adequate nni_mtx_unlock(&tq->tq_mtx); } void -nni_task_wait(nni_task *task) +nni_task_exec(nni_task *task) { - nni_taskq *tq = task->task_tq; - if (task->task_cb == NULL) { + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_run = false; + task->task_exec = false; + task->task_done = true; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); return; } - nni_mtx_lock(&tq->tq_mtx); - for (;;) { - bool running = false; - if (nni_list_active(&tq->tq_tasks, task)) { - running = true; - } else { - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } - } - if (!running) { - break; - } + nni_mtx_lock(&task->task_mtx); + if (task->task_exec) { + // recursive taskq_exec, run it asynchronously + nni_mtx_unlock(&task->task_mtx); + nni_task_dispatch(task); + return; + } + task->task_exec = true; + task->task_sched = false; + task->task_done = false; + nni_mtx_unlock(&task->task_mtx); - tq->tq_waiting = 1; - nni_cv_wait(&tq->tq_wait_cv); + task->task_cb(task->task_arg); + + nni_mtx_lock(&task->task_mtx); + task->task_exec = false; + if (task->task_sched || task->task_run) { + // cb scheduled a task + nni_mtx_unlock(&task->task_mtx); + return; + } + task->task_done = true; + nni_cv_wake(&task->task_cv); + if (task->task_fini) { + task->task_fini = false; + nni_mtx_unlock(&task->task_mtx); + nni_task_fini(task); + } else { + nni_mtx_unlock(&task->task_mtx); } - nni_mtx_unlock(&tq->tq_mtx); } -int -nni_task_cancel(nni_task *task) +void +nni_task_prep(nni_task *task) { - nni_taskq *tq = task->task_tq; - bool running; + nni_mtx_lock(&task->task_mtx); + task->task_sched = true; + task->task_done = false; + task->task_run = false; + nni_mtx_unlock(&task->task_mtx); +} - nni_mtx_lock(&tq->tq_mtx); - running = true; - for (;;) { - running = false; - for (int i = 0; i < tq->tq_nthreads; i++) { - if (tq->tq_threads[i].tqt_running == task) { - running = true; - break; - } - } +void +nni_task_unprep(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + nni_cv_wake(&task->task_cv); + nni_mtx_unlock(&task->task_mtx); +} - if (!running) { - break; - } - // tq->tq_threads[i].tqt_wait = 1; - tq->tq_waiting++; - nni_cv_wait(&tq->tq_wait_cv); +void +nni_task_wait(nni_task *task) +{ + nni_mtx_lock(&task->task_mtx); + while ((task->task_sched || task->task_run || task->task_exec) && + (!task->task_done)) { + nni_cv_wait(&task->task_cv); } + nni_mtx_unlock(&task->task_mtx); +} - if (nni_list_active(&tq->tq_tasks, task)) { - nni_list_remove(&tq->tq_tasks, task); +int +nni_task_init(nni_task **taskp, nni_taskq *tq, nni_cb cb, void *arg) +{ + nni_task *task; + + if ((task = NNI_ALLOC_STRUCT(task)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_unlock(&tq->tq_mtx); + NNI_LIST_NODE_INIT(&task->task_node); + nni_mtx_init(&task->task_mtx); + nni_cv_init(&task->task_cv, &task->task_mtx); + task->task_sched = false; + task->task_done = false; + task->task_run = false; + task->task_sched = false; + task->task_exec = false; + task->task_cb = cb; + task->task_arg = arg; + task->task_tq = tq != NULL ? tq : nni_taskq_systq; + *taskp = task; return (0); } void -nni_task_init(nni_taskq *tq, nni_task *task, nni_cb cb, void *arg) +nni_task_fini(nni_task *task) { - if (tq == NULL) { - tq = nni_taskq_systq; + NNI_ASSERT(!nni_list_node_active(&task->task_node)); + nni_mtx_lock(&task->task_mtx); + if (task->task_run || task->task_exec) { + // destroy later. + task->task_fini = true; + nni_mtx_unlock(&task->task_mtx); + return; } - NNI_LIST_NODE_INIT(&task->task_node); - task->task_cb = cb; - task->task_arg = arg; - task->task_tq = tq; + nni_mtx_unlock(&task->task_mtx); + nni_cv_fini(&task->task_cv); + nni_mtx_fini(&task->task_mtx); + NNI_FREE_STRUCT(task); } int diff --git a/src/core/taskq.h b/src/core/taskq.h index 40b5dc00..513b15bb 100644 --- a/src/core/taskq.h +++ b/src/core/taskq.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,28 +17,22 @@ typedef struct nni_taskq nni_taskq; typedef struct nni_task nni_task; -// nni_task is a structure representing a task. Its intended to inlined -// into structures so that taskq_dispatch can be a guaranteed operation. -struct nni_task { - nni_list_node task_node; - void * task_arg; - nni_cb task_cb; - nni_taskq * task_tq; -}; - extern int nni_taskq_init(nni_taskq **, int); extern void nni_taskq_fini(nni_taskq *); -extern void nni_taskq_drain(nni_taskq *); // nni_task_dispatch sends the task to the queue. It is guaranteed to // succeed. (If the queue is shutdown, then the behavior is undefined.) extern void nni_task_dispatch(nni_task *); +extern void nni_task_exec(nni_task *); +extern void nni_task_prep(nni_task *); +extern void nni_task_unprep(nni_task *); // nni_task_cancel cancels the task. It will wait for the task to complete // if it is already running. extern int nni_task_cancel(nni_task *); extern void nni_task_wait(nni_task *); -extern void nni_task_init(nni_taskq *, nni_task *, nni_cb, void *); +extern int nni_task_init(nni_task **, nni_taskq *, nni_cb, void *); +extern void nni_task_fini(nni_task *); extern int nni_taskq_sys_init(void); extern void nni_taskq_sys_fini(void); diff --git a/src/core/thread.c b/src/core/thread.c index 54c9c7d2..adc35542 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -173,3 +173,12 @@ nni_thr_fini(nni_thr *thr) nni_plat_mtx_fini(&thr->mtx); thr->init = 0; } + +bool +nni_thr_is_self(nni_thr *thr) +{ + if (!thr->init) { + return (false); + } + return (nni_plat_thr_is_self(&thr->thr)); +} diff --git a/src/core/thread.h b/src/core/thread.h index ee83b196..c3d5531e 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -82,4 +82,7 @@ extern void nni_thr_run(nni_thr *thr); // at all. extern void nni_thr_wait(nni_thr *thr); +// nni_thr_is_self returns true if the caller is the named thread. +extern bool nni_thr_is_self(nni_thr *thr); + #endif // CORE_THREAD_H diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 15d91db2..08ae99ca 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -18,11 +18,12 @@ // one of several possible different backends. #include "core/nng_impl.h" +#include "posix_pollq.h" typedef struct nni_posix_pipedesc nni_posix_pipedesc; typedef struct nni_posix_epdesc nni_posix_epdesc; -extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int); +extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, nni_posix_pfd *); extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *); extern void nni_posix_pipedesc_recv(nni_posix_pipedesc *, nni_aio *); extern void nni_posix_pipedesc_send(nni_posix_pipedesc *, nni_aio *); diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 7431dedf..0065806d 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -39,7 +39,7 @@ #endif struct nni_posix_epdesc { - nni_posix_pollq_node node; + nni_posix_pfd * pfd; nni_list connectq; nni_list acceptq; bool closed; @@ -53,10 +53,14 @@ struct nni_posix_epdesc { nni_mtx mtx; }; +static void nni_epdesc_connect_cb(nni_posix_pfd *, int, void *); +static void nni_epdesc_accept_cb(nni_posix_pfd *, int, void *); + static void -nni_posix_epdesc_cancel(nni_aio *aio, int rv) +nni_epdesc_cancel(nni_aio *aio, int rv) { - nni_posix_epdesc *ed = nni_aio_get_prov_data(aio); + nni_posix_epdesc *ed = nni_aio_get_prov_data(aio); + nni_posix_pfd * pfd = NULL; NNI_ASSERT(rv != 0); nni_mtx_lock(&ed->mtx); @@ -64,200 +68,136 @@ nni_posix_epdesc_cancel(nni_aio *aio, int rv) nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } + if ((ed->mode == NNI_EP_MODE_DIAL) && nni_list_empty(&ed->connectq) && + ((pfd = ed->pfd) != NULL)) { + nni_posix_pfd_close(pfd); + } nni_mtx_unlock(&ed->mtx); } static void -nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) +nni_epdesc_finish(nni_aio *aio, int rv, nni_posix_pfd *newpfd) { nni_posix_pipedesc *pd = NULL; // acceptq or connectq. nni_aio_list_remove(aio); - if (rv == 0) { - if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) { - (void) close(newfd); - } - } if (rv != 0) { + NNI_ASSERT(newpfd == NULL); nni_aio_finish_error(aio, rv); - } else { - nni_aio_set_output(aio, 0, pd); - nni_aio_finish(aio, 0, 0); + return; } -} - -static void -nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) -{ - nni_aio * aio; - socklen_t sz; - int rv; - - // Note that normally there will only be a single connect AIO... - // A socket that is here will have *initiated* with a connect() - // call, which returned EINPROGRESS. When the connection attempt - // is done, either way, the descriptor will be noted as writable. - // getsockopt() with SOL_SOCKET, SO_ERROR to determine the actual - // status of the connection attempt... - while ((aio = nni_list_first(&ed->connectq)) != NULL) { - rv = -1; - sz = sizeof(rv); - if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < - 0) { - rv = errno; - } - switch (rv) { - case 0: - // Success! - nni_posix_pollq_remove(&ed->node); - nni_posix_epdesc_finish(aio, 0, ed->node.fd); - ed->node.fd = -1; - continue; - - case EINPROGRESS: - // Still in progress... keep trying - return; - default: - if (rv == ENOENT) { - rv = ECONNREFUSED; - } - nni_posix_pollq_remove(&ed->node); - nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); - (void) close(ed->node.fd); - ed->node.fd = -1; - continue; - } + NNI_ASSERT(newpfd != NULL); + if ((rv = nni_posix_pipedesc_init(&pd, newpfd)) != 0) { + nni_posix_pfd_fini(newpfd); + nni_aio_finish_error(aio, rv); + return; } + nni_aio_set_output(aio, 0, pd); + nni_aio_finish(aio, 0, 0); } static void -nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) +nni_epdesc_doaccept(nni_posix_epdesc *ed) { nni_aio *aio; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - int newfd; + int newfd; + int fd; + int rv; + nni_posix_pfd *pfd; + + fd = nni_posix_pfd_fd(ed->pfd); #ifdef NNG_USE_ACCEPT4 - newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC); + newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) { - newfd = accept(ed->node.fd, NULL, NULL); + newfd = accept(fd, NULL, NULL); } #else - newfd = accept(ed->node.fd, NULL, NULL); + newfd = accept(fd, NULL, NULL); #endif - - if (newfd >= 0) { - // successful connection request! - nni_posix_epdesc_finish(aio, 0, newfd); - continue; - } - - if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { - // Well, let's try later. Note that EWOULDBLOCK - // is required by standards, but some platforms may - // use EAGAIN. The values may be the same, so we - // can't use switch. - return; + if (newfd < 0) { + switch (errno) { + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + rv = nni_posix_pfd_arm(ed->pfd, POLLIN); + if (rv != 0) { + nni_epdesc_finish(aio, rv, NULL); + continue; + } + // Come back later... + return; + case ECONNABORTED: + case ECONNRESET: + // Eat them, they aren't interesting. + continue; + default: + // Error this one, but keep moving to the next. + rv = nni_plat_errno(errno); + nni_epdesc_finish(aio, rv, NULL); + continue; + } } - if ((errno == ECONNABORTED) || (errno == ECONNRESET)) { - // Let's just eat this one. Perhaps it may be - // better to report it to the application, but we - // think most applications don't want to see this. - // Only someone with a packet trace is going to - // notice this. + if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + close(newfd); + nni_epdesc_finish(aio, rv, NULL); continue; } - nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); - } -} - -static void -nni_posix_epdesc_doerror(nni_posix_epdesc *ed) -{ - nni_aio * aio; - int rv = 1; - socklen_t sz = sizeof(rv); - - if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { - rv = errno; - } - if (rv == 0) { - return; - } - rv = nni_plat_errno(rv); - - while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - nni_posix_epdesc_finish(aio, rv, 0); - } - while ((aio = nni_list_first(&ed->connectq)) != NULL) { - nni_posix_epdesc_finish(aio, rv, 0); + nni_epdesc_finish(aio, 0, pfd); } } static void -nni_posix_epdesc_doclose(nni_posix_epdesc *ed) +nni_epdesc_doclose(nni_posix_epdesc *ed) { nni_aio *aio; - int fd; ed->closed = true; while ((aio = nni_list_first(&ed->acceptq)) != NULL) { - nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_epdesc_finish(aio, NNG_ECLOSED, 0); } while ((aio = nni_list_first(&ed->connectq)) != NULL) { - nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_epdesc_finish(aio, NNG_ECLOSED, 0); } - nni_posix_pollq_remove(&ed->node); + if (ed->pfd != NULL) { - if ((fd = ed->node.fd) != -1) { + nni_posix_pfd_close(ed->pfd); + } + + // clean up stale UNIX socket when closing the server. + if ((ed->mode == NNI_EP_MODE_LISTEN) && (ed->loclen != 0) && + (ed->locaddr.ss_family == AF_UNIX)) { struct sockaddr_un *sun = (void *) &ed->locaddr; - ed->node.fd = -1; - (void) shutdown(fd, SHUT_RDWR); - (void) close(fd); - if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { - (void) unlink(sun->sun_path); - } + (void) unlink(sun->sun_path); } } static void -nni_posix_epdesc_cb(void *arg) +nni_epdesc_accept_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_epdesc *ed = arg; - int events; nni_mtx_lock(&ed->mtx); - - if (ed->node.revents & POLLIN) { - nni_posix_epdesc_doaccept(ed); - } - if (ed->node.revents & POLLOUT) { - nni_posix_epdesc_doconnect(ed); - } - if (ed->node.revents & (POLLERR | POLLHUP)) { - nni_posix_epdesc_doerror(ed); - } - if (ed->node.revents & POLLNVAL) { - nni_posix_epdesc_doclose(ed); + if (events & POLLNVAL) { + nni_epdesc_doclose(ed); + nni_mtx_unlock(&ed->mtx); + return; } + NNI_ASSERT(pfd == ed->pfd); - events = 0; - if (!nni_list_empty(&ed->connectq)) { - events |= POLLOUT; - } - if (!nni_list_empty(&ed->acceptq)) { - events |= POLLIN; - } - if ((!ed->closed) && (events != 0)) { - nni_posix_pollq_arm(&ed->node, events); - } + // Anything else will turn up in accept. + nni_epdesc_doaccept(ed); nni_mtx_unlock(&ed->mtx); } @@ -265,7 +205,7 @@ void nni_posix_epdesc_close(nni_posix_epdesc *ed) { nni_mtx_lock(&ed->mtx); - nni_posix_epdesc_doclose(ed); + nni_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); } @@ -276,9 +216,23 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) struct sockaddr_storage *ss; int rv; int fd; + nni_posix_pfd * pfd; nni_mtx_lock(&ed->mtx); + if (ed->started) { + nni_mtx_unlock(&ed->mtx); + return (NNG_ESTATE); + } + if (ed->closed) { + nni_mtx_unlock(&ed->mtx); + return (NNG_ECLOSED); + } + if ((len = ed->loclen) == 0) { + nni_mtx_unlock(&ed->mtx); + return (NNG_EADDRINVAL); + } + ss = &ed->locaddr; len = ed->loclen; @@ -286,18 +240,17 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) nni_mtx_unlock(&ed->mtx); return (nni_plat_errno(errno)); } - (void) fcntl(fd, F_SETFD, FD_CLOEXEC); -#ifdef SO_NOSIGPIPE - // Darwin lacks MSG_NOSIGNAL, but has a socket option. - int one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); -#endif + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + nni_mtx_unlock(&ed->mtx); + nni_posix_pfd_fini(pfd); + return (rv); + } if (bind(fd, (struct sockaddr *) ss, len) < 0) { rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); - (void) close(fd); + nni_posix_pfd_fini(pfd); return (rv); } @@ -314,7 +267,7 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) if ((rv = chmod(sun->sun_path, perms)) != 0) { rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); - close(fd); + nni_posix_pfd_fini(pfd); return (rv); } } @@ -324,27 +277,24 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) if (listen(fd, 128) != 0) { rv = nni_plat_errno(errno); nni_mtx_unlock(&ed->mtx); - (void) close(fd); + nni_posix_pfd_fini(pfd); return (rv); } - (void) fcntl(fd, F_SETFL, O_NONBLOCK); + nni_posix_pfd_set_cb(pfd, nni_epdesc_accept_cb, ed); - ed->node.fd = fd; - if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { - (void) close(fd); - ed->node.fd = -1; - nni_mtx_unlock(&ed->mtx); - return (rv); - } + ed->pfd = pfd; ed->started = true; nni_mtx_unlock(&ed->mtx); + return (0); } void nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) { + int rv; + // Accept is simpler than the connect case. With accept we just // need to wait for the socket to be readable to indicate an incoming // connection is ready for us. There isn't anything else for us to @@ -354,14 +304,25 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) } nni_mtx_lock(&ed->mtx); + if (!ed->started) { + nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } if (ed->closed) { nni_mtx_unlock(&ed->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&ed->acceptq, aio); - nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed); - nni_posix_pollq_arm(&ed->node, POLLIN); + if (nni_list_first(&ed->acceptq) == aio) { + nni_epdesc_doaccept(ed); + } nni_mtx_unlock(&ed->mtx); } @@ -370,79 +331,171 @@ nni_posix_epdesc_sockname(nni_posix_epdesc *ed, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = -1; + + nni_mtx_lock(&ed->mtx); + if (ed->pfd != NULL) { + fd = nni_posix_pfd_fd(ed->pfd); + } + nni_mtx_unlock(&ed->mtx); - if (getsockname(ed->node.fd, (void *) &ss, &sslen) != 0) { + if (getsockname(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); } -void -nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +static void +nni_epdesc_connect_start(nni_posix_epdesc *ed) { - // NB: We assume that the FD is already set to nonblocking mode. - int rv; - int fd; + nni_posix_pfd *pfd; + int fd; + int rv; + nni_aio * aio; - if (nni_aio_begin(aio) != 0) { +loop: + if ((aio = nni_list_first(&ed->connectq)) == NULL) { return; } - nni_mtx_lock(&ed->mtx); + + NNI_ASSERT(ed->pfd == NULL); + if (ed->closed) { + nni_epdesc_finish(aio, NNG_ECLOSED, NULL); + goto loop; + } + ed->started = true; if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - nni_aio_finish_error(aio, rv); - return; + nni_epdesc_finish(aio, rv, NULL); + goto loop; } + if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { + (void) close(fd); + nni_epdesc_finish(aio, rv, NULL); + goto loop; + } // Possibly bind. if ((ed->loclen != 0) && (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) { rv = nni_plat_errno(errno); - nni_mtx_unlock(&ed->mtx); - (void) close(fd); - nni_aio_finish_error(aio, rv); - return; + nni_epdesc_finish(aio, rv, NULL); + nni_posix_pfd_fini(pfd); + goto loop; } - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) { // Immediate connect, cool! This probably only happens on // loopback, and probably not on every platform. - ed->started = true; - nni_posix_epdesc_finish(aio, 0, fd); - nni_mtx_unlock(&ed->mtx); - return; + nni_epdesc_finish(aio, 0, pfd); + goto loop; } if (errno != EINPROGRESS) { // Some immediate failure occurred. if (errno == ENOENT) { // For UNIX domain sockets - errno = ECONNREFUSED; + rv = NNG_ECONNREFUSED; + } else { + rv = nni_plat_errno(errno); } - rv = nni_plat_errno(errno); + nni_epdesc_finish(aio, rv, NULL); + nni_posix_pfd_fini(pfd); + goto loop; + } + nni_posix_pfd_set_cb(pfd, nni_epdesc_connect_cb, ed); + if ((rv = nni_posix_pfd_arm(pfd, POLLOUT)) != 0) { + nni_epdesc_finish(aio, rv, NULL); + nni_posix_pfd_fini(pfd); + goto loop; + } + ed->pfd = pfd; + // all done... wait for this to signal via callback +} + +void +nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +{ + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ed->mtx); + if (ed->closed) { + nni_mtx_unlock(&ed->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, nni_epdesc_cancel, ed)) != 0) { nni_mtx_unlock(&ed->mtx); - (void) close(fd); nni_aio_finish_error(aio, rv); return; } - // We have to submit to the pollq, because the connection is pending. - ed->node.fd = fd; ed->started = true; - if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { - ed->node.fd = -1; + nni_list_append(&ed->connectq, aio); + if (nni_list_first(&ed->connectq) == aio) { + // If there was a stale pfd (probably from an aborted or + // canceled connect attempt), discard it so we start fresh. + if (ed->pfd != NULL) { + nni_posix_pfd_fini(ed->pfd); + ed->pfd = NULL; + } + nni_epdesc_connect_start(ed); + } + nni_mtx_unlock(&ed->mtx); +} + +static void +nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg) +{ + nni_posix_epdesc *ed = arg; + nni_aio * aio; + socklen_t sz; + int rv; + int fd; + + nni_mtx_lock(&ed->mtx); + if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) || + (pfd != ed->pfd)) { + // Spurious completion. Ignore it, but discard the PFD. + if (ed->pfd == pfd) { + ed->pfd = NULL; + } + nni_posix_pfd_fini(pfd); + nni_mtx_unlock(&ed->mtx); + return; + } + + fd = nni_posix_pfd_fd(pfd); + sz = sizeof(rv); + + if ((events & POLLNVAL) != 0) { + rv = EBADF; + + } else if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { + rv = errno; + } + + switch (rv) { + case 0: + // Good connect! + ed->pfd = NULL; + nni_epdesc_finish(aio, 0, pfd); + break; + case EINPROGRESS: // still connecting... come back later nni_mtx_unlock(&ed->mtx); - (void) close(fd); - nni_aio_finish_error(aio, rv); return; + default: + ed->pfd = NULL; + nni_epdesc_finish(aio, nni_plat_errno(rv), NULL); + nni_posix_pfd_fini(pfd); + break; } - nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed); - nni_aio_list_append(&ed->connectq, aio); - nni_posix_pollq_arm(&ed->node, POLLOUT); + // Start another connect running, if any is waiting. + nni_epdesc_connect_start(ed); nni_mtx_unlock(&ed->mtx); } @@ -450,7 +503,6 @@ int nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode) { nni_posix_epdesc *ed; - int rv; if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { return (NNG_ENOMEM); @@ -458,28 +510,14 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, int mode) nni_mtx_init(&ed->mtx); - // We could randomly choose a different pollq, or for efficiencies - // sake we could take a modulo of the file desc number to choose - // one. For now we just have a global pollq. Note that by tying - // the ed to a single pollq we may get some kind of cache warmth. - - ed->node.index = 0; - ed->node.cb = nni_posix_epdesc_cb; - ed->node.data = ed; - ed->node.fd = -1; - ed->closed = false; - ed->started = false; - ed->perms = 0; // zero means use default (no change) - ed->mode = mode; + ed->pfd = NULL; + ed->closed = false; + ed->started = false; + ed->perms = 0; // zero means use default (no change) + ed->mode = mode; nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); - - if ((rv = nni_posix_pollq_init(&ed->node)) != 0) { - nni_mtx_fini(&ed->mtx); - NNI_FREE_STRUCT(ed); - return (rv); - } *edp = ed; return (0); } @@ -532,14 +570,16 @@ nni_posix_epdesc_set_permissions(nni_posix_epdesc *ed, mode_t mode) void nni_posix_epdesc_fini(nni_posix_epdesc *ed) { - int fd; + nni_posix_pfd *pfd; + nni_mtx_lock(&ed->mtx); - if ((fd = ed->node.fd) != -1) { - (void) close(ed->node.fd); - nni_posix_epdesc_doclose(ed); - } + nni_epdesc_doclose(ed); + pfd = ed->pfd; nni_mtx_unlock(&ed->mtx); - nni_posix_pollq_fini(&ed->node); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index e7225395..b11036ea 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -40,11 +40,11 @@ // file descriptor for TCP socket, etc.) This contains the list of pending // aios for that underlying socket, as well as the socket itself. struct nni_posix_pipedesc { - nni_posix_pollq_node node; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; + nni_posix_pfd *pfd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; }; static void @@ -66,16 +66,19 @@ nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd) while ((aio = nni_list_first(&pd->writeq)) != NULL) { nni_posix_pipedesc_finish(aio, NNG_ECLOSED); } - if (pd->node.fd != -1) { - // Let any peer know we are closing. - (void) shutdown(pd->node.fd, SHUT_RDWR); - } + nni_posix_pfd_close(pd->pfd); } static void nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) { nni_aio *aio; + int fd; + + fd = nni_posix_pfd_fd(pd->pfd); + if ((fd < 0) || (pd->closed)) { + return; + } while ((aio = nni_list_first(&pd->writeq)) != NULL) { unsigned i; @@ -122,20 +125,28 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) hdr.msg_iovlen = niov; hdr.msg_iov = iovec; - n = sendmsg(pd->node.fd, &hdr, MSG_NOSIGNAL); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EINTR)) { - // Can't write more right now. We're done - // on this fd for now. + if ((n = sendmsg(fd, &hdr, MSG_NOSIGNAL)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + return; + default: + nni_posix_pipedesc_finish( + aio, nni_plat_errno(errno)); + nni_posix_pipedesc_doclose(pd); return; } - nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; } nni_aio_bump_count(aio, n); // We completed the entire operation on this aioq. + // (Sendmsg never returns a partial result.) nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another @@ -147,6 +158,12 @@ static void nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) { nni_aio *aio; + int fd; + + fd = nni_posix_pfd_fd(pd->pfd); + if ((fd < 0) || (pd->closed)) { + return; + } while ((aio = nni_list_first(&pd->readq)) != NULL) { unsigned i; @@ -181,16 +198,18 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } } - n = readv(pd->node.fd, iovec, niov); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EINTR)) { - // Can't write more right now. We're done - // on this fd for now. + if ((n = readv(fd, iovec, niov)) < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + return; + default: + nni_posix_pipedesc_finish( + aio, nni_plat_errno(errno)); + nni_posix_pipedesc_doclose(pd); return; } - nni_posix_pipedesc_finish(aio, nni_plat_errno(errno)); - nni_posix_pipedesc_doclose(pd); - return; } if (n == 0) { @@ -211,21 +230,21 @@ nni_posix_pipedesc_doread(nni_posix_pipedesc *pd) } static void -nni_posix_pipedesc_cb(void *arg) +nni_posix_pipedesc_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_posix_pipedesc *pd = arg; nni_mtx_lock(&pd->mtx); - if (pd->node.revents & POLLIN) { + if (events & POLLIN) { nni_posix_pipedesc_doread(pd); } - if (pd->node.revents & POLLOUT) { + if (events & POLLOUT) { nni_posix_pipedesc_dowrite(pd); } - if (pd->node.revents & (POLLHUP | POLLERR | POLLNVAL)) { + if (events & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_pipedesc_doclose(pd); } else { - int events = 0; + events = 0; if (!nni_list_empty(&pd->writeq)) { events |= POLLOUT; } @@ -233,7 +252,7 @@ nni_posix_pipedesc_cb(void *arg) events |= POLLIN; } if ((!pd->closed) && (events != 0)) { - nni_posix_pollq_arm(&pd->node, events); + nni_posix_pfd_arm(pfd, events); } } nni_mtx_unlock(&pd->mtx); @@ -242,8 +261,7 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq_remove(&pd->node); - + // NB: Events may still occur. nni_mtx_lock(&pd->mtx); nni_posix_pipedesc_doclose(pd); nni_mtx_unlock(&pd->mtx); @@ -265,6 +283,8 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv) void nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -276,18 +296,24 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&pd->readq, aio); - nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); - // If we are only job on the list, go ahead and try to do an immediate - // transfer. This allows for faster completions in many cases. We - // also need not arm a list if it was already armed. + // If we are only job on the list, go ahead and try to do an + // immediate transfer. This allows for faster completions in + // many cases. We also need not arm a list if it was already + // armed. if (nni_list_first(&pd->readq) == aio) { nni_posix_pipedesc_doread(pd); - // If we are still the first thing on the list, that means we - // didn't finish the job, so arm the poller to complete us. + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. if (nni_list_first(&pd->readq) == aio) { - nni_posix_pollq_arm(&pd->node, POLLIN); + nni_posix_pfd_arm(pd->pfd, POLLIN); } } nni_mtx_unlock(&pd->mtx); @@ -296,6 +322,8 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio) void nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -307,15 +335,20 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd)) != 0) { + nni_mtx_unlock(&pd->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&pd->writeq, aio); - nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd); if (nni_list_first(&pd->writeq) == aio) { nni_posix_pipedesc_dowrite(pd); - // If we are still the first thing on the list, that means we - // didn't finish the job, so arm the poller to complete us. + // If we are still the first thing on the list, that + // means we didn't finish the job, so arm the poller to + // complete us. if (nni_list_first(&pd->writeq) == aio) { - nni_posix_pollq_arm(&pd->node, POLLOUT); + nni_posix_pfd_arm(pd->pfd, POLLOUT); } } nni_mtx_unlock(&pd->mtx); @@ -326,8 +359,9 @@ nni_posix_pipedesc_peername(nni_posix_pipedesc *pd, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(pd->pfd); - if (getpeername(pd->node.fd, (void *) &ss, &sslen) != 0) { + if (getpeername(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); @@ -338,8 +372,9 @@ nni_posix_pipedesc_sockname(nni_posix_pipedesc *pd, nni_sockaddr *sa) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); + int fd = nni_posix_pfd_fd(pd->pfd); - if (getsockname(pd->node.fd, (void *) &ss, &sslen) != 0) { + if (getsockname(fd, (void *) &ss, &sslen) != 0) { return (nni_plat_errno(errno)); } return (nni_posix_sockaddr2nn(sa, &ss)); @@ -349,9 +384,9 @@ int nni_posix_pipedesc_set_nodelay(nni_posix_pipedesc *pd, bool nodelay) { int val = nodelay ? 1 : 0; + int fd = nni_posix_pfd_fd(pd->pfd); - if (setsockopt(pd->node.fd, IPPROTO_TCP, TCP_NODELAY, &val, - sizeof(val)) != 0) { + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { return (nni_plat_errno(errno)); } return (0); @@ -361,61 +396,19 @@ int nni_posix_pipedesc_set_keepalive(nni_posix_pipedesc *pd, bool keep) { int val = keep ? 1 : 0; + int fd = nni_posix_pfd_fd(pd->pfd); - if (setsockopt(pd->node.fd, SOL_SOCKET, SO_KEEPALIVE, &val, - sizeof(val)) != 0) { + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { return (nni_plat_errno(errno)); } return (0); } int -nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) -{ - nni_posix_pipedesc *pd; - int rv; - - if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { - return (NNG_ENOMEM); - } - - // We could randomly choose a different pollq, or for efficiencies - // sake we could take a modulo of the file desc number to choose - // one. For now we just have a global pollq. Note that by tying - // the pd to a single pollq we may get some kind of cache warmth. - - pd->closed = false; - pd->node.fd = fd; - pd->node.cb = nni_posix_pipedesc_cb; - pd->node.data = pd; - - (void) fcntl(fd, F_SETFL, O_NONBLOCK); - -#ifdef SO_NOSIGPIPE - // Darwin lacks MSG_NOSIGNAL, but has a socket option. - int one = 1; - (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); -#endif - - nni_mtx_init(&pd->mtx); - nni_aio_list_init(&pd->readq); - nni_aio_list_init(&pd->writeq); - - if (((rv = nni_posix_pollq_init(&pd->node)) != 0) || - ((rv = nni_posix_pollq_add(&pd->node)) != 0)) { - nni_mtx_fini(&pd->mtx); - NNI_FREE_STRUCT(pd); - return (rv); - } - *pdp = pd; - return (0); -} - -int nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, uint64_t *egid, uint64_t *prid, uint64_t *znid) { - int fd = pd->node.fd; + int fd = nni_posix_pfd_fd(pd->pfd); #if defined(NNG_HAVE_GETPEEREID) uid_t uid; gid_t gid; @@ -458,7 +451,8 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, } *euid = xu.cr_uid; *egid = xu.cr_gid; - *prid = (uint64_t) -1; // XXX: macOS has undocumented LOCAL_PEERPID... + *prid = (uint64_t) -1; // XXX: macOS has undocumented + // LOCAL_PEERPID... *znid = (uint64_t) -1; return (0); #else @@ -473,16 +467,34 @@ nni_posix_pipedesc_get_peerid(nni_posix_pipedesc *pd, uint64_t *euid, #endif } +int +nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, nni_posix_pfd *pfd) +{ + nni_posix_pipedesc *pd; + + if ((pd = NNI_ALLOC_STRUCT(pd)) == NULL) { + return (NNG_ENOMEM); + } + + pd->closed = false; + pd->pfd = pfd; + + nni_mtx_init(&pd->mtx); + nni_aio_list_init(&pd->readq); + nni_aio_list_init(&pd->writeq); + + nni_posix_pfd_set_cb(pfd, nni_posix_pipedesc_cb, pd); + + *pdp = pd; + return (0); +} + void nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) { - // Make sure no other polling activity is pending. nni_posix_pipedesc_close(pd); - nni_posix_pollq_fini(&pd->node); - if (pd->node.fd >= 0) { - (void) close(pd->node.fd); - } - + nni_posix_pfd_fini(pd->pfd); + pd->pfd = NULL; nni_mtx_fini(&pd->mtx); NNI_FREE_STRUCT(pd); diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 2c855da1..b9786330 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -22,32 +22,18 @@ #include "core/nng_impl.h" #include <poll.h> -typedef struct nni_posix_pollq_node nni_posix_pollq_node; -typedef struct nni_posix_pollq nni_posix_pollq; - -struct nni_posix_pollq_node { - nni_list_node node; // linkage into the pollq list - nni_posix_pollq *pq; // associated pollq - int index; // used by the poller impl - int armed; // used by the poller impl - int fd; // file descriptor to poll - int events; // events to watch for - int revents; // events received - void * data; // user data - nni_cb cb; // user callback on event - nni_mtx mx; - nni_cv cv; -}; - -extern nni_posix_pollq *nni_posix_pollq_get(int); -extern int nni_posix_pollq_sysinit(void); -extern void nni_posix_pollq_sysfini(void); - -extern int nni_posix_pollq_init(nni_posix_pollq_node *); -extern void nni_posix_pollq_fini(nni_posix_pollq_node *); -extern int nni_posix_pollq_add(nni_posix_pollq_node *); -extern void nni_posix_pollq_remove(nni_posix_pollq_node *); -extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int); +typedef struct nni_posix_pfd nni_posix_pfd; +typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, int, void *); + +extern int nni_posix_pollq_sysinit(void); +extern void nni_posix_pollq_sysfini(void); + +extern int nni_posix_pfd_init(nni_posix_pfd **, int); +extern void nni_posix_pfd_fini(nni_posix_pfd *); +extern int nni_posix_pfd_arm(nni_posix_pfd *, int); +extern int nni_posix_pfd_fd(nni_posix_pfd *); +extern void nni_posix_pfd_close(nni_posix_pfd *); +extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *); #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index 0f6867da..a8d8693a 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -12,6 +12,7 @@ #ifdef NNG_HAVE_EPOLL #include <errno.h> +#include <fcntl.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ @@ -22,179 +23,215 @@ #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" +typedef struct nni_posix_pollq nni_posix_pollq; + +#ifndef EFD_CLOEXEC +#define EFD_CLOEXEC 0 +#endif +#ifndef EFD_NONBLOCK +#define EFD_NONBLOCK 0 +#endif + #define NNI_MAX_EPOLL_EVENTS 64 // flags we always want enabled as long as at least one event is active #define NNI_EPOLL_FLAGS (EPOLLONESHOT | EPOLLERR | EPOLLHUP) +// Locking strategy: +// +// The pollq mutex protects its own reapq, close state, and the close +// state of the individual pfds. It also protects the pfd cv, which is +// only signaled when the pfd is closed. This mutex is only acquired +// when shutting down the pollq, or closing a pfd. For normal hot-path +// operations we don't need it. +// +// The pfd mutex protects the pfd's own "closing" flag (test and set), +// the callback and arg, and its event mask. This mutex is used a lot, +// but it should be uncontended excepting possibly when closing. + // nni_posix_pollq is a work structure that manages state for the epoll-based // pollq implementation struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - int epfd; // epoll handle - int evfd; // event fd - bool close; // request for worker to exit - bool started; - nni_idhash * nodes; - nni_thr thr; // worker thread - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int epfd; // epoll handle + int evfd; // event fd (to wake us for other stuff) + bool close; // request for worker to exit + nni_thr thr; // worker thread + nni_list reapq; +}; + +struct nni_posix_pfd { + nni_posix_pollq *pq; + nni_list_node node; + int fd; + nni_posix_pfd_cb cb; + void * arg; + bool closed; + bool closing; + bool reap; + int events; + nni_mtx mtx; + nni_cv cv; }; +// single global instance for now. +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { - int rv; + nni_posix_pfd * pfd; nni_posix_pollq * pq; struct epoll_event ev; - uint64_t id; - - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + int rv; - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - return (NNG_ESTATE); - } + pq = &nni_posix_global_pollq; - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if ((rv = nni_idhash_alloc(pq->nodes, &id, node)) != 0) { - nni_mtx_unlock(&pq->mtx); - return (rv); + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); } - node->index = (int) id; - node->pq = pq; - node->events = 0; + pfd->pq = pq; + pfd->fd = fd; + pfd->cb = NULL; + pfd->arg = NULL; + pfd->events = 0; + pfd->closing = false; + pfd->closed = false; + + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + NNI_LIST_NODE_INIT(&pfd->node); // notifications disabled to begin with ev.events = 0; - ev.data.u64 = id; + ev.data.ptr = pfd; - rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, node->fd, &ev); - if (rv != 0) { + if ((rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev)) != 0) { rv = nni_plat_errno(errno); - nni_idhash_remove(pq->nodes, id); - node->index = 0; - node->pq = NULL; + nni_cv_fini(&pfd->cv); + NNI_FREE_STRUCT(pfd); + return (rv); } - nni_mtx_unlock(&pq->mtx); - return (rv); + *pfdp = pfd; + return (0); } -// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini() -// called while pq's lock is held -static void -nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, int events) { - int rv; - struct epoll_event ev; - - node->events = 0; - node->pq = NULL; - - ev.events = 0; - ev.data.u64 = (uint64_t) node->index; - - if (node->index != 0) { - // This deregisters the node. If the poller was blocked - // then this keeps it from coming back in to find us. - nni_idhash_remove(pq->nodes, (uint64_t) node->index); - } - - // NB: EPOLL_CTL_DEL actually *ignores* the event, but older Linux - // versions need it to be non-NULL. - rv = epoll_ctl(pq->epfd, EPOLL_CTL_DEL, node->fd, &ev); - if (rv != 0) { - NNI_ASSERT(errno == EBADF || errno == ENOENT); + nni_posix_pollq *pq = pfd->pq; + + // NB: We depend on epoll event flags being the same as their POLLIN + // equivalents. I.e. POLLIN == EPOLLIN, POLLOUT == EPOLLOUT, and so + // forth. This turns out to be true both for Linux and the illumos + // epoll implementation. + + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + struct epoll_event ev; + pfd->events |= events; + events = pfd->events; + + ev.events = events | NNI_EPOLL_FLAGS; + ev.data.ptr = pfd; + + if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) { + int rv = nni_plat_errno(errno); + nni_mtx_unlock(&pfd->mtx); + return (rv); + } } + nni_mtx_unlock(&pfd->mtx); + return (0); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). -void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; - - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); + return (pfd->fd); } -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. -int -nni_posix_pollq_init(nni_posix_pollq_node *node) +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) { - node->index = 0; - return (0); + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->arg = arg; + nni_mtx_unlock(&pfd->mtx); } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_close(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + nni_posix_pollq * pq = pfd->pq; + struct epoll_event ev; // Not actually used. + pfd->closing = true; + + (void) shutdown(pfd->fd, SHUT_RDWR); + (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev); } - nni_mtx_unlock(&pq->mtx); + nni_mtx_unlock(&pfd->mtx); } void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +nni_posix_pfd_fini(nni_posix_pfd *pfd) { - int rv; - struct epoll_event ev; - nni_posix_pollq * pq = node->pq; + nni_posix_pollq *pq = pfd->pq; + + nni_posix_pfd_close(pfd); - NNI_ASSERT(pq != NULL); - if (events == 0) { - return; + // We have to synchronize with the pollq thread (unless we are + // on that thread!) + if (!nni_thr_is_self(&pq->thr)) { + + uint64_t one = 1; + + nni_mtx_lock(&pq->mtx); + nni_list_append(&pq->reapq, pfd); + + // Wake the remote side. For now we assume this always + // succeeds. The only failure modes here occur when we + // have already excessively signaled this (2^64 times + // with no read!!), or when the evfd is closed, or some + // kernel bug occurs. Those errors would manifest as + // a hang waiting for the poller to reap the pfd in fini, + // if it were possible for them to occur. (Barring other + // bugs, it isn't.) + (void) write(pq->evfd, &one, sizeof(one)); + + while (!pfd->closed) { + nni_cv_wait(&pfd->cv); + } + nni_mtx_unlock(&pq->mtx); } - nni_mtx_lock(&pq->mtx); + // We're exclusive now. - node->events |= events; - ev.events = node->events | NNI_EPOLL_FLAGS; - ev.data.u64 = (uint64_t) node->index; + (void) close(pfd->fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); +} - rv = epoll_ctl(pq->epfd, EPOLL_CTL_MOD, node->fd, &ev); - NNI_ASSERT(rv == 0); +static void +nni_posix_pollq_reap(nni_posix_pollq *pq) +{ + nni_posix_pfd *pfd; + nni_mtx_lock(&pq->mtx); + while ((pfd = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pfd); + // Let fini know we're done with it, and it's safe to + // remove. + pfd->closed = true; + nni_cv_wake(&pfd->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -204,101 +241,74 @@ nni_posix_poll_thr(void *arg) nni_posix_pollq * pq = arg; struct epoll_event events[NNI_MAX_EPOLL_EVENTS]; - nni_mtx_lock(&pq->mtx); - - while (!pq->close) { - int i; - int nevents; - - // block indefinitely, timers are handled separately - nni_mtx_unlock(&pq->mtx); - - nevents = - epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1); + for (;;) { + int n; + bool reap = false; - nni_mtx_lock(&pq->mtx); - - if (nevents <= 0) { - continue; + n = epoll_wait(pq->epfd, events, NNI_MAX_EPOLL_EVENTS, -1); + if ((n < 0) && (errno == EBADF)) { + // Epoll fd closed, bail. + return; } // dispatch events - for (i = 0; i < nevents; ++i) { + for (int i = 0; i < n; ++i) { const struct epoll_event *ev; - nni_posix_pollq_node * node; ev = &events[i]; // If the waker pipe was signaled, read from it. - if ((ev->data.u64 == 0) && (ev->events & POLLIN)) { - int rv; + if ((ev->data.ptr == NULL) && (ev->events & POLLIN)) { uint64_t clear; - rv = read(pq->evfd, &clear, sizeof(clear)); - NNI_ASSERT(rv == sizeof(clear)); - continue; - } - - if (nni_idhash_find(pq->nodes, ev->data.u64, - (void **) &node) != 0) { - // node was removed while we were blocking - continue; + (void) read(pq->evfd, &clear, sizeof(clear)); + reap = true; + } else { + nni_posix_pfd * pfd = ev->data.ptr; + nni_posix_pfd_cb cb; + void * arg; + int events; + + events = ev->events & + (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP); + + nni_mtx_lock(&pfd->mtx); + pfd->events &= ~events; + cb = pfd->cb; + arg = pfd->arg; + nni_mtx_unlock(&pfd->mtx); + + // Execute the callback with lock released + if (cb != NULL) { + cb(pfd, events, arg); + } } + } - node->revents = ev->events & - (EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP); - - // mark events as cleared - node->events &= ~node->revents; - - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pq->active = node; - - // Execute the callback with lock released - nni_mtx_unlock(&pq->mtx); - node->cb(node->data); + if (reap) { + nni_posix_pollq_reap(pq); nni_mtx_lock(&pq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pq->active = NULL; - if (pq->wait == node) { - pq->wait = NULL; - nni_cv_wake(&pq->cv); + if (pq->close) { + nni_mtx_unlock(&pq->mtx); + return; } + nni_mtx_unlock(&pq->mtx); } } - - nni_mtx_unlock(&pq->mtx); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - if (pq->started) { - int rv; - uint64_t wakeval = 1; + uint64_t one = 1; - nni_mtx_lock(&pq->mtx); - pq->close = true; - pq->started = false; - rv = write(pq->evfd, &wakeval, sizeof(wakeval)); - NNI_ASSERT(rv == sizeof(wakeval)); - nni_mtx_unlock(&pq->mtx); - } - nni_thr_fini(&pq->thr); + nni_mtx_lock(&pq->mtx); + pq->close = true; + (void) write(pq->evfd, &one, sizeof(one)); + nni_mtx_unlock(&pq->mtx); - if (pq->evfd >= 0) { - close(pq->evfd); - pq->evfd = -1; - } + nni_thr_fini(&pq->thr); + close(pq->evfd); close(pq->epfd); - pq->epfd = -1; - - if (pq->nodes != NULL) { - nni_idhash_fini(pq->nodes); - } nni_mtx_fini(&pq->mtx); } @@ -308,22 +318,25 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq) { // add event fd so we can wake ourself on exit struct epoll_event ev; - int rv; + int fd; memset(&ev, 0, sizeof(ev)); - pq->evfd = eventfd(0, EFD_NONBLOCK); - if (pq->evfd == -1) { + if ((fd = eventfd(0, EFD_NONBLOCK)) < 0) { return (nni_plat_errno(errno)); } + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + // This is *NOT* one shot. We want to wake EVERY single time. ev.events = EPOLLIN; - ev.data.u64 = 0; + ev.data.ptr = 0; - rv = epoll_ctl(pq->epfd, EPOLL_CTL_ADD, pq->evfd, &ev); - if (rv != 0) { + if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { + (void) close(fd); return (nni_plat_errno(errno)); } + pq->evfd = fd; return (0); } @@ -332,40 +345,30 @@ nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; - if ((pq->epfd = epoll_create1(0)) < 0) { + if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) { return (nni_plat_errno(errno)); } - pq->evfd = -1; pq->close = false; + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); - if (((rv = nni_idhash_init(&pq->nodes)) != 0) || - ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || - ((rv = nni_posix_pollq_add_eventfd(pq)) != 0)) { - nni_posix_pollq_destroy(pq); + if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) { + (void) close(pq->epfd); + nni_mtx_fini(&pq->mtx); + return (rv); + } + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + (void) close(pq->epfd); + (void) close(pq->evfd); + nni_mtx_fini(&pq->mtx); return (rv); } - - // Positive values only for node indices. (0 is reserved for eventfd). - nni_idhash_set_limits(pq->nodes, 1, 0x7FFFFFFFu, 1); - pq->started = true; nni_thr_run(&pq->thr); return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 0f312170..36ced3ff 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -12,196 +12,203 @@ #ifdef NNG_HAVE_KQUEUE #include <errno.h> +#include <fcntl.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ #include <sys/event.h> +#include <sys/socket.h> #include <unistd.h> #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" -// TODO: can this be feature detected in cmake, -// rather than relying on platform? -#if defined NNG_PLATFORM_NETBSD -#define kevent_udata_t intptr_t -#else -#define kevent_udata_t void * -#endif - -#define NNI_MAX_KQUEUE_EVENTS 64 - -// user event id used to shutdown the polling thread -#define NNI_KQ_EV_EXIT_ID 0xF +typedef struct nni_posix_pollq nni_posix_pollq; // nni_posix_pollq is a work structure that manages state for the kqueue-based // pollq implementation struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - int kq; // kqueue handle - bool close; // request for worker to exit - bool started; - nni_thr thr; // worker thread - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int kq; // kqueue handle + nni_thr thr; // worker thread + nni_list reapq; // items to reap }; +struct nni_posix_pfd { + nni_list_node node; // linkage into the reap list + nni_posix_pollq *pq; // associated pollq + int fd; // file descriptor to poll + void * data; // user data + nni_posix_pfd_cb cb; // user callback on event + nni_cv cv; // signaled when poller has unregistered + nni_mtx mtx; + int events; + bool closing; + bool closed; +}; + +#define NNI_MAX_KQUEUE_EVENTS 64 + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { + nni_posix_pfd * pf; nni_posix_pollq *pq; - struct kevent kevents[2]; + struct kevent ev[2]; + unsigned flags = EV_ADD | EV_DISABLE; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + pq = &nni_posix_global_pollq; - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - return (NNG_ESTATE); + if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) { + return (NNG_ENOMEM); } - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } - - node->pq = pq; - node->events = 0; - - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); + // Create entries in the kevent queue, without enabling them. + EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); + EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ADD | EV_DISABLE, 0, 0, (kevent_udata_t) node); - - if (kevent(pq->kq, kevents, 2, NULL, 0, NULL) != 0) { - nni_mtx_unlock(&pq->mtx); + // We update the kqueue list, without polling for events. + if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { + NNI_FREE_STRUCT(pf); return (nni_plat_errno(errno)); } + pf->fd = fd; + pf->cb = NULL; + pf->pq = pq; + nni_mtx_init(&pf->mtx); + nni_cv_init(&pf->cv, &pq->mtx); + NNI_LIST_NODE_INIT(&pf->node); + *pfdp = pf; - nni_mtx_unlock(&pq->mtx); return (0); } -// common functionality for nni_posix_pollq_remove() and nni_posix_pollq_fini() -// called while pq's lock is held -static void -nni_posix_pollq_remove_helper(nni_posix_pollq *pq, nni_posix_pollq_node *node) +void +nni_posix_pfd_close(nni_posix_pfd *pf) { - struct kevent kevents[2]; - - node->events = 0; - node->pq = NULL; + nni_posix_pollq *pq = pf->pq; - EV_SET(&kevents[0], (uintptr_t) node->fd, EVFILT_READ, EV_DELETE, 0, 0, - (kevent_udata_t) node); - - EV_SET(&kevents[1], (uintptr_t) node->fd, EVFILT_WRITE, EV_DELETE, 0, - 0, (kevent_udata_t) node); - - // So it turns out that we can get EBADF, ENOENT, and apparently - // also EINPROGRESS (new on macOS Sierra). Frankly, we're deleting - // an event, and its harmless if the event removal fails (worst - // case would be a spurious wakeup), so lets ignore it. - (void) kevent(pq->kq, kevents, 2, NULL, 0, NULL); + nni_mtx_lock(&pq->mtx); + if (!pf->closing) { + struct kevent ev[2]; + pf->closing = true; + EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); + EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); + (void) shutdown(pf->fd, SHUT_RDWR); + // This should never fail -- no allocations, just deletion. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); + } + nni_mtx_unlock(&pq->mtx); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +nni_posix_pfd_fini(nni_posix_pfd *pf) { - nni_posix_pollq *pq = node->pq; + nni_posix_pollq *pq; - if (pq == NULL) { - return; - } + pq = pf->pq; - nni_mtx_lock(&pq->mtx); - nni_posix_pollq_remove_helper(pq, node); + nni_posix_pfd_close(pf); - if (pq->close) { - nni_cv_wake(&pq->cv); + if (!nni_thr_is_self(&pq->thr)) { + struct kevent ev; + nni_mtx_lock(&pq->mtx); + nni_list_append(&pq->reapq, pf); + EV_SET(&ev, 0, EVFILT_USER, EV_ENABLE, NOTE_TRIGGER, 0, NULL); + + // If this fails, the cleanup will stall. That should + // only occur in a memory pressure situation, and it + // will self-heal when the next event comes in. + (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); + while (!pf->closed) { + nni_cv_wait(&pf->cv); + } + nni_mtx_unlock(&pq->mtx); } - nni_mtx_unlock(&pq->mtx); + + (void) close(pf->fd); + nni_cv_fini(&pf->cv); + nni_mtx_fini(&pf->mtx); + NNI_FREE_STRUCT(pf); } -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. int -nni_posix_pollq_init(nni_posix_pollq_node *node) +nni_posix_pfd_fd(nni_posix_pfd *pf) { - NNI_ARG_UNUSED(node); - return (0); + return (pf->fd); } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - - nni_posix_pollq_remove_helper(pq, node); - - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); + NNI_ASSERT(cb != NULL); // must not be null when established. + nni_mtx_lock(&pf->mtx); + pf->cb = cb; + pf->data = arg; + nni_mtx_unlock(&pf->mtx); } -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +int +nni_posix_pfd_arm(nni_posix_pfd *pf, int events) { - nni_posix_pollq *pq = node->pq; - struct kevent kevents[2]; - int nevents = 0; + struct kevent ev[2]; + int nev = 0; + unsigned flags = EV_ENABLE | EV_DISPATCH; + nni_posix_pollq *pq = pf->pq; + + nni_mtx_lock(&pf->mtx); + if (pf->closing) { + events = 0; + } else { + pf->events |= events; + events = pf->events; + } + nni_mtx_unlock(&pf->mtx); - NNI_ASSERT(pq != NULL); if (events == 0) { - return; + // No events, and kqueue is oneshot, so nothing to do. + return (0); } - nni_mtx_lock(&pq->mtx); - - if (!(node->events & POLLIN) && (events & POLLIN)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_READ, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLIN) { + EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } - - if (!(node->events & POLLOUT) && (events & POLLOUT)) { - EV_SET(&kevents[nevents++], (uintptr_t) node->fd, EVFILT_WRITE, - EV_ENABLE | EV_DISPATCH, 0, 0, (kevent_udata_t) node); + if (events & POLLOUT) { + EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf); } - - if (nevents > 0) { - // This call should never fail, really. The only possible - // legitimate failure would be ENOMEM, but in that case - // lots of other things are going to be failing, or ENOENT - // or ESRCH, indicating we already lost interest; the - // only consequence of ignoring these errors is that a given - // descriptor might appear "stuck". This beats the alternative - // of just blithely crashing the application with an assertion. - (void) kevent(pq->kq, kevents, nevents, NULL, 0, NULL); - node->events |= events; + while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); } + return (0); +} +static void +nni_posix_pollq_reap(nni_posix_pollq *pq) +{ + nni_posix_pfd *pf; + nni_mtx_lock(&pq->mtx); + while ((pf = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pf); + pf->closed = true; + nni_cv_wake(&pf->cv); + } nni_mtx_unlock(&pq->mtx); } @@ -209,117 +216,71 @@ static void nni_posix_poll_thr(void *arg) { nni_posix_pollq *pq = arg; - struct kevent kevents[NNI_MAX_KQUEUE_EVENTS]; - - nni_mtx_lock(&pq->mtx); - while (!pq->close) { - int i; - int nevents; - - // block indefinitely, timers are handled separately - nni_mtx_unlock(&pq->mtx); - nevents = kevent( - pq->kq, NULL, 0, kevents, NNI_MAX_KQUEUE_EVENTS, NULL); - nni_mtx_lock(&pq->mtx); - - if (nevents < 0) { - continue; + for (;;) { + int n; + struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; + nni_posix_pfd * pf; + nni_posix_pfd_cb cb; + void * cbarg; + int revents; + bool reap = false; + + n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); + if (n < 0) { + if (errno == EBADF) { + nni_posix_pollq_reap(pq); + return; + } + reap = true; } - // dispatch events - for (i = 0; i < nevents; ++i) { - struct kevent ev_disable; - const struct kevent * ev; - nni_posix_pollq_node *node; + for (int i = 0; i < n; i++) { + struct kevent *ev = &evs[i]; - ev = &kevents[i]; - if (ev->filter == EVFILT_USER && - ev->ident == NNI_KQ_EV_EXIT_ID) { - // we've woken up to exit the polling thread + switch (ev->filter) { + case EVFILT_READ: + revents = POLLIN; break; - } - - node = (nni_posix_pollq_node *) ev->udata; - if (node->pq == NULL) { - // node was removed while we were blocking + case EVFILT_WRITE: + revents = POLLOUT; + break; + case EVFILT_USER: + default: + reap = true; continue; } - node->revents = 0; - + pf = (void *) ev->udata; if (ev->flags & (EV_ERROR | EV_EOF)) { - node->revents |= POLLHUP; - } - if (ev->filter == EVFILT_WRITE) { - node->revents |= POLLOUT; - } else if (ev->filter == EVFILT_READ) { - node->revents |= POLLIN; - } else { - NNI_ASSERT(false); // unhandled filter - break; + revents |= POLLHUP; } - // explicitly disable this event. we'd ideally rely on - // the behavior of EV_DISPATCH to do this, - // but that only happens once we've acknowledged the - // event by reading/or writing the fd. because there - // can currently be some latency between the time we - // receive this event and the time we read/write in - // response, disable the event in the meantime to avoid - // needless wakeups. - // revisit if we're able to reduce/remove this latency. - EV_SET(&ev_disable, (uintptr_t) node->fd, ev->filter, - EV_DISABLE, 0, 0, NULL); - // this will only fail if the fd is already - // closed/invalid which we don't mind anyway, - // so ignore return value. - (void) kevent(pq->kq, &ev_disable, 1, NULL, 0, NULL); - - // mark events as cleared - node->events &= ~node->revents; - - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pq->active = node; - - // Execute the callback with lock released - nni_mtx_unlock(&pq->mtx); - node->cb(node->data); - nni_mtx_lock(&pq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pq->active = NULL; - if (pq->wait == node) { - pq->wait = NULL; - nni_cv_wake(&pq->cv); + nni_mtx_lock(&pf->mtx); + cb = pf->cb; + cbarg = pf->data; + pf->events &= ~(revents); + nni_mtx_unlock(&pf->mtx); + + if (cb != NULL) { + cb(pf, revents, cbarg); } } + if (reap) { + nni_posix_pollq_reap(pq); + } } - - nni_mtx_unlock(&pq->mtx); } static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - if (pq->started) { - struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ENABLE, - NOTE_TRIGGER, 0, NULL); - nni_mtx_lock(&pq->mtx); - pq->close = true; - pq->started = false; - (void) kevent(pq->kq, &ev, 1, NULL, 0, NULL); - nni_mtx_unlock(&pq->mtx); - } - nni_thr_fini(&pq->thr); - if (pq->kq >= 0) { close(pq->kq); pq->kq = -1; } + nni_thr_fini(&pq->thr); + + nni_posix_pollq_reap(pq); nni_mtx_fini(&pq->mtx); } @@ -327,10 +288,17 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq) static int nni_posix_pollq_add_wake_evt(nni_posix_pollq *pq) { - // add user event so we can wake ourself on exit + int rv; struct kevent ev; - EV_SET(&ev, NNI_KQ_EV_EXIT_ID, EVFILT_USER, EV_ADD, 0, 0, NULL); - return (nni_plat_errno(kevent(pq->kq, &ev, 1, NULL, 0, NULL))); + + EV_SET(&ev, 0, EVFILT_USER, EV_ADD, 0, 0, NULL); + while ((rv = kevent(pq->kq, &ev, 1, NULL, 0, NULL)) != 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + return (0); } static int @@ -342,10 +310,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (nni_plat_errno(errno)); } - pq->close = false; - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); if (((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) || (rv = nni_posix_pollq_add_wake_evt(pq)) != 0) { @@ -353,24 +319,14 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (rv); } - pq->started = true; nni_thr_run(&pq->thr); return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { + return (nni_posix_pollq_create(&nni_posix_global_pollq)); } diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index efc9ff48..26753df6 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -11,8 +11,6 @@ #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" -#ifdef NNG_USE_POSIX_POLLQ_POLL - #include <errno.h> #include <fcntl.h> #include <poll.h> @@ -32,306 +30,285 @@ // nni_posix_pollq is a work structure used by the poller thread, that keeps // track of all the underlying pipe handles and so forth being used by poll(). + +// Locking strategy: We use the pollq lock to guard the lists on the pollq, +// the nfds (which counts the number of items in the pollq), the pollq +// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We +// use a lock on the pfd only to protect the the events field (which we treat +// as an atomic bitfield), and the cb and arg pointers. Note that the pfd +// lock is therefore a leaf lock, which is sometimes acquired while holding +// the pq lock. Every reasonable effort is made to minimize holding locks. +// (Btw, pfd->fd is not guarded, because it is set at pfd creation and +// persists until the pfd is destroyed.) + +typedef struct nni_posix_pollq nni_posix_pollq; + struct nni_posix_pollq { - nni_mtx mtx; - nni_cv cv; - struct pollfd * fds; - int nfds; - int wakewfd; // write side of waker pipe - int wakerfd; // read side of waker pipe - int close; // request for worker to exit - int started; - nni_thr thr; // worker thread - nni_list polled; // polled nodes - nni_list armed; // armed nodes - nni_list idle; // idle nodes - int nnodes; // num of nodes in nodes list - int inpoll; // poller asleep in poll - nni_posix_pollq_node *wait; // cancel waiting on this - nni_posix_pollq_node *active; // active node (in callback) + nni_mtx mtx; + int nfds; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + bool closing; // request for worker to exit + bool closed; + nni_thr thr; // worker thread + nni_list pollq; // armed nodes + nni_list reapq; }; -static int -nni_posix_pollq_poll_grow(nni_posix_pollq *pq) -{ - int grow = pq->nnodes + 2; // one for us, one for waker - struct pollfd *newfds; +struct nni_posix_pfd { + nni_posix_pollq *pq; + int fd; + nni_list_node node; + nni_cv cv; + nni_mtx mtx; + int events; + nni_posix_pfd_cb cb; + void * arg; +}; + +static nni_posix_pollq nni_posix_global_pollq; - if (grow < pq->nfds) { - return (0); +int +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +{ + nni_posix_pfd * pfd; + nni_posix_pollq *pq = &nni_posix_global_pollq; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); +#ifdef SO_NOSIGPIPE + // Darwin lacks MSG_NOSIGNAL, but has a socket option. + // If this code is getting used, you really should be using the + // kqueue poller, or you need to upgrade your older system. + int one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif + + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); } + NNI_LIST_NODE_INIT(&pfd->node); + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + pfd->fd = fd; + pfd->events = 0; + pfd->cb = NULL; + pfd->arg = NULL; + pfd->pq = pq; + nni_mtx_lock(&pq->mtx); + if (pq->closing) { + nni_mtx_unlock(&pq->mtx); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); + return (NNG_ECLOSED); + } + nni_list_append(&pq->pollq, pfd); + pq->nfds++; + nni_mtx_unlock(&pq->mtx); + *pfdp = pfd; + return (0); +} - grow = grow + 128; +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) +{ + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->arg = arg; + nni_mtx_unlock(&pfd->mtx); +} - if ((newfds = NNI_ALLOC_STRUCTS(newfds, grow)) == NULL) { - return (NNG_ENOMEM); +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) +{ + return (pfd->fd); +} + +void +nni_posix_pfd_close(nni_posix_pfd *pfd) +{ + (void) shutdown(pfd->fd, SHUT_RDWR); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + + nni_posix_pfd_close(pfd); + + nni_mtx_lock(&pq->mtx); + if (nni_list_active(&pq->pollq, pfd)) { + nni_list_remove(&pq->pollq, pfd); + pq->nfds--; } - if (pq->nfds != 0) { - NNI_FREE_STRUCTS(pq->fds, pq->nfds); + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_list_append(&pq->reapq, pfd); + nni_plat_pipe_raise(pq->wakewfd); + while (nni_list_active(&pq->reapq, pfd)) { + nni_cv_wait(&pfd->cv); + } } - pq->fds = newfds; - pq->nfds = grow; + nni_mtx_unlock(&pq->mtx); + + // We're exclusive now. + (void) close(pfd->fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); +} + +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, int events) +{ + nni_posix_pollq *pq = pfd->pq; + nni_mtx_lock(&pfd->mtx); + pfd->events |= events; + nni_mtx_unlock(&pfd->mtx); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if (!nni_thr_is_self(&pq->thr)) { + nni_plat_pipe_raise(pq->wakewfd); + } return (0); } static void nni_posix_poll_thr(void *arg) { - nni_posix_pollq * pollq = arg; - nni_posix_pollq_node *node; + nni_posix_pollq *pq = arg; + int nalloc = 0; + struct pollfd * fds = NULL; + nni_posix_pfd ** pfds = NULL; - nni_mtx_lock(&pollq->mtx); for (;;) { - int rv; int nfds; - struct pollfd *fds; + int events; + nni_posix_pfd *pfd; - if (pollq->close) { - break; + nni_mtx_lock(&pq->mtx); + while (nalloc < (pq->nfds + 1)) { + int n = pq->nfds + 128; + + // Drop the lock while we sleep or allocate. This + // allows additional items to be added or removed (!) + // while we wait. + nni_mtx_unlock(&pq->mtx); + + // Toss the old ones first; avoids *doubling* memory + // consumption during alloc. + NNI_FREE_STRUCTS(fds, nalloc); + NNI_FREE_STRUCTS(pfds, nalloc); + nalloc = 0; + + if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) { + nni_msleep(10); // sleep for a bit, try later + } else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) { + NNI_FREE_STRUCTS(pfds, n); + nni_msleep(10); + } else { + nalloc = n; + } + nni_mtx_lock(&pq->mtx); } - fds = pollq->fds; - nfds = 0; - // The waker pipe is set up so that we will be woken // when it is written (this allows us to be signaled). - fds[nfds].fd = pollq->wakerfd; - fds[nfds].events = POLLIN; - fds[nfds].revents = 0; - nfds++; + fds[0].fd = pq->wakerfd; + fds[0].events = POLLIN; + fds[0].revents = 0; + pfds[0] = NULL; + nfds = 1; + + // Also lets reap anything that was in the reaplist! + while ((pfd = nni_list_first(&pq->reapq)) != NULL) { + nni_list_remove(&pq->reapq, pfd); + nni_cv_wake(&pfd->cv); + } - // Set up the poll list. - while ((node = nni_list_first(&pollq->armed)) != NULL) { - nni_list_remove(&pollq->armed, node); - nni_list_append(&pollq->polled, node); - fds[nfds].fd = node->fd; - fds[nfds].events = node->events; - fds[nfds].revents = 0; - node->index = nfds; - nfds++; + // If we're closing down, bail now. This is done *after* we + // have ensured that the reapq is empty. Anything still in + // the pollq is not going to receive further callbacks. + if (pq->closing) { + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + break; } - // Now poll it. We block indefinitely, since we use separate - // timeouts to wake and remove the elements from the list. - pollq->inpoll = 1; - nni_mtx_unlock(&pollq->mtx); - rv = poll(fds, nfds, -1); - nni_mtx_lock(&pollq->mtx); - pollq->inpoll = 0; - - if (rv < 0) { - // This shouldn't happen really. If it does, we - // just try again. (EINTR is probably the only - // reasonable failure here, unless internal memory - // allocations fail in the kernel, leading to EAGAIN.) - continue; + // Set up the poll list. + NNI_LIST_FOREACH (&pq->pollq, pfd) { + + nni_mtx_lock(&pfd->mtx); + events = pfd->events; + nni_mtx_unlock(&pfd->mtx); + + if (events != 0) { + fds[nfds].fd = pfd->fd; + fds[nfds].events = events; + fds[nfds].revents = 0; + pfds[nfds] = pfd; + nfds++; + } } + nni_mtx_unlock(&pq->mtx); + + // We could get the result from poll, and avoid iterating + // over the entire set of pollfds, but since on average we + // will be walking half the list, doubling the work we do + // (the condition with a potential pipeline stall) seems like + // adding complexity with no real benefit. It also makes the + // worst case even worse. + (void) poll(fds, nfds, -1); // If the waker pipe was signaled, read from it. if (fds[0].revents & POLLIN) { - NNI_ASSERT(fds[0].fd == pollq->wakerfd); - nni_plat_pipe_clear(pollq->wakerfd); + NNI_ASSERT(fds[0].fd == pq->wakerfd); + nni_plat_pipe_clear(pq->wakerfd); } - while ((node = nni_list_first(&pollq->polled)) != NULL) { - int index = node->index; - - // We remove ourselves from the polled list, and - // then put it on either the idle or armed list - // depending on whether it remains armed. - node->index = 0; - nni_list_remove(&pollq->polled, node); - NNI_ASSERT(index > 0); - if (fds[index].revents == 0) { - // If still watching for events, return it - // to the armed list. - if (node->events) { - nni_list_append(&pollq->armed, node); - } else { - nni_list_append(&pollq->idle, node); - } - continue; - } + for (int i = 1; i < nfds; i++) { + if ((events = fds[i].revents) != 0) { + nni_posix_pfd_cb cb; + void * arg; - // We are calling the callback, so disarm - // all events; the node can rearm them in its - // callback. - node->revents = fds[index].revents; - node->events &= ~node->revents; - if (node->events == 0) { - nni_list_append(&pollq->idle, node); - } else { - nni_list_append(&pollq->armed, node); - } + pfd = pfds[i]; + + nni_mtx_lock(&pfd->mtx); + cb = pfd->cb; + arg = pfd->arg; + pfd->events &= ~events; + nni_mtx_unlock(&pfd->mtx); - // Save the active node; we can notice this way - // when it is busy, and avoid freeing it until - // we are sure that it is not in use. - pollq->active = node; - - // Execute the callback -- without locks held. - nni_mtx_unlock(&pollq->mtx); - node->cb(node->data); - nni_mtx_lock(&pollq->mtx); - - // We finished with this node. If something - // was blocked waiting for that, wake it up. - pollq->active = NULL; - if (pollq->wait == node) { - pollq->wait = NULL; - nni_cv_wake(&pollq->cv); + if (cb) { + cb(pfd, events, arg); + } } } } - nni_mtx_unlock(&pollq->mtx); -} - -int -nni_posix_pollq_add(nni_posix_pollq_node *node) -{ - int rv; - nni_posix_pollq *pq; - - NNI_ASSERT(!nni_list_node_active(&node->node)); - pq = nni_posix_pollq_get(node->fd); - if (node->pq != NULL) { - return (NNG_ESTATE); - } - - nni_mtx_lock(&pq->mtx); - if (pq->close) { - // This shouldn't happen! - nni_mtx_unlock(&pq->mtx); - return (NNG_ECLOSED); - } - node->pq = pq; - if ((rv = nni_posix_pollq_poll_grow(pq)) != 0) { - nni_mtx_unlock(&pq->mtx); - return (rv); - } - pq->nnodes++; - nni_list_append(&pq->idle, node); - nni_mtx_unlock(&pq->mtx); - return (0); + NNI_FREE_STRUCTS(fds, nalloc); + NNI_FREE_STRUCTS(pfds, nalloc); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). -void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +static void +nni_posix_pollq_destroy(nni_posix_pollq *pq) { - nni_posix_pollq *pq = node->pq; - - if (pq == NULL) { - return; - } - node->pq = NULL; nni_mtx_lock(&pq->mtx); - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - pq->nnodes--; - } - if (pq->close) { - nni_cv_wake(&pq->cv); - } + pq->closing = 1; nni_mtx_unlock(&pq->mtx); -} - -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. -int -nni_posix_pollq_init(nni_posix_pollq_node *node) -{ - NNI_LIST_NODE_INIT(&node->node); - return (0); -} - -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the callback is not active, so that the node -// may be deallocated. This function must not be called in a callback. -void -nni_posix_pollq_fini(nni_posix_pollq_node *node) -{ - nni_posix_pollq *pq = node->pq; - if (pq == NULL) { - return; - } - node->pq = NULL; - nni_mtx_lock(&pq->mtx); - while (pq->active == node) { - pq->wait = node; - nni_cv_wait(&pq->cv); - } - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - pq->nnodes--; - } - if (pq->close) { - nni_cv_wake(&pq->cv); - } - nni_mtx_unlock(&pq->mtx); -} + nni_plat_pipe_raise(pq->wakewfd); -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) -{ - nni_posix_pollq *pq = node->pq; - int oevents; - - NNI_ASSERT(pq != NULL); - - nni_mtx_lock(&pq->mtx); - oevents = node->events; - node->events |= events; - - // We move this to the armed list if its not armed, or already - // on the polled list. The polled list would be the case where - // the index is set to a positive value. - if ((oevents == 0) && (events != 0) && (node->index < 1)) { - nni_list_node_remove(&node->node); - nni_list_append(&pq->armed, node); - } - if ((events != 0) && (oevents != events)) { - // Possibly wake up poller since we're looking for new events. - if (pq->inpoll) { - nni_plat_pipe_raise(pq->wakewfd); - } - } - nni_mtx_unlock(&pq->mtx); -} - -static void -nni_posix_pollq_destroy(nni_posix_pollq *pq) -{ - if (pq->started) { - nni_mtx_lock(&pq->mtx); - pq->close = 1; - pq->started = 0; - nni_plat_pipe_raise(pq->wakewfd); - nni_mtx_unlock(&pq->mtx); - } nni_thr_fini(&pq->thr); - - // All pipes should have been closed before this is called. - NNI_ASSERT(nni_list_empty(&pq->polled)); - NNI_ASSERT(nni_list_empty(&pq->armed)); - NNI_ASSERT(nni_list_empty(&pq->idle)); - NNI_ASSERT(pq->nnodes == 0); - - if (pq->wakewfd >= 0) { - nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); - pq->wakewfd = pq->wakerfd = -1; - } - if (pq->nfds != 0) { - NNI_FREE_STRUCTS(pq->fds, pq->nfds); - pq->fds = NULL; - pq->nfds = 0; - } + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); nni_mtx_fini(&pq->mtx); } @@ -340,50 +317,27 @@ nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; - NNI_LIST_INIT(&pq->polled, nni_posix_pollq_node, node); - NNI_LIST_INIT(&pq->armed, nni_posix_pollq_node, node); - NNI_LIST_INIT(&pq->idle, nni_posix_pollq_node, node); - pq->wakewfd = -1; - pq->wakerfd = -1; - pq->close = 0; - - nni_mtx_init(&pq->mtx); - nni_cv_init(&pq->cv, &pq->mtx); + NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node); + NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node); + pq->closing = false; + pq->closed = false; - if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || - ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || - ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { - nni_posix_pollq_destroy(pq); + if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) { return (rv); } - pq->started = 1; + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + return (rv); + } + nni_mtx_init(&pq->mtx); nni_thr_run(&pq->thr); return (0); } -// We use a single pollq for the entire system, which means only a single -// thread is polling. This may be somewhat less than optimally efficient, -// and it may be worth investigating having multiple threads to improve -// efficiency and scalability. (This would shorten the linked lists, -// improving C10K scalability, and also allow us to engage multiple cores.) -// It's not entirely clear how many threads are "optimal". -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) -{ - NNI_ARG_UNUSED(fd); - // This is the point where we could choose a pollq based on FD. - return (&nni_posix_global_pollq); -} - int nni_posix_pollq_sysinit(void) { - int rv; - - rv = nni_posix_pollq_create(&nni_posix_global_pollq); - return (rv); + return (nni_posix_pollq_create(&nni_posix_global_pollq)); } void @@ -391,5 +345,3 @@ nni_posix_pollq_sysfini(void) { nni_posix_pollq_destroy(&nni_posix_global_pollq); } - -#endif // NNG_USE_POSIX_POLLQ_POLL diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c index 7231cb27..63c1d1d1 100644 --- a/src/platform/posix/posix_pollq_port.c +++ b/src/platform/posix/posix_pollq_port.c @@ -1,7 +1,6 @@ // // Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Liam Staskawicz <liam@stask.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -12,7 +11,9 @@ #ifdef NNG_HAVE_PORT_CREATE #include <errno.h> +#include <fcntl.h> #include <port.h> +#include <sched.h> #include <stdbool.h> #include <stdio.h> #include <string.h> /* for strerror() */ @@ -21,6 +22,9 @@ #include "core/nng_impl.h" #include "platform/posix/posix_pollq.h" +#define NNI_MAX_PORTEV 64 +typedef struct nni_posix_pollq nni_posix_pollq; + // nni_posix_pollq is a work structure that manages state for the port-event // based pollq implementation. We only really need to keep track of the // single thread, and the associated port itself. @@ -29,190 +33,178 @@ struct nni_posix_pollq { nni_thr thr; // worker thread }; +struct nni_posix_pfd { + nni_posix_pollq *pq; + int fd; + nni_mtx mtx; + nni_cv cv; + int events; + bool closed; + bool closing; + nni_posix_pfd_cb cb; + void * data; +}; + +// single global instance for now +static nni_posix_pollq nni_posix_global_pollq; + int -nni_posix_pollq_add(nni_posix_pollq_node *node) +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) { nni_posix_pollq *pq; + nni_posix_pfd * pfd; - pq = nni_posix_pollq_get(node->fd); - if (pq == NULL) { - return (NNG_EINVAL); - } + pq = &nni_posix_global_pollq; - nni_mtx_lock(&node->mx); - // ensure node was not previously associated with a pollq - if (node->pq != NULL) { - nni_mtx_unlock(&node->mx); - return (NNG_ESTATE); + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); } - - node->pq = pq; - node->events = 0; - node->armed = false; - nni_mtx_unlock(&node->mx); - + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pfd->mtx); + pfd->closed = false; + pfd->closing = false; + pfd->fd = fd; + pfd->pq = pq; + pfd->cb = NULL; + pfd->data = NULL; + *pfdp = pfd; return (0); } -// nni_posix_pollq_remove removes the node from the pollq, but -// does not ensure that the pollq node is safe to destroy. In particular, -// this function can be called from a callback (the callback may be active). -void -nni_posix_pollq_remove(nni_posix_pollq_node *node) +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; + return (pfd->fd); +} - if (pq == NULL) { - return; - } +void +nni_posix_pfd_close(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; - nni_mtx_lock(&node->mx); - node->events = 0; - if (node->armed) { - // Failure modes that can occur are uninteresting. - (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd); - node->armed = false; + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + pfd->closing = true; + (void) shutdown(pfd->fd, SHUT_RDWR); + port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd); } - nni_mtx_unlock(&node->mx); -} + nni_mtx_unlock(&pfd->mtx); -// nni_posix_pollq_init merely ensures that the node is ready for use. -// It does not register the node with any pollq in particular. -int -nni_posix_pollq_init(nni_posix_pollq_node *node) -{ - nni_mtx_init(&node->mx); - nni_cv_init(&node->cv, &node->mx); - node->pq = NULL; - node->armed = false; - NNI_LIST_NODE_INIT(&node->node); - return (0); + // Send the wake event to the poller to synchronize with it. + // Note that port_send should only really fail if out of memory + // or we run into a resource limit. } -// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, -// but it also ensures that the node is removed properly. void -nni_posix_pollq_fini(nni_posix_pollq_node *node) +nni_posix_pfd_fini(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = node->pq; + nni_posix_pollq *pq = pfd->pq; - nni_mtx_lock(&node->mx); - if ((pq = node->pq) != NULL) { - // Dissociate the port; if it isn't already associated we - // don't care. (An extra syscall, but it should not matter.) - (void) port_dissociate(pq->port, PORT_SOURCE_FD, node->fd); - node->armed = false; + nni_posix_pfd_close(pfd); - for (;;) { - if (port_send(pq->port, 0, node) == 0) { - break; - } - switch (errno) { - case EAGAIN: - case ENOMEM: - // Resource exhaustion. - // Best bet in these cases is to sleep it off. - // This may appear like a total application - // hang, but by sleeping here maybe we give - // a chance for things to clear up. - nni_mtx_unlock(&node->mx); - nni_msleep(5000); - nni_mtx_lock(&node->mx); - continue; - case EBADFD: - case EBADF: - // Most likely these indicate that the pollq - // itself has been closed. That's ok. + if (!nni_thr_is_self(&pq->thr)) { + + while (port_send(pq->port, 1, pfd) != 0) { + if ((errno == EBADF) || (errno == EBADFD)) { + pfd->closed = true; break; } + sched_yield(); // try again later... } - // Wait for the pollq thread to tell us with certainty that - // they are done. This is needed to ensure that the pollq - // thread isn't executing (or about to execute) the callback - // before we destroy it. - while (node->pq != NULL) { - nni_cv_wait(&node->cv); + + nni_mtx_lock(&pfd->mtx); + while (!pfd->closed) { + nni_cv_wait(&pfd->cv); } + nni_mtx_unlock(&pfd->mtx); } - nni_mtx_unlock(&node->mx); - nni_cv_fini(&node->cv); - nni_mtx_fini(&node->mx); + + // We're exclusive now. + (void) close(pfd->fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); } -void -nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, int events) { - nni_posix_pollq *pq = node->pq; - - NNI_ASSERT(pq != NULL); - if (events == 0) { - return; + nni_posix_pollq *pq = pfd->pq; + + nni_mtx_lock(&pfd->mtx); + if (!pfd->closing) { + pfd->events |= events; + if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd, + pfd->events, pfd) != 0) { + int rv = nni_plat_errno(errno); + nni_mtx_unlock(&pfd->mtx); + return (rv); + } } - - nni_mtx_lock(&node->mx); - node->events |= events; - node->armed = true; - (void) port_associate( - pq->port, PORT_SOURCE_FD, node->fd, node->events, node); - - // Possible errors here are: - // - // EBADF -- programming error on our part - // EBADFD -- programming error on our part - // ENOMEM -- not much we can do here - // EAGAIN -- too many port events registered (65K!!) - // - // For now we ignore them all. (We need to be able to return - // errors to our caller.) Effect on the application will appear - // to be a stalled file descriptor (no notifications). - nni_mtx_unlock(&node->mx); + nni_mtx_unlock(&pfd->mtx); + return (0); } static void nni_posix_poll_thr(void *arg) { - for (;;) { - nni_posix_pollq * pq = arg; - port_event_t ev; - nni_posix_pollq_node *node; - - if (port_get(pq->port, &ev, NULL) != 0) { + nni_posix_pollq *pq = arg; + port_event_t ev[NNI_MAX_PORTEV]; + nni_posix_pfd * pfd; + int events; + nni_posix_pfd_cb cb; + void * arg; + unsigned n; + + n = 1; // wake us even on just one event + if (port_getn(pq->port, ev, NNI_MAX_PORTEV, &n, NULL) != 0) { if (errno == EINTR) { continue; } return; } - switch (ev.portev_source) { - case PORT_SOURCE_ALERT: - return; - - case PORT_SOURCE_FD: - node = ev.portev_user; + // We run through the returned ports twice. First we + // get the callbacks. Then we do the reaps. This way + // we ensure that we only reap *after* callbacks have run. + for (unsigned i = 0; i < n; i++) { + if (ev[i].portev_source != PORT_SOURCE_FD) { + continue; + } + pfd = ev[i].portev_user; + events = ev[i].portev_events; - nni_mtx_lock(&node->mx); - node->revents = ev.portev_events; - // mark events as cleared - node->events &= ~node->revents; - node->armed = false; - nni_mtx_unlock(&node->mx); + nni_mtx_lock(&pfd->mtx); + cb = pfd->cb; + arg = pfd->data; + pfd->events &= ~events; + nni_mtx_unlock(&pfd->mtx); - node->cb(node->data); - break; + if (cb != NULL) { + cb(pfd, events, arg); + } + } + for (unsigned i = 0; i < n; i++) { + if (ev[i].portev_source != PORT_SOURCE_USER) { + continue; + } - case PORT_SOURCE_USER: // User event telling us to stop doing things. - // We signal back to use this as a coordination event - // between the pollq and the thread handler. - // NOTE: It is absolutely critical that there is only - // a single thread per pollq. Otherwise we cannot - // be sure that we are blocked completely, - node = ev.portev_user; - nni_mtx_lock(&node->mx); - node->pq = NULL; - nni_cv_wake(&node->cv); - nni_mtx_unlock(&node->mx); + // We signal back to use this as a coordination + // event between the pollq and the thread + // handler. NOTE: It is absolutely critical + // that there is only a single thread per + // pollq. Otherwise we cannot be sure that we + // are blocked completely, + pfd = ev[i].portev_user; + nni_mtx_lock(&pfd->mtx); + pfd->closed = true; + nni_cv_wake(&pfd->cv); + nni_mtx_unlock(&pfd->mtx); } } } @@ -220,7 +212,6 @@ nni_posix_poll_thr(void *arg) static void nni_posix_pollq_destroy(nni_posix_pollq *pq) { - port_alert(pq->port, PORT_ALERT_SET, 1, NULL); (void) close(pq->port); nni_thr_fini(&pq->thr); } @@ -243,14 +234,15 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (0); } -// single global instance for now -static nni_posix_pollq nni_posix_global_pollq; - -nni_posix_pollq * -nni_posix_pollq_get(int fd) +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) { - NNI_ARG_UNUSED(fd); - return (&nni_posix_global_pollq); + NNI_ASSERT(cb != NULL); // must not be null when established. + + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->data = arg; + nni_mtx_unlock(&pfd->mtx); } int diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index d86a2008..96d8debd 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -37,50 +37,51 @@ #define NNG_POSIX_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *nni_posix_resolv_tq = NULL; -static nni_mtx nni_posix_resolv_mtx; - -typedef struct nni_posix_resolv_item nni_posix_resolv_item; -struct nni_posix_resolv_item { - int family; - int passive; - const char *name; - const char *serv; - int proto; - nni_aio * aio; - nni_task task; +static nni_taskq *resolv_tq = NULL; +static nni_mtx resolv_mtx; + +typedef struct resolv_item resolv_item; +struct resolv_item { + int family; + int passive; + const char * name; + const char * serv; + int proto; + nni_aio * aio; + nni_task * task; + nng_sockaddr sa; }; static void -nni_posix_resolv_finish(nni_posix_resolv_item *item, int rv) +resolv_finish(resolv_item *item, int rv) { nni_aio *aio; - if ((aio = item->aio) != NULL) { - if (nni_aio_get_prov_data(aio) == item) { - nni_aio_set_prov_data(aio, NULL); - item->aio = NULL; - nni_aio_finish(aio, rv, 0); - NNI_FREE_STRUCT(item); - } + if (((aio = item->aio) != NULL) && + (nni_aio_get_prov_data(aio) == item)) { + nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + item->aio = NULL; + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); } } static void -nni_posix_resolv_cancel(nni_aio *aio, int rv) +resolv_cancel(nni_aio *aio, int rv) { - nni_posix_resolv_item *item; + resolv_item *item; - nni_mtx_lock(&nni_posix_resolv_mtx); + nni_mtx_lock(&resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { - nni_mtx_unlock(&nni_posix_resolv_mtx); + nni_mtx_unlock(&resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); item->aio = NULL; - nni_mtx_unlock(&nni_posix_resolv_mtx); - nni_task_cancel(&item->task); - NNI_FREE_STRUCT(item); + nni_mtx_unlock(&resolv_mtx); nni_aio_finish_error(aio, rv); } @@ -116,14 +117,23 @@ nni_posix_gai_errno(int rv) } static void -nni_posix_resolv_task(void *arg) +resolv_task(void *arg) { - nni_posix_resolv_item *item = arg; - nni_aio * aio = item->aio; - struct addrinfo hints; - struct addrinfo * results; - struct addrinfo * probe; - int rv; + resolv_item * item = arg; + struct addrinfo hints; + struct addrinfo *results; + struct addrinfo *probe; + int rv; + + nni_mtx_lock(&resolv_mtx); + if (item->aio == NULL) { + nni_mtx_unlock(&resolv_mtx); + // Caller canceled, and no longer cares about this. + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + return; + } + nni_mtx_unlock(&resolv_mtx); results = NULL; @@ -170,7 +180,7 @@ nni_posix_resolv_task(void *arg) if (probe != NULL) { struct sockaddr_in * sin; struct sockaddr_in6 *sin6; - nng_sockaddr * sa = nni_aio_get_input(aio, 0); + nng_sockaddr * sa = &item->sa; switch (probe->ai_addr->sa_family) { case AF_INET: @@ -196,17 +206,18 @@ done: freeaddrinfo(results); } - nni_mtx_lock(&nni_posix_resolv_mtx); - nni_posix_resolv_finish(item, rv); - nni_mtx_unlock(&nni_posix_resolv_mtx); + nni_mtx_lock(&resolv_mtx); + resolv_finish(item, rv); + nni_mtx_unlock(&resolv_mtx); } static void -nni_posix_resolv_ip(const char *host, const char *serv, int passive, - int family, int proto, nni_aio *aio) +resolv_ip(const char *host, const char *serv, int passive, int family, + int proto, nni_aio *aio) { - nni_posix_resolv_item *item; - sa_family_t fam; + resolv_item *item; + sa_family_t fam; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -231,10 +242,15 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive, return; } - nni_task_init( - nni_posix_resolv_tq, &item->task, nni_posix_resolv_task, item); + if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) != + 0) { + NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); + return; + }; // NB: host and serv must remain valid until this is completed. + memset(&item->sa, 0, sizeof(item->sa)); item->passive = passive; item->name = host; item->serv = serv; @@ -242,24 +258,30 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive, item->aio = aio; item->family = fam; - nni_mtx_lock(&nni_posix_resolv_mtx); - nni_aio_schedule(aio, nni_posix_resolv_cancel, item); - nni_task_dispatch(&item->task); - nni_mtx_unlock(&nni_posix_resolv_mtx); + nni_mtx_lock(&resolv_mtx); + if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) { + nni_mtx_unlock(&resolv_mtx); + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); + return; + } + nni_task_dispatch(item->task); + nni_mtx_unlock(&resolv_mtx); } void nni_plat_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void nni_plat_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - nni_posix_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); + resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } int @@ -267,10 +289,10 @@ nni_posix_resolv_sysinit(void) { int rv; - nni_mtx_init(&nni_posix_resolv_mtx); + nni_mtx_init(&resolv_mtx); - if ((rv = nni_taskq_init(&nni_posix_resolv_tq, 4)) != 0) { - nni_mtx_fini(&nni_posix_resolv_mtx); + if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) { + nni_mtx_fini(&resolv_mtx); return (rv); } return (0); @@ -279,11 +301,11 @@ nni_posix_resolv_sysinit(void) void nni_posix_resolv_sysfini(void) { - if (nni_posix_resolv_tq != NULL) { - nni_taskq_fini(nni_posix_resolv_tq); - nni_posix_resolv_tq = NULL; + if (resolv_tq != NULL) { + nni_taskq_fini(resolv_tq); + resolv_tq = NULL; } - nni_mtx_fini(&nni_posix_resolv_mtx); + nni_mtx_fini(&resolv_mtx); } #endif // NNG_USE_POSIX_RESOLV_GAI diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index cca8165c..0a521513 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -422,6 +422,12 @@ nni_plat_thr_fini(nni_plat_thr *thr) } } +bool +nni_plat_thr_is_self(nni_plat_thr *thr) +{ + return (pthread_self() == thr->tid); +} + void nni_atfork_child(void) { diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 654d31e3..759bdb96 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -36,24 +36,29 @@ #endif struct nni_plat_udp { - nni_posix_pollq_node udp_pitem; - int udp_fd; - nni_list udp_recvq; - nni_list udp_sendq; - nni_mtx udp_mtx; + nni_posix_pfd *udp_pfd; + int udp_fd; + nni_list udp_recvq; + nni_list udp_sendq; + nni_mtx udp_mtx; }; static void -nni_posix_udp_doclose(nni_plat_udp *udp) +nni_posix_udp_doerror(nni_plat_udp *udp, int rv) { nni_aio *aio; while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) || ((aio = nni_list_first(&udp->udp_sendq)) != NULL)) { nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_aio_finish_error(aio, rv); } - // Underlying socket left open until close API called. +} + +static void +nni_posix_udp_doclose(nni_plat_udp *udp) +{ + nni_posix_udp_doerror(udp, NNG_ECLOSED); } static void @@ -169,23 +174,22 @@ nni_posix_udp_dosend(nni_plat_udp *udp) // This function is called by the poller on activity on the FD. static void -nni_posix_udp_cb(void *arg) +nni_posix_udp_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_plat_udp *udp = arg; - int revents; + NNI_ASSERT(pfd == udp->udp_pfd); nni_mtx_lock(&udp->udp_mtx); - revents = udp->udp_pitem.revents; - if (revents & POLLIN) { + if (events & POLLIN) { nni_posix_udp_dorecv(udp); } - if (revents & POLLOUT) { + if (events & POLLOUT) { nni_posix_udp_dosend(udp); } - if (revents & (POLLHUP | POLLERR | POLLNVAL)) { + if (events & (POLLHUP | POLLERR | POLLNVAL)) { nni_posix_udp_doclose(udp); } else { - int events = 0; + events = 0; if (!nni_list_empty(&udp->udp_sendq)) { events |= POLLOUT; } @@ -193,7 +197,11 @@ nni_posix_udp_cb(void *arg) events |= POLLIN; } if (events) { - nni_posix_pollq_arm(&udp->udp_pitem, events); + int rv; + rv = nni_posix_pfd_arm(udp->udp_pfd, events); + if (rv != 0) { + nni_posix_udp_doerror(udp, rv); + } } } nni_mtx_unlock(&udp->udp_mtx); @@ -232,22 +240,16 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) NNI_FREE_STRUCT(udp); return (rv); } - udp->udp_pitem.fd = udp->udp_fd; - udp->udp_pitem.cb = nni_posix_udp_cb; - udp->udp_pitem.data = udp; - - (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK); - - nni_aio_list_init(&udp->udp_recvq); - nni_aio_list_init(&udp->udp_sendq); - - if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) || - ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) { + if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) { (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); return (rv); } + nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp); + + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); *upp = udp; return (0); @@ -256,8 +258,7 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) void nni_plat_udp_close(nni_plat_udp *udp) { - // We're no longer interested in events. - nni_posix_pollq_fini(&udp->udp_pitem); + nni_posix_pfd_fini(udp->udp_pfd); nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); @@ -284,26 +285,46 @@ nni_plat_udp_cancel(nni_aio *aio, int rv) void nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&udp->udp_mtx); - nni_aio_schedule(aio, nni_plat_udp_cancel, udp); + if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&udp->udp_recvq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); + if (nni_list_first(&udp->udp_recvq) == aio) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + } nni_mtx_unlock(&udp->udp_mtx); } void nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&udp->udp_mtx); - nni_aio_schedule(aio, nni_plat_udp_cancel, udp); + if ((rv = nni_aio_schedule(aio, nni_plat_udp_cancel, udp)) != 0) { + nni_mtx_unlock(&udp->udp_mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&udp->udp_sendq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); + if (nni_list_first(&udp->udp_sendq) == aio) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + } nni_mtx_unlock(&udp->udp_mtx); } diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index b3c4738f..0bd12b24 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -34,6 +34,7 @@ struct nni_plat_thr { void (*func)(void *); void * arg; HANDLE handle; + DWORD id; }; struct nni_plat_mtx { diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index a3ae3748..5ead9cbc 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -155,11 +155,16 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) void nni_win_event_submit(nni_win_event *evt, nni_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&evt->mtx); - nni_aio_schedule(aio, nni_win_event_cancel, evt); + if ((rv = nni_aio_schedule(aio, nni_win_event_cancel, evt)) != 0) { + nni_mtx_unlock(&evt->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&evt->aios, aio); nni_win_event_start(evt); nni_mtx_unlock(&evt->mtx); diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index c6376cc7..169a2e00 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -572,17 +572,22 @@ void nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio) { nni_win_ipc_conn_work *w = &nni_win_ipc_connecter; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&w->mtx); + if ((rv = nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep)) != 0) { + nni_mtx_unlock(&w->mtx); + nni_aio_finish_error(aio, rv); + return; + } NNI_ASSERT(!nni_list_active(&w->waiters, ep)); ep->con_aio = aio; nni_list_append(&w->waiters, ep); - nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep); nni_cv_wake(&w->cv); nni_mtx_unlock(&w->mtx); } diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c index 070bf40d..2bc68c32 100644 --- a/src/platform/windows/win_resolv.c +++ b/src/platform/windows/win_resolv.c @@ -29,49 +29,55 @@ #define NNG_WIN_RESOLV_CONCURRENCY 4 #endif -static nni_taskq *nni_win_resolv_tq = NULL; -static nni_mtx nni_win_resolv_mtx; - -typedef struct nni_win_resolv_item nni_win_resolv_item; -struct nni_win_resolv_item { - int family; - int passive; - const char *name; - const char *serv; - int proto; - nni_aio * aio; - nni_task task; +static nni_taskq *win_resolv_tq = NULL; +static nni_mtx win_resolv_mtx; + +typedef struct win_resolv_item win_resolv_item; +struct win_resolv_item { + int family; + int passive; + const char * name; + const char * serv; + int proto; + nni_aio * aio; + nni_task * task; + nng_sockaddr sa; }; static void -nni_win_resolv_finish(nni_win_resolv_item *item, int rv) +win_resolv_finish(win_resolv_item *item, int rv) { - nni_aio *aio = item->aio; - - nni_aio_set_prov_data(aio, NULL); - nni_aio_finish(aio, rv, 0); - NNI_FREE_STRUCT(item); + nni_aio *aio; + + if (((aio = item->aio) != NULL) && + (nni_aio_get_prov_data(aio) == item)) { + nni_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_data(aio, NULL); + memcpy(sa, &item->sa, sizeof(*sa)); + nni_aio_finish(aio, rv, 0); + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + } } static void -nni_win_resolv_cancel(nni_aio *aio, int rv) +win_resolv_cancel(nni_aio *aio, int rv) { - nni_win_resolv_item *item; + win_resolv_item *item; - nni_mtx_lock(&nni_win_resolv_mtx); + nni_mtx_lock(&win_resolv_mtx); if ((item = nni_aio_get_prov_data(aio)) == NULL) { - nni_mtx_unlock(&nni_win_resolv_mtx); + nni_mtx_unlock(&win_resolv_mtx); return; } nni_aio_set_prov_data(aio, NULL); - nni_mtx_unlock(&nni_win_resolv_mtx); - nni_task_cancel(&item->task); - NNI_FREE_STRUCT(item); + item->aio = NULL; + nni_mtx_unlock(&win_resolv_mtx); nni_aio_finish_error(aio, rv); } static int -nni_win_gai_errno(int rv) +win_gai_errno(int rv) { switch (rv) { case 0: @@ -98,17 +104,26 @@ nni_win_gai_errno(int rv) } static void -nni_win_resolv_task(void *arg) +win_resolv_task(void *arg) { - nni_win_resolv_item *item = arg; - nni_aio * aio = item->aio; - struct addrinfo hints; - struct addrinfo * results; - struct addrinfo * probe; - int rv; + win_resolv_item *item = arg; + struct addrinfo hints; + struct addrinfo *results; + struct addrinfo *probe; + int rv; results = NULL; + nni_mtx_lock(&win_resolv_mtx); + if (item->aio == NULL) { + nni_mtx_unlock(&win_resolv_mtx); + // Caller canceled, and no longer cares about this. + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + return; + } + nni_mtx_unlock(&win_resolv_mtx); + // We treat these all as IP addresses. The service and the // host part are split. memset(&hints, 0, sizeof(hints)); @@ -124,7 +139,7 @@ nni_win_resolv_task(void *arg) rv = getaddrinfo(item->name, item->serv, &hints, &results); if (rv != 0) { - rv = nni_win_gai_errno(rv); + rv = win_gai_errno(rv); goto done; } @@ -142,7 +157,7 @@ nni_win_resolv_task(void *arg) if (probe != NULL) { struct sockaddr_in * sin; struct sockaddr_in6 *sin6; - nni_sockaddr * sa = nni_aio_get_input(aio, 0); + nni_sockaddr * sa = &item->sa; switch (probe->ai_addr->sa_family) { case AF_INET: @@ -167,17 +182,18 @@ done: if (results != NULL) { freeaddrinfo(results); } - nni_mtx_lock(&nni_win_resolv_mtx); - nni_win_resolv_finish(item, rv); - nni_mtx_unlock(&nni_win_resolv_mtx); + nni_mtx_lock(&win_resolv_mtx); + win_resolv_finish(item, rv); + nni_mtx_unlock(&win_resolv_mtx); } static void -nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, +win_resolv_ip(const char *host, const char *serv, int passive, int family, int proto, nni_aio *aio) { - nni_win_resolv_item *item; - int fam; + win_resolv_item *item; + int fam; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -202,8 +218,12 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, return; } - nni_task_init( - nni_win_resolv_tq, &item->task, nni_win_resolv_task, item); + rv = nni_task_init(&item->task, win_resolv_tq, win_resolv_task, item); + if (rv != 0) { + NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); + return; + } item->passive = passive; item->name = host; @@ -212,24 +232,30 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family, item->aio = aio; item->family = fam; - nni_mtx_lock(&nni_win_resolv_mtx); - nni_aio_schedule(aio, nni_win_resolv_cancel, item); - nni_task_dispatch(&item->task); - nni_mtx_unlock(&nni_win_resolv_mtx); + nni_mtx_lock(&win_resolv_mtx); + if ((rv = nni_aio_schedule(aio, win_resolv_cancel, item)) != 0) { + nni_mtx_unlock(&win_resolv_mtx); + nni_task_fini(item->task); + NNI_FREE_STRUCT(item); + nni_aio_finish_error(aio, rv); + return; + } + nni_task_dispatch(item->task); + nni_mtx_unlock(&win_resolv_mtx); } void nni_plat_tcp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - nni_win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); + win_resolv_ip(host, serv, passive, family, IPPROTO_TCP, aio); } void nni_plat_udp_resolv( const char *host, const char *serv, int family, int passive, nni_aio *aio) { - nni_win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); + win_resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio); } int @@ -237,10 +263,10 @@ nni_win_resolv_sysinit(void) { int rv; - nni_mtx_init(&nni_win_resolv_mtx); + nni_mtx_init(&win_resolv_mtx); - if ((rv = nni_taskq_init(&nni_win_resolv_tq, 4)) != 0) { - nni_mtx_fini(&nni_win_resolv_mtx); + if ((rv = nni_taskq_init(&win_resolv_tq, 4)) != 0) { + nni_mtx_fini(&win_resolv_mtx); return (rv); } return (0); @@ -249,11 +275,11 @@ nni_win_resolv_sysinit(void) void nni_win_resolv_sysfini(void) { - if (nni_win_resolv_tq != NULL) { - nni_taskq_fini(nni_win_resolv_tq); - nni_win_resolv_tq = NULL; + if (win_resolv_tq != NULL) { + nni_taskq_fini(win_resolv_tq); + win_resolv_tq = NULL; } - nni_mtx_fini(&nni_win_resolv_mtx); + nni_mtx_fini(&win_resolv_mtx); } #endif // NNG_PLATFORM_WINDOWS diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index a2ae72fe..2e9d58d7 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -107,6 +107,7 @@ static unsigned int __stdcall nni_plat_thr_main(void *arg) { nni_plat_thr *thr = arg; + thr->id = GetCurrentThreadId(); thr->func(thr->arg); return (0); } @@ -138,6 +139,12 @@ nni_plat_thr_fini(nni_plat_thr *thr) } } +bool +nni_plat_thr_is_self(nni_plat_thr *thr) +{ + return (GetCurrentThreadId() == thr->id); +} + static LONG plat_inited = 0; int diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 019727be..965cbea7 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -216,8 +216,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) return; } - rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); - if (rv != 0) { + if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, rv); return; @@ -354,6 +353,9 @@ rep0_pipe_stop(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_mtx_lock(&s->lk); if (nni_list_active(&s->recvpipes, p)) { // We are no longer "receivable". @@ -379,9 +381,6 @@ rep0_pipe_stop(void *arg) } nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); - - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); } static void @@ -459,8 +458,7 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&s->lk); if ((p = nni_list_first(&s->recvpipes)) == NULL) { int rv; - rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx); - if (rv != 0) { + if ((rv = nni_aio_schedule(aio, rep0_cancel_recv, ctx)) != 0) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, rv); return; @@ -516,10 +514,10 @@ rep0_pipe_recv_cb(void *arg) bool end = false; if (hops > s->ttl) { - // This isn't malformed, but it has gone through - // too many hops. Do not disconnect, because we - // can legitimately receive messages with too many - // hops from devices, etc. + // This isn't malformed, but it has gone + // through too many hops. Do not disconnect, + // because we can legitimately receive messages + // with too many hops from devices, etc. goto drop; } hops++; @@ -566,9 +564,9 @@ rep0_pipe_recv_cb(void *arg) nni_msg_header_clear(msg); ctx->pipe_id = p->id; - // If we got a request on a pipe that wasn't busy, we should mark - // it sendable. (The sendable flag is not set when there is no - // request needing a reply.) + // If we got a request on a pipe that wasn't busy, we should + // mark it sendable. (The sendable flag is not set when there + // is no request needing a reply.) if ((ctx == s->ctx) && (!p->busy)) { nni_pollable_raise(s->sendable); } diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 3ecc8604..bbb0b886 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -638,7 +638,7 @@ req0_ctx_recv(void *arg, nni_aio *aio) if ((msg = ctx->repmsg) == NULL) { int rv; - rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_recv, ctx); + rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx); if (rv != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); @@ -740,7 +740,7 @@ req0_ctx_send(void *arg, nni_aio *aio) } // If no pipes are ready, and the request was a poll (no background // schedule), then fail it. Should be NNG_TIMEDOUT. - rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_send, ctx); + rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx); if ((rv != 0) && (nni_list_empty(&s->readypipes))) { nni_idhash_remove(s->reqids, id); nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 60cf188a..e553f6ce 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -210,8 +210,7 @@ resp0_ctx_send(void *arg, nni_aio *aio) return; } - if ((rv = nni_aio_schedule_verify(aio, resp0_ctx_cancel_send, ctx)) != - 0) { + if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; @@ -450,7 +449,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if ((p = nni_list_first(&s->recvpipes)) == NULL) { int rv; - rv = nni_aio_schedule_verify(aio, resp0_cancel_recv, ctx); + rv = nni_aio_schedule(aio, resp0_cancel_recv, ctx); if (rv != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index a2427009..aba72715 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -241,11 +241,16 @@ http_connect_cancel(nni_aio *aio, int rv) void nni_http_client_connect(nni_http_client *c, nni_aio *aio) { + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - nni_aio_schedule(aio, http_connect_cancel, aio); + if ((rv = nni_aio_schedule(aio, http_connect_cancel, aio)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&c->aios, aio); if (nni_list_first(&c->aios) == aio) { http_conn_start(c); diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 0de40e10..f3b16370 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -365,6 +365,8 @@ http_rd_cancel(nni_aio *aio, int rv) static void http_rd_submit(nni_http_conn *conn, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -372,7 +374,10 @@ http_rd_submit(nni_http_conn *conn, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_schedule(aio, http_rd_cancel, conn); + if ((rv = nni_aio_schedule(aio, http_rd_cancel, conn)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&conn->rdq, aio); if (conn->rd_uaio == NULL) { http_rd_start(conn); @@ -479,6 +484,8 @@ http_wr_cancel(nni_aio *aio, int rv) static void http_wr_submit(nni_http_conn *conn, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -486,8 +493,12 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if ((rv = nni_aio_schedule(aio, http_wr_cancel, conn)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&conn->wrq, aio); - nni_aio_schedule(aio, http_wr_cancel, conn); + if (conn->wr_uaio == NULL) { http_wr_start(conn); } diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index 5246e06c..f721d4aa 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -566,6 +566,8 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len) void nni_tls_send(nni_tls *tp, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -575,7 +577,11 @@ nni_tls_send(nni_tls *tp, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_schedule(aio, nni_tls_cancel, tp); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) { + nni_mtx_unlock(&tp->lk); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&tp->sends, aio); nni_tls_do_send(tp); nni_mtx_unlock(&tp->lk); @@ -584,6 +590,8 @@ nni_tls_send(nni_tls *tp, nni_aio *aio) void nni_tls_recv(nni_tls *tp, nni_aio *aio) { + int rv; + if (nni_aio_begin(aio) != 0) { return; } @@ -593,7 +601,12 @@ nni_tls_recv(nni_tls *tp, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_schedule(aio, nni_tls_cancel, tp); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel, tp)) != 0) { + nni_mtx_unlock(&tp->lk); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&tp->recvs, aio); nni_tls_do_recv(tp); nni_mtx_unlock(&tp->lk); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index e562b209..9a3f5519 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -667,8 +667,12 @@ ws_send_close(nni_ws *ws, uint16_t code) return; } // Close frames get priority! + if ((rv = nni_aio_schedule(aio, ws_cancel_close, ws)) != 0) { + ws->wclose = false; + nni_aio_finish_error(aio, rv); + return; + } nni_list_prepend(&ws->txmsgs, wm); - nni_aio_schedule(aio, ws_cancel_close, ws); ws_start_write(ws); } @@ -737,7 +741,12 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio) ws_msg_fini(wm); return; } - nni_aio_schedule(aio, ws_write_cancel, ws); + if ((rv = nni_aio_schedule(aio, ws_write_cancel, ws)) != 0) { + nni_mtx_unlock(&ws->mtx); + nni_aio_finish_error(aio, rv); + ws_msg_fini(wm); + return; + } nni_aio_set_prov_extra(aio, 0, wm); nni_list_append(&ws->sendq, aio); nni_list_append(&ws->txmsgs, wm); @@ -1060,7 +1069,12 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio) return; } nni_mtx_lock(&ws->mtx); - nni_aio_schedule(aio, ws_read_cancel, ws); + if ((rv = nni_aio_schedule(aio, ws_read_cancel, ws)) != 0) { + nni_mtx_unlock(&ws->mtx); + ws_msg_fini(wm); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_set_prov_extra(aio, 0, wm); nni_list_append(&ws->recvq, aio); nni_list_append(&ws->rxmsgs, wm); @@ -1647,6 +1661,7 @@ void nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) { nni_ws *ws; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -1669,7 +1684,11 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio) nni_aio_finish(aio, 0, 0); return; } - nni_aio_schedule(aio, ws_accept_cancel, l); + if ((rv = nni_aio_schedule(aio, ws_accept_cancel, l)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&l->mtx); + return; + } nni_list_append(&l->aios, aio); nni_mtx_unlock(&l->mtx); } @@ -1995,11 +2014,16 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio) ws_fini(ws); return; } + if ((rv = nni_aio_schedule(aio, ws_dial_cancel, ws)) != 0) { + nni_mtx_unlock(&d->mtx); + nni_aio_finish_error(aio, rv); + ws_fini(ws); + return; + } ws->dialer = d; ws->useraio = aio; ws->mode = NNI_EP_MODE_DIAL; nni_list_append(&d->wspend, ws); - nni_aio_schedule(aio, ws_dial_cancel, ws); nni_http_client_connect(d->client, ws->connaio); nni_mtx_unlock(&d->mtx); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 8bfb097e..0f159d3a 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -349,6 +349,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; nni_inproc_ep *server; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -375,7 +376,11 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // We don't have to worry about the case where a zero timeout // on connect was specified, as there is no option to specify // that in the upper API. - nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -407,6 +412,7 @@ static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { nni_inproc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -416,7 +422,11 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) // We need not worry about the case where a non-blocking // accept was tried -- there is no API to do such a thing. - nni_aio_schedule(aio, nni_inproc_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } // We are already on the master list of servers, thanks to bind. // Insert us into pending server aios, and then run accept list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index c91606c6..2347e24c 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -418,12 +418,17 @@ static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&pipe->mtx); - nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&pipe->sendq, aio); if (nni_list_first(&pipe->sendq) == aio) { nni_ipc_pipe_dosend(pipe, aio); @@ -474,14 +479,18 @@ static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&pipe->mtx); - // Transports never have a zero length timeout. - (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&pipe->recvq, aio); if (nni_list_first(&pipe->recvq) == aio) { @@ -496,11 +505,17 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_ipc_pipe *pipe = arg; nni_aio * negaio; nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&pipe->mtx); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_start, pipe)) != 0) { + nni_mtx_unlock(&pipe->mtx); + nni_aio_finish_error(aio, rv); + return; + } pipe->txhead[0] = 0; pipe->txhead[1] = 'S'; pipe->txhead[2] = 'P'; @@ -517,7 +532,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &pipe->txhead[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_aio_schedule(aio, nni_ipc_cancel_start, pipe); nni_plat_ipc_pipe_send(pipe->ipp, negaio); nni_mtx_unlock(&pipe->mtx); } @@ -728,6 +742,7 @@ static void nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -735,7 +750,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_ipc_ep_accept(ep->iep, ep->aio); @@ -746,6 +765,7 @@ static void nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -753,7 +773,11 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_ipc_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_ipc_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_ipc_ep_connect(ep->iep, ep->aio); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 3d738a98..f2cdf8ac 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -409,12 +409,17 @@ static void nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, nni_tcp_cancel_tx, p); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_tx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tcp_pipe_dosend(p, aio); @@ -465,12 +470,18 @@ static void nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, nni_tcp_cancel_rx, p); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_rx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tcp_pipe_dorecv(p); @@ -535,11 +546,17 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_tcp_pipe *p = arg; nni_aio * negaio; nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_nego, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->txlen[0] = 0; p->txlen[1] = 'S'; p->txlen[2] = 'P'; @@ -556,7 +573,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_aio_schedule(aio, nni_tcp_cancel_nego, p); nni_plat_tcp_pipe_send(p->tpp, negaio); nni_mtx_unlock(&p->mtx); } @@ -749,6 +765,7 @@ static void nni_tcp_ep_accept(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -756,7 +773,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_tcp_ep_accept(ep->tep, ep->aio); @@ -767,6 +788,7 @@ static void nni_tcp_ep_connect(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -774,7 +796,11 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tcp_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tcp_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_tcp_ep_connect(ep->tep, ep->aio); diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 78fdd622..c863a85e 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -416,12 +416,17 @@ static void nni_tls_pipe_send(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, nni_tls_cancel_tx, p); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_tx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&p->sendq, aio); if (nni_list_first(&p->sendq) == aio) { nni_tls_pipe_dosend(p, aio); @@ -472,13 +477,18 @@ static void nni_tls_pipe_recv(void *arg, nni_aio *aio) { nni_tls_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_rx, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } - nni_aio_schedule(aio, nni_tls_cancel_rx, p); nni_aio_list_append(&p->recvq, aio); if (nni_list_first(&p->recvq) == aio) { nni_tls_pipe_dorecv(p); @@ -542,11 +552,17 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) nni_tls_pipe *p = arg; nni_aio * negaio; nni_iov iov; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_nego, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->txlen[0] = 0; p->txlen[1] = 'S'; p->txlen[2] = 'P'; @@ -563,7 +579,6 @@ nni_tls_pipe_start(void *arg, nni_aio *aio) iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; nni_aio_set_iov(negaio, 1, &iov); - nni_aio_schedule(aio, nni_tls_cancel_nego, p); nni_tls_send(p->tls, negaio); nni_mtx_unlock(&p->mtx); } @@ -769,13 +784,18 @@ static void nni_tls_ep_accept(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tls_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } ep->user_aio = aio; nni_plat_tcp_ep_accept(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); @@ -785,13 +805,17 @@ static void nni_tls_ep_connect(void *arg, nni_aio *aio) { nni_tls_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); - nni_aio_schedule(aio, nni_tls_cancel_ep, ep); + if ((rv = nni_aio_schedule(aio, nni_tls_cancel_ep, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + } ep->user_aio = aio; nni_plat_tcp_ep_connect(ep->tep, ep->aio); nni_mtx_unlock(&ep->mtx); diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 9d967668..12f3aeb5 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -128,12 +128,17 @@ static void ws_pipe_recv(void *arg, nni_aio *aio) { ws_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, ws_pipe_recv_cancel, p); + if ((rv = nni_aio_schedule(aio, ws_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->user_rxaio = aio; nni_ws_recv_msg(p->ws, p->rxaio); nni_mtx_unlock(&p->mtx); @@ -158,12 +163,17 @@ static void ws_pipe_send(void *arg, nni_aio *aio) { ws_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - nni_aio_schedule(aio, ws_pipe_send_cancel, p); + if ((rv = nni_aio_schedule(aio, ws_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } p->user_txaio = aio; nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); @@ -289,6 +299,7 @@ static void ws_ep_accept(void *arg, nni_aio *aio) { ws_ep *ep = arg; + int rv; // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. @@ -297,7 +308,11 @@ ws_ep_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&ep->mtx); - nni_aio_schedule(aio, ws_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } nni_list_append(&ep->aios, aio); if (aio == nni_list_first(&ep->aios)) { nni_ws_listener_accept(ep->listener, ep->accaio); @@ -309,6 +324,7 @@ static void ws_ep_connect(void *arg, nni_aio *aio) { ws_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -326,9 +342,12 @@ ws_ep_connect(void *arg, nni_aio *aio) } nni_mtx_lock(&ep->mtx); + if ((rv = nni_aio_schedule(aio, ws_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } NNI_ASSERT(nni_list_empty(&ep->aios)); - - nni_aio_schedule(aio, ws_ep_cancel, ep); ep->started = true; nni_list_append(&ep->aios, aio); nni_ws_dialer_dial(ep->dialer, ep->connaio); diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index 54e402be..f7139b94 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -1902,6 +1902,7 @@ static void zt_pipe_recv(void *arg, nni_aio *aio) { zt_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -1912,7 +1913,11 @@ zt_pipe_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - nni_aio_schedule(aio, zt_pipe_cancel_recv, p); + if ((rv = nni_aio_schedule(aio, zt_pipe_cancel_recv, p)) != 0) { + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); + return; + } p->zp_user_rxaio = aio; zt_pipe_dorecv(p); nni_mtx_unlock(&zt_lk); @@ -2049,7 +2054,13 @@ zt_pipe_ping_cb(void *arg) // going to send a ping. (We don't increment the try count // unless we actually do send one though.) if (nni_aio_begin(aio) == 0) { - nni_aio_schedule(aio, zt_pipe_cancel_ping, p); + int rv; + rv = nni_aio_schedule(aio, zt_pipe_cancel_ping, p); + if (rv != 0) { + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); + return; + } p->zp_ping_active = 1; if (now > (p->zp_last_recv + p->zp_ping_time)) { p->zp_ping_try++; @@ -2081,10 +2092,15 @@ zt_pipe_start(void *arg, nni_aio *aio) p->zp_ping_try = 0; nni_aio_set_timeout(aio, p->zp_ping_time); if (nni_aio_begin(p->zp_ping_aio) == 0) { - nni_aio_schedule( + int rv; + rv = nni_aio_schedule( p->zp_ping_aio, zt_pipe_cancel_ping, p); - p->zp_ping_active = 1; - zt_pipe_send_ping(p); + if (rv != 0) { + nni_aio_finish_error(p->zp_ping_aio, rv); + } else { + p->zp_ping_active = 1; + zt_pipe_send_ping(p); + } } } nni_aio_finish(aio, 0, 0); @@ -2405,12 +2421,17 @@ static void zt_ep_accept(void *arg, nni_aio *aio) { zt_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&zt_lk); - nni_aio_schedule(aio, zt_ep_cancel, ep); + if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&zt_lk); + nni_aio_finish_error(aio, rv); + return; + } nni_aio_list_append(&ep->ze_aios, aio); zt_ep_doaccept(ep); nni_mtx_unlock(&zt_lk); @@ -2486,10 +2507,14 @@ zt_ep_conn_req_cb(void *arg) if (nni_list_first(&ep->ze_aios) != NULL) { nni_aio_set_timeout(aio, ep->ze_conn_time); if (nni_aio_begin(aio) == 0) { - nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); - ep->ze_creq_active = 1; - ep->ze_creq_try++; - zt_ep_send_conn_req(ep); + rv = nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep); + if (rv != 0) { + nni_aio_finish_error(aio, rv); + } else { + ep->ze_creq_active = 1; + ep->ze_creq_try++; + zt_ep_send_conn_req(ep); + } } } @@ -2523,18 +2548,27 @@ zt_ep_connect(void *arg, nni_aio *aio) if ((ep->ze_raddr >> 24) == 0) { ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift); } + if ((rv = nni_aio_schedule(aio, zt_ep_cancel, ep)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&zt_lk); + return; + } nni_aio_list_append(&ep->ze_aios, aio); ep->ze_running = 1; - nni_aio_schedule(aio, zt_ep_cancel, ep); nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time); if (nni_aio_begin(ep->ze_creq_aio) == 0) { - nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); - // Send out the first connect message; if not - // yet attached to network message will be dropped. - ep->ze_creq_try = 1; - ep->ze_creq_active = 1; - zt_ep_send_conn_req(ep); + rv = nni_aio_schedule( + ep->ze_creq_aio, zt_ep_conn_req_cancel, ep); + if (rv != 0) { + nni_aio_finish_error(ep->ze_creq_aio, rv); + } else { + // Send out the first connect message; if not + // yet attached to network message will be dropped. + ep->ze_creq_try = 1; + ep->ze_creq_active = 1; + zt_ep_send_conn_req(ep); + } } nni_mtx_unlock(&zt_lk); } |
