diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 165 |
1 files changed, 101 insertions, 64 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index c7de05fa..21b1b300 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1,22 +1,18 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * Socket implementation. - */ +// Socket implementation. -/* - * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain - * the upper read and write queues. - */ +// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain +// the upper read and write queues. nni_msgqueue_t nni_socket_sendq(nni_socket_t s) { @@ -31,6 +27,7 @@ nni_socket_recvq(nni_socket_t s) } +// nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket_t *sockp, uint16_t proto) { @@ -46,10 +43,22 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } sock->s_ops = *ops; + if ((rv = nni_mutex_create(&sock->s_mx)) != 0) { + nni_free(sock, sizeof (*sock)); + return (rv); + } + if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) { + nni_mutex_destroy(sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); + } + NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); - //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); + // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { + nni_cond_destroy(sock->s_cv); + nni_mutex_destroy(sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -58,51 +67,74 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } +// nni_socket_close closes the underlying socket. int nni_socket_close(nni_socket_t sock) { nni_pipe_t pipe; nni_endpt_t ep; - nni_msgqueue_close(sock->s_urq); - /* XXX: drain this? */ - nni_msgqueue_close(sock->s_uwq); nni_mutex_enter(sock->s_mx); + // Mark us closing, so no more EPs or changes can occur. + sock->s_closing = 1; + + // Stop all EPS. We're going to do this first, since we know + // we're closing. NNI_LIST_FOREACH (&sock->s_eps, ep) { #if 0 - nni_ep_stop(ep); - // OR.... - nni_mutex_enter(ep->ep_mx); - ep->ep_stop = 1; - nni_cond_broadcast(ep->ep_cond); - nni_mutex_exit(ep->ep_mx); + // XXX: Question: block for them now, or wait further down? + // Probably we can simply just watch for the first list... + // stopping EPs should be *dead* easy, and never block. + nni_ep_stop(ep); or nni_ep_shutdown(ep); #endif break; /* REMOVE ME */ } nni_mutex_exit(sock->s_mx); - /* XXX: close endpoints - no new pipes made... */ - - /* XXX: protocol shutdown */ - /* - * Paths to pipe close: - * - * - user calls nng_pipe_close() - * - protocol calls pipe_close() after underlying close - * - socket calls pipe close due to socket_close (here) - */ + // XXX: TODO. This is a place where we should drain the write side + // msgqueue, effectively getting a linger on the socket. The + // protocols will drain this queue, and should continue to run + // handling both transmit and receive until that's done. + // Note that *all* protocols need to monitor for this linger, even + // those that do not transmit. This way they can notice and go ahead + // and quickly shutdown their pipes; this keeps us from having to wait + // for *our* pipelist to become empty. This helps us ensure that + // they effectively get the chance to linger on their side, without + // requring us to do anything magical for them. + + // nni_msgqueue_drain(sock->s_uwq, sock->s_linger); + + // Now we should attempt to wait for the list of pipes to drop to + // zero -- indicating that the protocol has shut things down + // cleanly, voluntarily. (I.e. it finished its drain.) + nni_mutex_enter(sock->s_mx); + while (nni_list_first(&sock->s_pipes) != NULL) { + // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger); + int rv = NNG_ETIMEDOUT; + if (rv == NNG_ETIMEDOUT) { + break; + } + } - /* XXX: close remaining pipes */ - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_list_remove(&sock->s_pipes, pipe); - /* XXX: nni_pipe_destroy */ + // Time's up! Shut it down the hard way. + nni_msgqueue_close(sock->s_urq); + nni_msgqueue_close(sock->s_uwq); + // This gives the protocol notice, in case it didn't get the hint. + // XXX: In retrospect, this entry point seems redundant. + sock->s_ops.proto_shutdown(sock->s_data); + + // The protocol *MUST* give us back our pipes at this point, and + // quickly too! If this blocks for any non-trivial amount of time + // here, it indicates a protocol implementation bug. + while (nni_list_first(&sock->s_pipes) != NULL) { + nni_cond_wait(sock->s_cv); } - /* XXX: wait for workers to cease activity */ - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - /* XXX: nni_ep_destroy(ep); */ + // Wait to make sure endpoint listeners have shutdown and exited + // as well. They should have done so *long* ago. + while (nni_list_first(&sock->s_eps) != NULL) { + nni_cond_wait(sock->s_cv); } return (0); @@ -115,11 +147,9 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) int rv; int besteffort; - /* - * Senderr is typically set by protocols when the state machine - * indicates that it is no longer valid to send a message. E.g. - * a REP socket with no REQ pending. - */ + // Senderr is typically set by protocols when the state machine + // indicates that it is no longer valid to send a message. E.g. + // a REP socket with no REQ pending. nni_mutex_enter(sock->s_mx); if ((rv = sock->s_senderr) != 0) { nni_mutex_exit(sock->s_mx); @@ -136,15 +166,13 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) } if (besteffort) { - /* - * BestEffort mode -- if we cannot handle the message due to - * backpressure, we just throw it away, and don't complain. - */ + // BestEffort mode -- if we cannot handle the message due to + // backpressure, we just throw it away, and don't complain. tmout = 0; } rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); if (besteffort && (rv == NNG_EAGAIN)) { - /* Pretend this worked... it didn't, but pretend. */ + // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); return (0); } @@ -152,35 +180,40 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) } +// nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t -nni_socket_protocol(nni_socket_t sock) +nni_socket_proto(nni_socket_t sock) { return (sock->s_ops.proto_self); } - +// nni_socket_remove_pipe removes the pipe from the socket. This is often +// called by the protocol when a pipe is removed due to close. void -nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe) +nni_socket_rem_pipe(nni_socket_t sock, nni_pipe_t pipe) { nni_mutex_enter(sock->s_mx); if (pipe->p_sock != sock) { nni_mutex_exit(sock->s_mx); } - /* - * Remove the pipe from the protocol. Protocols may - * keep lists of pipes for managing their topologies. - */ + // Remove the pipe from the protocol. Protocols may + // keep lists of pipes for managing their topologies. sock->s_ops.proto_remove_pipe(sock->s_data, pipe); - /* Now remove it from our own list */ + // Now remove it from our own list. nni_list_remove(&sock->s_pipes, pipe); pipe->p_sock = NULL; - // XXX: Redial + // XXX: TODO: Redial // XXX: also publish event... - //if (pipe->p_ep != NULL) { + // if (pipe->p_ep != NULL) { // nn_endpt_remove_pipe(pipe->p_ep, pipe) - //} + // } + + // If we're closing, wake the socket if we finished draining. + if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { + nni_cond_broadcast(sock->s_cv); + } nni_mutex_exit(sock->s_mx); } @@ -191,6 +224,10 @@ nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe) int rv; nni_mutex_enter(sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { nni_mutex_exit(sock->s_mx); return (rv); |
