diff options
Diffstat (limited to 'src')
37 files changed, 495 insertions, 278 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a5b6c088..2f90aa34 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -63,10 +63,10 @@ struct nng_aio { nni_duration a_timeout; // Relative timeout // These fields are private to the aio framework. - bool a_stop; // shutting down (no new operations) - bool a_sleep; // sleeping with no action - int a_sleeprv; // result when sleep wakes - int a_cancelrv; // if canceled between begin and schedule + bool a_stop; // shutting down (no new operations) + bool a_closed; // close called, but not fini (yet) + bool a_sleep; // sleeping with no action + int a_sleeprv; // result when sleep wakes nni_task *a_task; // Read/write operations. @@ -205,6 +205,24 @@ nni_aio_stop(nni_aio *aio) } void +nni_aio_close(nni_aio *aio) +{ + if (aio != NULL) { + nni_aio_cancelfn cancelfn; + + nni_mtx_lock(&nni_aio_lk); + cancelfn = aio->a_prov_cancel; + aio->a_prov_cancel = NULL; + aio->a_closed = true; + nni_mtx_unlock(&nni_aio_lk); + + if (cancelfn != NULL) { + cancelfn(aio, NNG_ECLOSED); + } + } +} + +void nni_aio_set_timeout(nni_aio *aio, nni_duration when) { aio->a_timeout = when; @@ -306,11 +324,18 @@ nni_aio_begin(nni_aio *aio) aio->a_count = 0; aio->a_prov_cancel = NULL; aio->a_prov_data = NULL; - aio->a_cancelrv = 0; for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) { aio->a_outputs[i] = NULL; } nni_task_prep(aio->a_task); + if (aio->a_closed) { + aio->a_result = NNG_ECLOSED; + aio->a_expire = NNI_TIME_NEVER; + aio->a_sleep = false; + nni_mtx_unlock(&nni_aio_lk); + nni_task_dispatch(aio->a_task); + return (NNG_ECLOSED); + } nni_mtx_unlock(&nni_aio_lk); return (0); } @@ -318,7 +343,6 @@ nni_aio_begin(nni_aio *aio) int nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) { - int rv; if (!aio->a_sleep) { // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { @@ -339,19 +363,16 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) nni_mtx_unlock(&nni_aio_lk); return (NNG_ECANCELED); } - if ((rv = aio->a_cancelrv) != 0) { + if (aio->a_closed) { nni_mtx_unlock(&nni_aio_lk); - return (rv); + return (NNG_ECLOSED); } // If cancellation occurred in between "begin" and "schedule", // then cancel it right now. aio->a_prov_cancel = cancelfn; aio->a_prov_data = data; - if ((rv = aio->a_cancelrv) != 0) { - aio->a_expire = 0; - nni_aio_expire_add(aio); - } else if (aio->a_expire != NNI_TIME_NEVER) { + if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } nni_mtx_unlock(&nni_aio_lk); diff --git a/src/core/aio.h b/src/core/aio.h index 2ed0fb5b..5462c40b 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -42,6 +42,12 @@ extern void nni_aio_fini(nni_aio *); // use nni_aio_cancel instead.) extern void nni_aio_stop(nni_aio *); +// nni_aio_close closes the aio for further activity. It aborts any in-progress +// transaction (if it can), and future calls nni_aio_begin or nni_aio_schedule +// with both result in NNG_ECLOSED. The expectation is that protocols call this +// for all their aios in a stop routine, before calling fini on any of them. +extern void nni_aio_close(nni_aio *); + // nni_aio_set_data sets user data. This should only be done by the // consumer, initiating the I/O. The intention is to be able to store // additional data for use when the operation callback is executed. diff --git a/src/core/defs.h b/src/core/defs.h index b9c9b7e5..29d1b826 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -45,9 +45,9 @@ typedef struct nni_ctx nni_ctx; typedef struct nni_ep nni_ep; typedef struct nni_pipe nni_pipe; typedef struct nni_tran nni_tran; -typedef struct nni_tran_ep nni_tran_ep; +typedef struct nni_tran_ep_ops nni_tran_ep_ops; typedef struct nni_tran_ep_option nni_tran_ep_option; -typedef struct nni_tran_pipe nni_tran_pipe; +typedef struct nni_tran_pipe_ops nni_tran_pipe_ops; typedef struct nni_tran_pipe_option nni_tran_pipe_option; typedef struct nni_proto_ctx_option nni_proto_ctx_option; diff --git a/src/core/device.c b/src/core/device.c index 1f3bf233..0fd23add 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -120,8 +120,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) if ((s1 == NULL) || (s2 == NULL)) { return (NNG_EINVAL); } - if ((nni_sock_peer(s1) != nni_sock_proto(s2)) || - (nni_sock_peer(s2) != nni_sock_proto(s1))) { + if ((nni_sock_peer_id(s1) != nni_sock_proto_id(s2)) || + (nni_sock_peer_id(s2) != nni_sock_proto_id(s1))) { return (NNG_EINVAL); } diff --git a/src/core/endpt.c b/src/core/endpt.c index 7593fb42..87566d42 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -15,30 +15,30 @@ #include <string.h> struct nni_ep { - nni_tran_ep ep_ops; // transport ops - nni_tran * ep_tran; // transport pointer - void * ep_data; // transport private - uint64_t ep_id; // endpoint id - nni_list_node ep_node; // per socket list - nni_sock * ep_sock; - nni_url * ep_url; - int ep_mode; - int ep_started; - int ep_closed; // full shutdown - int ep_closing; // close pending (waiting on refcnt) - int ep_refcnt; - bool ep_tmo_run; - nni_mtx ep_mtx; - nni_cv ep_cv; - nni_list ep_pipes; - nni_aio * ep_acc_aio; - nni_aio * ep_con_aio; - nni_aio * ep_con_syn; // used for sync connect - nni_aio * ep_tmo_aio; // backoff timer - nni_duration ep_maxrtime; // maximum time for reconnect - nni_duration ep_currtime; // current time for reconnect - nni_duration ep_inirtime; // initial time for reconnect - nni_time ep_conntime; // time of last good connect + nni_tran_ep_ops ep_ops; // transport ops + nni_tran * ep_tran; // transport pointer + void * ep_data; // transport private + uint64_t ep_id; // endpoint id + nni_list_node ep_node; // per socket list + nni_sock * ep_sock; + nni_url * ep_url; + int ep_mode; + int ep_started; + int ep_closed; // full shutdown + int ep_closing; // close pending (waiting on refcnt) + int ep_refcnt; + bool ep_tmo_run; + nni_mtx ep_mtx; + nni_cv ep_cv; + nni_list ep_pipes; + nni_aio * ep_acc_aio; + nni_aio * ep_con_aio; + nni_aio * ep_con_syn; // used for sync connect + nni_aio * ep_tmo_aio; // backoff timer + nni_duration ep_maxrtime; // maximum time for reconnect + nni_duration ep_currtime; // current time for reconnect + nni_duration ep_inirtime; // initial time for reconnect + nni_time ep_conntime; // time of last good connect }; // Functionality related to end points. @@ -249,10 +249,10 @@ nni_ep_shutdown(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); // Abort any remaining in-flight operations. - nni_aio_abort(ep->ep_acc_aio, NNG_ECLOSED); - nni_aio_abort(ep->ep_con_aio, NNG_ECLOSED); - nni_aio_abort(ep->ep_con_syn, NNG_ECLOSED); - nni_aio_abort(ep->ep_tmo_aio, NNG_ECLOSED); + nni_aio_close(ep->ep_acc_aio); + nni_aio_close(ep->ep_con_aio); + nni_aio_close(ep->ep_con_syn); + nni_aio_close(ep->ep_tmo_aio); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); @@ -276,10 +276,10 @@ nni_ep_close(nni_ep *ep) nni_ep_shutdown(ep); - nni_aio_stop(ep->ep_acc_aio); - nni_aio_stop(ep->ep_con_aio); - nni_aio_stop(ep->ep_con_syn); - nni_aio_stop(ep->ep_tmo_aio); + nni_aio_close(ep->ep_acc_aio); + nni_aio_close(ep->ep_con_aio); + nni_aio_close(ep->ep_con_syn); + nni_aio_close(ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); NNI_LIST_FOREACH (&ep->ep_pipes, p) { diff --git a/src/core/pipe.c b/src/core/pipe.c index 010d306d..ba3027cf 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -18,21 +18,22 @@ // performed in the context of the protocol. struct nni_pipe { - uint32_t p_id; - nni_tran_pipe p_tran_ops; - void * p_tran_data; - void * p_proto_data; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_ep * p_ep; - bool p_closed; - bool p_stop; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio * p_start_aio; + uint32_t p_id; + nni_tran_pipe_ops p_tran_ops; + nni_proto_pipe_ops p_proto_ops; + void * p_tran_data; + void * p_proto_data; + nni_list_node p_sock_node; + nni_list_node p_ep_node; + nni_sock * p_sock; + nni_ep * p_ep; + bool p_closed; + bool p_stop; + int p_refcnt; + nni_mtx p_mtx; + nni_cv p_cv; + nni_list_node p_reap_node; + nni_aio * p_start_aio; }; static nni_idhash *nni_pipes; @@ -106,6 +107,10 @@ nni_pipe_destroy(nni_pipe *p) // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } + // We have exclusive access at this point, so we can check if // we are still on any lists. if (nni_list_node_active(&p->p_ep_node)) { @@ -126,6 +131,9 @@ nni_pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); + } if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -189,6 +197,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) void nni_pipe_close(nni_pipe *p) { + // abort any pending negotiation/start process. + nni_aio_close(p->p_start_aio); + nni_mtx_lock(&p->p_mtx); if (p->p_closed) { // We already did a close. @@ -196,16 +207,16 @@ nni_pipe_close(nni_pipe *p) return; } p->p_closed = true; + nni_mtx_unlock(&p->p_mtx); + + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_close(p->p_proto_data); + } // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); } - - nni_mtx_unlock(&p->p_mtx); - - // abort any pending negotiation/start process. - nni_aio_abort(p->p_start_aio, NNG_ECLOSED); } bool @@ -222,7 +233,6 @@ void nni_pipe_stop(nni_pipe *p) { // Guard against recursive calls. - nni_pipe_close(p); nni_mtx_lock(&p->p_mtx); if (p->p_stop) { nni_mtx_unlock(&p->p_mtx); @@ -231,6 +241,8 @@ nni_pipe_stop(nni_pipe *p) p->p_stop = true; nni_mtx_unlock(&p->p_mtx); + nni_pipe_close(p); + // Put it on the reaplist for async cleanup nni_mtx_lock(&nni_pipe_reap_lk); nni_list_append(&nni_pipe_reap_list, p); @@ -250,12 +262,9 @@ nni_pipe_start_cb(void *arg) nni_pipe *p = arg; nni_aio * aio = p->p_start_aio; - if (nni_aio_result(aio) != 0) { - nni_pipe_stop(p); - return; - } - - if (nni_sock_pipe_start(p->p_sock, p) != 0) { + if ((nni_aio_result(aio) != 0) || + (nni_sock_pipe_start(p->p_sock, p) != 0) || + (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { nni_pipe_stop(p); } } @@ -263,10 +272,12 @@ nni_pipe_start_cb(void *arg) int nni_pipe_create(nni_ep *ep, void *tdata) { - nni_pipe *p; - int rv; - nni_tran *tran = nni_ep_tran(ep); - nni_sock *sock = nni_ep_sock(ep); + nni_pipe * p; + int rv; + nni_tran * tran = nni_ep_tran(ep); + nni_sock * sock = nni_ep_sock(ep); + void * sdata = nni_sock_proto_data(sock); + nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -277,6 +288,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) // Make a private copy of the transport ops. p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; + p->p_proto_ops = *pops; p->p_proto_data = NULL; p->p_ep = ep; p->p_sock = sock; @@ -290,6 +302,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); + if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { uint64_t id; nni_mtx_lock(&nni_pipe_lk); @@ -299,11 +312,19 @@ nni_pipe_create(nni_ep *ep, void *tdata) nni_mtx_unlock(&nni_pipe_lk); } - if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) || - ((rv = nni_sock_pipe_add(sock, p)) != 0)) { + if ((rv != 0) || + ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || + ((rv = nni_ep_pipe_add(ep, p)) != 0)) { nni_pipe_destroy(p); + return (rv); } + // At this point the endpoint knows about it, and the protocol + // might too, so on failure we have to tear it down fully as if done + // after a successful result. + if ((rv = nni_sock_pipe_add(sock, p)) != 0) { + nni_pipe_stop(p); + } return (rv); } @@ -339,12 +360,6 @@ nni_pipe_get_proto_data(nni_pipe *p) } void -nni_pipe_set_proto_data(nni_pipe *p, void *data) -{ - p->p_proto_data = data; -} - -void nni_pipe_sock_list_init(nni_list *list) { NNI_LIST_INIT(list, nni_pipe, p_sock_node); diff --git a/src/core/pipe.h b/src/core/pipe.h index 18a59ddb..4ccbeae2 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -65,7 +65,6 @@ extern int nni_pipe_getopt(nni_pipe *, const char *, void *, size_t *, int); // nni_pipe_get_proto_data gets the protocol private data set with the // nni_pipe_set_proto_data function. No locking is performed. extern void *nni_pipe_get_proto_data(nni_pipe *); -extern void nni_pipe_set_proto_data(nni_pipe *, void *); // nni_pipe_sock_list_init initializes a list of pipes, to be used by // a per-socket list. diff --git a/src/core/protocol.h b/src/core/protocol.h index 964aee1a..9c3b4d33 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -34,16 +34,19 @@ struct nni_proto_pipe_ops { // pipe_start is called to register a pipe with the protocol. The // protocol can reject this, for example if another pipe is already - // active on a 1:1 protocol. The protocol may not block during this, - // as the socket lock is held. + // active on a 1:1 protocol. The protocol may not block during this. int (*pipe_start)(void *); - // pipe_stop is called to unregister a pipe from the protocol. - // Threads may still acccess data structures, so the protocol - // should not free anything yet. This is called with the socket - // lock held, so the protocol may not call back into the socket, and - // must not block. This operation must be idempotent, and may - // be called even if pipe_start was not. + // pipe_close is an idempotent, non-blocking, operation, called + // when the pipe is being closed. Any operations pending on the + // pipe should be canceled with NNG_ECLOSED. (Best option is to + // use nng_aio_close() on them) + void (*pipe_close)(void *); + + // pipe_stop is called during finalization, to ensure that + // the protocol is absolutely finished with the pipe. It should + // wait if necessary to ensure that the pipe is not referenced + // anymore by the protocol. It should not destroy resources. void (*pipe_stop)(void *); }; diff --git a/src/core/socket.c b/src/core/socket.c index 66580300..f14c15fd 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -253,13 +253,13 @@ nni_sock_setopt_sockname(nni_sock *s, const void *buf, size_t sz, int typ) static int nni_sock_getopt_proto(nni_sock *s, void *buf, size_t *szp, int typ) { - return (nni_copyout_int(nni_sock_proto(s), buf, szp, typ)); + return (nni_copyout_int(nni_sock_proto_id(s), buf, szp, typ)); } static int nni_sock_getopt_peer(nni_sock *s, void *buf, size_t *szp, int typ) { - return (nni_copyout_int(nni_sock_peer(s), buf, szp, typ)); + return (nni_copyout_int(nni_sock_peer_id(s), buf, szp, typ)); } static int @@ -434,9 +434,7 @@ nni_sock_rele(nni_sock *s) int nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) { - void * pdata = nni_pipe_get_proto_data(pipe); nng_pipe_cb cb; - int rv; NNI_ASSERT(s != NULL); nni_mtx_lock(&s->s_mx); @@ -464,26 +462,13 @@ nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) return (NNG_ECLOSED); } - // Protocol can reject for other reasons. - // This must be the last operation, until this point - // the protocol has not actually "seen" the pipe. - rv = s->s_pipe_ops.pipe_start(pdata); - nni_mtx_unlock(&s->s_mx); - return (rv); + return (0); } int nni_sock_pipe_add(nni_sock *s, nni_pipe *p) { - int rv; - void *pdata; - - if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) { - return (rv); - } - nni_pipe_set_proto_data(p, pdata); - // Initialize protocol pipe data. nni_mtx_lock(&s->s_mx); if (s->s_closing) { @@ -503,7 +488,6 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p) void nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { - void * pdata; nng_pipe_cb cb; nni_mtx_lock(&sock->s_mx); @@ -515,14 +499,8 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) cb(p, NNG_PIPE_REM, arg); nni_mtx_lock(&sock->s_mx); } - pdata = nni_pipe_get_proto_data(pipe); - if (pdata != NULL) { - sock->s_pipe_ops.pipe_stop(pdata); - nni_pipe_set_proto_data(pipe, NULL); - if (nni_list_active(&sock->s_pipes, pipe)) { - nni_list_remove(&sock->s_pipes, pipe); - } - sock->s_pipe_ops.pipe_fini(pdata); + if (nni_list_active(&sock->s_pipes, pipe)) { + nni_list_remove(&sock->s_pipes, pipe); } if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { nni_cv_wake(&sock->s_cv); @@ -589,9 +567,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_ASSERT(s->s_sock_ops.sock_open != NULL); NNI_ASSERT(s->s_sock_ops.sock_close != NULL); - NNI_ASSERT(s->s_pipe_ops.pipe_start != NULL); - NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL); - NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); @@ -883,15 +858,16 @@ nni_sock_recv(nni_sock *sock, nni_aio *aio) sock->s_sock_ops.sock_recv(sock->s_data, aio); } -// nni_sock_protocol returns the socket's 16-bit protocol number. +// nni_sock_proto_id returns the socket's 16-bit protocol number. uint16_t -nni_sock_proto(nni_sock *sock) +nni_sock_proto_id(nni_sock *sock) { return (sock->s_self_id.p_id); } +// nni_sock_peer_id returns the socket peer's 16-bit protocol number. uint16_t -nni_sock_peer(nni_sock *sock) +nni_sock_peer_id(nni_sock *sock) { return (sock->s_peer_id.p_id); } @@ -908,6 +884,18 @@ nni_sock_peer_name(nni_sock *sock) return (sock->s_peer_id.p_name); } +struct nni_proto_pipe_ops * +nni_sock_proto_pipe_ops(nni_sock *sock) +{ + return (&sock->s_pipe_ops); +} + +void * +nni_sock_proto_data(nni_sock *sock) +{ + return (sock->s_data); +} + void nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) { diff --git a/src/core/socket.h b/src/core/socket.h index bccda5b3..22a13ef7 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -20,10 +20,13 @@ extern int nni_sock_open(nni_sock **, const nni_proto *); extern void nni_sock_close(nni_sock *); extern void nni_sock_closeall(void); extern int nni_sock_shutdown(nni_sock *); -extern uint16_t nni_sock_proto(nni_sock *); -extern uint16_t nni_sock_peer(nni_sock *); +extern uint16_t nni_sock_proto_id(nni_sock *); +extern uint16_t nni_sock_peer_id(nni_sock *); extern const char *nni_sock_proto_name(nni_sock *); extern const char *nni_sock_peer_name(nni_sock *); +extern void * nni_sock_proto_data(nni_sock *); + +extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *); extern int nni_sock_setopt( nni_sock *, const char *, const void *, size_t, int); diff --git a/src/core/transport.c b/src/core/transport.c index 94e5caa1..0cd84461 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -118,7 +118,7 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ) nni_mtx_lock(&nni_tran_lk); NNI_LIST_FOREACH (&nni_tran_list, t) { - const nni_tran_ep * ep; + const nni_tran_ep_ops * ep; const nni_tran_ep_option *eo; // Generally we look for endpoint options. diff --git a/src/core/transport.h b/src/core/transport.h index b1fecaa2..96976efd 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -23,10 +23,10 @@ struct nni_tran { const char *tran_scheme; // tran_ep links our endpoint-specific operations. - const nni_tran_ep *tran_ep; + const nni_tran_ep_ops *tran_ep; // tran_pipe links our pipe-specific operations. - const nni_tran_pipe *tran_pipe; + const nni_tran_pipe_ops *tran_pipe; // tran_init, if not NULL, is called once during library // initialization. @@ -77,7 +77,7 @@ struct nni_tran_ep_option { // For a given endpoint, the framework holds a lock so that each entry // point is run exclusively of the others. (Transports must still guard // against any asynchronous operations they manage themselves, though.) -struct nni_tran_ep { +struct nni_tran_ep_ops { // ep_init creates a vanilla endpoint. The value created is // used for the first argument for all other endpoint functions. int (*ep_init)(void **, nni_url *, nni_sock *, int); @@ -128,7 +128,7 @@ struct nni_tran_pipe_option { // with socket locks held, so it is forbidden for the transport to call // back into the socket at this point. (Which is one reason pointers back // to socket or even enclosing pipe state, are not provided.) -struct nni_tran_pipe { +struct nni_tran_pipe_ops { // p_fini destroys the pipe. This should clean up all local // resources, including closing files and freeing memory, used by // the pipe. After this call returns, the system will not make diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index 2a2a1228..01f20130 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -67,7 +67,6 @@ bus0_sock_fini(void *arg) { bus0_sock *s = arg; - nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); @@ -108,7 +107,18 @@ bus0_sock_close(void *arg) { bus0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_aio_close(s->aio_getq); +} + +static void +bus0_pipe_stop(void *arg) +{ + bus0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); } static void @@ -168,18 +178,17 @@ bus0_pipe_start(void *arg) } static void -bus0_pipe_stop(void *arg) +bus0_pipe_close(void *arg) { bus0_pipe *p = arg; bus0_sock *s = p->psock; + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_putq); nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - nni_mtx_lock(&s->mtx); if (nni_list_active(&s->pipes, p)) { nni_list_remove(&s->pipes, p); @@ -351,6 +360,7 @@ static nni_proto_pipe_ops bus0_pipe_ops = { .pipe_init = bus0_pipe_init, .pipe_fini = bus0_pipe_fini, .pipe_start = bus0_pipe_start, + .pipe_close = bus0_pipe_close, .pipe_stop = bus0_pipe_stop, }; diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index e275e52c..2fb42df5 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -78,6 +78,17 @@ pair0_sock_fini(void *arg) } static void +pair0_pipe_stop(void *arg) +{ + pair0_pipe *p = arg; + + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); +} + +static void pair0_pipe_fini(void *arg) { pair0_pipe *p = arg; @@ -135,15 +146,15 @@ pair0_pipe_start(void *arg) } static void -pair0_pipe_stop(void *arg) +pair0_pipe_close(void *arg) { pair0_pipe *p = arg; pair0_sock *s = p->psock; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_putq); + nni_aio_close(p->aio_getq); nni_mtx_lock(&s->mtx); if (s->ppipe == p) { @@ -249,6 +260,7 @@ static nni_proto_pipe_ops pair0_pipe_ops = { .pipe_init = pair0_pipe_init, .pipe_fini = pair0_pipe_fini, .pipe_start = pair0_pipe_start, + .pipe_close = pair0_pipe_close, .pipe_stop = pair0_pipe_stop, }; diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index a3c01d46..ab01e451 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -117,9 +117,21 @@ pair1_sock_init_raw(void **sp, nni_sock *nsock) } static void +pair1_pipe_stop(void *arg) +{ + pair1_pipe *p = arg; + + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); +} + +static void pair1_pipe_fini(void *arg) { pair1_pipe *p = arg; + nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); nni_aio_fini(p->aio_putq); @@ -198,21 +210,22 @@ pair1_pipe_start(void *arg) } static void -pair1_pipe_stop(void *arg) +pair1_pipe_close(void *arg) { pair1_pipe *p = arg; pair1_sock *s = p->psock; + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_putq); + nni_aio_close(p->aio_getq); + nni_mtx_lock(&s->mtx); nni_idhash_remove(s->pipes, nni_pipe_id(p->npipe)); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); } static void @@ -405,7 +418,8 @@ pair1_sock_open(void *arg) static void pair1_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + pair1_sock *s = arg; + nni_aio_close(s->aio_getq); } static int @@ -464,6 +478,7 @@ static nni_proto_pipe_ops pair1_pipe_ops = { .pipe_init = pair1_pipe_init, .pipe_fini = pair1_pipe_fini, .pipe_start = pair1_pipe_start, + .pipe_close = pair1_pipe_close, .pipe_stop = pair1_pipe_stop, }; diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index c5017d50..81f6c137 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -68,6 +68,15 @@ pull0_sock_fini(void *arg) } static void +pull0_pipe_stop(void *arg) +{ + pull0_pipe *p = arg; + + nni_aio_stop(p->putq_aio); + nni_aio_stop(p->recv_aio); +} + +static void pull0_pipe_fini(void *arg) { pull0_pipe *p = arg; @@ -110,12 +119,12 @@ pull0_pipe_start(void *arg) } static void -pull0_pipe_stop(void *arg) +pull0_pipe_close(void *arg) { pull0_pipe *p = arg; - nni_aio_stop(p->putq_aio); - nni_aio_stop(p->recv_aio); + nni_aio_close(p->putq_aio); + nni_aio_close(p->recv_aio); } static void @@ -198,6 +207,7 @@ static nni_proto_pipe_ops pull0_pipe_ops = { .pipe_init = pull0_pipe_init, .pipe_fini = pull0_pipe_fini, .pipe_start = pull0_pipe_start, + .pipe_close = pull0_pipe_close, .pipe_stop = pull0_pipe_stop, }; diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 2ad657b6..e413cf46 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -83,6 +83,16 @@ push0_sock_close(void *arg) } static void +push0_pipe_stop(void *arg) +{ + push0_pipe *p = arg; + + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_getq); +} + +static void push0_pipe_fini(void *arg) { push0_pipe *p = arg; @@ -136,13 +146,13 @@ push0_pipe_start(void *arg) } static void -push0_pipe_stop(void *arg) +push0_pipe_close(void *arg) { push0_pipe *p = arg; - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_getq); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_getq); } static void @@ -214,6 +224,7 @@ static nni_proto_pipe_ops push0_pipe_ops = { .pipe_init = push0_pipe_init, .pipe_fini = push0_pipe_fini, .pipe_start = push0_pipe_start, + .pipe_close = push0_pipe_close, .pipe_stop = push0_pipe_stop, }; diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 45f4b7d9..4db48754 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -61,7 +61,6 @@ pub0_sock_fini(void *arg) { pub0_sock *s = arg; - nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); @@ -103,13 +102,24 @@ pub0_sock_close(void *arg) { pub0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_aio_close(s->aio_getq); +} + +static void +pub0_pipe_stop(void *arg) +{ + pub0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); } static void pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; + nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -164,14 +174,14 @@ pub0_pipe_start(void *arg) } static void -pub0_pipe_stop(void *arg) +pub0_pipe_close(void *arg) { pub0_pipe *p = arg; pub0_sock *s = p->pub; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); nni_msgq_close(p->sendq); @@ -290,6 +300,7 @@ static nni_proto_pipe_ops pub0_pipe_ops = { .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, + .pipe_close = pub0_pipe_close, .pipe_stop = pub0_pipe_stop, }; diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b41b33ea..c244b0ad 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -99,6 +99,15 @@ sub0_sock_close(void *arg) } static void +sub0_pipe_stop(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); +} + +static void sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; @@ -139,12 +148,12 @@ sub0_pipe_start(void *arg) } static void -sub0_pipe_stop(void *arg) +sub0_pipe_close(void *arg) { sub0_pipe *p = arg; - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_recv); + nni_aio_close(p->aio_putq); + nni_aio_close(p->aio_recv); } static void @@ -338,6 +347,7 @@ static nni_proto_pipe_ops sub0_pipe_ops = { .pipe_init = sub0_pipe_init, .pipe_fini = sub0_pipe_fini, .pipe_start = sub0_pipe_start, + .pipe_close = sub0_pipe_close, .pipe_stop = sub0_pipe_stop, }; diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 965cbea7..c483b777 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -297,6 +297,15 @@ rep0_sock_close(void *arg) } static void +rep0_pipe_stop(void *arg) +{ + rep0_pipe *p = arg; + + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); +} + +static void rep0_pipe_fini(void *arg) { rep0_pipe *p = arg; @@ -347,14 +356,14 @@ rep0_pipe_start(void *arg) } static void -rep0_pipe_stop(void *arg) +rep0_pipe_close(void *arg) { rep0_pipe *p = arg; rep0_sock *s = p->rep; rep0_ctx * ctx; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); nni_mtx_lock(&s->lk); if (nni_list_active(&s->recvpipes, p)) { @@ -647,6 +656,7 @@ static nni_proto_pipe_ops rep0_pipe_ops = { .pipe_init = rep0_pipe_init, .pipe_fini = rep0_pipe_fini, .pipe_start = rep0_pipe_start, + .pipe_close = rep0_pipe_close, .pipe_stop = rep0_pipe_stop, }; diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index bbb0b886..fc9d7271 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -186,6 +186,15 @@ req0_sock_fini(void *arg) } static void +req0_pipe_stop(void *arg) +{ + req0_pipe *p = arg; + + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_send); +} + +static void req0_pipe_fini(void *arg) { req0_pipe *p = arg; @@ -243,17 +252,14 @@ req0_pipe_start(void *arg) } static void -req0_pipe_stop(void *arg) +req0_pipe_close(void *arg) { req0_pipe *p = arg; req0_sock *s = p->req; req0_ctx * ctx; - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_send); - - // At this point there should not be any further AIOs running. - // Further, any completion tasks have completed. + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_send); nni_mtx_lock(&s->mtx); // This removes the node from either busypipes or readypipes. @@ -837,6 +843,7 @@ static nni_proto_pipe_ops req0_pipe_ops = { .pipe_init = req0_pipe_init, .pipe_fini = req0_pipe_fini, .pipe_start = req0_pipe_start, + .pipe_close = req0_pipe_close, .pipe_stop = req0_pipe_stop, }; diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 4773677e..6dcfe6be 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -62,7 +62,6 @@ xrep0_sock_fini(void *arg) { xrep0_sock *s = arg; - nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->lk); @@ -108,7 +107,18 @@ xrep0_sock_close(void *arg) { xrep0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_aio_close(s->aio_getq); +} + +static void +xrep0_pipe_stop(void *arg) +{ + xrep0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); } static void @@ -178,16 +188,16 @@ xrep0_pipe_start(void *arg) } static void -xrep0_pipe_stop(void *arg) +xrep0_pipe_close(void *arg) { xrep0_pipe *p = arg; xrep0_sock *s = p->rep; + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_putq); nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); nni_mtx_lock(&s->lk); nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); @@ -389,6 +399,7 @@ static nni_proto_pipe_ops xrep0_pipe_ops = { .pipe_init = xrep0_pipe_init, .pipe_fini = xrep0_pipe_fini, .pipe_start = xrep0_pipe_start, + .pipe_close = xrep0_pipe_close, .pipe_stop = xrep0_pipe_stop, }; diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 13ae7418..793411af 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -90,6 +90,17 @@ xreq0_sock_fini(void *arg) } static void +xreq0_pipe_stop(void *arg) +{ + xreq0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_send); +} + +static void xreq0_pipe_fini(void *arg) { xreq0_pipe *p = arg; @@ -140,17 +151,14 @@ xreq0_pipe_start(void *arg) } static void -xreq0_pipe_stop(void *arg) +xreq0_pipe_close(void *arg) { xreq0_pipe *p = arg; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_send); - - // At this point there should not be any further AIOs running. - // Further, any completion tasks have completed. + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_putq); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_send); } // For raw mode we can just let the pipes "contend" via getq to get a @@ -277,6 +285,7 @@ static nni_proto_pipe_ops xreq0_pipe_ops = { .pipe_init = xreq0_pipe_init, .pipe_fini = xreq0_pipe_fini, .pipe_start = xreq0_pipe_start, + .pipe_close = xreq0_pipe_close, .pipe_stop = xreq0_pipe_stop, }; diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index e553f6ce..7738a8b7 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -290,6 +290,15 @@ resp0_sock_close(void *arg) } static void +resp0_pipe_stop(void *arg) +{ + resp0_pipe *p = arg; + + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); +} + +static void resp0_pipe_fini(void *arg) { resp0_pipe *p = arg; @@ -344,12 +353,15 @@ resp0_pipe_start(void *arg) } static void -resp0_pipe_stop(void *arg) +resp0_pipe_close(void *arg) { resp0_pipe *p = arg; resp0_sock *s = p->psock; resp0_ctx * ctx; + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_mtx_lock(&s->mtx); while ((ctx = nni_list_first(&p->sendq)) != NULL) { nni_aio *aio; @@ -369,9 +381,6 @@ resp0_pipe_stop(void *arg) } nni_idhash_remove(s->pipes, p->id); nni_mtx_unlock(&s->mtx); - - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); } static void @@ -626,6 +635,7 @@ static nni_proto_pipe_ops resp0_pipe_ops = { .pipe_init = resp0_pipe_init, .pipe_fini = resp0_pipe_fini, .pipe_start = resp0_pipe_start, + .pipe_close = resp0_pipe_close, .pipe_stop = resp0_pipe_stop, }; diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index e725d2b3..51bce0c8 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -284,6 +284,16 @@ surv0_sock_close(void *arg) } static void +surv0_pipe_stop(void *arg) +{ + surv0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); +} + +static void surv0_pipe_fini(void *arg) { surv0_pipe *p = arg; @@ -338,14 +348,14 @@ surv0_pipe_start(void *arg) } static void -surv0_pipe_stop(void *arg) +surv0_pipe_close(void *arg) { surv0_pipe *p = arg; surv0_sock *s = p->sock; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); nni_msgq_close(p->sendq); @@ -532,6 +542,7 @@ static nni_proto_pipe_ops surv0_pipe_ops = { .pipe_init = surv0_pipe_init, .pipe_fini = surv0_pipe_fini, .pipe_start = surv0_pipe_start, + .pipe_close = surv0_pipe_close, .pipe_stop = surv0_pipe_stop, }; diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index 7aaed6da..bcbbcbc7 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -63,7 +63,6 @@ xresp0_sock_fini(void *arg) { xresp0_sock *s = arg; - nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); @@ -107,7 +106,18 @@ xresp0_sock_close(void *arg) { xresp0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_aio_close(s->aio_getq); +} + +static void +xresp0_pipe_stop(void *arg) +{ + xresp0_pipe *p = arg; + + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); } static void @@ -170,16 +180,17 @@ xresp0_pipe_start(void *arg) } static void -xresp0_pipe_stop(void *arg) +xresp0_pipe_close(void *arg) { xresp0_pipe *p = arg; xresp0_sock *s = p->psock; + nni_aio_close(p->aio_putq); + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); nni_mtx_lock(&s->mtx); nni_idhash_remove(s->pipes, p->id); @@ -366,6 +377,7 @@ static nni_proto_pipe_ops xresp0_pipe_ops = { .pipe_init = xresp0_pipe_init, .pipe_fini = xresp0_pipe_fini, .pipe_start = xresp0_pipe_start, + .pipe_close = xresp0_pipe_close, .pipe_stop = xresp0_pipe_stop, }; diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index cf311b15..47ebef3c 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -61,7 +61,6 @@ xsurv0_sock_fini(void *arg) { xsurv0_sock *s = arg; - nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); @@ -104,7 +103,18 @@ xsurv0_sock_close(void *arg) { xsurv0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_aio_close(s->aio_getq); +} + +static void +xsurv0_pipe_stop(void *arg) +{ + xsurv0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); } static void @@ -166,15 +176,15 @@ xsurv0_pipe_start(void *arg) } static void -xsurv0_pipe_stop(void *arg) +xsurv0_pipe_close(void *arg) { xsurv0_pipe *p = arg; xsurv0_sock *s = p->psock; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); + nni_aio_close(p->aio_putq); nni_msgq_close(p->sendq); @@ -338,6 +348,7 @@ static nni_proto_pipe_ops xsurv0_pipe_ops = { .pipe_init = xsurv0_pipe_init, .pipe_fini = xsurv0_pipe_fini, .pipe_start = xsurv0_pipe_start, + .pipe_close = xsurv0_pipe_close, .pipe_stop = xsurv0_pipe_stop, }; diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index f3b16370..15d1f776 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -110,22 +110,26 @@ http_close(nni_http_conn *conn) } conn->closed = true; - if (nni_list_first(&conn->wrq)) { - nni_aio_abort(conn->wr_aio, NNG_ECLOSED); - // Abort all operations except the one in flight. - while ((aio = nni_list_last(&conn->wrq)) != - nni_list_first(&conn->wrq)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } + nni_aio_close(conn->wr_aio); + nni_aio_close(conn->rd_aio); + + if ((aio = conn->rd_uaio) != NULL) { + conn->rd_uaio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); } - if (nni_list_first(&conn->rdq)) { - nni_aio_abort(conn->rd_aio, NNG_ECLOSED); - while ((aio = nni_list_last(&conn->rdq)) != - nni_list_first(&conn->rdq)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } + if ((aio = conn->wr_uaio) != NULL) { + conn->wr_uaio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + + // Abort all operations except the one in flight. + while ((aio = nni_list_first(&conn->wrq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + while ((aio = nni_list_first(&conn->rdq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } if (conn->sock != NULL) { @@ -668,6 +672,9 @@ nni_http_tls_verified(nni_http_conn *conn) void nni_http_conn_fini(nni_http_conn *conn) { + nni_aio_stop(conn->wr_aio); + nni_aio_stop(conn->rd_aio); + nni_mtx_lock(&conn->mtx); http_close(conn); if ((conn->sock != NULL) && (conn->fini != NULL)) { @@ -675,8 +682,7 @@ nni_http_conn_fini(nni_http_conn *conn) conn->sock = NULL; } nni_mtx_unlock(&conn->mtx); - nni_aio_stop(conn->wr_aio); - nni_aio_stop(conn->rd_aio); + nni_aio_fini(conn->wr_aio); nni_aio_fini(conn->rd_aio); nni_free(conn->rd_buf, conn->rd_bufsz); diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index c92de586..c7738aee 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -229,12 +229,6 @@ http_sconn_reap(void *arg) } static void -http_sconn_fini(http_sconn *sc) -{ - nni_reap(&sc->reap, http_sconn_reap, sc); -} - -static void http_sconn_close_locked(http_sconn *sc) { nni_http_conn *conn; @@ -245,15 +239,15 @@ http_sconn_close_locked(http_sconn *sc) NNI_ASSERT(!sc->finished); sc->closed = true; - nni_aio_abort(sc->rxaio, NNG_ECLOSED); - nni_aio_abort(sc->txaio, NNG_ECLOSED); - nni_aio_abort(sc->txdataio, NNG_ECLOSED); - nni_aio_abort(sc->cbaio, NNG_ECLOSED); + nni_aio_close(sc->rxaio); + nni_aio_close(sc->txaio); + nni_aio_close(sc->txdataio); + nni_aio_close(sc->cbaio); if ((conn = sc->conn) != NULL) { nni_http_conn_close(conn); } - http_sconn_fini(sc); + nni_reap(&sc->reap, http_sconn_reap, sc); } static void diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c index f721d4aa..cdd226cd 100644 --- a/src/supplemental/tls/mbedtls/tls.c +++ b/src/supplemental/tls/mbedtls/tls.c @@ -761,6 +761,9 @@ nni_tls_close(nni_tls *tp) { nni_aio *aio; + nni_aio_close(tp->tcp_send); + nni_aio_close(tp->tcp_recv); + nni_mtx_lock(&tp->lk); tp->tls_closed = true; diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 9a3f5519..6d4d3c13 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -407,14 +407,15 @@ ws_close_cb(void *arg) nni_ws *ws = arg; ws_msg *wm; + nni_aio_close(ws->txaio); + nni_aio_close(ws->rxaio); + nni_aio_close(ws->httpaio); + // Either we sent a close frame, or we didn't. Either way, // we are done, and its time to abort everything else. nni_mtx_lock(&ws->mtx); nni_http_conn_close(ws->http); - nni_aio_abort(ws->txaio, NNG_ECLOSED); - nni_aio_abort(ws->rxaio, NNG_ECLOSED); - nni_aio_abort(ws->httpaio, NNG_ECLOSED); // This list (receive) should be empty. while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) { @@ -464,8 +465,8 @@ ws_close(nni_ws *ws, uint16_t code) // pending connect request. if (!ws->closed) { // ABORT connection negotiation. - nni_aio_abort(ws->connaio, NNG_ECLOSED); - nni_aio_abort(ws->httpaio, NNG_ECLOSED); + nni_aio_close(ws->connaio); + nni_aio_close(ws->httpaio); ws_send_close(ws, code); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 0f159d3a..bc51d971 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -198,7 +198,7 @@ nni_inproc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) } ep->mode = mode; - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto_id(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -452,7 +452,7 @@ static nni_tran_pipe_option nni_inproc_pipe_options[] = { }, }; -static nni_tran_pipe nni_inproc_pipe_ops = { +static nni_tran_pipe_ops nni_inproc_pipe_ops = { .p_fini = nni_inproc_pipe_fini, .p_send = nni_inproc_pipe_send, .p_recv = nni_inproc_pipe_recv, @@ -468,7 +468,7 @@ static nni_tran_ep_option nni_inproc_ep_options[] = { }, }; -static nni_tran_ep nni_inproc_ep_ops = { +static nni_tran_ep_ops nni_inproc_ep_ops = { .ep_init = nni_inproc_ep_init, .ep_fini = nni_inproc_ep_fini, .ep_connect = nni_inproc_ep_connect, diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 2347e24c..1740bfcb 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -81,6 +81,10 @@ nni_ipc_pipe_close(void *arg) { nni_ipc_pipe *pipe = arg; + nni_aio_close(pipe->rxaio); + nni_aio_close(pipe->txaio); + nni_aio_close(pipe->negaio); + nni_plat_ipc_pipe_close(pipe->ipp); } @@ -644,7 +648,7 @@ nni_ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) nni_ipc_ep_fini(ep); return (rv); } - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto_id(sock); *epp = ep; return (0); @@ -655,11 +659,11 @@ nni_ipc_ep_close(void *arg) { nni_ipc_ep *ep = arg; + nni_aio_close(ep->aio); + nni_mtx_lock(&ep->mtx); nni_plat_ipc_ep_close(ep->iep); nni_mtx_unlock(&ep->mtx); - - nni_aio_stop(ep->aio); } static int @@ -883,7 +887,7 @@ static nni_tran_pipe_option nni_ipc_pipe_options[] = { }, }; -static nni_tran_pipe nni_ipc_pipe_ops = { +static nni_tran_pipe_ops nni_ipc_pipe_ops = { .p_fini = nni_ipc_pipe_fini, .p_start = nni_ipc_pipe_start, .p_send = nni_ipc_pipe_send, @@ -924,7 +928,7 @@ static nni_tran_ep_option nni_ipc_ep_options[] = { }, }; -static nni_tran_ep nni_ipc_ep_ops = { +static nni_tran_ep_ops nni_ipc_ep_ops = { .ep_init = nni_ipc_ep_init, .ep_fini = nni_ipc_ep_fini, .ep_connect = nni_ipc_ep_connect, diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index f2cdf8ac..22217699 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -81,9 +81,13 @@ nni_tcp_tran_fini(void) static void nni_tcp_pipe_close(void *arg) { - nni_tcp_pipe *pipe = arg; + nni_tcp_pipe *p = arg; + + nni_aio_close(p->rxaio); + nni_aio_close(p->txaio); + nni_aio_close(p->negaio); - nni_plat_tcp_pipe_close(pipe->tpp); + nni_plat_tcp_pipe_close(p->tpp); } static void @@ -666,7 +670,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) nni_tcp_ep_fini(ep); return (rv); } - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto_id(sock); ep->mode = mode; ep->nodelay = true; ep->keepalive = false; @@ -680,11 +684,11 @@ nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; + nni_aio_close(ep->aio); + nni_mtx_lock(&ep->mtx); nni_plat_tcp_ep_close(ep->tep); nni_mtx_unlock(&ep->mtx); - - nni_aio_stop(ep->aio); } static int @@ -910,7 +914,7 @@ static nni_tran_pipe_option nni_tcp_pipe_options[] = { }, }; -static nni_tran_pipe nni_tcp_pipe_ops = { +static nni_tran_pipe_ops nni_tcp_pipe_ops = { .p_fini = nni_tcp_pipe_fini, .p_start = nni_tcp_pipe_start, .p_send = nni_tcp_pipe_send, @@ -951,7 +955,7 @@ static nni_tran_ep_option nni_tcp_ep_options[] = { }, }; -static nni_tran_ep nni_tcp_ep_ops = { +static nni_tran_ep_ops nni_tcp_ep_ops = { .ep_init = nni_tcp_ep_init, .ep_fini = nni_tcp_ep_fini, .ep_connect = nni_tcp_ep_connect, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index c863a85e..21557270 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -91,6 +91,10 @@ nni_tls_pipe_close(void *arg) { nni_tls_pipe *p = arg; + nni_aio_close(p->rxaio); + nni_aio_close(p->txaio); + nni_aio_close(p->negaio); + nni_tls_close(p->tls); } @@ -687,7 +691,7 @@ nni_tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) return (rv); } } - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto_id(sock); ep->authmode = authmode; *epp = ep; @@ -699,11 +703,11 @@ nni_tls_ep_close(void *arg) { nni_tls_ep *ep = arg; + nni_aio_close(ep->aio); + nni_mtx_lock(&ep->mtx); nni_plat_tcp_ep_close(ep->tep); nni_mtx_unlock(&ep->mtx); - - nni_aio_stop(ep->aio); } static int @@ -1036,7 +1040,7 @@ static nni_tran_pipe_option nni_tls_pipe_options[] = { }, }; -static nni_tran_pipe nni_tls_pipe_ops = { +static nni_tran_pipe_ops nni_tls_pipe_ops = { .p_fini = nni_tls_pipe_fini, .p_start = nni_tls_pipe_start, .p_send = nni_tls_pipe_send, @@ -1107,7 +1111,7 @@ static nni_tran_ep_option nni_tls_ep_options[] = { }, }; -static nni_tran_ep nni_tls_ep_ops = { +static nni_tran_ep_ops nni_tls_ep_ops = { .ep_init = nni_tls_ep_init, .ep_fini = nni_tls_ep_fini, .ep_connect = nni_tls_ep_connect, diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 12f3aeb5..a4081b25 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -205,6 +205,9 @@ ws_pipe_close(void *arg) { ws_pipe *p = arg; + nni_aio_close(p->rxaio); + nni_aio_close(p->txaio); + nni_mtx_lock(&p->mtx); nni_ws_close(p->ws); nni_mtx_unlock(&p->mtx); @@ -588,7 +591,7 @@ static nni_tran_pipe_option ws_pipe_options[] = { } }; -static nni_tran_pipe ws_pipe_ops = { +static nni_tran_pipe_ops ws_pipe_ops = { .p_fini = ws_pipe_fini, .p_send = ws_pipe_send, .p_recv = ws_pipe_recv, @@ -690,6 +693,9 @@ ws_ep_close(void *arg) { ws_ep *ep = arg; + nni_aio_close(ep->accaio); + nni_aio_close(ep->connaio); + if (ep->mode == NNI_EP_MODE_LISTEN) { nni_ws_listener_close(ep->listener); } else { @@ -750,8 +756,8 @@ ws_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) nni_aio_list_init(&ep->aios); ep->mode = mode; - ep->lproto = nni_sock_proto(sock); - ep->rproto = nni_sock_peer(sock); + ep->lproto = nni_sock_proto_id(sock); + ep->rproto = nni_sock_peer_id(sock); if (mode == NNI_EP_MODE_DIAL) { pname = nni_sock_peer_name(sock); @@ -795,7 +801,7 @@ ws_tran_fini(void) { } -static nni_tran_ep ws_ep_ops = { +static nni_tran_ep_ops ws_ep_ops = { .ep_init = ws_ep_init, .ep_fini = ws_ep_fini, .ep_connect = ws_ep_connect, @@ -1015,7 +1021,7 @@ static nni_tran_ep_option wss_ep_options[] = { }, }; -static nni_tran_ep wss_ep_ops = { +static nni_tran_ep_ops wss_ep_ops = { .ep_init = ws_ep_init, .ep_fini = ws_ep_fini, .ep_connect = ws_ep_connect, diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index f7139b94..46fe476e 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -2182,7 +2182,7 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) ep->ze_ping_time = zt_ping_time; ep->ze_conn_time = zt_conn_time; ep->ze_conn_tries = zt_conn_tries; - ep->ze_proto = nni_sock_proto(sock); + ep->ze_proto = nni_sock_proto_id(sock); nni_aio_list_init(&ep->ze_aios); @@ -2888,7 +2888,7 @@ static nni_tran_pipe_option zt_pipe_options[] = { }, }; -static nni_tran_pipe zt_pipe_ops = { +static nni_tran_pipe_ops zt_pipe_ops = { .p_fini = zt_pipe_fini, .p_start = zt_pipe_start, .p_send = zt_pipe_send, @@ -2983,7 +2983,7 @@ static nni_tran_ep_option zt_ep_options[] = { }, }; -static nni_tran_ep zt_ep_ops = { +static nni_tran_ep_ops zt_ep_ops = { .ep_init = zt_ep_init, .ep_fini = zt_ep_fini, .ep_connect = zt_ep_connect, |
