diff options
| -rw-r--r-- | src/core/dialer.c | 237 | ||||
| -rw-r--r-- | src/core/dialer.h | 3 | ||||
| -rw-r--r-- | src/core/init.c | 3 | ||||
| -rw-r--r-- | src/core/listener.c | 160 | ||||
| -rw-r--r-- | src/core/listener.h | 4 | ||||
| -rw-r--r-- | src/core/pipe.c | 207 | ||||
| -rw-r--r-- | src/core/pipe.h | 39 | ||||
| -rw-r--r-- | src/core/reap.c | 81 | ||||
| -rw-r--r-- | src/core/reap.h | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 369 | ||||
| -rw-r--r-- | src/core/socket.h | 16 | ||||
| -rw-r--r-- | src/core/sockimpl.h | 107 | ||||
| -rw-r--r-- | src/nng.h | 2 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.c | 10 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.c | 8 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 12 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c | 4 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.c | 6 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 6 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 4 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 6 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 6 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 10 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xreq.c | 10 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 6 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 8 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond.c | 10 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey.c | 10 | ||||
| -rw-r--r-- | tests/sock.c | 2 |
29 files changed, 620 insertions, 727 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index e93c893e..3144d673 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -9,36 +9,12 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -struct nni_dialer { - nni_tran_dialer_ops d_ops; // transport ops - nni_tran * d_tran; // transport pointer - void * d_data; // transport private - uint64_t d_id; // endpoint id - nni_list_node d_node; // per socket list - nni_sock * d_sock; - nni_url * d_url; - int d_refcnt; - int d_lastrv; // last result from synchronous - bool d_synch; // synchronous connect in progress? - bool d_started; - bool d_closed; // full shutdown - nni_atomic_flag d_closing; // close pending (waiting on refcnt) - nni_mtx d_mtx; - nni_cv d_cv; - nni_list d_pipes; - nni_aio * d_con_aio; - nni_aio * d_tmo_aio; // backoff timer - nni_duration d_maxrtime; // maximum time for reconnect - nni_duration d_currtime; // current time for reconnect - nni_duration d_inirtime; // initial time for reconnect - nni_time d_conntime; // time of last good connect -}; - // Functionality related to dialers. static void dialer_connect_start(nni_dialer *); static void dialer_connect_cb(void *); @@ -65,6 +41,7 @@ nni_dialer_sys_init(void) void nni_dialer_sys_fini(void) { + nni_reap_drain(); nni_mtx_fini(&dialers_lk); nni_idhash_fini(dialers); dialers = NULL; @@ -73,26 +50,15 @@ nni_dialer_sys_fini(void) uint32_t nni_dialer_id(nni_dialer *d) { - return ((uint32_t) d->d_id); + return (d->d_id); } -static void -dialer_destroy(nni_dialer *d) +void +nni_dialer_destroy(nni_dialer *d) { - if (d == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (d->d_id != 0) { - nni_idhash_remove(dialers, d->d_id); - } - nni_aio_stop(d->d_con_aio); nni_aio_stop(d->d_tmo_aio); - nni_sock_remove_dialer(d->d_sock, d); - nni_aio_fini(d->d_con_aio); nni_aio_fini(d->d_tmo_aio); @@ -126,13 +92,13 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) nni_url_free(url); return (NNG_ENOMEM); } - d->d_url = url; - d->d_closed = false; - d->d_started = false; - d->d_data = NULL; - d->d_refcnt = 1; - d->d_sock = s; - d->d_tran = tran; + d->d_url = url; + d->d_closed = false; + d->d_data = NULL; + d->d_refcnt = 1; + d->d_sock = s; + d->d_tran = tran; + nni_atomic_flag_reset(&d->d_started); nni_atomic_flag_reset(&d->d_closing); // Make a copy of the endpoint operations. This allows us to @@ -141,8 +107,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) d->d_ops = *tran->tran_dialer; NNI_LIST_NODE_INIT(&d->d_node); - - nni_pipe_ep_list_init(&d->d_pipes); + NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node); nni_mtx_init(&d->d_mtx); nni_cv_init(&d->d_cv, &d->d_mtx); @@ -150,9 +115,9 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr) if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) || ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) || ((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) || - ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) || + ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) || ((rv = nni_sock_add_dialer(s, d)) != 0)) { - dialer_destroy(d); + nni_dialer_destroy(d); return (rv); } @@ -203,80 +168,33 @@ nni_dialer_rele(nni_dialer *d) { nni_mtx_lock(&dialers_lk); d->d_refcnt--; - if (d->d_refcnt == 0) { - nni_cv_wake(&d->d_cv); + if ((d->d_refcnt == 0) && (d->d_closed)) { + nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); } nni_mtx_unlock(&dialers_lk); } -int -nni_dialer_shutdown(nni_dialer *d) -{ - if (nni_atomic_flag_test_and_set(&d->d_closing)) { - return (NNG_ECLOSED); - } - - // Abort any remaining in-flight operations. - nni_aio_close(d->d_con_aio); - nni_aio_close(d->d_tmo_aio); - - // Stop the underlying transport. - d->d_ops.d_close(d->d_data); - - return (0); -} - void nni_dialer_close(nni_dialer *d) { - nni_pipe *p; - - nni_mtx_lock(&d->d_mtx); + nni_mtx_lock(&dialers_lk); if (d->d_closed) { - nni_mtx_unlock(&d->d_mtx); + nni_mtx_unlock(&dialers_lk); nni_dialer_rele(d); return; } d->d_closed = true; - nni_mtx_unlock(&d->d_mtx); - - nni_dialer_shutdown(d); - - nni_aio_stop(d->d_con_aio); - nni_aio_stop(d->d_tmo_aio); - - nni_mtx_lock(&d->d_mtx); - NNI_LIST_FOREACH (&d->d_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) { - nni_cv_wait(&d->d_cv); - } - nni_mtx_unlock(&d->d_mtx); - - dialer_destroy(d); -} + nni_mtx_unlock(&dialers_lk); -// This function starts an exponential backoff timer for reconnecting. -static void -dialer_timer_start(nni_dialer *d) -{ - nni_duration backoff; + // Remove us from the table so we cannot be found. + // This is done fairly early in the teardown process. + // If we're here, either the socket or the listener has been + // closed at the user request, so there would be a race anyway. + nni_idhash_remove(dialers, d->d_id); - backoff = d->d_currtime; - d->d_currtime *= 2; - if (d->d_currtime > d->d_maxrtime) { - d->d_currtime = d->d_maxrtime; - } + nni_dialer_shutdown(d); - // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is - // pretty similar to 802 style backoff, except that we have a - // nearly uniform time period instead of discrete slot times. - // This algorithm may lead to slight biases because we don't - // have a statistically perfect distribution with the modulo of - // the random number, but this really doesn't matter. - nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); + nni_dialer_rele(d); } static void @@ -285,11 +203,9 @@ dialer_timer_cb(void *arg) nni_dialer *d = arg; nni_aio * aio = d->d_tmo_aio; - nni_mtx_lock(&d->d_mtx); if (nni_aio_result(aio) == 0) { dialer_connect_start(d); } - nni_mtx_unlock(&d->d_mtx); } static void @@ -304,52 +220,32 @@ dialer_connect_cb(void *arg) if ((rv = nni_aio_result(aio)) == 0) { void *data = nni_aio_get_output(aio, 0); NNI_ASSERT(data != NULL); - rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); + rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data); } nni_mtx_lock(&d->d_mtx); - synch = d->d_synch; - d->d_synch = false; - if (rv == 0) { - nni_pipe_set_dialer(p, d); - nni_list_append(&d->d_pipes, p); - - // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops - // us immediately, is going to get hit pretty hard - // (depending on the initial backoff) with no - // exponential backoff. This can happen if we wind up - // trying to connect to some port that does not speak - // SP for example. - d->d_currtime = d->d_inirtime; - } + synch = d->d_synch; nni_mtx_unlock(&d->d_mtx); - if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { - nni_pipe_stop(p); - } - - nni_mtx_lock(&d->d_mtx); switch (rv) { case 0: - // No further outgoing connects -- we will restart a - // connection from the pipe when the pipe is removed. + nni_dialer_add_pipe(d, p); break; - case NNG_ECLOSED: - case NNG_ECANCELED: - // Canceled/closed -- stop everything. + case NNG_ECLOSED: // No further action. + case NNG_ECANCELED: // No further action. break; default: - // redial, but only if we are not synchronous if (!synch) { - dialer_timer_start(d); + nni_dialer_timer_start(d); } break; } if (synch) { + nni_mtx_lock(&d->d_mtx); + d->d_synch = false; d->d_lastrv = rv; nni_cv_wake(&d->d_cv); + nni_mtx_unlock(&d->d_mtx); } - nni_mtx_unlock(&d->d_mtx); } static void @@ -366,70 +262,37 @@ nni_dialer_start(nni_dialer *d, int flags) { int rv = 0; - // nni_sock_reconntimes(d->d_sock, &d->d_inirtime, - //&d->d_maxrtime); - d->d_currtime = d->d_inirtime; - - nni_mtx_lock(&d->d_mtx); - - if (d->d_started) { - nni_mtx_unlock(&d->d_mtx); + if (nni_atomic_flag_test_and_set(&d->d_started)) { return (NNG_ESTATE); } if ((flags & NNG_FLAG_NONBLOCK) != 0) { - d->d_started = true; - dialer_connect_start(d); + nni_mtx_lock(&d->d_mtx); + d->d_currtime = d->d_inirtime; nni_mtx_unlock(&d->d_mtx); + dialer_connect_start(d); return (0); } - d->d_synch = true; - d->d_started = true; + nni_mtx_lock(&d->d_mtx); + d->d_synch = true; + nni_mtx_unlock(&d->d_mtx); + dialer_connect_start(d); + nni_mtx_lock(&d->d_mtx); while (d->d_synch) { nni_cv_wait(&d->d_cv); } rv = d->d_lastrv; - nni_cv_wake(&d->d_cv); + nni_mtx_unlock(&d->d_mtx); if (rv != 0) { - d->d_started = false; + nni_atomic_flag_reset(&d->d_started); } - nni_mtx_unlock(&d->d_mtx); return (rv); } -void -nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p) -{ - if (d == NULL) { - return; - } - - // Break up the relationship between the dialer and the pipe. - nni_mtx_lock(&d->d_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&d->d_pipes, p)) { - nni_list_remove(&d->d_pipes, p); - } - // Wake up the close thread if it is waiting. - if (d->d_closed) { - if (nni_list_empty(&d->d_pipes)) { - nni_cv_wake(&d->d_cv); - } - } else { - // If this pipe closed, then lets restart the dial operation. - // Since the remote side seems to have closed, lets start with - // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects - // immediately. - dialer_timer_start(d); - } - nni_mtx_unlock(&d->d_mtx); -} - int nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, nni_opt_type t) @@ -508,9 +371,3 @@ nni_dialer_getopt( return (nni_sock_getopt(d->d_sock, name, valp, szp, t)); } - -void -nni_dialer_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_dialer, d_node); -} diff --git a/src/core/dialer.h b/src/core/dialer.h index 56b0fb1b..4a3e127c 100644 --- a/src/core/dialer.h +++ b/src/core/dialer.h @@ -18,11 +18,8 @@ extern int nni_dialer_hold(nni_dialer *); extern void nni_dialer_rele(nni_dialer *); extern uint32_t nni_dialer_id(nni_dialer *); extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *); -extern int nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_close(nni_dialer *); extern int nni_dialer_start(nni_dialer *, int); -extern void nni_dialer_list_init(nni_list *); -extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *); extern int nni_dialer_setopt( nni_dialer *, const char *, const void *, size_t, nni_opt_type); diff --git a/src/core/init.c b/src/core/init.c index 30a7a547..c66e54d2 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -75,11 +75,12 @@ nni_fini(void) nni_dialer_sys_fini(); nni_listener_sys_fini(); nni_sock_sys_fini(); - nni_reap_sys_fini(); // must be before timer and aio (expire) + nni_reap_drain(); nni_random_sys_fini(); nni_aio_sys_fini(); nni_timer_sys_fini(); nni_taskq_sys_fini(); + nni_reap_sys_fini(); // must be before timer and aio (expire) nni_mtx_fini(&nni_init_mtx); nni_plat_fini(); diff --git a/src/core/listener.c b/src/core/listener.c index 8e06076d..1bfa657d 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -9,30 +9,12 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -struct nni_listener { - nni_tran_listener_ops l_ops; // transport ops - nni_tran * l_tran; // transport pointer - void * l_data; // transport private - uint64_t l_id; // endpoint id - nni_list_node l_node; // per socket list - nni_sock * l_sock; - nni_url * l_url; - int l_refcnt; - bool l_started; - bool l_closed; // full shutdown - nni_atomic_flag l_closing; // close pending - nni_mtx l_mtx; - nni_cv l_cv; - nni_list l_pipes; - nni_aio * l_acc_aio; - nni_aio * l_tmo_aio; -}; - // Functionality related to listeners. static void listener_accept_start(nni_listener *); @@ -60,6 +42,7 @@ nni_listener_sys_init(void) void nni_listener_sys_fini(void) { + nni_reap_drain(); nni_mtx_fini(&listeners_lk); nni_idhash_fini(listeners); listeners = NULL; @@ -68,32 +51,21 @@ nni_listener_sys_fini(void) uint32_t nni_listener_id(nni_listener *l) { - return ((uint32_t) l->l_id); + return (l->l_id); } -static void -listener_destroy(nni_listener *l) +void +nni_listener_destroy(nni_listener *l) { - if (l == NULL) { - return; - } - - // Remove us from the table so we cannot be found. - if (l->l_id != 0) { - nni_idhash_remove(listeners, l->l_id); - } - nni_aio_stop(l->l_acc_aio); - - nni_sock_remove_listener(l->l_sock, l); + nni_aio_stop(l->l_tmo_aio); nni_aio_fini(l->l_acc_aio); + nni_aio_fini(l->l_tmo_aio); if (l->l_data != NULL) { l->l_ops.l_fini(l->l_data); } - nni_cv_fini(&l->l_cv); - nni_mtx_fini(&l->l_mtx); nni_url_free(l->l_url); NNI_FREE_STRUCT(l); } @@ -119,14 +91,14 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) nni_url_free(url); return (NNG_ENOMEM); } - l->l_url = url; - l->l_closed = false; - l->l_started = false; - l->l_data = NULL; - l->l_refcnt = 1; - l->l_sock = s; - l->l_tran = tran; + l->l_url = url; + l->l_closed = false; + l->l_data = NULL; + l->l_refcnt = 1; + l->l_sock = s; + l->l_tran = tran; nni_atomic_flag_reset(&l->l_closing); + nni_atomic_flag_reset(&l->l_started); // Make a copy of the endpoint operations. This allows us to // modify them (to override NULLs for example), and avoids an extra @@ -134,18 +106,14 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr) l->l_ops = *tran->tran_listener; NNI_LIST_NODE_INIT(&l->l_node); - - nni_pipe_ep_list_init(&l->l_pipes); - - nni_mtx_init(&l->l_mtx); - nni_cv_init(&l->l_cv, &l->l_mtx); + NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node); if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) || ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) || ((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) || - ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) || + ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) || ((rv = nni_sock_add_listener(s, l)) != 0)) { - listener_destroy(l); + nni_listener_destroy(l); return (rv); } @@ -196,58 +164,33 @@ nni_listener_rele(nni_listener *l) { nni_mtx_lock(&listeners_lk); l->l_refcnt--; - if (l->l_refcnt == 0) { - nni_cv_wake(&l->l_cv); + if ((l->l_refcnt == 0) && (l->l_closed)) { + nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); } nni_mtx_unlock(&listeners_lk); } -int -nni_listener_shutdown(nni_listener *l) -{ - if (nni_atomic_flag_test_and_set(&l->l_closing)) { - return (NNG_ECLOSED); - } - - // Abort any remaining in-flight accepts. - nni_aio_close(l->l_acc_aio); - nni_aio_close(l->l_tmo_aio); - - // Stop the underlying transport. - l->l_ops.l_close(l->l_data); - - return (0); -} - void nni_listener_close(nni_listener *l) { - nni_pipe *p; - - nni_mtx_lock(&l->l_mtx); + nni_mtx_lock(&listeners_lk); if (l->l_closed) { - nni_mtx_unlock(&l->l_mtx); + nni_mtx_unlock(&listeners_lk); nni_listener_rele(l); return; } l->l_closed = true; - nni_mtx_unlock(&l->l_mtx); - - nni_listener_shutdown(l); + nni_mtx_unlock(&listeners_lk); - nni_aio_stop(l->l_acc_aio); - nni_aio_stop(l->l_tmo_aio); + // Remove us from the table so we cannot be found. + // This is done fairly early in the teardown process. + // If we're here, either the socket or the listener has been + // closed at the user request, so there would be a race anyway. + nni_idhash_remove(listeners, l->l_id); - nni_mtx_lock(&l->l_mtx); - NNI_LIST_FOREACH (&l->l_pipes, p) { - nni_pipe_stop(p); - } - while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) { - nni_cv_wait(&l->l_cv); - } - nni_mtx_unlock(&l->l_mtx); + nni_listener_shutdown(l); - listener_destroy(l); + nni_listener_rele(l); // This will trigger a reap if id count is zero. } static void @@ -272,14 +215,11 @@ listener_accept_cb(void *arg) if ((rv = nni_aio_result(aio)) == 0) { void *data = nni_aio_get_output(aio, 0); NNI_ASSERT(data != NULL); - rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); + rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data); } switch (rv) { case 0: - nni_mtx_lock(&l->l_mtx); - nni_pipe_set_listener(p, l); - nni_list_append(&l->l_pipes, p); - nni_mtx_unlock(&l->l_mtx); + nni_listener_add_pipe(l, p); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown @@ -298,10 +238,6 @@ listener_accept_cb(void *arg) nni_sleep_aio(100, l->l_tmo_aio); break; } - - if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { - nni_pipe_stop(p); - } } static void @@ -319,44 +255,20 @@ nni_listener_start(nni_listener *l, int flags) int rv = 0; NNI_ARG_UNUSED(flags); - nni_mtx_lock(&l->l_mtx); - if (l->l_started) { - nni_mtx_unlock(&l->l_mtx); + if (nni_atomic_flag_test_and_set(&l->l_started)) { return (NNG_ESTATE); } if ((rv = l->l_ops.l_bind(l->l_data)) != 0) { - nni_mtx_unlock(&l->l_mtx); + nni_atomic_flag_reset(&l->l_started); return (rv); } - l->l_started = true; - nni_mtx_unlock(&l->l_mtx); - listener_accept_start(l); return (0); } -void -nni_listener_remove_pipe(nni_listener *l, nni_pipe *p) -{ - if (l == NULL) { - return; - } - // Break up relationship between listener and pipe. - nni_mtx_lock(&l->l_mtx); - // During early init, the pipe might not have this set. - if (nni_list_active(&l->l_pipes, p)) { - nni_list_remove(&l->l_pipes, p); - } - // Wake up the closer if it is waiting. - if (l->l_closed && nni_list_empty(&l->l_pipes)) { - nni_cv_wake(&l->l_cv); - } - nni_mtx_unlock(&l->l_mtx); -} - int nni_listener_setopt(nni_listener *l, const char *name, const void *val, size_t sz, nni_opt_type t) @@ -406,9 +318,3 @@ nni_listener_getopt( return (nni_sock_getopt(l->l_sock, name, valp, szp, t)); } - -void -nni_listener_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_listener, l_node); -} diff --git a/src/core/listener.h b/src/core/listener.h index 41b1a678..5806fc72 100644 --- a/src/core/listener.h +++ b/src/core/listener.h @@ -18,12 +18,8 @@ extern int nni_listener_hold(nni_listener *); extern void nni_listener_rele(nni_listener *); extern uint32_t nni_listener_id(nni_listener *); extern int nni_listener_create(nni_listener **, nni_sock *, const char *); -extern int nni_listener_shutdown(nni_listener *); extern void nni_listener_close(nni_listener *); extern int nni_listener_start(nni_listener *, int); -extern void nni_listener_list_init(nni_list *); -extern int nni_listener_add_pipe(nni_listener *, nni_pipe *); -extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *); extern int nni_listener_setopt( nni_listener *, const char *, const void *, size_t, nni_opt_type); diff --git a/src/core/pipe.c b/src/core/pipe.c index 6b9b082c..e95fe1d4 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <string.h> @@ -17,54 +18,17 @@ // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. -struct nni_pipe { - uint32_t p_id; - uint32_t p_sock_id; - uint32_t p_dialer_id; - uint32_t p_listener_id; - nni_tran_pipe_ops p_tran_ops; - nni_proto_pipe_ops p_proto_ops; - void * p_tran_data; - void * p_proto_data; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_listener * p_listener; - nni_dialer * p_dialer; - bool p_closed; - nni_atomic_flag p_stop; - bool p_cbs; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio * p_start_aio; -}; - static nni_idhash *nni_pipes; static nni_mtx nni_pipe_lk; -static nni_list nni_pipe_reap_list; -static nni_mtx nni_pipe_reap_lk; -static nni_cv nni_pipe_reap_cv; -static nni_thr nni_pipe_reap_thr; -static int nni_pipe_reap_run; - -static void nni_pipe_reaper(void *); - int nni_pipe_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); nni_mtx_init(&nni_pipe_lk); - nni_mtx_init(&nni_pipe_reap_lk); - nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); - if (((rv = nni_idhash_init(&nni_pipes)) != 0) || - ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) != - 0)) { + if ((rv = nni_idhash_init(&nni_pipes)) != 0) { return (rv); } @@ -76,25 +40,13 @@ nni_pipe_sys_init(void) nni_idhash_set_limits( nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_pipe_reap_run = 1; - nni_thr_run(&nni_pipe_reap_thr); - return (0); } void nni_pipe_sys_fini(void) { - if (nni_pipe_reap_run) { - nni_mtx_lock(&nni_pipe_reap_lk); - nni_pipe_reap_run = 0; - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); - } - - nni_thr_fini(&nni_pipe_reap_thr); - nni_cv_fini(&nni_pipe_reap_cv); - nni_mtx_fini(&nni_pipe_reap_lk); + nni_reap_drain(); nni_mtx_fini(&nni_pipe_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); @@ -109,28 +61,7 @@ nni_pipe_destroy(nni_pipe *p) return; } - if (p->p_cbs) { - nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); - } - - // Stop any pending negotiation. - nni_aio_stop(p->p_start_aio); - - if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_stop(p->p_proto_data); - } - if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { - p->p_tran_ops.p_stop(p->p_tran_data); - } - - // We have exclusive access at this point, so we can check if - // we are still on any lists. - nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL - nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL - - if (nni_list_node_active(&p->p_sock_node)) { - nni_sock_pipe_remove(p->p_sock, p); - } + nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST); // Make sure any unlocked holders are done with this. // This happens during initialization for example. @@ -138,11 +69,24 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_id != 0) { nni_idhash_remove(nni_pipes, p->p_id); } + // This wait guarantees that all callers are done with us. while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&nni_pipe_lk); + // Wait for neg callbacks to finish. (Already closed). + nni_aio_stop(p->p_start_aio); + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } + if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { + p->p_tran_ops.p_stop(p->p_tran_data); + } + + nni_pipe_remove(p); + if (p->p_proto_data != NULL) { p->p_proto_ops.pipe_fini(p->p_proto_data); } @@ -229,6 +173,8 @@ nni_pipe_close(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); } + + nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p); } bool @@ -241,23 +187,6 @@ nni_pipe_closed(nni_pipe *p) return (rv); } -void -nni_pipe_stop(nni_pipe *p) -{ - // Guard against recursive calls. - if (nni_atomic_flag_test_and_set(&p->p_stop)) { - return; - } - - nni_pipe_close(p); - - // Put it on the reaplist for async cleanup - nni_mtx_lock(&nni_pipe_reap_lk); - nni_list_append(&nni_pipe_reap_list, p); - nni_cv_wake(&nni_pipe_reap_cv); - nni_mtx_unlock(&nni_pipe_reap_lk); -} - uint16_t nni_pipe_peer(nni_pipe *p) { @@ -270,32 +199,29 @@ nni_pipe_start_cb(void *arg) nni_pipe *p = arg; nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; - uint32_t id = nni_pipe_id(p); if (nni_aio_result(aio) != 0) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - p->p_cbs = true; // We're running all cbs going forward - - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE); if (nni_pipe_closed(p)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || nni_sock_closing(s)) { - nni_pipe_stop(p); + nni_pipe_close(p); return; } - nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); + nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST); } int -nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) +nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) { nni_pipe * p; int rv; @@ -315,13 +241,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) p->p_proto_ops = *pops; p->p_proto_data = NULL; p->p_sock = sock; - p->p_sock_id = nni_sock_id(sock); p->p_closed = false; p->p_cbs = false; p->p_refcnt = 0; nni_atomic_flag_reset(&p->p_stop); - NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); @@ -347,27 +271,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata) return (0); } -void -nni_pipe_set_listener(nni_pipe *p, nni_listener *l) -{ - p->p_listener = l; - p->p_listener_id = nni_listener_id(l); -} - -void -nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d) -{ - p->p_dialer = d; - p->p_dialer_id = nni_dialer_id(d); -} - int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) { nni_tran_option *o; - nni_dialer * d; - nni_listener * l; for (o = p->p_tran_ops.p_options; o && o->o_name; o++) { if (strcmp(o->o_name, name) != 0) { @@ -376,24 +284,13 @@ nni_pipe_getopt( return (o->o_get(p->p_tran_data, val, szp, t)); } - // Maybe the endpoint knows? We look up by ID, instead of using - // the links directly, to avoid needing a hold on them. The pipe - // can wind up outliving the endpoint in certain circumstances. - // This means that getting these properties from the pipe may wind - // up being somewhat more expensive. - if ((p->p_dialer_id != 0) && - (nni_dialer_find(&d, p->p_dialer_id) == 0)) { - int rv; - rv = nni_dialer_getopt(d, name, val, szp, t); - nni_dialer_rele(d); - return (rv); + // Maybe the endpoint knows? The guarantees on pipes ensure that the + // pipe will not outlive its creating endpoint. + if (p->p_dialer != NULL) { + return (nni_dialer_getopt(p->p_dialer, name, val, szp, t)); } - if ((p->p_listener_id != 0) && - (nni_listener_find(&l, p->p_listener_id) == 0)) { - int rv; - rv = nni_listener_getopt(l, name, val, szp, t); - nni_listener_rele(l); - return (rv); + if (p->p_listener != NULL) { + return (nni_listener_getopt(p->p_listener, name, val, szp, t)); } return (NNG_ENOTSUP); } @@ -414,56 +311,20 @@ nni_pipe_get_proto_data(nni_pipe *p) return (p->p_proto_data); } -void -nni_pipe_sock_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_sock_node); -} - -void -nni_pipe_ep_list_init(nni_list *list) -{ - NNI_LIST_INIT(list, nni_pipe, p_ep_node); -} - uint32_t nni_pipe_sock_id(nni_pipe *p) { - return (p->p_sock_id); + return (nni_sock_id(p->p_sock)); } uint32_t nni_pipe_listener_id(nni_pipe *p) { - return (p->p_listener_id); + return (p->p_listener ? nni_listener_id(p->p_listener) : 0); } uint32_t nni_pipe_dialer_id(nni_pipe *p) { - return (p->p_dialer_id); -} - -static void -nni_pipe_reaper(void *notused) -{ - NNI_ARG_UNUSED(notused); - - nni_mtx_lock(&nni_pipe_reap_lk); - for (;;) { - nni_pipe *p; - if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) { - nni_list_remove(&nni_pipe_reap_list, p); - - nni_mtx_unlock(&nni_pipe_reap_lk); - nni_pipe_destroy(p); - nni_mtx_lock(&nni_pipe_reap_lk); - continue; - } - if (!nni_pipe_reap_run) { - break; - } - nni_cv_wait(&nni_pipe_reap_cv); - } - nni_mtx_unlock(&nni_pipe_reap_lk); + return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0); } diff --git a/src/core/pipe.h b/src/core/pipe.h index 5c505514..1d73ce51 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -29,38 +29,11 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *); // Pipe operations that protocols use. extern uint32_t nni_pipe_id(nni_pipe *); -// nni_pipe_destroy destroys a pipe -- there must not be any other -// references to it; this is used only during creation failures. -extern void nni_pipe_destroy(nni_pipe *); - // nni_pipe_close closes the underlying transport for the pipe. Further -// operations against will return NNG_ECLOSED. +// operations against will return NNG_ECLOSED. This is idempotent. The +// actual pipe will be reaped asynchronously. extern void nni_pipe_close(nni_pipe *); -// nni_pipe_stop is called to begin the process of tearing down the socket. -// This function runs asynchronously, and takes care to ensure that no -// other consumers are referencing the pipe. We assume that either the -// socket (protocol code) or endpoint may have references to the pipe -// when this function is called. The pipe cleanup is asynchronous and -// make take a while depending on scheduling, etc. The pipe lock itself -// may not be held during this, but any other locks may be. -extern void nni_pipe_stop(nni_pipe *); - -// nni_pipe_create is used only by endpoints - as we don't wish to expose the -// details of the pipe structure outside of pipe.c. This function must be -// called without any locks held, as it will call back up into the socket and -// endpoint, grabbing each of those locks. The function takes ownership of -// the transport specific pipe (3rd argument), regardless of whether it -// succeeds or not. The endpoint should be held when calling this. -extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *); -extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *); -extern void nni_pipe_set_listener(nni_pipe *, nni_listener *); - -// nni_pipe_start is called by the socket to begin any startup activities -// on the pipe before making it ready for use by protocols. For example, -// TCP and IPC initial handshaking is performed this way. -extern void nni_pipe_start(nni_pipe *); - extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); @@ -73,14 +46,6 @@ extern int nni_pipe_getopt( // nni_pipe_set_proto_data function. No locking is performed. extern void *nni_pipe_get_proto_data(nni_pipe *); -// nni_pipe_sock_list_init initializes a list of pipes, to be used by -// a per-socket list. -extern void nni_pipe_sock_list_init(nni_list *); - -// nni_pipe_ep_list_init initializes a list of pipes, to be used by -// a per-endpoint list. -extern void nni_pipe_ep_list_init(nni_list *); - // nni_pipe_find finds a pipe given its ID. It places a hold on the // pipe, which must be released by the caller when it is done. extern int nni_pipe_find(nni_pipe **, uint32_t); diff --git a/src/core/reap.c b/src/core/reap.c index 8191dba3..bfad6c32 100644 --- a/src/core/reap.c +++ b/src/core/reap.c @@ -14,47 +14,62 @@ #include <stdbool.h> -static nni_list nni_reap_list; -static nni_mtx nni_reap_mtx; -static nni_cv nni_reap_cv; -static bool nni_reap_exit = false; -static nni_thr nni_reap_thr; +static nni_list reap_list; +static nni_mtx reap_mtx; +static nni_cv reap_cv; +static nni_cv reap_empty_cv; +static bool reap_exit = false; +static bool reap_empty = false; +static nni_thr reap_thr; static void -nni_reap_stuff(void *notused) +reap_worker(void *notused) { NNI_ARG_UNUSED(notused); - nni_mtx_lock(&nni_reap_mtx); + nni_mtx_lock(&reap_mtx); for (;;) { nni_reap_item *item; - if ((item = nni_list_first(&nni_reap_list)) != NULL) { - nni_list_remove(&nni_reap_list, item); - nni_mtx_unlock(&nni_reap_mtx); + while ((item = nni_list_first(&reap_list)) != NULL) { + nni_list_remove(&reap_list, item); + nni_mtx_unlock(&reap_mtx); item->r_func(item->r_ptr); - nni_mtx_lock(&nni_reap_mtx); - continue; + nni_mtx_lock(&reap_mtx); } - if (nni_reap_exit) { + reap_empty = true; + nni_cv_wake(&reap_empty_cv); + + if (reap_exit) { break; } - nni_cv_wait(&nni_reap_cv); + nni_cv_wait(&reap_cv); } - nni_mtx_unlock(&nni_reap_mtx); + nni_mtx_unlock(&reap_mtx); } void nni_reap(nni_reap_item *item, nni_cb func, void *ptr) { - nni_mtx_lock(&nni_reap_mtx); + nni_mtx_lock(&reap_mtx); item->r_func = func; item->r_ptr = ptr; - nni_list_append(&nni_reap_list, item); - nni_cv_wake(&nni_reap_cv); - nni_mtx_unlock(&nni_reap_mtx); + nni_list_append(&reap_list, item); + reap_empty = false; + nni_cv_wake(&reap_cv); + nni_mtx_unlock(&reap_mtx); +} + +void +nni_reap_drain(void) +{ + nni_mtx_lock(&reap_mtx); + while (!reap_empty) { + nni_cv_wait(&reap_empty_cv); + } + nni_mtx_unlock(&reap_mtx); } int @@ -62,28 +77,30 @@ nni_reap_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_reap_list, nni_reap_item, r_link); - nni_mtx_init(&nni_reap_mtx); - nni_cv_init(&nni_reap_cv, &nni_reap_mtx); - nni_reap_exit = false; + NNI_LIST_INIT(&reap_list, nni_reap_item, r_link); + nni_mtx_init(&reap_mtx); + nni_cv_init(&reap_cv, &reap_mtx); + nni_cv_init(&reap_empty_cv, &reap_mtx); + reap_exit = false; // If this fails, we don't fail init, instead we will try to // start up at reap time. - if ((rv = nni_thr_init(&nni_reap_thr, nni_reap_stuff, NULL)) != 0) { - nni_cv_fini(&nni_reap_cv); - nni_mtx_fini(&nni_reap_mtx); + if ((rv = nni_thr_init(&reap_thr, reap_worker, NULL)) != 0) { + nni_cv_fini(&reap_cv); + nni_cv_fini(&reap_empty_cv); + nni_mtx_fini(&reap_mtx); return (rv); } - nni_thr_run(&nni_reap_thr); + nni_thr_run(&reap_thr); return (0); } void nni_reap_sys_fini(void) { - nni_mtx_lock(&nni_reap_mtx); - nni_reap_exit = true; - nni_cv_wake(&nni_reap_cv); - nni_mtx_unlock(&nni_reap_mtx); - nni_thr_fini(&nni_reap_thr); + nni_mtx_lock(&reap_mtx); + reap_exit = true; + nni_cv_wake(&reap_cv); + nni_mtx_unlock(&reap_mtx); + nni_thr_fini(&reap_thr); } diff --git a/src/core/reap.h b/src/core/reap.h index 843c8c9c..40c27a5d 100644 --- a/src/core/reap.h +++ b/src/core/reap.h @@ -38,6 +38,7 @@ typedef struct nni_reap_item { // part of a fully reapable graph; otherwise this can lead to an infinite // loop in the reap thread. extern void nni_reap(nni_reap_item *, nni_cb, void *); +extern void nni_reap_drain(void); extern int nni_reap_sys_init(void); extern void nni_reap_sys_fini(void); diff --git a/src/core/socket.c b/src/core/socket.c index 620c5d19..640e0db6 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -9,6 +9,7 @@ // #include "core/nng_impl.h" +#include "sockimpl.h" #include <stdio.h> #include <string.h> @@ -58,7 +59,7 @@ struct nni_socket { nni_cv s_cv; nni_cv s_close_cv; - uint64_t s_id; + uint32_t s_id; uint32_t s_flags; unsigned s_refcnt; // protected by global lock void * s_data; // Protocol private @@ -97,6 +98,9 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); +static void dialer_shutdown_locked(nni_dialer *); +static void listener_shutdown_locked(nni_listener *); + static int sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -340,7 +344,7 @@ nni_free_opt(nni_sockopt *opt) uint32_t nni_sock_id(nni_sock *s) { - return ((uint32_t) s->s_id); + return (s->s_id); } // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain @@ -405,58 +409,6 @@ nni_sock_closing(nni_sock *s) return (rv); } -void -nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id) -{ - if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { - nng_pipe_cb cb; - void * arg; - - nni_mtx_lock(&s->s_pipe_cbs_mtx); - cb = s->s_pipe_cbs[ev].cb_fn; - arg = s->s_pipe_cbs[ev].cb_arg; - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - - if (cb != NULL) { - nng_pipe p; - p.id = id; - cb(p, ev, arg); - } - } -} - -int -nni_sock_pipe_add(nni_sock *s, nni_pipe *p) -{ - // Initialize protocol pipe data. - nni_mtx_lock(&s->s_mx); - if (s->s_closing) { - nni_mtx_unlock(&s->s_mx); - return (NNG_ECLOSED); - } - - nni_list_append(&s->s_pipes, p); - - // Start the initial negotiation I/O... - nni_pipe_start(p); - - nni_mtx_unlock(&s->s_mx); - return (0); -} - -void -nni_sock_pipe_remove(nni_sock *s, nni_pipe *p) -{ - nni_mtx_lock(&s->s_mx); - if (nni_list_active(&s->s_pipes, p)) { - nni_list_remove(&s->s_pipes, p); - } - if (s->s_closing && nni_list_empty(&s->s_pipes)) { - nni_cv_wake(&s->s_cv); - } - nni_mtx_unlock(&s->s_mx); -} - static void sock_destroy(nni_sock *s) { @@ -521,10 +473,9 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); - - nni_pipe_sock_list_init(&s->s_pipes); - nni_listener_list_init(&s->s_listeners); - nni_dialer_list_init(&s->s_dialers); + NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node); + NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node); + NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -615,7 +566,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) } nni_mtx_lock(&sock_lk); - if ((rv = nni_idhash_alloc(sock_hash, &s->s_id, s)) != 0) { + if ((rv = nni_idhash_alloc32(sock_hash, &s->s_id, s)) != 0) { sock_destroy(s); } else { nni_list_append(&sock_list, s); @@ -625,8 +576,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) nni_mtx_unlock(&sock_lk); // Set the sockname. - (void) snprintf( - s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); + (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id); return (rv); } @@ -640,9 +590,7 @@ nni_sock_shutdown(nni_sock *sock) { nni_pipe * pipe; nni_dialer * d; - nni_dialer * nd; nni_listener *l; - nni_listener *nl; nni_ctx * ctx; nni_ctx * nctx; @@ -657,10 +605,10 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_listeners, l) { - nni_listener_shutdown(l); + listener_shutdown_locked(l); } NNI_LIST_FOREACH (&sock->s_dialers, d) { - nni_dialer_shutdown(d); + dialer_shutdown_locked(d); } nni_mtx_unlock(&sock->s_mx); @@ -706,35 +654,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nl = nni_list_first(&sock->s_listeners); - while ((l = nl) != NULL) { - nl = nni_list_next(&sock->s_listeners, nl); - + NNI_LIST_FOREACH (&sock->s_listeners, l) { if (nni_listener_hold(l) == 0) { - nni_mtx_unlock(&sock->s_mx); nni_listener_close(l); - nni_mtx_lock(&sock->s_mx); } } - nd = nni_list_first(&sock->s_dialers); - while ((d = nd) != NULL) { - nd = nni_list_next(&sock->s_dialers, nd); - + NNI_LIST_FOREACH (&sock->s_dialers, d) { if (nni_dialer_hold(d) == 0) { - nni_mtx_unlock(&sock->s_mx); nni_dialer_close(d); - nni_mtx_lock(&sock->s_mx); } } - // For each pipe, arrange for it to teardown hard. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_stop(pipe); - } - // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || @@ -1271,7 +1210,6 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) { nni_ctx *ctx; int rv; - uint64_t id; if (sock->s_ctx_ops.ctx_init == NULL) { return (NNG_ENOTSUP); @@ -1286,12 +1224,11 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) NNI_FREE_STRUCT(ctx); return (NNG_ECLOSED); } - if ((rv = nni_idhash_alloc(ctx_hash, &id, ctx)) != 0) { + if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) { nni_mtx_unlock(&sock_lk); NNI_FREE_STRUCT(ctx); return (rv); } - ctx->c_id = (uint32_t) id; if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { nni_idhash_remove(ctx_hash, ctx->c_id); @@ -1415,3 +1352,265 @@ nni_ctx_setopt( nni_mtx_unlock(&sock->s_mx); return (rv); } + +static void +dialer_timer_start_locked(nni_dialer *d) +{ + nni_duration backoff; + + backoff = d->d_currtime; + d->d_currtime *= 2; + if (d->d_currtime > d->d_maxrtime) { + d->d_currtime = d->d_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio); +} + +void +nni_dialer_timer_start(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + nni_mtx_lock(&s->s_mx); + dialer_timer_start_locked(d); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p) +{ + nni_sock *s = d->d_sock; + + nni_mtx_lock(&s->s_mx); + + if (s->s_closed || d->d_closed) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_destroy(p); + return; + } + + p->p_dialer = d; + nni_list_append(&d->d_pipes, p); + nni_list_append(&s->s_pipes, p); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + // Start the initial negotiation I/O... + nni_pipe_start(p); +} + +static void +dialer_shutdown_impl(nni_dialer *d) +{ + nni_pipe *p; + + // Abort any remaining in-flight operations. + nni_aio_close(d->d_con_aio); + nni_aio_close(d->d_tmo_aio); + + // Stop the underlying transport. + d->d_ops.d_close(d->d_data); + + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_close(p); + } +} + +static void +dialer_shutdown_locked(nni_dialer *d) +{ + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } + dialer_shutdown_impl(d); +} + +void +nni_dialer_shutdown(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + if (nni_atomic_flag_test_and_set(&d->d_closing)) { + return; + } + nni_mtx_lock(&s->s_mx); + dialer_shutdown_impl(d); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_dialer_reap(nni_dialer *d) +{ + nni_sock *s = d->d_sock; + + nni_aio_stop(d->d_tmo_aio); + nni_aio_stop(d->d_con_aio); + + nni_mtx_lock(&s->s_mx); + if (!nni_list_empty(&d->d_pipes)) { + nni_pipe *p; + // This should already have been done, but be certain! + NNI_LIST_FOREACH (&d->d_pipes, p) { + nni_pipe_close(p); + } + nni_mtx_unlock(&s->s_mx); + // Go back to the end of reap list. + nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d); + return; + } + + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + + nni_mtx_unlock(&s->s_mx); + + nni_dialer_destroy(d); +} + +void +nni_listener_add_pipe(nni_listener *l, nni_pipe *p) +{ + nni_sock *s = l->l_sock; + + nni_mtx_lock(&s->s_mx); + if (s->s_closed || l->l_closed) { + nni_mtx_unlock(&s->s_mx); + nni_pipe_destroy(p); + return; + } + p->p_listener = l; + nni_list_append(&l->l_pipes, p); + nni_list_append(&s->s_pipes, p); + nni_mtx_unlock(&s->s_mx); + + // Start the initial negotiation I/O... + nni_pipe_start(p); +} + +static void +listener_shutdown_impl(nni_listener *l) +{ + nni_pipe *p; + + // Abort any remaining in-flight accepts. + nni_aio_close(l->l_acc_aio); + nni_aio_close(l->l_tmo_aio); + + // Stop the underlying transport. + l->l_ops.l_close(l->l_data); + + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_close(p); + } +} + +static void +listener_shutdown_locked(nni_listener *l) +{ + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } + listener_shutdown_impl(l); +} + +void +nni_listener_shutdown(nni_listener *l) +{ + nni_sock *s = l->l_sock; + + if (nni_atomic_flag_test_and_set(&l->l_closing)) { + return; + } + + nni_mtx_lock(&s->s_mx); + listener_shutdown_impl(l); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_listener_reap(nni_listener *l) +{ + nni_sock *s = l->l_sock; + + nni_aio_stop(l->l_tmo_aio); + nni_aio_stop(l->l_acc_aio); + + nni_mtx_lock(&s->s_mx); + if (!nni_list_empty(&l->l_pipes)) { + nni_pipe *p; + // This should already have been done, but be certain! + NNI_LIST_FOREACH (&l->l_pipes, p) { + nni_pipe_close(p); + } + nni_mtx_unlock(&s->s_mx); + // Go back to the end of reap list. + nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l); + return; + } + + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); + } + + nni_mtx_unlock(&s->s_mx); + + nni_listener_destroy(l); +} + +void +nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) +{ + nni_sock * s = p->p_sock; + nng_pipe_cb cb; + void * arg; + + if (!p->p_cbs) { + if (ev == NNG_PIPE_EV_ADD_PRE) { + // First event, after this we want all other events. + p->p_cbs = true; + } else { + return; + } + } + nni_mtx_lock(&s->s_pipe_cbs_mtx); + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + + if (cb != NULL) { + nng_pipe pid; + pid.id = p->p_id; + cb(pid, ev, arg); + } +} + +void +nni_pipe_remove(nni_pipe *p) +{ + nni_sock * s = p->p_sock; + nni_dialer *d = p->p_dialer; + + nni_mtx_lock(&s->s_mx); + nni_list_node_remove(&p->p_sock_node); + nni_list_node_remove(&p->p_ep_node); + p->p_listener = NULL; + p->p_dialer = NULL; + if ((d != NULL) && (d->d_pipe == p)) { + d->d_pipe = NULL; + dialer_timer_start_locked(d); // Kick the timer to redial. + } + if (s->s_closing) { + nni_cv_wake(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); +} diff --git a/src/core/socket.h b/src/core/socket.h index 37256571..4b9c4642 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -38,20 +38,6 @@ extern void nni_sock_send(nni_sock *, nni_aio *); extern void nni_sock_recv(nni_sock *, nni_aio *); extern uint32_t nni_sock_id(nni_sock *); -// nni_sock_pipe_add adds the pipe to the socket. It is called by -// the generic pipe creation code. It also adds the socket to the -// ep list, and starts the pipe. It does all these to ensure that -// we have complete success or failure, and there is no point where -// a pipe could wind up orphaned. -extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); -extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); - -extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); -extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); - -extern int nni_sock_add_listener(nni_sock *, nni_listener *); -extern void nni_sock_remove_listener(nni_sock *, nni_listener *); - // These are socket methods that protocol operations can expect to call. // Note that each of these should be called without any locks held, since // the socket can reenter the protocol. @@ -76,8 +62,6 @@ extern uint32_t nni_sock_flags(nni_sock *); // should be executed. extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *); -extern void nni_sock_run_pipe_cb(nni_sock *sock, int, uint32_t); - extern bool nni_sock_closing(nni_sock *sock); // nni_ctx_open is used to open/create a new context structure. diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h new file mode 100644 index 00000000..207a83b3 --- /dev/null +++ b/src/core/sockimpl.h @@ -0,0 +1,107 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_SOCKIMPL_H +#define CORE_SOCKIMPL_H + +// This file contains stuff shared within the core between sockets, endpoints, +// and pipes. This must not be exposed to other subsystems -- these internals +// are subject to change at any time. + +struct nni_dialer { + nni_tran_dialer_ops d_ops; // transport ops + nni_tran * d_tran; // transport pointer + void * d_data; // transport private + uint32_t d_id; // endpoint id + nni_list_node d_node; // per socket list + nni_sock * d_sock; + nni_url * d_url; + nni_pipe * d_pipe; // active pipe (for redialer) + int d_refcnt; + int d_lastrv; // last result from synchronous + bool d_synch; // synchronous connect in progress? + bool d_closed; // full shutdown + nni_atomic_flag d_started; + nni_atomic_flag d_closing; // close pending (waiting on refcnt) + nni_mtx d_mtx; + nni_cv d_cv; + nni_list d_pipes; + nni_aio * d_con_aio; + nni_aio * d_tmo_aio; // backoff timer + nni_duration d_maxrtime; // maximum time for reconnect + nni_duration d_currtime; // current time for reconnect + nni_duration d_inirtime; // initial time for reconnect + nni_time d_conntime; // time of last good connect + nni_reap_item d_reap; +}; + +struct nni_listener { + nni_tran_listener_ops l_ops; // transport ops + nni_tran * l_tran; // transport pointer + void * l_data; // transport private + uint32_t l_id; // endpoint id + nni_list_node l_node; // per socket list + nni_sock * l_sock; + nni_url * l_url; + int l_refcnt; + bool l_closed; // full shutdown + nni_atomic_flag l_started; + nni_atomic_flag l_closing; // close started (shutdown) + nni_list l_pipes; + nni_aio * l_acc_aio; + nni_aio * l_tmo_aio; + nni_reap_item l_reap; +}; + +struct nni_pipe { + uint32_t p_id; + nni_tran_pipe_ops p_tran_ops; + nni_proto_pipe_ops p_proto_ops; + void * p_tran_data; + void * p_proto_data; + nni_list_node p_sock_node; + nni_list_node p_ep_node; + nni_sock * p_sock; + nni_dialer * p_dialer; + nni_listener * p_listener; + bool p_closed; + nni_atomic_flag p_stop; + bool p_cbs; + int p_refcnt; + nni_mtx p_mtx; + nni_cv p_cv; + nni_reap_item p_reap; + nni_aio * p_start_aio; +}; + +extern int nni_sock_add_dialer(nni_sock *, nni_dialer *); +extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *); + +extern int nni_sock_add_listener(nni_sock *, nni_listener *); +extern void nni_sock_remove_listener(nni_sock *, nni_listener *); + +extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *); +extern void nni_dialer_shutdown(nni_dialer *); +extern void nni_dialer_reap(nni_dialer *); +extern void nni_dialer_destroy(nni_dialer *); +extern void nni_dialer_timer_start(nni_dialer *); + +extern void nni_listener_add_pipe(nni_listener *, nni_pipe *); +extern void nni_listener_shutdown(nni_listener *); +extern void nni_listener_reap(nni_listener *); +extern void nni_listener_destroy(nni_listener *); + +extern void nni_pipe_remove(nni_pipe *); +extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev); +extern void nni_pipe_destroy(nni_pipe *); +extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *); +extern void nni_pipe_start(nni_pipe *); + +#endif // CORE_SOCKIMPL_H @@ -229,7 +229,7 @@ NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **); typedef enum { NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket - NNG_PIPE_EV_REM_POST, // Called just after poipe removed from socket + NNG_PIPE_EV_REM_POST, // Called just after pipe removed from socket NNG_PIPE_EV_NUM, // Used internally, must be last. } nng_pipe_ev; diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index ba719e49..2426abba 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -234,7 +234,7 @@ bus0_pipe_getq_cb(void *arg) if (nni_aio_result(p->aio_getq) != 0) { // closed? - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); @@ -252,7 +252,7 @@ bus0_pipe_send_cb(void *arg) // closed? nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -267,7 +267,7 @@ bus0_pipe_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } msg = nni_aio_get_msg(p->aio_recv); @@ -277,7 +277,7 @@ bus0_pipe_recv_cb(void *arg) // XXX: bump a nomemory stat nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -295,7 +295,7 @@ bus0_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 4ab9f9e8..d663c5e2 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -176,7 +176,7 @@ pair0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -196,7 +196,7 @@ pair0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_pipe_recv(p->npipe, p->aio_recv); @@ -208,7 +208,7 @@ pair0_getq_cb(void *arg) pair0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -226,7 +226,7 @@ pair0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index 0c6a4867..dc250943 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -244,7 +244,7 @@ pair1_pipe_recv_cb(void *arg) int rv; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -257,13 +257,13 @@ pair1_pipe_recv_cb(void *arg) // If the message is missing the hop count header, scrap it. if (nni_msg_len(msg) < sizeof(uint32_t)) { nni_msg_free(msg); - nni_pipe_stop(npipe); + nni_pipe_close(npipe); return; } hdr = nni_msg_trim_u32(msg); if (hdr & 0xffffff00) { nni_msg_free(msg); - nni_pipe_stop(npipe); + nni_pipe_close(npipe); return; } @@ -343,7 +343,7 @@ pair1_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_pipe_recv(p->npipe, p->aio_recv); @@ -358,7 +358,7 @@ pair1_pipe_getq_cb(void *arg) uint32_t hops; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -405,7 +405,7 @@ pair1_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 63243cb5..a713bc80 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -141,7 +141,7 @@ pull0_recv_cb(void *arg) if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -163,7 +163,7 @@ pull0_putq_cb(void *arg) // we can do. Just close the pipe. nni_msg_free(nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 9585b86c..00e9212c 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -163,7 +163,7 @@ push0_recv_cb(void *arg) // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_msg_free(nni_aio_get_msg(p->aio_recv)); @@ -180,7 +180,7 @@ push0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -195,7 +195,7 @@ push0_getq_cb(void *arg) if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 72cd1daa..cbc8acea 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -240,7 +240,7 @@ pub0_pipe_recv_cb(void *arg) pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -255,7 +255,7 @@ pub0_pipe_getq_cb(void *arg) pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -273,7 +273,7 @@ pub0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 2e8be4be..cb6d781f 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -164,7 +164,7 @@ sub0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -182,7 +182,7 @@ sub0_recv_cb(void *arg) // Any other error we stop the pipe for. It's probably // NNG_ECLOSED anyway. nng_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_pipe_recv(p->pipe, p->aio_recv); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f725cadb..f9ce58fd 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -416,7 +416,7 @@ rep0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_mtx_lock(&s->lk); @@ -519,7 +519,7 @@ rep0_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -544,7 +544,7 @@ rep0_pipe_recv_cb(void *arg) // Peer is speaking garbage. Kick it. nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } body = nni_msg_body(msg); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 43751d14..018dd0e8 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -305,7 +305,7 @@ req0_send_cb(void *arg) // We failed to send... clean up and deal with it. nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -345,7 +345,7 @@ req0_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -409,7 +409,7 @@ req0_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); } static void diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index e3b9b605..1fe81ac5 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -262,7 +262,7 @@ xrep0_pipe_getq_cb(void *arg) xrep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -280,7 +280,7 @@ xrep0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -296,7 +296,7 @@ xrep0_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -327,7 +327,7 @@ xrep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } body = nni_msg_body(msg); @@ -361,7 +361,7 @@ xrep0_pipe_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 2f0a3652..a98c713e 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -173,7 +173,7 @@ xreq0_getq_cb(void *arg) xreq0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -191,7 +191,7 @@ xreq0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -207,7 +207,7 @@ xreq0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } nni_aio_set_msg(p->aio_putq, NULL); @@ -224,7 +224,7 @@ xreq0_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } @@ -236,7 +236,7 @@ xreq0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer gave us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->pipe); + nni_pipe_close(p->pipe); return; } id = nni_msg_trim_u32(msg); diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index fbdeb65a..4e0a5263 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -405,7 +405,7 @@ resp0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } nni_mtx_lock(&s->mtx); @@ -511,7 +511,7 @@ resp0_pipe_recv_cb(void *arg) size_t len; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -532,7 +532,7 @@ resp0_pipe_recv_cb(void *arg) // Peer is speaking garbage, kick it. nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } body = nni_msg_body(msg); diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 42d88f13..58fa4aa6 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -376,7 +376,7 @@ surv0_pipe_getq_cb(void *arg) surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -394,7 +394,7 @@ surv0_pipe_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -411,7 +411,7 @@ surv0_pipe_recv_cb(void *arg) uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -423,7 +423,7 @@ surv0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer sent us garbage. Kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } id = nni_msg_trim_u32(msg); diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 03a47e92..334c5ca6 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -247,7 +247,7 @@ xresp0_getq_cb(void *arg) xresp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -265,7 +265,7 @@ xresp0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -282,7 +282,7 @@ xresp0_recv_cb(void *arg) int hops; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -308,7 +308,7 @@ xresp0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer sent us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } body = nni_msg_body(msg); @@ -340,7 +340,7 @@ xresp0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index bad24042..fabcc766 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -205,7 +205,7 @@ xsurv0_getq_cb(void *arg) xsurv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -223,7 +223,7 @@ xsurv0_send_cb(void *arg) if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -238,7 +238,7 @@ xsurv0_putq_cb(void *arg) if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -252,7 +252,7 @@ xsurv0_recv_cb(void *arg) nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } @@ -264,7 +264,7 @@ xsurv0_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer gave us garbage, so kick it. nni_msg_free(msg); - nni_pipe_stop(p->npipe); + nni_pipe_close(p->npipe); return; } if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { diff --git a/tests/sock.c b/tests/sock.c index 002ae944..4aef1f38 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -315,6 +315,8 @@ TestMain("Socket Operations", { size_t sz; nng_socket s2; char * a = "inproc://asy"; + So(nng_setopt_ms(s1, NNG_OPT_RECONNMINT, 10) == 0); + So(nng_setopt_ms(s1, NNG_OPT_RECONNMAXT, 10) == 0); So(nng_dial(s1, a, NULL, NNG_FLAG_NONBLOCK) == 0); Convey("And connects late", { So(nng_pair_open(&s2) == 0); |
