diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-12 12:24:54 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-14 13:43:02 -0700 |
| commit | 343417234aa3fd86e8ae0b56ae500a1ed3411cfc (patch) | |
| tree | 728992cfe8c2987d5939026a1f734dcc58b3df18 /src/core/socket.c | |
| parent | 4fb81f024e5f32a186cd5538574f8e5796980e36 (diff) | |
| download | nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.gz nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.bz2 nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.zip | |
fixes #62 Endpoint close should be synchronous #62
fixes #66 Make pipe and endpoint structures private
This changes a number of things, refactoring endpoints and supporting
code to keep their internals private, and making endpoint close
synchronous. This will allow us to add a consumer facing API for
nng_ep_close(), as well as property APIs, etc.
While here a bunch of convoluted and dead code was cleaned up.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 283 |
1 files changed, 74 insertions, 209 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 9471e56e..e01791a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -64,132 +64,63 @@ nni_sock_find(nni_sock **sockp, uint32_t id) void nni_sock_rele(nni_sock *s) { - nni_mtx_lock(&s->s_mx); + nni_mtx_lock(&nni_sock_lk); s->s_refcnt--; - if (s->s_closing) { - nni_cv_wake(&s->s_cv); + if (s->s_closed && (s->s_refcnt < 2)) { + nni_cv_wake(&s->s_close_cv); } - nni_mtx_unlock(&s->s_mx); + nni_mtx_unlock(&nni_sock_lk); } -static int -nni_sock_pipe_start(nni_pipe *pipe) +int +nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) { - nni_sock *s = pipe->p_sock; - void * pdata = nni_pipe_get_proto_data(pipe); - int rv; + void *pdata = nni_pipe_get_proto_data(pipe); + int rv; NNI_ASSERT(s != NULL); + nni_mtx_lock(&s->s_mx); if (s->s_closing) { // We're closing, bail out. - return (NNG_ECLOSED); - } - if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { + rv = NNG_ECLOSED; + } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { // Peer protocol mismatch. - return (NNG_EPROTO); - } - if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) { - // Protocol rejection for other reasons. - // E.g. pair and already have active connected partner. - return (rv); + rv = NNG_EPROTO; + } else { + // Protocol can reject for other reasons. + rv = s->s_pipe_ops.pipe_start(pdata); } + nni_mtx_unlock(&s->s_mx); return (0); } -static void -nni_sock_pipe_start_cb(void *arg) +int +nni_sock_pipe_add(nni_sock *s, nni_pipe *p) { - nni_pipe *pipe = arg; - nni_aio * aio = &pipe->p_start_aio; + int rv; + void *pdata; - if (nni_aio_result(aio) != 0) { - // Failed I/O during start, abort everything. - nni_pipe_stop(pipe); - return; - } - if (nni_sock_pipe_start(pipe) != 0) { - nni_pipe_stop(pipe); - return; + if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) { + return (rv); } -} - -int -nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe) -{ - int rv; + nni_pipe_set_proto_data(p, pdata); // Initialize protocol pipe data. nni_mtx_lock(&s->s_mx); - nni_mtx_lock(&ep->ep_mtx); - - if ((s->s_closing) || (ep->ep_closed)) { - nni_mtx_unlock(&ep->ep_mtx); + if (s->s_closing) { nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } - rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&s->s_mx); - return (rv); - } - rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data); - if (rv != 0) { - nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_lock(&s->s_mx); - return (rv); - } - // Save the protocol destructor. - pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini; - pipe->p_sock = s; - pipe->p_ep = ep; - - nni_list_append(&s->s_pipes, pipe); - nni_list_append(&ep->ep_pipes, pipe); + nni_list_append(&s->s_pipes, p); // Start the initial negotiation I/O... - if (pipe->p_tran_ops.p_start == NULL) { - if (nni_sock_pipe_start(pipe) != 0) { - nni_pipe_stop(pipe); - } - } else { - pipe->p_tran_ops.p_start( - pipe->p_tran_data, &pipe->p_start_aio); - } + nni_pipe_start(p); - nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&s->s_mx); return (0); } -int -nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) -{ - int rv; - void *pdata = nni_pipe_get_proto_data(pipe); - - nni_mtx_lock(&sock->s_mx); - - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_EPROTO); - } - - if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } - - nni_mtx_unlock(&sock->s_mx); - - return (0); -} - void nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { @@ -197,23 +128,20 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) pdata = nni_pipe_get_proto_data(pipe); - // Stop any pending negotiation. - nni_aio_stop(&pipe->p_start_aio); - - nni_mtx_lock(&sock->s_mx); - if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) { - nni_mtx_unlock(&sock->s_mx); - return; - } - - sock->s_pipe_ops.pipe_stop(pdata); - if (nni_list_active(&sock->s_pipes, pipe)) { - nni_list_remove(&sock->s_pipes, pipe); - if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { - nni_cv_wake(&sock->s_cv); + if (pdata != NULL) { + nni_mtx_lock(&sock->s_mx); + sock->s_pipe_ops.pipe_stop(pdata); + if (nni_list_active(&sock->s_pipes, pipe)) { + nni_list_remove(&sock->s_pipes, pipe); + if (sock->s_closing && + nni_list_empty(&sock->s_pipes)) { + nni_cv_wake(&sock->s_cv); + } } + sock->s_pipe_ops.pipe_fini(pdata); + nni_pipe_set_proto_data(pipe, NULL); + nni_mtx_unlock(&sock->s_mx); } - nni_mtx_unlock(&sock->s_mx); } void @@ -329,6 +257,7 @@ nni_sock_destroy(nni_sock *s) nni_ev_fini(&s->s_recv_ev); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); + nni_cv_fini(&s->s_close_cv); nni_cv_fini(&s->s_cv); nni_mtx_fini(&s->s_mx); NNI_FREE_STRUCT(s); @@ -372,6 +301,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if (((rv = nni_mtx_init(&s->s_mx)) != 0) || ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) || + ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) || ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) || ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) || ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || @@ -446,6 +376,7 @@ nni_sock_shutdown(nni_sock *sock) { nni_pipe *pipe; nni_ep * ep; + nni_ep * nep; nni_time linger; nni_mtx_lock(&sock->s_mx); @@ -468,7 +399,7 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_close(ep); + nni_ep_shutdown(ep); } nni_mtx_unlock(&sock->s_mx); @@ -498,10 +429,20 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each ep, arrange for it to teardown hard. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_stop(ep); + // Go through the endpoint list, attempting to close them. + // We might already have a close in progress, in which case + // we skip past it; it will be removed from another thread. + nep = nni_list_first(&sock->s_eps); + while ((ep = nep) != NULL) { + nep = nni_list_next(&sock->s_eps, nep); + + if (nni_ep_hold(ep) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_ep_close(ep); + nni_mtx_lock(&sock->s_mx); + } } + // For each pipe, arrange for it to teardown hard. NNI_LIST_FOREACH (&sock->s_pipes, pipe) { nni_pipe_stop(pipe); @@ -527,38 +468,6 @@ nni_sock_shutdown(nni_sock *sock) return (0); } -void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) -{ - nni_pipe *pipe; - // If we're not on the list, then nothing to do. Be idempotent. - // Note that if the ep is not on a list, then we assume that we have - // exclusive access. Therefore the check for being active need not - // be locked. - if (!nni_list_node_active(&ep->ep_node)) { - return; - } - - // This is done under the endpoints lock, although the remove - // is done under that as well, we also make sure that we hold - // the socket lock in the remove step. - nni_mtx_lock(&ep->ep_mtx); - NNI_LIST_FOREACH (&ep->ep_pipes, pipe) { - nni_pipe_stop(pipe); - } - while (!nni_list_empty(&ep->ep_pipes)) { - nni_cv_wait(&ep->ep_cv); - } - nni_mtx_unlock(&ep->ep_mtx); - - nni_mtx_lock(&sock->s_mx); - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); - } - nni_mtx_unlock(&sock->s_mx); -} - // nni_sock_close shuts down the socket, then releases any resources // associated with it. It is a programmer error to reference the socket // after this function is called, as the pointer may reference invalid @@ -585,13 +494,17 @@ nni_sock_close(nni_sock *s) // nni_sock_closeall. This is idempotent. nni_list_node_remove(&s->s_node); - nni_mtx_unlock(&nni_sock_lk); - // Wait for all other references to drop. Note that we // have a reference already (from our caller). + while (s->s_refcnt > 1) { + nni_cv_wait(&s->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + + // Wait for pipe and eps to finish closing. nni_mtx_lock(&s->s_mx); - while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) || - (!nni_list_empty(&s->s_eps))) { + while ( + (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -742,77 +655,29 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) +nni_sock_ep_add(nni_sock *sock, nni_ep *ep) { - nni_ep *ep; - int rv; - nni_mtx_lock(&sock->s_mx); - if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) { + if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); - return (rv); + return (NNG_ECLOSED); } - nni_mtx_lock(&ep->ep_mtx); nni_list_append(&sock->s_eps, ep); - // Put a hold on the endpoint, for now. - ep->ep_refcnt++; - ep->ep_started = 1; - nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); - - if ((rv = nni_ep_dial(ep, flags)) != 0) { - nni_ep_stop(ep); - } else if (epp != NULL) { - *epp = ep; - } - - // Drop our endpoint hold. - nni_mtx_lock(&ep->ep_mtx); - if (rv != 0) { - ep->ep_started = 0; - } - ep->ep_refcnt--; - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); - - return (rv); + return (0); } -int -nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) +void +nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) { - nni_ep *ep; - int rv; - nni_mtx_lock(&sock->s_mx); - if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); + if (nni_list_active(&sock->s_eps, ep)) { + nni_list_remove(&sock->s_eps, ep); + if ((sock->s_closed) && (nni_list_empty(&sock->s_eps))) { + nni_cv_wake(&sock->s_cv); + } } - - nni_list_append(&sock->s_eps, ep); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_refcnt++; - ep->ep_started = 1; - nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); - - if ((rv = nni_ep_listen(ep, flags)) != 0) { - nni_ep_stop(ep); - } else if (epp != NULL) { - *epp = ep; - } - - // Drop our endpoint hold. - nni_mtx_lock(&ep->ep_mtx); - if (rv != 0) { - ep->ep_started = 0; - } - ep->ep_refcnt--; - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); - - return (rv); } void |
