From 343417234aa3fd86e8ae0b56ae500a1ed3411cfc Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 12 Aug 2017 12:24:54 -0700 Subject: fixes #62 Endpoint close should be synchronous #62 fixes #66 Make pipe and endpoint structures private This changes a number of things, refactoring endpoints and supporting code to keep their internals private, and making endpoint close synchronous. This will allow us to add a consumer facing API for nng_ep_close(), as well as property APIs, etc. While here a bunch of convoluted and dead code was cleaned up. --- src/core/endpt.c | 389 ++++++++++++++++++++++++++++-------------------------- src/core/endpt.h | 70 +++------- src/core/pipe.c | 114 +++++++--------- src/core/pipe.h | 20 +-- src/core/socket.c | 283 +++++++++++---------------------------- src/core/socket.h | 15 +-- 6 files changed, 351 insertions(+), 540 deletions(-) (limited to 'src/core') diff --git a/src/core/endpt.c b/src/core/endpt.c index a5865acf..6e5f7e8a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,58 +14,63 @@ #include #include +struct nni_ep { + nni_tran_ep ep_ops; // transport ops + nni_tran * ep_tran; // transport pointer + void * ep_data; // transport private + uint32_t ep_id; // endpoint id + nni_list_node ep_node; // per socket list + nni_sock * ep_sock; + char ep_addr[NNG_MAXADDRLEN]; + int ep_mode; + int ep_closed; // full shutdown + int ep_closing; // close pending (waiting on refcnt) + int ep_bound; // true if we bound locally + int ep_refcnt; + nni_mtx ep_mtx; + nni_cv ep_cv; + nni_list ep_pipes; + nni_aio ep_acc_aio; + nni_aio ep_con_aio; + nni_aio ep_con_syn; // used for sync connect + nni_aio ep_tmo_aio; // backoff timer + nni_duration ep_maxrtime; // maximum time for reconnect + nni_duration ep_currtime; // current time for reconnect + nni_duration ep_inirtime; // initial time for reconnect + nni_time ep_conntime; // time of last good connect +}; + // Functionality related to end points. -static void nni_ep_accept_start(nni_ep *); -static void nni_ep_accept_done(void *); -static void nni_ep_connect_start(nni_ep *); -static void nni_ep_connect_done(void *); -static void nni_ep_backoff_start(nni_ep *); -static void nni_ep_backoff_done(void *); -static void nni_ep_reaper(void *); +static void nni_ep_acc_start(nni_ep *); +static void nni_ep_acc_cb(void *); +static void nni_ep_con_start(nni_ep *); +static void nni_ep_con_cb(void *); +static void nni_ep_tmo_start(nni_ep *); +static void nni_ep_tmo_cb(void *); static nni_idhash *nni_eps; - -static nni_list nni_ep_reap_list; -static nni_mtx nni_ep_reap_lk; -static nni_cv nni_ep_reap_cv; -static nni_thr nni_ep_reap_thr; -static int nni_ep_reap_run; +static nni_mtx nni_ep_lk; int nni_ep_sys_init(void) { int rv; - NNI_LIST_INIT(&nni_ep_reap_list, nni_ep, ep_reap_node); - - if (((rv = nni_mtx_init(&nni_ep_reap_lk)) != 0) || - ((rv = nni_cv_init(&nni_ep_reap_cv, &nni_ep_reap_lk)) != 0) || - ((rv = nni_thr_init(&nni_ep_reap_thr, nni_ep_reaper, 0)) != 0) || + if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) || ((rv = nni_idhash_init(&nni_eps)) != 0)) { return (rv); } nni_idhash_set_limits( nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_ep_reap_run = 1; - nni_thr_run(&nni_ep_reap_thr); - return (0); } void nni_ep_sys_fini(void) { - if (nni_ep_reap_run) { - nni_mtx_lock(&nni_ep_reap_lk); - nni_ep_reap_run = 0; - nni_cv_wake(&nni_ep_reap_cv); - nni_mtx_unlock(&nni_ep_reap_lk); - } - nni_thr_fini(&nni_ep_reap_thr); - nni_cv_fini(&nni_ep_reap_cv); - nni_mtx_fini(&nni_ep_reap_lk); + nni_mtx_fini(&nni_ep_lk); nni_idhash_fini(nni_eps); nni_eps = NULL; } @@ -83,19 +88,22 @@ nni_ep_destroy(nni_ep *ep) return; } - // Remove us form the table so we cannot be found. + // Remove us from the table so we cannot be found. if (ep->ep_id != 0) { nni_idhash_remove(nni_eps, ep->ep_id); } + + nni_sock_ep_remove(ep->ep_sock, ep); + nni_aio_stop(&ep->ep_acc_aio); nni_aio_stop(&ep->ep_con_aio); nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_backoff); + nni_aio_stop(&ep->ep_tmo_aio); nni_aio_fini(&ep->ep_acc_aio); nni_aio_fini(&ep->ep_con_aio); nni_aio_fini(&ep->ep_con_syn); - nni_aio_fini(&ep->ep_backoff); + nni_aio_fini(&ep->ep_tmo_aio); nni_mtx_lock(&ep->ep_mtx); if (ep->ep_data != NULL) { @@ -107,8 +115,8 @@ nni_ep_destroy(nni_ep *ep) NNI_FREE_STRUCT(ep); } -int -nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) +static int +nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) { nni_tran *tran; nni_ep * ep; @@ -127,152 +135,174 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_closed = 0; ep->ep_bound = 0; ep->ep_data = NULL; - ep->ep_refcnt = 0; + ep->ep_refcnt = 1; + ep->ep_sock = s; + ep->ep_tran = tran; + ep->ep_mode = mode; + + // Make a copy of the endpoint operations. This allows us to + // modify them (to override NULLs for example), and avoids an extra + // dereference on hot paths. + ep->ep_ops = *tran->tran_ep; + + // Could safely use strcpy here, but this avoids discussion. + (void) snprintf(ep->ep_addr, sizeof(ep->ep_addr), "%s", addr); NNI_LIST_NODE_INIT(&ep->ep_node); - NNI_LIST_NODE_INIT(&ep->ep_reap_node); nni_pipe_ep_list_init(&ep->ep_pipes); if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) || ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) || - ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0)) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_accept_done, ep); - if (rv != 0) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep); - if (rv != 0) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL); - if (rv != 0) { - nni_ep_destroy(ep); - return (rv); - } - rv = nni_aio_init(&ep->ep_backoff, nni_ep_backoff_done, ep); - if (rv != 0) { + ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || + ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || + ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); return (rv); } - ep->ep_sock = sock; - ep->ep_tran = tran; - ep->ep_mode = mode; + *epp = ep; + return (0); +} - // Could safely use strcpy here, but this avoids discussion. - (void) snprintf(ep->ep_addr, sizeof(ep->ep_addr), "%s", addr); +int +nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *addr) +{ + return (nni_ep_create(epp, s, addr, NNI_EP_MODE_DIAL)); +} - // Make a copy of the endpoint operations. This allows us to - // modify them (to override NULLs for example), and avoids an extra - // dereference on hot paths. - ep->ep_ops = *tran->tran_ep; +int +nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *addr) +{ + return (nni_ep_create(epp, s, addr, NNI_EP_MODE_LISTEN)); +} - if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) { - nni_ep_destroy(ep); +int +nni_ep_find(nni_ep **epp, uint32_t id) +{ + int rv; + nni_ep *ep; + + if ((rv = nni_init()) != 0) { return (rv); } - *epp = ep; - return (0); + nni_mtx_lock(&nni_ep_lk); + if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) { + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + ep->ep_refcnt++; + *epp = ep; + } + } + nni_mtx_unlock(&nni_ep_lk); + return (rv); +} + +int +nni_ep_hold(nni_ep *ep) +{ + int rv; + nni_mtx_lock(&nni_ep_lk); + if (ep->ep_closed) { + rv = NNG_ECLOSED; + } else { + ep->ep_refcnt++; + rv = 0; + } + nni_mtx_unlock(&nni_ep_lk); + return (rv); } void -nni_ep_close(nni_ep *ep) +nni_ep_rele(nni_ep *ep) +{ + nni_mtx_lock(&nni_ep_lk); + ep->ep_refcnt--; + if (ep->ep_closing) { + nni_cv_wake(&ep->ep_cv); + } + nni_mtx_unlock(&nni_ep_lk); +} + +int +nni_ep_shutdown(nni_ep *ep) { + // This is done under the endpoints lock, although the remove + // is done under that as well, we also make sure that we hold + // the socket lock in the remove step. nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); - return; + return (NNG_ECLOSED); } - ep->ep_closed = 1; - nni_mtx_unlock(&ep->ep_mtx); + ep->ep_closing = 1; // Abort any remaining in-flight operations. nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED); nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED); nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED); - nni_aio_cancel(&ep->ep_backoff, NNG_ECLOSED); + nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED); // Stop the underlying transport. ep->ep_ops.ep_close(ep->ep_data); -} - -static void -nni_ep_reap(nni_ep *ep) -{ - nni_ep_close(ep); // Extra sanity. - - nni_aio_stop(&ep->ep_acc_aio); - nni_aio_stop(&ep->ep_con_aio); - nni_aio_stop(&ep->ep_con_syn); - nni_aio_stop(&ep->ep_backoff); - // Take us off the sock list. - nni_sock_ep_remove(ep->ep_sock, ep); - - // Make sure any other unlocked users (references) are gone - // before we actually remove the memory. We should not have - // to wait long as we have closed the underlying pipe and - // done everything we can to wake any waiter (synchronous connect) - // gracefully. - nni_mtx_lock(&ep->ep_mtx); - ep->ep_closed = 1; - for (;;) { - if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) { - nni_cv_wait(&ep->ep_cv); - continue; - } - break; - } nni_mtx_unlock(&ep->ep_mtx); - nni_ep_destroy(ep); + return (0); } void -nni_ep_stop(nni_ep *ep) +nni_ep_close(nni_ep *ep) { - nni_pipe *pipe; + nni_pipe *p; nni_mtx_lock(&ep->ep_mtx); - - // Protection against recursion. - if (ep->ep_stop) { + if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); + nni_ep_rele(ep); return; } - ep->ep_stop = 1; - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_stop(pipe); + ep->ep_closed = 1; + nni_mtx_unlock(&ep->ep_mtx); + + nni_ep_shutdown(ep); + + nni_aio_stop(&ep->ep_acc_aio); + nni_aio_stop(&ep->ep_con_aio); + nni_aio_stop(&ep->ep_con_syn); + nni_aio_stop(&ep->ep_tmo_aio); + + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, p) { + nni_pipe_stop(p); + } + while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) { + nni_cv_wait(&ep->ep_cv); } nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_lock(&nni_ep_reap_lk); - NNI_ASSERT(!nni_list_node_active(&ep->ep_reap_node)); - nni_list_append(&nni_ep_reap_list, ep); - nni_cv_wake(&nni_ep_reap_cv); - nni_mtx_unlock(&nni_ep_reap_lk); + nni_ep_destroy(ep); } static void -nni_ep_backoff_cancel(nni_aio *aio, int rv) +nni_ep_tmo_cancel(nni_aio *aio, int rv) { // The only way this ever gets "finished", is via cancellation. nni_aio_finish_error(aio, rv); } static void -nni_ep_backoff_start(nni_ep *ep) +nni_ep_tmo_start(nni_ep *ep) { nni_duration backoff; - if (ep->ep_closed) { + if (ep->ep_closing) { return; } backoff = ep->ep_currtime; @@ -282,36 +312,36 @@ nni_ep_backoff_start(nni_ep *ep) } // To minimize damage from storms, etc., we select a backoff - // value randomly, in the range of [0, backoff-1]; this is pretty - // similar to 802 style backoff, except that we have a nearly - // uniform time period instead of discrete slot times. This - // algorithm may lead to slight biases because we don't have - // a statistically perfect distribution with the modulo of the - // random number, but this really doesn't matter. - - ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff); - nni_aio_start(&ep->ep_backoff, nni_ep_backoff_cancel, ep); + // value randomly, in the range of [0, backoff-1]; this is + // pretty similar to 802 style backoff, except that we have a + // nearly uniform time period instead of discrete slot times. + // This algorithm may lead to slight biases because we don't + // have a statistically perfect distribution with the modulo of + // the random number, but this really doesn't matter. + + ep->ep_tmo_aio.a_expire = nni_clock() + (nni_random() % backoff); + nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep); } static void -nni_ep_backoff_done(void *arg) +nni_ep_tmo_cb(void *arg) { nni_ep * ep = arg; - nni_aio *aio = &ep->ep_backoff; + nni_aio *aio = &ep->ep_tmo_aio; nni_mtx_lock(&ep->ep_mtx); if (nni_aio_result(aio) == NNG_ETIMEDOUT) { if (ep->ep_mode == NNI_EP_MODE_DIAL) { - nni_ep_connect_start(ep); + nni_ep_con_start(ep); } else { - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); } } nni_mtx_unlock(&ep->ep_mtx); } static void -nni_ep_connect_done(void *arg) +nni_ep_con_cb(void *arg) { nni_ep * ep = arg; nni_aio *aio = &ep->ep_con_aio; @@ -324,11 +354,12 @@ nni_ep_connect_done(void *arg) switch (rv) { case 0: // Good connect, so reset the backoff timer. - // Note that a host that accepts the connect, but drops us - // immediately, is going to get hit pretty hard (depending - // on the initial backoff) with no exponential backoff. - // This can happen if we wind up trying to connect to some - // port that does not speak SP for example. + // Note that a host that accepts the connect, but drops + // us immediately, is going to get hit pretty hard + // (depending on the initial backoff) with no + // exponential backoff. This can happen if we wind up + // trying to connect to some port that does not speak + // SP for example. ep->ep_currtime = ep->ep_inirtime; // No further outgoing connects -- we will restart a @@ -340,19 +371,19 @@ nni_ep_connect_done(void *arg) break; default: // Other errors involve the use of the backoff timer. - nni_ep_backoff_start(ep); + nni_ep_tmo_start(ep); break; } nni_mtx_unlock(&ep->ep_mtx); } static void -nni_ep_connect_start(nni_ep *ep) +nni_ep_con_start(nni_ep *ep) { nni_aio *aio = &ep->ep_con_aio; // Call with the Endpoint lock held. - if (ep->ep_closed) { + if (ep->ep_closing) { return; } @@ -375,13 +406,13 @@ nni_ep_dial(nni_ep *ep, int flags) nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } if ((flags & NNG_FLAG_SYNCH) == 0) { - nni_ep_connect_start(ep); + nni_ep_con_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); } @@ -404,7 +435,7 @@ nni_ep_dial(nni_ep *ep, int flags) } static void -nni_ep_accept_done(void *arg) +nni_ep_acc_cb(void *arg) { nni_ep * ep = arg; nni_aio *aio = &ep->ep_acc_aio; @@ -418,7 +449,7 @@ nni_ep_accept_done(void *arg) nni_mtx_lock(&ep->ep_mtx); switch (rv) { case 0: - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); break; case NNG_ECLOSED: case NNG_ECANCELED: @@ -427,26 +458,27 @@ nni_ep_accept_done(void *arg) case NNG_ECONNABORTED: case NNG_ECONNRESET: // These are remote conditions, no cool down. - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); break; default: - // We don't really know why we failed, but we backoff here. - // This is because errors here are probably due to system - // failures (resource exhaustion) and we hope by not - // thrashing we give the system a chance to recover. - nni_ep_backoff_start(ep); + // We don't really know why we failed, but we backoff + // here. This is because errors here are probably due + // to system failures (resource exhaustion) and we hope + // by not thrashing we give the system a chance to + // recover. + nni_ep_tmo_start(ep); break; } nni_mtx_unlock(&ep->ep_mtx); } static void -nni_ep_accept_start(nni_ep *ep) +nni_ep_acc_start(nni_ep *ep) { nni_aio *aio = &ep->ep_acc_aio; // Call with the Endpoint lock held. - if (ep->ep_closed) { + if (ep->ep_closing) { return; } aio->a_pipe = NULL; @@ -467,7 +499,7 @@ nni_ep_listen(nni_ep *ep, int flags) nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } @@ -479,7 +511,7 @@ nni_ep_listen(nni_ep *ep, int flags) } ep->ep_bound = 1; - nni_ep_accept_start(ep); + nni_ep_acc_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); @@ -489,12 +521,11 @@ int nni_ep_pipe_add(nni_ep *ep, nni_pipe *p) { nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { + if (ep->ep_closing) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } nni_list_append(&ep->ep_pipes, p); - p->p_ep = ep; nni_mtx_unlock(&ep->ep_mtx); return (0); } @@ -508,7 +539,6 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) if (nni_list_active(&ep->ep_pipes, pipe)) { nni_list_remove(&ep->ep_pipes, pipe); } - pipe->p_ep = NULL; // Wake up the close thread if it is waiting. if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) { nni_cv_wake(&ep->ep_cv); @@ -517,9 +547,10 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) // If this pipe closed, then lets restart the dial operation. // Since the remote side seems to have closed, lets start with // a backoff. This keeps us from pounding the crap out of the - // thing if a remote server accepts but then disconnects immediately. + // thing if a remote server accepts but then disconnects + // immediately. if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) { - nni_ep_backoff_start(ep); + nni_ep_tmo_start(ep); } nni_mtx_unlock(&ep->ep_mtx); } @@ -530,28 +561,14 @@ nni_ep_list_init(nni_list *list) NNI_LIST_INIT(list, nni_ep, ep_node); } -static void -nni_ep_reaper(void *notused) +nni_tran * +nni_ep_tran(nni_ep *ep) { - NNI_ARG_UNUSED(notused); - - nni_mtx_lock(&nni_ep_reap_lk); - for (;;) { - nni_ep *ep; - - if ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) { - nni_list_remove(&nni_ep_reap_list, ep); - nni_mtx_unlock(&nni_ep_reap_lk); - nni_ep_reap(ep); - nni_mtx_lock(&nni_ep_reap_lk); - continue; - } - - if (!nni_ep_reap_run) { - break; - } + return (ep->ep_tran); +} - nni_cv_wait(&nni_ep_reap_cv); - } - nni_mtx_unlock(&nni_ep_reap_lk); -} \ No newline at end of file +nni_sock * +nni_ep_sock(nni_ep *ep) +{ + return (ep->ep_sock); +} diff --git a/src/core/endpt.h b/src/core/endpt.h index 5a923bd5..fbb10911 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -1,5 +1,5 @@ // -// Copyright 2016 Garrett D'Amore +// Copyright 2017 Garrett D'Amore // Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -11,58 +11,30 @@ #ifndef CORE_ENDPT_H #define CORE_ENDPT_H -#include "core/defs.h" -#include "core/list.h" -#include "core/thread.h" -#include "core/transport.h" - -// NB: This structure is supplied here for use by the CORE. Use of this -// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS -// OR TRANSPORTS. -struct nni_ep { - nni_tran_ep ep_ops; // transport ops - nni_tran * ep_tran; // transport pointer - void * ep_data; // transport private - uint32_t ep_id; // endpoint id - nni_list_node ep_node; // per socket list - nni_sock * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - int ep_mode; - int ep_started; - int ep_stop; - int ep_closed; // full shutdown - int ep_bound; // true if we bound locally - int ep_refcnt; - nni_mtx ep_mtx; - nni_cv ep_cv; - nni_list ep_pipes; - nni_aio ep_acc_aio; - nni_aio ep_con_aio; - nni_aio ep_con_syn; // used for sync connect - nni_aio ep_backoff; // backoff timer - nni_duration ep_maxrtime; // maximum time for reconnect - nni_duration ep_currtime; // current time for reconnect - nni_duration ep_inirtime; // initial time for reconnect - nni_time ep_conntime; // time of last good connect - nni_list_node ep_reap_node; -}; +extern int nni_ep_sys_init(void); +extern void nni_ep_sys_fini(void); +extern nni_tran *nni_ep_tran(nni_ep *); +extern nni_sock *nni_ep_sock(nni_ep *); +extern int nni_ep_find(nni_ep **, uint32_t); +extern int nni_ep_hold(nni_ep *); +extern void nni_ep_rele(nni_ep *); +extern uint32_t nni_ep_id(nni_ep *); +extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *); +extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *); +extern void nni_ep_stop(nni_ep *); +extern int nni_ep_shutdown(nni_ep *); +extern void nni_ep_close(nni_ep *); +extern int nni_ep_dial(nni_ep *, int); +extern int nni_ep_listen(nni_ep *, int); +extern void nni_ep_list_init(nni_list *); +extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); +extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); +// Endpoint modes. Currently used by transports. Remove this when we make +// transport dialers and listeners explicit. enum nni_ep_mode { NNI_EP_MODE_DIAL = 1, NNI_EP_MODE_LISTEN = 2, }; -extern int nni_ep_sys_init(void); -extern void nni_ep_sys_fini(void); -extern int nni_ep_find(nni_ep **, uint32_t); -extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int); -extern void nni_ep_stop(nni_ep *); -extern void nni_ep_close(nni_ep *); -extern int nni_ep_dial(nni_ep *, int); -extern int nni_ep_listen(nni_ep *, int); -extern void nni_ep_list_init(nni_list *); -extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); -extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); - #endif // CORE_ENDPT_H diff --git a/src/core/pipe.c b/src/core/pipe.c index 1658aabc..75c4c8d6 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -15,6 +15,24 @@ // Operations on pipes (to the transport) are generally blocking operations, // performed in the context of the protocol. +struct nni_pipe { + uint32_t p_id; + nni_tran_pipe p_tran_ops; + void * p_tran_data; + void * p_proto_data; + nni_list_node p_sock_node; + nni_list_node p_ep_node; + nni_sock * p_sock; + nni_ep * p_ep; + int p_reap; + int p_stop; + int p_refcnt; + nni_mtx p_mtx; + nni_cv p_cv; + nni_list_node p_reap_node; + nni_aio p_start_aio; +}; + static nni_idhash *nni_pipes; static nni_list nni_pipe_reap_list; @@ -78,7 +96,10 @@ nni_pipe_destroy(nni_pipe *p) if (p == NULL) { return; } - NNI_ASSERT(p->p_refcnt != 0xDEAD); + + // Stop any pending negotiation. + nni_aio_stop(&p->p_start_aio); + // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&p->p_mtx); @@ -86,14 +107,17 @@ nni_pipe_destroy(nni_pipe *p) nni_cv_wait(&p->p_cv); } nni_mtx_unlock(&p->p_mtx); - p->p_refcnt = 0xDEAD; - nni_aio_stop(&p->p_start_aio); + // We have exclusive access at this point, so we can check if + // we are still on any lists. + nni_aio_fini(&p->p_start_aio); - if (p->p_proto_data != NULL) { - p->p_proto_dtor(p->p_proto_data); + if (nni_list_node_active(&p->p_ep_node)) { + nni_ep_pipe_remove(p->p_ep, p); } + nni_sock_pipe_remove(p->p_sock, p); + if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -148,31 +172,6 @@ nni_pipe_close(nni_pipe *p) nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED); } -// Pipe reap is called on a taskq when the pipe should be closed. No -// locks are held. This routine must take care to synchronously ensure -// that no further references to the pipe are possible, then it may -// destroy the pipe. -static void -nni_pipe_reap(nni_pipe *p) -{ - // Transport close... - nni_pipe_close(p); - - nni_aio_stop(&p->p_start_aio); - - // Remove the pipe from the socket and the endpoint. Note - // that it is in theory possible for either of these to be null - // if the pipe is being torn down before it is fully initialized. - if (p->p_ep != NULL) { - nni_ep_pipe_remove(p->p_ep, p); - } - if (p->p_sock != NULL) { - nni_sock_pipe_remove(p->p_sock, p); - } - - nni_pipe_destroy(p); -} - void nni_pipe_stop(nni_pipe *p) { @@ -206,16 +205,12 @@ nni_pipe_start_cb(void *arg) nni_aio * aio = &p->p_start_aio; int rv; - nni_mtx_lock(&p->p_mtx); if ((rv = nni_aio_result(aio)) != 0) { - nni_mtx_unlock(&p->p_mtx); nni_pipe_stop(p); return; } - nni_mtx_unlock(&p->p_mtx); - - if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { + if ((rv = nni_sock_pipe_start(p->p_sock, p)) != 0) { nni_pipe_stop(p); } } @@ -225,8 +220,8 @@ nni_pipe_create(nni_ep *ep, void *tdata) { nni_pipe *p; int rv; - nni_tran *tran = ep->ep_tran; - nni_sock *sock = ep->ep_sock; + nni_tran *tran = nni_ep_tran(ep); + nni_sock *sock = nni_ep_sock(ep); if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { // In this case we just toss the pipe... @@ -238,31 +233,24 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_data = NULL; + p->p_ep = ep; + p->p_sock = sock; NNI_LIST_NODE_INIT(&p->p_reap_node); - - if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || - ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) { - tran->tran_pipe->p_fini(p); - nni_mtx_fini(&p->p_mtx); - NNI_FREE_STRUCT(p); - return (rv); - } - NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); - if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) { - nni_pipe_destroy(p); - return (rv); - } - - if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) { + if (((rv = nni_mtx_init(&p->p_mtx)) != 0) || + ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) || + ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != + 0) || + ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || + ((rv = nni_ep_pipe_add(ep, p)) != 0) || + ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); - return (rv); } - return (0); + return (rv); } int @@ -291,6 +279,12 @@ nni_pipe_get_proto_data(nni_pipe *p) return (p->p_proto_data); } +void +nni_pipe_set_proto_data(nni_pipe *p, void *data) +{ + p->p_proto_data = data; +} + void nni_pipe_sock_list_init(nni_list *list) { @@ -318,18 +312,6 @@ nni_pipe_reaper(void *notused) // Transport close... nni_pipe_close(p); - nni_aio_stop(&p->p_start_aio); - - // Remove the pipe from the socket and the endpoint. - // Note that it is in theory possible for either of - // these to be null if the pipe is being torn down - // before it is fully initialized. - if (p->p_ep != NULL) { - nni_ep_pipe_remove(p->p_ep, p); - } - if (p->p_sock != NULL) { - nni_sock_pipe_remove(p->p_sock, p); - } nni_pipe_destroy(p); nni_mtx_lock(&nni_pipe_reap_lk); continue; diff --git a/src/core/pipe.h b/src/core/pipe.h index 40871062..9436d650 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -19,25 +19,6 @@ #include "core/thread.h" #include "core/transport.h" -struct nni_pipe { - uint32_t p_id; - nni_tran_pipe p_tran_ops; - void * p_tran_data; - void * p_proto_data; - nni_cb p_proto_dtor; - nni_list_node p_sock_node; - nni_list_node p_ep_node; - nni_sock * p_sock; - nni_ep * p_ep; - int p_reap; - int p_stop; - int p_refcnt; - nni_mtx p_mtx; - nni_cv p_cv; - nni_list_node p_reap_node; - nni_aio p_start_aio; -}; - extern int nni_pipe_sys_init(void); extern void nni_pipe_sys_fini(void); @@ -81,6 +62,7 @@ extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); // nni_pipe_get_proto_data gets the protocol private data set with the // nni_pipe_set_proto_data function. No locking is performed. extern void *nni_pipe_get_proto_data(nni_pipe *); +extern void nni_pipe_set_proto_data(nni_pipe *, void *); // nni_pipe_sock_list_init initializes a list of pipes, to be used by // a per-socket list. diff --git a/src/core/socket.c b/src/core/socket.c index 9471e56e..e01791a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -64,132 +64,63 @@ nni_sock_find(nni_sock **sockp, uint32_t id) void nni_sock_rele(nni_sock *s) { - nni_mtx_lock(&s->s_mx); + nni_mtx_lock(&nni_sock_lk); s->s_refcnt--; - if (s->s_closing) { - nni_cv_wake(&s->s_cv); + if (s->s_closed && (s->s_refcnt < 2)) { + nni_cv_wake(&s->s_close_cv); } - nni_mtx_unlock(&s->s_mx); + nni_mtx_unlock(&nni_sock_lk); } -static int -nni_sock_pipe_start(nni_pipe *pipe) +int +nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) { - nni_sock *s = pipe->p_sock; - void * pdata = nni_pipe_get_proto_data(pipe); - int rv; + void *pdata = nni_pipe_get_proto_data(pipe); + int rv; NNI_ASSERT(s != NULL); + nni_mtx_lock(&s->s_mx); if (s->s_closing) { // We're closing, bail out. - return (NNG_ECLOSED); - } - if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { + rv = NNG_ECLOSED; + } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { // Peer protocol mismatch. - return (NNG_EPROTO); - } - if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) { - // Protocol rejection for other reasons. - // E.g. pair and already have active connected partner. - return (rv); + rv = NNG_EPROTO; + } else { + // Protocol can reject for other reasons. + rv = s->s_pipe_ops.pipe_start(pdata); } + nni_mtx_unlock(&s->s_mx); return (0); } -static void -nni_sock_pipe_start_cb(void *arg) +int +nni_sock_pipe_add(nni_sock *s, nni_pipe *p) { - nni_pipe *pipe = arg; - nni_aio * aio = &pipe->p_start_aio; + int rv; + void *pdata; - if (nni_aio_result(aio) != 0) { - // Failed I/O during start, abort everything. - nni_pipe_stop(pipe); - return; - } - if (nni_sock_pipe_start(pipe) != 0) { - nni_pipe_stop(pipe); - return; + if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) { + return (rv); } -} - -int -nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe) -{ - int rv; + nni_pipe_set_proto_data(p, pdata); // Initialize protocol pipe data. nni_mtx_lock(&s->s_mx); - nni_mtx_lock(&ep->ep_mtx); - - if ((s->s_closing) || (ep->ep_closed)) { - nni_mtx_unlock(&ep->ep_mtx); + if (s->s_closing) { nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } - rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&s->s_mx); - return (rv); - } - rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_lock(&s->s_mx); - return (rv); - } - // Save the protocol destructor. - pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini; - pipe->p_sock = s; - pipe->p_ep = ep; - - nni_list_append(&s->s_pipes, pipe); - nni_list_append(&ep->ep_pipes, pipe); + nni_list_append(&s->s_pipes, p); // Start the initial negotiation I/O... - if (pipe->p_tran_ops.p_start == NULL) { - if (nni_sock_pipe_start(pipe) != 0) { - nni_pipe_stop(pipe); - } - } else { - pipe->p_tran_ops.p_start( - pipe->p_tran_data, &pipe->p_start_aio); - } + nni_pipe_start(p); - nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&s->s_mx); return (0); } -int -nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) -{ - int rv; - void *pdata = nni_pipe_get_proto_data(pipe); - - nni_mtx_lock(&sock->s_mx); - - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_EPROTO); - } - - if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } - - nni_mtx_unlock(&sock->s_mx); - - return (0); -} - void nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { @@ -197,23 +128,20 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - // Stop any pending negotiation. - nni_aio_stop(&pipe->p_start_aio); - - nni_mtx_lock(&sock->s_mx); - if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { - nni_mtx_unlock(&sock->s_mx); - return; - } - - sock->s_pipe_ops.pipe_stop(pdata); - if (nni_list_active(&sock->s_pipes, pipe)) { - nni_list_remove(&sock->s_pipes, pipe); - if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { - nni_cv_wake(&sock->s_cv); + if (pdata != NULL) { + nni_mtx_lock(&sock->s_mx); + sock->s_pipe_ops.pipe_stop(pdata); + if (nni_list_active(&sock->s_pipes, pipe)) { + nni_list_remove(&sock->s_pipes, pipe); + if (sock->s_closing && + nni_list_empty(&sock->s_pipes)) { + nni_cv_wake(&sock->s_cv); + } } + sock->s_pipe_ops.pipe_fini(pdata); + nni_pipe_set_proto_data(pipe, NULL); + nni_mtx_unlock(&sock->s_mx); } - nni_mtx_unlock(&sock->s_mx); } void @@ -329,6 +257,7 @@ nni_sock_destroy(nni_sock *s) nni_ev_fini(&s->s_recv_ev); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); + nni_cv_fini(&s->s_close_cv); nni_cv_fini(&s->s_cv); nni_mtx_fini(&s->s_mx); NNI_FREE_STRUCT(s); @@ -372,6 +301,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if (((rv = nni_mtx_init(&s->s_mx)) != 0) || ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) || + ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) || ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) || ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) || ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || @@ -446,6 +376,7 @@ nni_sock_shutdown(nni_sock *sock) { nni_pipe *pipe; nni_ep * ep; + nni_ep * nep; nni_time linger; nni_mtx_lock(&sock->s_mx); @@ -468,7 +399,7 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_shutdown(ep); } nni_mtx_unlock(&sock->s_mx); @@ -498,10 +429,20 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each ep, arrange for it to teardown hard. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_stop(ep); + // Go through the endpoint list, attempting to close them. + // We might already have a close in progress, in which case + // we skip past it; it will be removed from another thread. + nep = nni_list_first(&sock->s_eps); + while ((ep = nep) != NULL) { + nep = nni_list_next(&sock->s_eps, nep); + + if (nni_ep_hold(ep) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_ep_close(ep); + nni_mtx_lock(&sock->s_mx); + } } + // For each pipe, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_stop(pipe); @@ -527,38 +468,6 @@ nni_sock_shutdown(nni_sock *sock) return (0); } -void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) -{ - nni_pipe *pipe; - // If we're not on the list, then nothing to do. Be idempotent. - // Note that if the ep is not on a list, then we assume that we have - // exclusive access. Therefore the check for being active need not - // be locked. - if (!nni_list_node_active(&ep->ep_node)) { - return; - } - - // This is done under the endpoints lock, although the remove - // is done under that as well, we also make sure that we hold - // the socket lock in the remove step. - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_stop(pipe); - } - while (!nni_list_empty(&ep->ep_pipes)) { - nni_cv_wait(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); - - nni_mtx_lock(&sock->s_mx); - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); - } - nni_mtx_unlock(&sock->s_mx); -} - // nni_sock_close shuts down the socket, then releases any resources // associated with it. It is a programmer error to reference the socket // after this function is called, as the pointer may reference invalid @@ -585,13 +494,17 @@ nni_sock_close(nni_sock *s) // nni_sock_closeall. This is idempotent. nni_list_node_remove(&s->s_node); - nni_mtx_unlock(&nni_sock_lk); - // Wait for all other references to drop. Note that we // have a reference already (from our caller). + while (s->s_refcnt > 1) { + nni_cv_wait(&s->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + + // Wait for pipe and eps to finish closing. nni_mtx_lock(&s->s_mx); - while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) || - (!nni_list_empty(&s->s_eps))) { + while ( + (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -742,77 +655,29 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) +nni_sock_ep_add(nni_sock *sock, nni_ep *ep) { - nni_ep *ep; - int rv; - nni_mtx_lock(&sock->s_mx); - if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) { + if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); - return (rv); + return (NNG_ECLOSED); } - nni_mtx_lock(&ep->ep_mtx); nni_list_append(&sock->s_eps, ep); - // Put a hold on the endpoint, for now. - ep->ep_refcnt++; - ep->ep_started = 1; - nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); - - if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_stop(ep); - } else if (epp != NULL) { - *epp = ep; - } - - // Drop our endpoint hold. - nni_mtx_lock(&ep->ep_mtx); - if (rv != 0) { - ep->ep_started = 0; - } - ep->ep_refcnt--; - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); - - return (rv); + return (0); } -int -nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) +void +nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { - nni_ep *ep; - int rv; - nni_mtx_lock(&sock->s_mx); - if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); + if (nni_list_active(&sock->s_eps, ep)) { + nni_list_remove(&sock->s_eps, ep); + if ((sock->s_closed) && (nni_list_empty(&sock->s_eps))) { + nni_cv_wake(&sock->s_cv); + } } - - nni_list_append(&sock->s_eps, ep); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_refcnt++; - ep->ep_started = 1; - nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); - - if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_stop(ep); - } else if (epp != NULL) { - *epp = ep; - } - - // Drop our endpoint hold. - nni_mtx_lock(&ep->ep_mtx); - if (rv != 0) { - ep->ep_started = 0; - } - ep->ep_refcnt--; - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); - - return (rv); } void diff --git a/src/core/socket.h b/src/core/socket.h index 76ebca12..94dd816d 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -74,8 +74,6 @@ extern int nni_sock_setopt(nni_sock *, int, const void *, size_t); extern int nni_sock_getopt(nni_sock *, int, void *, size_t *); extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time); extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time); -extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int); -extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int); extern uint32_t nni_sock_id(nni_sock *); extern void nni_sock_lock(nni_sock *); @@ -84,22 +82,17 @@ extern void nni_sock_unlock(nni_sock *); extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); extern void nni_sock_unnotify(nni_sock *, nni_notify *); -extern void nni_sock_ep_remove(nni_sock *, nni_ep *); - // nni_sock_pipe_add adds the pipe to the socket. It is called by // the generic pipe creation code. It also adds the socket to the // ep list, and starts the pipe. It does all these to ensure that // we have complete success or failure, and there is no point where // a pipe could wind up orphaned. -extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *); - +extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); +extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p); -// nni_sock_pipe_ready lets the socket know the pipe is ready for -// business. This also calls the socket/protocol specific add function, -// and it may return an error. The reference count should be dropped by -// nni_sock_pipe_closed. -extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *); +extern int nni_sock_ep_add(nni_sock *, nni_ep *); +extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // Set error codes for applications. These are only ever // called from the filter functions in protocols, and thus -- cgit v1.2.3-70-g09d2