diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-06 14:42:53 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-06 18:57:29 -0700 |
| commit | 953ca274ae57f8edd12536a3dd15d134aa6e5576 (patch) | |
| tree | 7a0e889fbae7b525befefedcb5cb8f10820e7a47 | |
| parent | 89cba92d13fbc5e059336fd054be30e50d8a2621 (diff) | |
| download | nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.gz nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.tar.bz2 nng-953ca274ae57f8edd12536a3dd15d134aa6e5576.zip | |
fixes #568 Want a single reader/write lock on socket child objects
fixes #170 Make more use of reaper
This is a complete restructure/rethink of how child objects interact
with the socket. (This also backs out #576 as it turns out not to be
needed.) While 568 says reader/writer lock, for now we have settled
for a single writer lock. Its likely that this is sufficient.
Essentially we use the single socket lock to guard lists of the socket
children. We also use deferred deletion in the idhash to facilitate
teardown, which means endpoint closes are no longer synchronous.
We use the reaper to clean up objects when the reference count drops
to zero. We make a special exception for pipes, since they really
are not reference counted by their parents, and they are leaf objects
anyway.
We believe this addresses the main outstanding race conditions in
a much more correct and holistic way.
Note that endpoint shutdown is a little tricky, as it makes use of
atomic flags to guard against double entry, and against recursive
lock entry. This is something that would be nice to make a bit more
obvious, but what we have is safe, and the complexity is at least
confined to one place.
| -rw-r--r-- | src/core/dialer.c | 237 | ||||
| -rw-r--r-- | src/core/dialer.h | 3 | ||||
| -rw-r--r-- | src/core/init.c | 3 | ||||
| -rw-r--r-- | src/core/listener.c | 160 | ||||
| -rw-r--r-- | src/core/listener.h | 4 | ||||
| -rw-r--r-- | src/core/pipe.c | 207 | ||||
| -rw-r--r-- | src/core/pipe.h | 39 | ||||
| -rw-r--r-- | src/core/reap.c | 81 | ||||
| -rw-r--r-- | src/core/reap.h | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 369 | ||||
| -rw-r--r-- | src/core/socket.h | 16 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 107 | ||||
| -rw-r--r-- | src/nng.h | 2 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.c | 10 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.c | 8 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 12 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c | 4 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.c | 6 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 6 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 4 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 6 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 6 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 10 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq.c | 10 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 6 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 8 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond.c | 10 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey.c | 10 | ||||
| -rw-r--r-- | tests/sock.c | 2 |
29 files changed, 620 insertions, 727 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index e93c893e..3144d673 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -9,36 +9,12 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -struct nni_dialer { - nni_tran_dialer_ops d_ops; // transport ops - nni_tran * d_tran; // transport pointer - void * d_data; // transport private - uint64_t d_id; // endpoint id - nni_list_node d_node; // per socket list - nni_sock * d_sock; - nni_url * d_url; - int d_refcnt; - int d_lastrv; // last result from synchronous - bool d_synch; // synchronous connect in progress? - bool d_started; - bool d_closed; // full shutdown - nni_atomic_flag d_closing; // close pending (waiting on refcnt) - nni_mtx d_mtx; - nni_cv d_cv; - nni_list d_pipes; - nni_aio * d_con_aio; - nni_aio * d_tmo_aio; // backoff timer - nni_duration d_maxrtime; // maximum time for reconnect - nni_duration d_currtime; // current time for reconnect - nni_duration d_inirtime; // initial time for reconnect - nni_time d_conntime; // time of last good connect -}; - // Functionality related to dialers. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); @@ -65,6 +41,7 @@ nni_dialer_sys_init(void) void nni_dialer_sys_fini(void) { + nni_reap_drain(); nni_mtx_fini(&dialers_lk); nni_idhash_fini(dialers); dialers = NULL; @@ -73,26 +50,15 @@ nni_dialer_sys_fini(void) uint32_t nni_dialer_id(nni_dialer *d) { - return ((uint32_t) d->d_id); + return (d->d_id); } -static void -dialer_destroy(nni_dialer *d) +void +nni_dialer_destroy(nni_dialer *d) { - if (d == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (d->d_id != 0) { - nni_idhash_remove(dialers, d->d_id); - } - nni_aio_stop(d->d_con_aio); nni_aio_stop(d->d_tmo_aio); - nni_sock_remove_dialer(d->d_sock, d); - nni_aio_fini(d->d_con_aio); nni_aio_fini(d->d_tmo_aio); @@ -126,13 +92,13 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_url_free(url); return (NNG_ENOMEM); } - d->d_url = url; - d->d_closed = false; - d->d_started = false; - d->d_data = NULL; - d->d_refcnt = 1; - d->d_sock = s; - d->d_tran = tran; + d->d_url = url; + d->d_closed = false; + d->d_data = NULL; + d->d_refcnt = 1; + d->d_sock = s; + d->d_tran = tran; + nni_atomic_flag_reset(&d->d_started); nni_atomic_flag_reset(&d->d_closing); // Make a copy of the endpoint operations. This allows us to @@ -141,8 +107,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) d->d_ops = *tran->tran_dialer; NNI_LIST_NODE_INIT(&d->d_node); - - nni_pipe_ep_list_init(&d->d_pipes); + NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); nni_mtx_init(&d->d_mtx); nni_cv_init(&d->d_cv, &d->d_mtx); @@ -150,9 +115,9 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || - ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) || + ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { - dialer_destroy(d); + nni_dialer_destroy(d); return (rv); } @@ -203,80 +168,33 @@ nni_dialer_rele(nni_dialer *d) { nni_mtx_lock(&dialers_lk); d->d_refcnt--; - if (d->d_refcnt == 0) { - nni_cv_wake(&d->d_cv); + if ((d->d_refcnt == 0) && (d->d_closed)) { + nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); } nni_mtx_unlock(&dialers_lk); } -int -nni_dialer_shutdown(nni_dialer *d) -{ - if (nni_atomic_flag_test_and_set(&d->d_closing)) { - return (NNG_ECLOSED); - } - - // Abort any remaining in-flight operations. - nni_aio_close(d->d_con_aio); - nni_aio_close(d->d_tmo_aio); - - // Stop the underlying transport. - d->d_ops.d_close(d->d_data); - - return (0); -} - void nni_dialer_close(nni_dialer *d) { - nni_pipe *p; - - nni_mtx_lock(&d->d_mtx); + nni_mtx_lock(&dialers_lk); if (d->d_closed) { - nni_mtx_unlock(&d->d_mtx); + nni_mtx_unlock(&dialers_lk); nni_dialer_rele(d); return; } d->d_closed = true; - nni_mtx_unlock(&d->d_mtx); - - nni_dialer_shutdown(d); - - nni_aio_stop(d->d_con_aio); - nni_aio_stop(d->d_tmo_aio); - - nni_mtx_lock(&d->d_mtx); - NNI_LIST_FOREACH (&d->d_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) { - nni_cv_wait(&d->d_cv); - } - nni_mtx_unlock(&d->d_mtx); - - dialer_destroy(d); -} + nni_mtx_unlock(&dialers_lk); -// This function starts an exponential backoff timer for reconnecting. -static void -dialer_timer_start(nni_dialer *d) -{ - nni_duration backoff; + // Remove us from the table so we cannot be found. + // This is done fairly early in the teardown process. + // If we're here, either the socket or the listener has been + // closed at the user request, so there would be a race anyway. + nni_idhash_remove(dialers, d->d_id); - backoff = d->d_currtime; - d->d_currtime *= 2; - if (d->d_currtime > d->d_maxrtime) { - d->d_currtime = d->d_maxrtime; - } + nni_dialer_shutdown(d); - // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is - // pretty similar to 802 style backoff, except that we have a - // nearly uniform time period instead of discrete slot times. - // This algorithm may lead to slight biases because we don't - // have a statistically perfect distribution with the modulo of - // the random number, but this really doesn't matter. - nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); + nni_dialer_rele(d); } static void @@ -285,11 +203,9 @@ dialer_timer_cb(void *arg) nni_dialer *d = arg; nni_aio * aio = d->d_tmo_aio; - nni_mtx_lock(&d->d_mtx); if (nni_aio_result(aio) == 0) { dialer_connect_start(d); } - nni_mtx_unlock(&d->d_mtx); } static void @@ -304,52 +220,32 @@ dialer_connect_cb(void *arg) if ((rv = nni_aio_result(aio)) == 0) { void *data = nni_aio_get_output(aio, 0); NNI_ASSERT(data != NULL); - rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); + rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); } nni_mtx_lock(&d->d_mtx); - synch = d->d_synch; - d->d_synch = false; - if (rv == 0) { - nni_pipe_set_dialer(p, d); - nni_list_append(&d->d_pipes, p); - - // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops - // us immediately, is going to get hit pretty hard - // (depending on the initial backoff) with no - // exponential backoff. This can happen if we wind up - // trying to connect to some port that does not speak - // SP for example. - d->d_currtime = d->d_inirtime; - } + synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { - nni_pipe_stop(p); - } - - nni_mtx_lock(&d->d_mtx); switch (rv) { case 0: - // No further outgoing connects -- we will restart a - // connection from the pipe when the pipe is removed. + nni_dialer_add_pipe(d, p); break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled/closed -- stop everything. + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. break; default: - // redial, but only if we are not synchronous if (!synch) { - dialer_timer_start(d); + nni_dialer_timer_start(d); } break; } if (synch) { + nni_mtx_lock(&d->d_mtx); + d->d_synch = false; d->d_lastrv = rv; nni_cv_wake(&d->d_cv); + nni_mtx_unlock(&d->d_mtx); } - nni_mtx_unlock(&d->d_mtx); } static void @@ -366,70 +262,37 @@ nni_dialer_start(nni_dialer *d, int flags) { int rv = 0; - // nni_sock_reconntimes(d->d_sock, &d->d_inirtime, - //&d->d_maxrtime); - d->d_currtime = d->d_inirtime; - - nni_mtx_lock(&d->d_mtx); - - if (d->d_started) { - nni_mtx_unlock(&d->d_mtx); + if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); } if ((flags & NNG_FLAG_NONBLOCK) != 0) { - d->d_started = true; - dialer_connect_start(d); + nni_mtx_lock(&d->d_mtx); + d->d_currtime = d->d_inirtime; nni_mtx_unlock(&d->d_mtx); + dialer_connect_start(d); return (0); } - d->d_synch = true; - d->d_started = true; + nni_mtx_lock(&d->d_mtx); + d->d_synch = true; + nni_mtx_unlock(&d->d_mtx); + dialer_connect_start(d); + nni_mtx_lock(&d->d_mtx); while (d->d_synch) { nni_cv_wait(&d->d_cv); } rv = d->d_lastrv; - nni_cv_wake(&d->d_cv); + nni_mtx_unlock(&d->d_mtx); if (rv != 0) { - d->d_started = false; + nni_atomic_flag_reset(&d->d_started); } - nni_mtx_unlock(&d->d_mtx); return (rv); } -void -nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p) -{ - if (d == NULL) { - return; - } - - // Break up the relationship between the dialer and the pipe. - nni_mtx_lock(&d->d_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&d->d_pipes, p)) { - nni_list_remove(&d->d_pipes, p); - } - // Wake up the close thread if it is waiting. - if (d->d_closed) { - if (nni_list_empty(&d->d_pipes)) { - nni_cv_wake(&d->d_cv); - } - } else { - // If this pipe closed, then lets restart the dial operation. - // Since the remote side seems to have closed, lets start with - // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects - // immediately. - dialer_timer_start(d); - } - nni_mtx_unlock(&d->d_mtx); -} - int nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, nni_opt_type t) @@ -508,9 +371,3 @@ nni_dialer_getopt( return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); } - -void -nni_dialer_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_dialer, d_node); -} diff --git a/src/core/dialer.h b/src/core/dialer.h index 56b0fb1b..4a3e127c 100644 --- a/src/core/dialer.h +++ b/src/core/dialer.h @@ -18,11 +18,8 @@ extern int nni_dialer_hold(nni_dialer *); extern void nni_dialer_rele(nni_dialer *); extern uint32_t nni_dialer_id(nni_dialer *); extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); -extern int nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_close(nni_dialer *); extern int nni_dialer_start(nni_dialer *, int); -extern void nni_dialer_list_init(nni_list *); -extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *); extern int nni_dialer_setopt( nni_dialer *, const char *, const void *, size_t, nni_opt_type); diff --git a/src/core/init.c b/src/core/init.c index 30a7a547..c66e54d2 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -75,11 +75,12 @@ nni_fini(void) nni_dialer_sys_fini(); nni_listener_sys_fini(); nni_sock_sys_fini(); - nni_reap_sys_fini(); // must be before timer and aio (expire) + nni_reap_drain(); nni_random_sys_fini(); nni_aio_sys_fini(); nni_timer_sys_fini(); nni_taskq_sys_fini(); + nni_reap_sys_fini(); // must be before timer and aio (expire) nni_mtx_fini(&nni_init_mtx); nni_plat_fini(); diff --git a/src/core/listener.c b/src/core/listener.c index 8e06076d..1bfa657d 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -9,30 +9,12 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -struct nni_listener { - nni_tran_listener_ops l_ops; // transport ops - nni_tran * l_tran; // transport pointer - void * l_data; // transport private - uint64_t l_id; // endpoint id - nni_list_node l_node; // per socket list - nni_sock * l_sock; - nni_url * l_url; - int l_refcnt; - bool l_started; - bool l_closed; // full shutdown - nni_atomic_flag l_closing; // close pending - nni_mtx l_mtx; - nni_cv l_cv; - nni_list l_pipes; - nni_aio * l_acc_aio; - nni_aio * l_tmo_aio; -}; - // Functionality related to listeners. static void listener_accept_start(nni_listener *); @@ -60,6 +42,7 @@ nni_listener_sys_init(void) void nni_listener_sys_fini(void) { + nni_reap_drain(); nni_mtx_fini(&listeners_lk); nni_idhash_fini(listeners); listeners = NULL; @@ -68,32 +51,21 @@ nni_listener_sys_fini(void) uint32_t nni_listener_id(nni_listener *l) { - return ((uint32_t) l->l_id); + return (l->l_id); } -static void -listener_destroy(nni_listener *l) +void +nni_listener_destroy(nni_listener *l) { - if (l == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (l->l_id != 0) { - nni_idhash_remove(listeners, l->l_id); - } - nni_aio_stop(l->l_acc_aio); - - nni_sock_remove_listener(l->l_sock, l); + nni_aio_stop(l->l_tmo_aio); nni_aio_fini(l->l_acc_aio); + nni_aio_fini(l->l_tmo_aio); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); } - nni_cv_fini(&l->l_cv); - nni_mtx_fini(&l->l_mtx); nni_url_free(l->l_url); NNI_FREE_STRUCT(l); } @@ -119,14 +91,14 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) nni_url_free(url); return (NNG_ENOMEM); } - l->l_url = url; - l->l_closed = false; - l->l_started = false; - l->l_data = NULL; - l->l_refcnt = 1; - l->l_sock = s; - l->l_tran = tran; + l->l_url = url; + l->l_closed = false; + l->l_data = NULL; + l->l_refcnt = 1; + l->l_sock = s; + l->l_tran = tran; nni_atomic_flag_reset(&l->l_closing); + nni_atomic_flag_reset(&l->l_started); // Make a copy of the endpoint operations. This allows us to // modify them (to override NULLs for example), and avoids an extra @@ -134,18 +106,14 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) l->l_ops = *tran->tran_listener; NNI_LIST_NODE_INIT(&l->l_node); - - nni_pipe_ep_list_init(&l->l_pipes); - - nni_mtx_init(&l->l_mtx); - nni_cv_init(&l->l_cv, &l->l_mtx); + NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) || - ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) || + ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { - listener_destroy(l); + nni_listener_destroy(l); return (rv); } @@ -196,58 +164,33 @@ nni_listener_rele(nni_listener *l) { nni_mtx_lock(&listeners_lk); l->l_refcnt--; - if (l->l_refcnt == 0) { - nni_cv_wake(&l->l_cv); + if ((l->l_refcnt == 0) && (l->l_closed)) { + nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); } nni_mtx_unlock(&listeners_lk); } -int -nni_listener_shutdown(nni_listener *l) -{ - if (nni_atomic_flag_test_and_set(&l->l_closing)) { - return (NNG_ECLOSED); - } - - // Abort any remaining in-flight accepts. - nni_aio_close(l->l_acc_aio); - nni_aio_close(l->l_tmo_aio); - - // Stop the underlying transport. - l->l_ops.l_close(l->l_data); - - return (0); -} - void nni_listener_close(nni_listener *l) { - nni_pipe *p; - - nni_mtx_lock(&l->l_mtx); + nni_mtx_lock(&listeners_lk); if (l->l_closed) { - nni_mtx_unlock(&l->l_mtx); + nni_mtx_unlock(&listeners_lk); nni_listener_rele(l); return; } l->l_closed = true; - nni_mtx_unlock(&l->l_mtx); - - nni_listener_shutdown(l); + nni_mtx_unlock(&listeners_lk); - nni_aio_stop(l->l_acc_aio); - nni_aio_stop(l->l_tmo_aio); + // Remove us from the table so we cannot be found. + // This is done fairly early in the teardown process. + // If we're here, either the socket or the listener has been + // closed at the user request, so there would be a race anyway. + nni_idhash_remove(listeners, l->l_id); - nni_mtx_lock(&l->l_mtx); - NNI_LIST_FOREACH (&l->l_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) { - nni_cv_wait(&l->l_cv); - } - nni_mtx_unlock(&l->l_mtx); + nni_listener_shutdown(l); - listener_destroy(l); + nni_listener_rele(l); // This will trigger a reap if id count is zero. } static void @@ -272,14 +215,11 @@ listener_accept_cb(void *arg) if ((rv = nni_aio_result(aio)) == 0) { void *data = nni_aio_get_output(aio, 0); NNI_ASSERT(data != NULL); - rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); + rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data); } switch (rv) { case 0: - nni_mtx_lock(&l->l_mtx); - nni_pipe_set_listener(p, l); - nni_list_append(&l->l_pipes, p); - nni_mtx_unlock(&l->l_mtx); + nni_listener_add_pipe(l, p); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown @@ -298,10 +238,6 @@ listener_accept_cb(void *arg) nni_sleep_aio(100, l->l_tmo_aio); break; } - - if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { - nni_pipe_stop(p); - } } static void @@ -319,44 +255,20 @@ nni_listener_start(nni_listener *l, int flags) int rv = 0; NNI_ARG_UNUSED(flags); - nni_mtx_lock(&l->l_mtx); - if (l->l_started) { - nni_mtx_unlock(&l->l_mtx); + if (nni_atomic_flag_test_and_set(&l->l_started)) { return (NNG_ESTATE); } if ((rv = l->l_ops.l_bind(l->l_data)) != 0) { - nni_mtx_unlock(&l->l_mtx); + nni_atomic_flag_reset(&l->l_started); return (rv); } - l->l_started = true; - nni_mtx_unlock(&l->l_mtx); - listener_accept_start(l); return (0); } -void -nni_listener_remove_pipe(nni_listener *l, nni_pipe *p) -{ - if (l == NULL) { - return; - } - // Break up relationship between listener and pipe. - nni_mtx_lock(&l->l_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&l->l_pipes, p)) { - nni_list_remove(&l->l_pipes, p); - } - // Wake up the closer if it is waiting. - if (l->l_closed && nni_list_empty(&l->l_pipes)) { - nni_cv_wake(&l->l_cv); - } - nni_mtx_unlock(&l->l_mtx); -} - int nni_listener_setopt(nni_listener *l, const char *name, const void *val, size_t sz, nni_opt_type t) @@ -406,9 +318,3 @@ nni_listener_getopt( return (nni_sock_getopt(l->l_sock, name, valp, szp, t)); } - -void -nni_listener_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_listener, l_node); -} diff --git a/src/core/listener.h b/src/core/listener.h index 41b1a678..5806fc72 100644 --- a/src/core/listener.h +++ b/src/core/listener.h @@ -18,12 +18,8 @@ extern int nni_listener_hold(nni_listener *); extern void nni_listener_rele(nni_listener *); extern uint32_t nni_listener_id(nni_listener *); extern int nni_listener_create(nni_listener **, nni_sock *, const char *); -extern int nni_listener_shutdown(nni_listener *); extern void nni_listener_close(nni_listener *); extern int nni_listener_start(nni_listener *, int); -extern void nni_listener_list_init(nni_list *); -extern int nni_listener_add_pipe(nni_listener *, nni_pipe *); -extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *); extern int nni_listener_setopt( nni_listener *, const char *, const void *, size_t, nni_opt_type); diff --git a/src/core/pipe.c b/src/core/pipe.c index 6b9b082c..e95fe1d4 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <string.h> @@ -17,54 +18,17 @@ // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. -struct nni_pipe { - uint32_t p_id; - uint32_t p_sock_id; - uint32_t p_dialer_id; - uint32_t p_listener_id; - nni_tran_pipe_ops p_tran_ops; - nni_proto_pipe_ops p_proto_ops; - void * p_tran_data; - void * p_proto_data; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_listener * p_listener; - nni_dialer * p_dialer; - bool p_closed; - nni_atomic_flag p_stop; - bool p_cbs; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio * p_start_aio; -}; - static nni_idhash *nni_pipes; static nni_mtx nni_pipe_lk; -static nni_list nni_pipe_reap_list; -static nni_mtx nni_pipe_reap_lk; -static nni_cv nni_pipe_reap_cv; -static nni_thr nni_pipe_reap_thr; -static int nni_pipe_reap_run; - -static void nni_pipe_reaper(void *); - int nni_pipe_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); nni_mtx_init(&nni_pipe_lk); - nni_mtx_init(&nni_pipe_reap_lk); - nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); - if (((rv = nni_idhash_init(&nni_pipes)) != 0) || - ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != - 0)) { + if ((rv = nni_idhash_init(&nni_pipes)) != 0) { return (rv); } @@ -76,25 +40,13 @@ nni_pipe_sys_init(void) nni_idhash_set_limits( nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_pipe_reap_run = 1; - nni_thr_run(&nni_pipe_reap_thr); - return (0); } void nni_pipe_sys_fini(void) { - if (nni_pipe_reap_run) { - nni_mtx_lock(&nni_pipe_reap_lk); - nni_pipe_reap_run = 0; - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); - } - - nni_thr_fini(&nni_pipe_reap_thr); - nni_cv_fini(&nni_pipe_reap_cv); - nni_mtx_fini(&nni_pipe_reap_lk); + nni_reap_drain(); nni_mtx_fini(&nni_pipe_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); @@ -109,28 +61,7 @@ nni_pipe_destroy(nni_pipe *p) return; } - if (p->p_cbs) { - nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); - } - - // Stop any pending negotiation. - nni_aio_stop(p->p_start_aio); - - if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_stop(p->p_proto_data); - } - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } - - // We have exclusive access at this point, so we can check if - // we are still on any lists. - nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL - nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL - - if (nni_list_node_active(&p->p_sock_node)) { - nni_sock_pipe_remove(p->p_sock, p); - } + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. // This happens during initialization for example. @@ -138,11 +69,24 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_id != 0) { nni_idhash_remove(nni_pipes, p->p_id); } + // This wait guarantees that all callers are done with us. while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&nni_pipe_lk); + // Wait for neg callbacks to finish. (Already closed). + nni_aio_stop(p->p_start_aio); + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); + } + + nni_pipe_remove(p); + if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_fini(p->p_proto_data); } @@ -229,6 +173,8 @@ nni_pipe_close(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); } + + nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p); } bool @@ -241,23 +187,6 @@ nni_pipe_closed(nni_pipe *p) return (rv); } -void -nni_pipe_stop(nni_pipe *p) -{ - // Guard against recursive calls. - if (nni_atomic_flag_test_and_set(&p->p_stop)) { - return; - } - - nni_pipe_close(p); - - // Put it on the reaplist for async cleanup - nni_mtx_lock(&nni_pipe_reap_lk); - nni_list_append(&nni_pipe_reap_list, p); - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); -} - uint16_t nni_pipe_peer(nni_pipe *p) { @@ -270,32 +199,29 @@ nni_pipe_start_cb(void *arg) nni_pipe *p = arg; nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; - uint32_t id = nni_pipe_id(p); if (nni_aio_result(aio) != 0) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - p->p_cbs = true; // We're running all cbs going forward - - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); if (nni_pipe_closed(p)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || nni_sock_closing(s)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); } int -nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) +nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; @@ -315,13 +241,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) p->p_proto_ops = *pops; p->p_proto_data = NULL; p->p_sock = sock; - p->p_sock_id = nni_sock_id(sock); p->p_closed = false; p->p_cbs = false; p->p_refcnt = 0; nni_atomic_flag_reset(&p->p_stop); - NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); @@ -347,27 +271,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) return (0); } -void -nni_pipe_set_listener(nni_pipe *p, nni_listener *l) -{ - p->p_listener = l; - p->p_listener_id = nni_listener_id(l); -} - -void -nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) -{ - p->p_dialer = d; - p->p_dialer_id = nni_dialer_id(d); -} - int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { nni_tran_option *o; - nni_dialer * d; - nni_listener * l; for (o = p->p_tran_ops.p_options; o && o->o_name; o++) { if (strcmp(o->o_name, name) != 0) { @@ -376,24 +284,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } - // Maybe the endpoint knows? We look up by ID, instead of using - // the links directly, to avoid needing a hold on them. The pipe - // can wind up outliving the endpoint in certain circumstances. - // This means that getting these properties from the pipe may wind - // up being somewhat more expensive. - if ((p->p_dialer_id != 0) && - (nni_dialer_find(&d, p->p_dialer_id) == 0)) { - int rv; - rv = nni_dialer_getopt(d, name, val, szp, t); - nni_dialer_rele(d); - return (rv); + // Maybe the endpoint knows? The guarantees on pipes ensure that the + // pipe will not outlive its creating endpoint. + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); } - if ((p->p_listener_id != 0) && - (nni_listener_find(&l, p->p_listener_id) == 0)) { - int rv; - rv = nni_listener_getopt(l, name, val, szp, t); - nni_listener_rele(l); - return (rv); + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); } return (NNG_ENOTSUP); } @@ -414,56 +311,20 @@ nni_pipe_get_proto_data(nni_pipe *p) return (p->p_proto_data); } -void -nni_pipe_sock_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_sock_node); -} - -void -nni_pipe_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_ep_node); -} - uint32_t nni_pipe_sock_id(nni_pipe *p) { - return (p->p_sock_id); + return (nni_sock_id(p->p_sock)); } uint32_t nni_pipe_listener_id(nni_pipe *p) { - return (p->p_listener_id); + return (p->p_listener ? nni_listener_id(p->p_listener) : 0); } uint32_t nni_pipe_dialer_id(nni_pipe *p) { - return (p->p_dialer_id); -} - -static void -nni_pipe_reaper(void *notused) -{ - NNI_ARG_UNUSED(notused); - - nni_mtx_lock(&nni_pipe_reap_lk); - for (;;) { - nni_pipe *p; - if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) { - nni_list_remove(&nni_pipe_reap_list, p); - - nni_mtx_unlock(&nni_pipe_reap_lk); - nni_pipe_destroy(p); - nni_mtx_lock(&nni_pipe_reap_lk); - continue; - } - if (!nni_pipe_reap_run) { - break; - } - nni_cv_wait(&nni_pipe_reap_cv); - } - nni_mtx_unlock(&nni_pipe_reap_lk); + return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0); } diff --git a/src/core/pipe.h b/src/core/pipe.h index 5c505514..1d73ce51 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -29,38 +29,11 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); -// nni_pipe_destroy destroys a pipe -- there must not be any other -// references to it; this is used only during creation failures. -extern void nni_pipe_destroy(nni_pipe *); - // nni_pipe_close closes the underlying transport for the pipe. Further -// operations against will return NNG_ECLOSED. +// operations against will return NNG_ECLOSED. This is idempotent. The +// actual pipe will be reaped asynchronously. extern void nni_pipe_close(nni_pipe *); -// nni_pipe_stop is called to begin the process of tearing down the socket. -// This function runs asynchronously, and takes care to ensure that no -// other consumers are referencing the pipe. We assume that either the -// socket (protocol code) or endpoint may have references to the pipe -// when this function is called. The pipe cleanup is asynchronous and -// make take a while depending on scheduling, etc. The pipe lock itself -// may not be held during this, but any other locks may be. -extern void nni_pipe_stop(nni_pipe *); - -// nni_pipe_create is used only by endpoints - as we don't wish to expose the -// details of the pipe structure outside of pipe.c. This function must be -// called without any locks held, as it will call back up into the socket and -// endpoint, grabbing each of those locks. The function takes ownership of -// the transport specific pipe (3rd argument), regardless of whether it -// succeeds or not. The endpoint should be held when calling this. -extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *); -extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *); -extern void nni_pipe_set_listener(nni_pipe *, nni_listener *); - -// nni_pipe_start is called by the socket to begin any startup activities -// on the pipe before making it ready for use by protocols. For example, -// TCP and IPC initial handshaking is performed this way. -extern void nni_pipe_start(nni_pipe *); - extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); @@ -73,14 +46,6 @@ extern int nni_pipe_getopt( // nni_pipe_set_proto_data function. No locking is performed. extern void *nni_pipe_get_proto_data(nni_pipe *); -// nni_pipe_sock_list_init initializes a list of pipes, to be used by -// a per-socket list. -extern void nni_pipe_sock_list_init(nni_list *); - -// nni_pipe_ep_list_init initializes a list of pipes, to be used by -// a per-endpoint list. -extern void nni_pipe_ep_list_init(nni_list *); - // nni_pipe_find finds a pipe given its ID. It places a hold on the // pipe, which must be released by the caller when it is done. extern int nni_pipe_find(nni_pipe **, uint32_t); diff --git a/src/core/reap.c b/src/core/reap.c index 8191dba3..bfad6c32 100644 --- a/src/core/reap.c +++ b/src/core/reap.c @@ -14,47 +14,62 @@ #include <stdbool.h> -static nni_list nni_reap_list; -static nni_mtx nni_reap_mtx; -static nni_cv nni_reap_cv; -static bool nni_reap_exit = false; -static nni_thr nni_reap_thr; +static nni_list reap_list; +static nni_mtx reap_mtx; +static nni_cv reap_cv; +static nni_cv reap_empty_cv; +static bool reap_exit = false; +static bool reap_empty = false; +static nni_thr reap_thr; static void -nni_reap_stuff(void *notused) +reap_worker(void *notused) { NNI_ARG_UNUSED(notused); - nni_mtx_lock(&nni_reap_mtx); + nni_mtx_lock(&reap_mtx); for (;;) { nni_reap_item *item; - if ((item = nni_list_first(&nni_reap_list)) != NULL) { - nni_list_remove(&nni_reap_list, item); - nni_mtx_unlock(&nni_reap_mtx); + while ((item = nni_list_first(&reap_list)) != NULL) { + nni_list_remove(&reap_list, item); + nni_mtx_unlock(&reap_mtx); item->r_func(item->r_ptr); - nni_mtx_lock(&nni_reap_mtx); - continue; + nni_mtx_lock(&reap_mtx); } - if (nni_reap_exit) { + reap_empty = true; + nni_cv_wake(&reap_empty_cv); + + if (reap_exit) { break; } - nni_cv_wait(&nni_reap_cv); + nni_cv_wait(&reap_cv); } - nni_mtx_unlock(&nni_reap_mtx); + nni_mtx_unlock(&reap_mtx); } void nni_reap(nni_reap_item *item, nni_cb func, void *ptr) { - nni_mtx_lock(&nni_reap_mtx); + nni_mtx_lock(&reap_mtx); item->r_func = func; item->r_ptr = ptr; - nni_list_append(&nni_reap_list, item); - nni_cv_wake(&nni_reap_cv); - nni_mtx_unlock(&nni_reap_mtx); + nni_list_append(&reap_list, item); + reap_empty = false; + nni_cv_wake(&reap_cv); + nni_mtx_unlock(&reap_mtx); +} + +void +nni_reap_drain(void) +{ + nni_mtx_lock(&reap_mtx); + while (!reap_empty) { + nni_cv_wait(&reap_empty_cv); + } + nni_mtx_unlock(&reap_mtx); } int @@ -62,28 +77,30 @@ nni_reap_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_reap_list, nni_reap_item, r_link); - nni_mtx_init(&nni_reap_mtx); - nni_cv_init(&nni_reap_cv, &nni_reap_mtx); - nni_reap_exit = false; + NNI_LIST_INIT(&reap_list, nni_reap_item, r_link); + nni_mtx_init(&reap_mtx); + nni_cv_init(&reap_cv, &reap_mtx); + nni_cv_init(&reap_empty_cv, &reap_mtx); + reap_exit = false; // If this fails, we don't fail init, instead we will try to // start up at reap time. - if ((rv = nni_thr_init(&nni_reap_thr, nni_reap_stuff, NULL)) != 0) { - nni_cv_fini(&nni_reap_cv); - nni_mtx_fini(&nni_reap_mtx); + if ((rv = nni_thr_init(&reap_thr, reap_worker, NULL)) != 0) { + nni_cv_fini(&reap_cv); + nni_cv_fini(&reap_empty_cv); + nni_mtx_fini(&reap_mtx); return (rv); } - nni_thr_run(&nni_reap_thr); + nni_thr_run(&reap_thr); return (0); } void nni_reap_sys_fini(void) { - nni_mtx_lock(&nni_reap_mtx); - nni_reap_exit = true; - nni_cv_wake(&nni_reap_cv); - nni_mtx_unlock(&nni_reap_mtx); - nni_thr_fini(&nni_reap_thr); + nni_mtx_lock(&reap_mtx); + reap_exit = true; + nni_cv_wake(&reap_cv); + nni_mtx_unlock(&reap_mtx); + nni_thr_fini(&reap_thr); } diff --git a/src/core/reap.h b/src/core/reap.h index 843c8c9c..40c27a5d 100644 --- a/src/core/reap.h +++ b/src/core/reap.h @@ -38,6 +38,7 @@ typedef struct nni_reap_item { // part of a fully reapable graph; otherwise this can lead to an infinite // loop in the reap thread. extern void nni_reap(nni_reap_item *, nni_cb, void *); +extern void nni_reap_drain(void); extern int nni_reap_sys_init(void); extern void nni_reap_sys_fini(void); diff --git a/src/core/socket.c b/src/core/socket.c index 620c5d19..640e0db6 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <string.h> @@ -58,7 +59,7 @@ struct nni_socket { nni_cv s_cv; nni_cv s_close_cv; - uint64_t s_id; + uint32_t s_id; uint32_t s_flags; unsigned s_refcnt; // protected by global lock void * s_data; // Protocol private @@ -97,6 +98,9 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); +static void dialer_shutdown_locked(nni_dialer *); +static void listener_shutdown_locked(nni_listener *); + static int sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -340,7 +344,7 @@ nni_free_opt(nni_sockopt *opt) uint32_t nni_sock_id(nni_sock *s) { - return ((uint32_t) s->s_id); + return (s->s_id); } // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain @@ -405,58 +409,6 @@ nni_sock_closing(nni_sock *s) return (rv); } -void -nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id) -{ - if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { - nng_pipe_cb cb; - void * arg; - - nni_mtx_lock(&s->s_pipe_cbs_mtx); - cb = s->s_pipe_cbs[ev].cb_fn; - arg = s->s_pipe_cbs[ev].cb_arg; - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - - if (cb != NULL) { - nng_pipe p; - p.id = id; - cb(p, ev, arg); - } - } -} - -int -nni_sock_pipe_add(nni_sock *s, nni_pipe *p) -{ - // Initialize protocol pipe data. - nni_mtx_lock(&s->s_mx); - if (s->s_closing) { - nni_mtx_unlock(&s->s_mx); - return (NNG_ECLOSED); - } - - nni_list_append(&s->s_pipes, p); - - // Start the initial negotiation I/O... - nni_pipe_start(p); - - nni_mtx_unlock(&s->s_mx); - return (0); -} - -void -nni_sock_pipe_remove(nni_sock *s, nni_pipe *p) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_pipes, p)) { - nni_list_remove(&s->s_pipes, p); - } - if (s->s_closing && nni_list_empty(&s->s_pipes)) { - nni_cv_wake(&s->s_cv); - } - nni_mtx_unlock(&s->s_mx); -} - static void sock_destroy(nni_sock *s) { @@ -521,10 +473,9 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); - - nni_pipe_sock_list_init(&s->s_pipes); - nni_listener_list_init(&s->s_listeners); - nni_dialer_list_init(&s->s_dialers); + NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node); + NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node); + NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -615,7 +566,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) } nni_mtx_lock(&sock_lk); - if ((rv = nni_idhash_alloc(sock_hash, &s->s_id, s)) != 0) { + if ((rv = nni_idhash_alloc32(sock_hash, &s->s_id, s)) != 0) { sock_destroy(s); } else { nni_list_append(&sock_list, s); @@ -625,8 +576,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) nni_mtx_unlock(&sock_lk); // Set the sockname. - (void) snprintf( - s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); + (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id); return (rv); } @@ -640,9 +590,7 @@ nni_sock_shutdown(nni_sock *sock) { nni_pipe * pipe; nni_dialer * d; - nni_dialer * nd; nni_listener *l; - nni_listener *nl; nni_ctx * ctx; nni_ctx * nctx; @@ -657,10 +605,10 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_listeners, l) { - nni_listener_shutdown(l); + listener_shutdown_locked(l); } NNI_LIST_FOREACH (&sock->s_dialers, d) { - nni_dialer_shutdown(d); + dialer_shutdown_locked(d); } nni_mtx_unlock(&sock->s_mx); @@ -706,35 +654,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nl = nni_list_first(&sock->s_listeners); - while ((l = nl) != NULL) { - nl = nni_list_next(&sock->s_listeners, nl); - + NNI_LIST_FOREACH (&sock->s_listeners, l) { if (nni_listener_hold(l) == 0) { - nni_mtx_unlock(&sock->s_mx); nni_listener_close(l); - nni_mtx_lock(&sock->s_mx); } } - nd = nni_list_first(&sock->s_dialers); - while ((d = nd) != NULL) { - nd = nni_list_next(&sock->s_dialers, nd); - + NNI_LIST_FOREACH (&sock->s_dialers, d) { if (nni_dialer_hold(d) == 0) { - nni_mtx_unlock(&sock->s_mx); nni_dialer_close(d); - nni_mtx_lock(&sock->s_mx); } } - // For each pipe, arrange for it to teardown hard. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || @@ -1271,7 +1210,6 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) { nni_ctx *ctx; int rv; - uint64_t id; if (sock->s_ctx_ops.ctx_init == NULL) { return (NNG_ENOTSUP); @@ -1286,12 +1224,11 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) NNI_FREE_STRUCT(ctx); return (NNG_ECLOSED); } - if ((rv = nni_idhash_alloc(ctx_hash, &id, ctx)) != 0) { + if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) { nni_mtx_unlock(&sock_lk); NNI_FREE_STRUCT(ctx); return (rv); } - ctx->c_id = (uint32_t) id; if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { nni_idhash_remove(ctx_hash, ctx->c_id); @@ -1415,3 +1352,265 @@ nni_ctx_setopt( nni_mtx_unlock(&sock->s_mx); return (rv); } + +static void +dialer_timer_start_locked(nni_dialer *d) +{ + nni_duration backoff; + + backoff = d->d_currtime; + d->d_currtime *= 2; + if (d->d_currtime > d->d_maxrtime) { + d->d_currtime = d->d_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); +} + +void +nni_dialer_timer_start(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + nni_mtx_lock(&s->s_mx); + dialer_timer_start_locked(d); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +{ + nni_sock *s = d->d_sock; + + nni_mtx_lock(&s->s_mx); + + if (s->s_closed || d->d_closed) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_destroy(p); + return; + } + + p->p_dialer = d; + nni_list_append(&d->d_pipes, p); + nni_list_append(&s->s_pipes, p); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + // Start the initial negotiation I/O... + nni_pipe_start(p); +} + +static void +dialer_shutdown_impl(nni_dialer *d) +{ + nni_pipe *p; + + // Abort any remaining in-flight operations. + nni_aio_close(d->d_con_aio); + nni_aio_close(d->d_tmo_aio); + + // Stop the underlying transport. + d->d_ops.d_close(d->d_data); + + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_close(p); + } +} + +static void +dialer_shutdown_locked(nni_dialer *d) +{ + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } + dialer_shutdown_impl(d); +} + +void +nni_dialer_shutdown(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } + nni_mtx_lock(&s->s_mx); + dialer_shutdown_impl(d); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_dialer_reap(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + + nni_aio_stop(d->d_tmo_aio); + nni_aio_stop(d->d_con_aio); + + nni_mtx_lock(&s->s_mx); + if (!nni_list_empty(&d->d_pipes)) { + nni_pipe *p; + // This should already have been done, but be certain! + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_close(p); + } + nni_mtx_unlock(&s->s_mx); + // Go back to the end of reap list. + nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); + return; + } + + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + + nni_mtx_unlock(&s->s_mx); + + nni_dialer_destroy(d); +} + +void +nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +{ + nni_sock *s = l->l_sock; + + nni_mtx_lock(&s->s_mx); + if (s->s_closed || l->l_closed) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_destroy(p); + return; + } + p->p_listener = l; + nni_list_append(&l->l_pipes, p); + nni_list_append(&s->s_pipes, p); + nni_mtx_unlock(&s->s_mx); + + // Start the initial negotiation I/O... + nni_pipe_start(p); +} + +static void +listener_shutdown_impl(nni_listener *l) +{ + nni_pipe *p; + + // Abort any remaining in-flight accepts. + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); + + // Stop the underlying transport. + l->l_ops.l_close(l->l_data); + + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_close(p); + } +} + +static void +listener_shutdown_locked(nni_listener *l) +{ + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } + listener_shutdown_impl(l); +} + +void +nni_listener_shutdown(nni_listener *l) +{ + nni_sock *s = l->l_sock; + + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } + + nni_mtx_lock(&s->s_mx); + listener_shutdown_impl(l); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_listener_reap(nni_listener *l) +{ + nni_sock *s = l->l_sock; + + nni_aio_stop(l->l_tmo_aio); + nni_aio_stop(l->l_acc_aio); + + nni_mtx_lock(&s->s_mx); + if (!nni_list_empty(&l->l_pipes)) { + nni_pipe *p; + // This should already have been done, but be certain! + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_close(p); + } + nni_mtx_unlock(&s->s_mx); + // Go back to the end of reap list. + nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); + return; + } + + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); + } + + nni_mtx_unlock(&s->s_mx); + + nni_listener_destroy(l); +} + +void +nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) +{ + nni_sock * s = p->p_sock; + nng_pipe_cb cb; + void * arg; + + if (!p->p_cbs) { + if (ev == NNG_PIPE_EV_ADD_PRE) { + // First event, after this we want all other events. + p->p_cbs = true; + } else { + return; + } + } + nni_mtx_lock(&s->s_pipe_cbs_mtx); + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + + if (cb != NULL) { + nng_pipe pid; + pid.id = p->p_id; + cb(pid, ev, arg); + } +} + +void +nni_pipe_remove(nni_pipe *p) +{ + nni_sock * s = p->p_sock; + nni_dialer *d = p->p_dialer; + + nni_mtx_lock(&s->s_mx); + nni_list_node_remove(&p->p_sock_node); + nni_list_node_remove(&p->p_ep_node); + p->p_listener = NULL; + p->p_dialer = NULL; + if ((d != NULL) && (d->d_pipe == p)) { + d->d_pipe = NULL; + dialer_timer_start_locked(d); // Kick the timer to redial. + } + if (s->s_closing) { + nni_cv_wake(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); +} diff --git a/src/core/socket.h b/src/core/socket.h index 37256571..4b9c4642 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -38,20 +38,6 @@ extern void nni_sock_send(nni_sock *, nni_aio *); extern void nni_sock_recv(nni_sock *, nni_aio *); extern uint32_t nni_sock_id(nni_sock *); -// nni_sock_pipe_add adds the pipe to the socket. It is called by -// the generic pipe creation code. It also adds the socket to the -// ep list, and starts the pipe. It does all these to ensure that -// we have complete success or failure, and there is no point where -// a pipe could wind up orphaned. -extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); -extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); - -extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); -extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); - -extern int nni_sock_add_listener(nni_sock *, nni_listener *); -extern void nni_sock_remove_listener(nni_sock *, nni_listener *); - // These are socket methods that protocol operations can expect to call. // Note that each of these should be called without any locks held, since // the socket can reenter the protocol. @@ -76,8 +62,6 @@ extern uint32_t nni_sock_flags(nni_sock *); // should be executed. extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *); -extern void nni_sock_run_pipe_cb(nni_sock *sock, int, uint32_t); - extern bool nni_sock_closing(nni_sock *sock); // nni_ctx_open is used to open/create a new context structure. diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h new file mode 100644 index 00000000..207a83b3 --- /dev/null +++ b/src/core/sockimpl.h @@ -0,0 +1,107 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_SOCKIMPL_H +#define CORE_SOCKIMPL_H + +// This file contains stuff shared within the core between sockets, endpoints, +// and pipes. This must not be exposed to other subsystems -- these internals +// are subject to change at any time. + +struct nni_dialer { + nni_tran_dialer_ops d_ops; // transport ops + nni_tran * d_tran; // transport pointer + void * d_data; // transport private + uint32_t d_id; // endpoint id + nni_list_node d_node; // per socket list + nni_sock * d_sock; + nni_url * d_url; + nni_pipe * d_pipe; // active pipe (for redialer) + int d_refcnt; + int d_lastrv; // last result from synchronous + bool d_synch; // synchronous connect in progress? + bool d_closed; // full shutdown + nni_atomic_flag d_started; + nni_atomic_flag d_closing; // close pending (waiting on refcnt) + nni_mtx d_mtx; + nni_cv d_cv; + nni_list d_pipes; + nni_aio * d_con_aio; + nni_aio * d_tmo_aio; // backoff timer + nni_duration d_maxrtime; // maximum time for reconnect + nni_duration d_currtime; // current time for reconnect + nni_duration d_inirtime; // initial time for reconnect + nni_time d_conntime; // time of last good connect + nni_reap_item d_reap; +}; + +struct nni_listener { + nni_tran_listener_ops l_ops; // transport ops + nni_tran * l_tran; // transport pointer + void * l_data; // transport private + uint32_t l_id; // endpoint id + nni_list_node l_node; // per socket list + nni_sock * l_sock; + nni_url * l_url; + int l_refcnt; + bool l_closed; // full shutdown + nni_atomic_flag l_started; + nni_atomic_flag l_closing; // close started (shutdown) + nni_list l_pipes; + nni_aio * l_acc_aio; + nni_aio * l_tmo_aio; + nni_reap_item l_reap; +}; + +struct nni_pipe { + uint32_t p_id; + nni_tran_pipe_ops p_tran_ops; + nni_proto_pipe_ops p_proto_ops; + void * p_tran_data; + void * p_proto_data; + nni_list_node p_sock_node; + nni_list_node p_ep_node; + nni_sock * p_sock; + nni_dialer * p_dialer; + nni_listener * p_listener; + bool p_closed; + nni_atomic_flag p_stop; + bool p_cbs; + int p_refcnt; + nni_mtx p_mtx; + nni_cv p_cv; + nni_reap_item p_reap; + nni_aio * p_start_aio; +}; + +extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); +extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); + +extern int nni_sock_add_listener(nni_sock *, nni_listener *); +extern void nni_sock_remove_listener(nni_sock *, nni_listener *); + +extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *); +extern void nni_dialer_shutdown(nni_dialer *); +extern void nni_dialer_reap(nni_dialer *); +extern void nni_dialer_destroy(nni_dialer *); +extern void nni_dialer_timer_start(nni_dialer *); + +extern void nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_shutdown(nni_listener *); +extern void nni_listener_reap(nni_listener *); +extern void nni_listener_destroy(nni_listener *); + +extern void nni_pipe_remove(nni_pipe *); +extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); +extern void nni_pipe_destroy(nni_pipe *); +extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *); +extern void nni_pipe_start(nni_pipe *); + +#endif // CORE_SOCKIMPL_H @@ -229,7 +229,7 @@ NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **); typedef enum { NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket - NNG_PIPE_EV_REM_POST, // Called just after poipe removed from socket + NNG_PIPE_EV_REM_POST, // Called just after pipe removed from socket NNG_PIPE_EV_NUM, // Used internally, must be last. } nng_pipe_ev; diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index ba719e49..2426abba 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -234,7 +234,7 @@ bus0_pipe_getq_cb(void *arg) if (nni_aio_result(p->aio_getq) != 0) { // closed? - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); @@ -252,7 +252,7 @@ bus0_pipe_send_cb(void *arg) // closed? nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -267,7 +267,7 @@ bus0_pipe_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } msg = nni_aio_get_msg(p->aio_recv); @@ -277,7 +277,7 @@ bus0_pipe_recv_cb(void *arg) // XXX: bump a nomemory stat nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -295,7 +295,7 @@ bus0_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 4ab9f9e8..d663c5e2 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -176,7 +176,7 @@ pair0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -196,7 +196,7 @@ pair0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_pipe_recv(p->npipe, p->aio_recv); @@ -208,7 +208,7 @@ pair0_getq_cb(void *arg) pair0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -226,7 +226,7 @@ pair0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 0c6a4867..dc250943 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -244,7 +244,7 @@ pair1_pipe_recv_cb(void *arg) int rv; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -257,13 +257,13 @@ pair1_pipe_recv_cb(void *arg) // If the message is missing the hop count header, scrap it. if (nni_msg_len(msg) < sizeof(uint32_t)) { nni_msg_free(msg); - nni_pipe_stop(npipe); + nni_pipe_close(npipe); return; } hdr = nni_msg_trim_u32(msg); if (hdr & 0xffffff00) { nni_msg_free(msg); - nni_pipe_stop(npipe); + nni_pipe_close(npipe); return; } @@ -343,7 +343,7 @@ pair1_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_pipe_recv(p->npipe, p->aio_recv); @@ -358,7 +358,7 @@ pair1_pipe_getq_cb(void *arg) uint32_t hops; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -405,7 +405,7 @@ pair1_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 63243cb5..a713bc80 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -141,7 +141,7 @@ pull0_recv_cb(void *arg) if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -163,7 +163,7 @@ pull0_putq_cb(void *arg) // we can do. Just close the pipe. nni_msg_free(nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 9585b86c..00e9212c 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -163,7 +163,7 @@ push0_recv_cb(void *arg) // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_msg_free(nni_aio_get_msg(p->aio_recv)); @@ -180,7 +180,7 @@ push0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -195,7 +195,7 @@ push0_getq_cb(void *arg) if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 72cd1daa..cbc8acea 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -240,7 +240,7 @@ pub0_pipe_recv_cb(void *arg) pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -255,7 +255,7 @@ pub0_pipe_getq_cb(void *arg) pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -273,7 +273,7 @@ pub0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 2e8be4be..cb6d781f 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -164,7 +164,7 @@ sub0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -182,7 +182,7 @@ sub0_recv_cb(void *arg) // Any other error we stop the pipe for. It's probably // NNG_ECLOSED anyway. nng_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_pipe_recv(p->pipe, p->aio_recv); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f725cadb..f9ce58fd 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -416,7 +416,7 @@ rep0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_mtx_lock(&s->lk); @@ -519,7 +519,7 @@ rep0_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -544,7 +544,7 @@ rep0_pipe_recv_cb(void *arg) // Peer is speaking garbage. Kick it. nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } body = nni_msg_body(msg); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 43751d14..018dd0e8 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -305,7 +305,7 @@ req0_send_cb(void *arg) // We failed to send... clean up and deal with it. nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -345,7 +345,7 @@ req0_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -409,7 +409,7 @@ req0_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); } static void diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index e3b9b605..1fe81ac5 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -262,7 +262,7 @@ xrep0_pipe_getq_cb(void *arg) xrep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -280,7 +280,7 @@ xrep0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -296,7 +296,7 @@ xrep0_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -327,7 +327,7 @@ xrep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } body = nni_msg_body(msg); @@ -361,7 +361,7 @@ xrep0_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 2f0a3652..a98c713e 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -173,7 +173,7 @@ xreq0_getq_cb(void *arg) xreq0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -191,7 +191,7 @@ xreq0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -207,7 +207,7 @@ xreq0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_aio_set_msg(p->aio_putq, NULL); @@ -224,7 +224,7 @@ xreq0_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -236,7 +236,7 @@ xreq0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer gave us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } id = nni_msg_trim_u32(msg); diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index fbdeb65a..4e0a5263 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -405,7 +405,7 @@ resp0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_mtx_lock(&s->mtx); @@ -511,7 +511,7 @@ resp0_pipe_recv_cb(void *arg) size_t len; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -532,7 +532,7 @@ resp0_pipe_recv_cb(void *arg) // Peer is speaking garbage, kick it. nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } body = nni_msg_body(msg); diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 42d88f13..58fa4aa6 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -376,7 +376,7 @@ surv0_pipe_getq_cb(void *arg) surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -394,7 +394,7 @@ surv0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -411,7 +411,7 @@ surv0_pipe_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -423,7 +423,7 @@ surv0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer sent us garbage. Kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } id = nni_msg_trim_u32(msg); diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 03a47e92..334c5ca6 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -247,7 +247,7 @@ xresp0_getq_cb(void *arg) xresp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -265,7 +265,7 @@ xresp0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -282,7 +282,7 @@ xresp0_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -308,7 +308,7 @@ xresp0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer sent us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } body = nni_msg_body(msg); @@ -340,7 +340,7 @@ xresp0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index bad24042..fabcc766 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -205,7 +205,7 @@ xsurv0_getq_cb(void *arg) xsurv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -223,7 +223,7 @@ xsurv0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -238,7 +238,7 @@ xsurv0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -252,7 +252,7 @@ xsurv0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -264,7 +264,7 @@ xsurv0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer gave us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { diff --git a/tests/sock.c b/tests/sock.c index 002ae944..4aef1f38 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -315,6 +315,8 @@ TestMain("Socket Operations", { size_t sz; nng_socket s2; char * a = "inproc://asy"; + So(nng_setopt_ms(s1, NNG_OPT_RECONNMINT, 10) == 0); + So(nng_setopt_ms(s1, NNG_OPT_RECONNMAXT, 10) == 0); So(nng_dial(s1, a, NULL, NNG_FLAG_NONBLOCK) == 0); Convey("And connects late", { So(nng_pair_open(&s2) == 0); |
