diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 01:05:43 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 01:05:43 -0800 |
| commit | b92672e20420683e73bfc017956ac6ef2b6b793b (patch) | |
| tree | fb110918430e41a3751ea63801f8acb7c21b7db9 /src/core/socket.c | |
| parent | 0283e8bbef80d42fda1cd9b21e4d14673c3641b8 (diff) | |
| download | nng-b92672e20420683e73bfc017956ac6ef2b6b793b.tar.gz nng-b92672e20420683e73bfc017956ac6ef2b6b793b.tar.bz2 nng-b92672e20420683e73bfc017956ac6ef2b6b793b.zip | |
Logic for socket shutdown, cleanup, and draining figured out.
There's work to do still, but I've left clear indications of the
design in comments. Some ugly mysteries are now solved.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 165 |
1 files changed, 101 insertions, 64 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index c7de05fa..21b1b300 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1,22 +1,18 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * Socket implementation. - */ +// Socket implementation. -/* - * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain - * the upper read and write queues. - */ +// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain +// the upper read and write queues. nni_msgqueue_t nni_socket_sendq(nni_socket_t s) { @@ -31,6 +27,7 @@ nni_socket_recvq(nni_socket_t s) } +// nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket_t *sockp, uint16_t proto) { @@ -46,10 +43,22 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } sock->s_ops = *ops; + if ((rv = nni_mutex_create(&sock->s_mx)) != 0) { + nni_free(sock, sizeof (*sock)); + return (rv); + } + if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) { + nni_mutex_destroy(sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); + } + NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); - //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); + // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { + nni_cond_destroy(sock->s_cv); + nni_mutex_destroy(sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -58,51 +67,74 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } +// nni_socket_close closes the underlying socket. int nni_socket_close(nni_socket_t sock) { nni_pipe_t pipe; nni_endpt_t ep; - nni_msgqueue_close(sock->s_urq); - /* XXX: drain this? */ - nni_msgqueue_close(sock->s_uwq); nni_mutex_enter(sock->s_mx); + // Mark us closing, so no more EPs or changes can occur. + sock->s_closing = 1; + + // Stop all EPS. We're going to do this first, since we know + // we're closing. NNI_LIST_FOREACH (&sock->s_eps, ep) { #if 0 - nni_ep_stop(ep); - // OR.... - nni_mutex_enter(ep->ep_mx); - ep->ep_stop = 1; - nni_cond_broadcast(ep->ep_cond); - nni_mutex_exit(ep->ep_mx); + // XXX: Question: block for them now, or wait further down? + // Probably we can simply just watch for the first list... + // stopping EPs should be *dead* easy, and never block. + nni_ep_stop(ep); or nni_ep_shutdown(ep); #endif break; /* REMOVE ME */ } nni_mutex_exit(sock->s_mx); - /* XXX: close endpoints - no new pipes made... */ - - /* XXX: protocol shutdown */ - /* - * Paths to pipe close: - * - * - user calls nng_pipe_close() - * - protocol calls pipe_close() after underlying close - * - socket calls pipe close due to socket_close (here) - */ + // XXX: TODO. This is a place where we should drain the write side + // msgqueue, effectively getting a linger on the socket. The + // protocols will drain this queue, and should continue to run + // handling both transmit and receive until that's done. + // Note that *all* protocols need to monitor for this linger, even + // those that do not transmit. This way they can notice and go ahead + // and quickly shutdown their pipes; this keeps us from having to wait + // for *our* pipelist to become empty. This helps us ensure that + // they effectively get the chance to linger on their side, without + // requring us to do anything magical for them. + + // nni_msgqueue_drain(sock->s_uwq, sock->s_linger); + + // Now we should attempt to wait for the list of pipes to drop to + // zero -- indicating that the protocol has shut things down + // cleanly, voluntarily. (I.e. it finished its drain.) + nni_mutex_enter(sock->s_mx); + while (nni_list_first(&sock->s_pipes) != NULL) { + // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger); + int rv = NNG_ETIMEDOUT; + if (rv == NNG_ETIMEDOUT) { + break; + } + } - /* XXX: close remaining pipes */ - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_list_remove(&sock->s_pipes, pipe); - /* XXX: nni_pipe_destroy */ + // Time's up! Shut it down the hard way. + nni_msgqueue_close(sock->s_urq); + nni_msgqueue_close(sock->s_uwq); + // This gives the protocol notice, in case it didn't get the hint. + // XXX: In retrospect, this entry point seems redundant. + sock->s_ops.proto_shutdown(sock->s_data); + + // The protocol *MUST* give us back our pipes at this point, and + // quickly too! If this blocks for any non-trivial amount of time + // here, it indicates a protocol implementation bug. + while (nni_list_first(&sock->s_pipes) != NULL) { + nni_cond_wait(sock->s_cv); } - /* XXX: wait for workers to cease activity */ - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - /* XXX: nni_ep_destroy(ep); */ + // Wait to make sure endpoint listeners have shutdown and exited + // as well. They should have done so *long* ago. + while (nni_list_first(&sock->s_eps) != NULL) { + nni_cond_wait(sock->s_cv); } return (0); @@ -115,11 +147,9 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) int rv; int besteffort; - /* - * Senderr is typically set by protocols when the state machine - * indicates that it is no longer valid to send a message. E.g. - * a REP socket with no REQ pending. - */ + // Senderr is typically set by protocols when the state machine + // indicates that it is no longer valid to send a message. E.g. + // a REP socket with no REQ pending. nni_mutex_enter(sock->s_mx); if ((rv = sock->s_senderr) != 0) { nni_mutex_exit(sock->s_mx); @@ -136,15 +166,13 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) } if (besteffort) { - /* - * BestEffort mode -- if we cannot handle the message due to - * backpressure, we just throw it away, and don't complain. - */ + // BestEffort mode -- if we cannot handle the message due to + // backpressure, we just throw it away, and don't complain. tmout = 0; } rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); if (besteffort && (rv == NNG_EAGAIN)) { - /* Pretend this worked... it didn't, but pretend. */ + // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); return (0); } @@ -152,35 +180,40 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) } +// nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t -nni_socket_protocol(nni_socket_t sock) +nni_socket_proto(nni_socket_t sock) { return (sock->s_ops.proto_self); } - +// nni_socket_remove_pipe removes the pipe from the socket. This is often +// called by the protocol when a pipe is removed due to close. void -nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe) +nni_socket_rem_pipe(nni_socket_t sock, nni_pipe_t pipe) { nni_mutex_enter(sock->s_mx); if (pipe->p_sock != sock) { nni_mutex_exit(sock->s_mx); } - /* - * Remove the pipe from the protocol. Protocols may - * keep lists of pipes for managing their topologies. - */ + // Remove the pipe from the protocol. Protocols may + // keep lists of pipes for managing their topologies. sock->s_ops.proto_remove_pipe(sock->s_data, pipe); - /* Now remove it from our own list */ + // Now remove it from our own list. nni_list_remove(&sock->s_pipes, pipe); pipe->p_sock = NULL; - // XXX: Redial + // XXX: TODO: Redial // XXX: also publish event... - //if (pipe->p_ep != NULL) { + // if (pipe->p_ep != NULL) { // nn_endpt_remove_pipe(pipe->p_ep, pipe) - //} + // } + + // If we're closing, wake the socket if we finished draining. + if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { + nni_cond_broadcast(sock->s_cv); + } nni_mutex_exit(sock->s_mx); } @@ -191,6 +224,10 @@ nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe) int rv; nni_mutex_enter(sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { nni_mutex_exit(sock->s_mx); return (rv); |
