diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 395 | ||||
| -rw-r--r-- | src/core/endpt.h | 7 | ||||
| -rw-r--r-- | src/core/pipe.c | 14 | ||||
| -rw-r--r-- | src/core/pipe.h | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 87 | ||||
| -rw-r--r-- | src/core/socket.h | 2 |
6 files changed, 197 insertions, 310 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 8eb7dd12..fef6ac83 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -16,69 +16,31 @@ // Functionality related to end points. -static nni_objhash *nni_eps = NULL; -static void * nni_ep_ctor(uint32_t); -static void nni_ep_dtor(void *); +static void nni_ep_accept_start(nni_ep *); +static void nni_ep_accept_done(void *); + +static nni_idhash *nni_eps; int nni_ep_sys_init(void) { int rv; - rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor); - if (rv != 0) { + if ((rv = nni_idhash_init(&nni_eps)) != 0) { return (rv); } - return (rv); -} - -void -nni_ep_sys_fini(void) -{ - nni_objhash_fini(nni_eps); - nni_eps = NULL; -} - -int -nni_ep_find(nni_ep **epp, uint32_t id) -{ - int rv; - nni_ep *ep; - if ((rv = nni_init()) != 0) { - return (rv); - } + nni_idhash_set_limits( + nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff); - rv = nni_objhash_find(nni_eps, id, (void **) &ep); - if (rv != 0) { - return (NNG_ECLOSED); - } - nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - nni_objhash_unref(nni_eps, id); - return (NNG_ECLOSED); - } - nni_mtx_unlock(&ep->ep_mtx); - if (epp != NULL) { - *epp = ep; - } return (0); } void -nni_ep_hold(nni_ep *ep) -{ - int rv; - - rv = nni_objhash_find(nni_eps, ep->ep_id, NULL); - NNI_ASSERT(rv == 0); -} - -void -nni_ep_rele(nni_ep *ep) +nni_ep_sys_fini(void) { - nni_objhash_unref(nni_eps, ep->ep_id); + nni_idhash_fini(nni_eps); + nni_eps = NULL; } uint32_t @@ -87,47 +49,20 @@ nni_ep_id(nni_ep *ep) return (ep->ep_id); } -static void * -nni_ep_ctor(uint32_t id) -{ - nni_ep *ep; - int rv; - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NULL); - } - ep->ep_closed = 0; - ep->ep_bound = 0; - ep->ep_pipe = NULL; - ep->ep_id = id; - ep->ep_data = NULL; - - NNI_LIST_NODE_INIT(&ep->ep_node); - - nni_pipe_ep_list_init(&ep->ep_pipes); - - if ((rv = nni_mtx_init(&ep->ep_mtx)) != 0) { - NNI_FREE_STRUCT(ep); - return (NULL); - } - - if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) { - nni_mtx_fini(&ep->ep_mtx); - NNI_FREE_STRUCT(ep); - return (NULL); - } - - return (ep); -} - static void -nni_ep_dtor(void *ptr) +nni_ep_destroy(nni_ep *ep) { - nni_ep *ep = ptr; - + if (ep == NULL) { + return; + } + nni_aio_fini(&ep->ep_acc_aio); if (ep->ep_data != NULL) { ep->ep_ops.ep_fini(ep->ep_data); } + if (ep->ep_id != 0) { + nni_idhash_remove(nni_eps, ep->ep_id); + } + nni_thr_fini(&ep->ep_thr); nni_cv_fini(&ep->ep_cv); nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); @@ -148,10 +83,31 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) return (NNG_EINVAL); } - rv = nni_objhash_alloc(nni_eps, &id, (void **) &ep); + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + ep->ep_closed = 0; + ep->ep_bound = 0; + ep->ep_pipe = NULL; + ep->ep_id = id; + ep->ep_data = NULL; + + NNI_LIST_NODE_INIT(&ep->ep_node); + + nni_pipe_ep_list_init(&ep->ep_pipes); + + if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) || + ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) || + ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0)) { + nni_ep_destroy(ep); + return (rv); + } + rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_accept_done, ep); if (rv != 0) { + nni_ep_destroy(ep); return (rv); } + ep->ep_sock = sock; ep->ep_tran = tran; ep->ep_mode = mode; @@ -165,12 +121,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) ep->ep_ops = *tran->tran_ep; if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) { - nni_objhash_unref(nni_eps, id); - return (rv); - } - - if ((rv = nni_sock_ep_add(sock, ep)) != 0) { - nni_objhash_unref(nni_eps, id); + nni_ep_destroy(ep); return (rv); } @@ -179,8 +130,11 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode) } void -nni_ep_stop(nni_ep *ep) +nni_ep_close(nni_ep *ep) { + // Abort any in-flight operations. + nni_aio_stop(&ep->ep_acc_aio); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_closed == 0) { ep->ep_closed = 1; @@ -190,38 +144,33 @@ nni_ep_stop(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); } -void -nni_ep_close(nni_ep *ep) -{ - nni_ep_stop(ep); -} - -void -nni_ep_remove(nni_ep *ep) +static void +nni_ep_reap(nni_ep *ep) { nni_pipe *pipe; - nni_sock *sock = ep->ep_sock; - nni_ep_stop(ep); + nni_ep_close(ep); // Extra sanity. - nni_thr_wait(&ep->ep_thr); + // Take us off the sock list. + nni_sock_ep_remove(ep->ep_sock, ep); - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_close(pipe); - } - nni_mtx_unlock(&ep->ep_mtx); + nni_ep_destroy(ep); +} +void +nni_ep_stop(nni_ep *ep) +{ nni_mtx_lock(&ep->ep_mtx); - while (nni_list_first(&ep->ep_pipes) != NULL) { - nni_cv_wait(&ep->ep_cv); + + // Protection against recursion. + if (ep->ep_stop) { + nni_mtx_unlock(&ep->ep_mtx); + return; } + ep->ep_stop = 1; + nni_taskq_ent_init(&ep->ep_reap_tqe, (nni_cb) nni_ep_reap, ep); + nni_taskq_dispatch(NULL, &ep->ep_reap_tqe); nni_mtx_unlock(&ep->ep_mtx); - - nni_sock_ep_remove(sock, ep); - - nni_thr_fini(&ep->ep_thr); - nni_objhash_unref(nni_eps, ep->ep_id); } static int @@ -253,17 +202,20 @@ nni_ep_connect_sync(nni_ep *ep) nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { nni_mtx_unlock(&ep->ep_mtx); return (rv); } + pipe->p_ep = ep; nni_list_append(&ep->ep_pipes, pipe); nni_mtx_unlock(&ep->ep_mtx); rv = nni_ep_connect_aio(ep, &pipe->p_tran_data); if (rv != 0) { - nni_pipe_stop(pipe); + if (rv != NNG_ECLOSED) { // HACK ALERT + nni_pipe_stop(pipe); + } return (rv); } nni_pipe_start(pipe); @@ -273,24 +225,6 @@ nni_ep_connect_sync(nni_ep *ep) return (0); } -void -nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) -{ - if (ep == NULL) { - return; - } - nni_mtx_lock(&ep->ep_mtx); - if (nni_list_active(&ep->ep_pipes, pipe)) { - nni_list_remove(&ep->ep_pipes, pipe); - - if (ep->ep_pipe == pipe) { - ep->ep_pipe = NULL; - } - nni_cv_wake(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); -} - // nni_dialer is the thread worker that dials in the background. static void nni_dialer(void *arg) @@ -396,131 +330,81 @@ nni_ep_dial(nni_ep *ep, int flags) return (rv); } -static int -nni_ep_accept_aio(nni_ep *ep, void **pipep) -{ - nni_aio aio; - int rv; +static void nni_ep_accept_start(nni_ep *ep); - nni_aio_init(&aio, NULL, NULL); - aio.a_endpt = ep->ep_data; - ep->ep_ops.ep_accept(ep->ep_data, &aio); - nni_aio_wait(&aio); - - if ((rv = nni_aio_result(&aio)) == 0) { - *pipep = aio.a_pipe; - } - nni_aio_fini(&aio); - return (rv); -} - -static int -nni_ep_accept_sync(nni_ep *ep) +static void +nni_ep_accept_done(void *arg) { - nni_pipe *pipe; - int rv; + nni_ep * ep = arg; + nni_aio * aio = &ep->ep_acc_aio; + void * tpipe; + nni_pipe * pipe; + int rv; + const nni_tran_pipe *ops; + + ops = ep->ep_tran->tran_pipe; nni_mtx_lock(&ep->ep_mtx); - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_ECLOSED); + if ((rv = nni_aio_result(aio)) != 0) { + goto done; } - rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - nni_list_append(&ep->ep_pipes, pipe); - nni_mtx_unlock(&ep->ep_mtx); + NNI_ASSERT((tpipe = aio->a_pipe) != NULL); - rv = nni_ep_accept_aio(ep, &pipe->p_tran_data); + rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); if (rv != 0) { - nni_pipe_stop(pipe); - return (rv); + ops->p_fini(tpipe); + goto done; + } + +done: + + switch (rv) { + case 0: + pipe->p_tran_ops = *ops; + pipe->p_tran_data = tpipe; + nni_pipe_start(pipe); + nni_ep_accept_start(ep); + break; + case NNG_ECLOSED: + case NNG_ECANCELED: + // Canceled or closed, no furhter action. + break; + case NNG_ECONNABORTED: + case NNG_ECONNRESET: + // These are remote conditions, no cool down. + // cooldown = 0; + nni_ep_accept_start(ep); + break; + case NNG_ENOMEM: + // We're running low on memory, so its best to wait + // a whole second to give the system a chance to + // recover memory. + // cooldown = 1000000; + nni_ep_accept_start(ep); + break; + default: + // other cases... sleep a tiny bit then try again. + // cooldown = 1000; 10msec + // Add a timeout here instead to avoid spinning. + nni_ep_accept_start(ep); + break; } - nni_pipe_start(pipe); - return (0); + nni_mtx_unlock(&ep->ep_mtx); } static void -nni_listener(void *arg) +nni_ep_accept_start(nni_ep *ep) { - nni_ep *ep = arg; - int rv; - - for (;;) { - nni_time cooldown; - nni_mtx_lock(&ep->ep_mtx); + nni_aio *aio = &ep->ep_acc_aio; - // If we didn't bind synchronously, do it now. - while (!ep->ep_bound && !ep->ep_closed) { - int rv; - - nni_mtx_unlock(&ep->ep_mtx); - rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mtx_lock(&ep->ep_mtx); - - if (rv == 0) { - ep->ep_bound = 1; - break; - } - // Invalid address? Out of memory? Who knows. - // Try again in a bit (10ms). - // XXX: PROPER BACKOFF NEEDED - cooldown = 10000; - cooldown += nni_clock(); - while (!ep->ep_closed) { - rv = nni_cv_until(&ep->ep_cv, cooldown); - if (rv == NNG_ETIMEDOUT) { - break; - } - } - } - if (ep->ep_closed) { - nni_mtx_unlock(&ep->ep_mtx); - break; - } - nni_mtx_unlock(&ep->ep_mtx); - - if ((rv = nni_ep_accept_sync(ep)) == 0) { - // Success! Loop around for the next one. - continue; - } - - switch (rv) { - case NNG_ECLOSED: - // This indicates the listening socket got closed. - // We just bail. - return; - - case NNG_ECONNABORTED: - case NNG_ECONNRESET: - // These are remote conditions, no cool down. - cooldown = 0; - break; - case NNG_ENOMEM: - // We're running low on memory, so its best to wait - // a whole second to give the system a chance to - // recover memory. - cooldown = 1000000; - break; - default: - // Other cases we sleep just a tiny bit to avoid - // burning the cpu (e.g. out of files). - cooldown = 1000; // 1 msec - break; - } - cooldown += nni_clock(); - nni_mtx_lock(&ep->ep_mtx); - while (!ep->ep_closed) { - rv = nni_cv_until(&ep->ep_cv, cooldown); - if (rv == NNG_ETIMEDOUT) { - break; - } - } - nni_mtx_unlock(&ep->ep_mtx); + // Call with the Endpoint lock held. + if (ep->ep_closed) { + return; } + + aio->a_endpt = ep->ep_data; + ep->ep_ops.ep_accept(ep->ep_data, aio); } int @@ -542,26 +426,17 @@ nni_ep_listen(nni_ep *ep, int flags) return (NNG_ECLOSED); } - if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) { - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - ep->ep_started = 1; - if (flags & NNG_FLAG_SYNCH) { + rv = ep->ep_ops.ep_bind(ep->ep_data); + if (rv != 0) { + ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); - rv = ep->ep_ops.ep_bind(ep->ep_data); - if (rv != 0) { - nni_thr_fini(&ep->ep_thr); - ep->ep_started = 0; - return (rv); - } - nni_mtx_lock(&ep->ep_mtx); - ep->ep_bound = 1; + return (rv); } + ep->ep_bound = 1; - nni_thr_run(&ep->ep_thr); + nni_ep_accept_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); diff --git a/src/core/endpt.h b/src/core/endpt.h index f37ea7cc..a47586a0 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -29,12 +29,16 @@ struct nni_ep { nni_thr ep_thr; int ep_mode; int ep_started; + int ep_stop; int ep_closed; // full shutdown int ep_bound; // true if we bound locally nni_mtx ep_mtx; nni_cv ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) nni_list ep_pipes; + nni_aio ep_acc_aio; + nni_aio ep_con_aio; + nni_taskq_ent ep_reap_tqe; }; enum nni_ep_mode { @@ -45,13 +49,10 @@ enum nni_ep_mode { extern int nni_ep_sys_init(void); extern void nni_ep_sys_fini(void); extern int nni_ep_find(nni_ep **, uint32_t); -extern void nni_ep_hold(nni_ep *); -extern void nni_ep_rele(nni_ep *); extern uint32_t nni_ep_id(nni_ep *); extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int); extern void nni_ep_stop(nni_ep *); extern void nni_ep_close(nni_ep *); -extern void nni_ep_remove(nni_ep *); extern int nni_ep_dial(nni_ep *, int); extern int nni_ep_listen(nni_ep *, int); extern void nni_ep_list_init(nni_list *); diff --git a/src/core/pipe.c b/src/core/pipe.c index 6fa9ed47..8f2099a9 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -22,8 +22,7 @@ nni_pipe_sys_init(void) { int rv; - rv = nni_idhash_init(&nni_pipes); - if (rv != 0) { + if ((rv = nni_idhash_init(&nni_pipes)) != 0) { return (rv); } @@ -102,6 +101,9 @@ nni_pipe_close(nni_pipe *p) } p->p_reap = 1; + // abort any pending negotiation/start process. + nni_aio_stop(&p->p_start_aio); + // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.p_close(p->p_tran_data); @@ -120,9 +122,6 @@ nni_pipe_reap(nni_pipe *p) // Transport close... nni_pipe_close(p); - // Unlink the endpoint and pipe. - nni_ep_pipe_remove(p->p_ep, p); - // Tell the protocol to stop. nni_sock_pipe_stop(p->p_sock, p); @@ -140,9 +139,9 @@ nni_pipe_stop(nni_pipe *p) return; } p->p_stop = 1; - nni_mtx_unlock(&p->p_mtx); nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_reap, p); nni_taskq_dispatch(NULL, &p->p_reap_tqe); + nni_mtx_unlock(&p->p_mtx); } uint16_t @@ -173,7 +172,7 @@ nni_pipe_start_cb(void *arg) } int -nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) +nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran) { nni_pipe *p; int rv; @@ -202,7 +201,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran) return (rv); } p->p_sock = sock; - p->p_ep = ep; // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. diff --git a/src/core/pipe.h b/src/core/pipe.h index 2f56e788..990dac9d 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -60,7 +60,7 @@ extern void nni_pipe_stop(nni_pipe *); // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. -extern int nni_pipe_create(nni_pipe **, nni_ep *, nni_sock *, nni_tran *); +extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *); extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index 4be5a856..3a4f36e5 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -109,7 +109,8 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) void nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe) { - void *pdata; + void * pdata; + nni_ep *ep; pdata = nni_pipe_get_proto_data(pipe); @@ -119,10 +120,24 @@ nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); return; } + + // Break up the relationship between the EP and the pipe. + if ((ep = pipe->p_ep) != NULL) { + nni_mtx_lock(&ep->ep_mtx); + // During early init, the pipe might not have this set. + if (nni_list_active(&ep->ep_pipes, pipe)) { + nni_list_remove(&ep->ep_pipes, pipe); + } + pipe->p_ep = NULL; + ep->ep_pipe = NULL; // XXX: remove this soon + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + } + sock->s_pipe_ops.pipe_stop(pdata); if (nni_list_active(&sock->s_pipes, pipe)) { nni_list_remove(&sock->s_pipes, pipe); - if (sock->s_closing) { + if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { nni_cv_wake(&sock->s_cv); } } @@ -459,10 +474,10 @@ nni_sock_shutdown(nni_sock *sock) linger = nni_clock() + sock->s_linger; } - // Stop the EPs. This prevents new connections from forming but + // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_stop(ep); + nni_ep_close(ep); } nni_mtx_unlock(&sock->s_mx); @@ -492,15 +507,14 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, close the underlying transport. + // For each pipe, arrange for it to teardown hard. (Close, etc.). NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_stop(pipe); } - // For each ep, close it; this will also tell it to force any - // of its pipes to close. + // For each ep, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_stop(ep); } // Wait for the pipes to be reaped (there should not be any because @@ -511,14 +525,7 @@ nni_sock_shutdown(nni_sock *sock) // Wait for the eps to be reaped. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - - // This has to be done without the lock held, as the remove - // operation requires shutting down a thread which might be - // trying to acquire the socket lock. - nni_mtx_unlock(&sock->s_mx); - nni_ep_remove(ep); - nni_mtx_lock(&sock->s_mx); + nni_cv_wait(&sock->s_cv); } sock->s_sock_ops.sock_close(sock->s_data); @@ -535,28 +542,10 @@ nni_sock_shutdown(nni_sock *sock) return (0); } -// nni_sock_ep_add adds a newly created endpoint to the socket. The -// caller must hold references on the sock and the ep, and not be holding -// the socket lock. The ep acquires a reference against the sock, -// which will be dropped later by nni_sock_rem_ep. The endpoint must not -// already be associated with a socket. (Note, the ep holds the reference -// on the socket, not the other way around.) -int -nni_sock_ep_add(nni_sock *sock, nni_ep *ep) -{ - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - nni_list_append(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); - return (0); -} - void nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { + nni_pipe *pipe; // If we're not on the list, then nothing to do. Be idempotent. // Note that if the ep is not on a list, then we assume that we have // exclusive access. Therefore the check for being active need not @@ -564,8 +553,24 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) if ((sock == NULL) || (!nni_list_active(&sock->s_eps, ep))) { return; } + + // This is done under the endpoints lock, although the remove + // is done under that as well, we also make sure that we hold + // the socket lock in the remove step. + nni_mtx_lock(&ep->ep_mtx); + NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { + nni_pipe_stop(pipe); + } + while (!nni_list_empty(&ep->ep_pipes)) { + nni_cv_wait(&ep->ep_cv); + } + nni_mtx_unlock(&ep->ep_mtx); + nni_mtx_lock(&sock->s_mx); nni_list_remove(&sock->s_eps, ep); + if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { + nni_cv_wake(&sock->s_cv); + } nni_mtx_unlock(&sock->s_mx); } @@ -727,12 +732,16 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; + nni_mtx_lock(&sock->s_mx); if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) { + nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_remove(ep); + nni_ep_stop(ep); } else if (epp != NULL) { *epp = ep; } @@ -746,12 +755,16 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; + nni_mtx_lock(&sock->s_mx); if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) { + nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_remove(ep); + nni_ep_stop(ep); } else if (epp != NULL) { *epp = ep; } diff --git a/src/core/socket.h b/src/core/socket.h index 76b57f09..2dc06009 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -84,7 +85,6 @@ extern void nni_sock_unlock(nni_sock *); extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); extern void nni_sock_unnotify(nni_sock *, nni_notify *); -extern int nni_sock_ep_add(nni_sock *, nni_ep *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *); |
