aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c165
1 files changed, 101 insertions, 64 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index c7de05fa..21b1b300 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1,22 +1,18 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "core/nng_impl.h"
-/*
- * Socket implementation.
- */
+// Socket implementation.
-/*
- * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
- * the upper read and write queues.
- */
+// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
+// the upper read and write queues.
nni_msgqueue_t
nni_socket_sendq(nni_socket_t s)
{
@@ -31,6 +27,7 @@ nni_socket_recvq(nni_socket_t s)
}
+// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket_t *sockp, uint16_t proto)
{
@@ -46,10 +43,22 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
sock->s_ops = *ops;
+ if ((rv = nni_mutex_create(&sock->s_mx)) != 0) {
+ nni_free(sock, sizeof (*sock));
+ return (rv);
+ }
+ if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) {
+ nni_mutex_destroy(sock->s_mx);
+ nni_free(sock, sizeof (*sock));
+ return (rv);
+ }
+
NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node);
- //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
+ // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
+ nni_cond_destroy(sock->s_cv);
+ nni_mutex_destroy(sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -58,51 +67,74 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
+// nni_socket_close closes the underlying socket.
int
nni_socket_close(nni_socket_t sock)
{
nni_pipe_t pipe;
nni_endpt_t ep;
- nni_msgqueue_close(sock->s_urq);
- /* XXX: drain this? */
- nni_msgqueue_close(sock->s_uwq);
nni_mutex_enter(sock->s_mx);
+ // Mark us closing, so no more EPs or changes can occur.
+ sock->s_closing = 1;
+
+ // Stop all EPS. We're going to do this first, since we know
+ // we're closing.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
#if 0
- nni_ep_stop(ep);
- // OR....
- nni_mutex_enter(ep->ep_mx);
- ep->ep_stop = 1;
- nni_cond_broadcast(ep->ep_cond);
- nni_mutex_exit(ep->ep_mx);
+ // XXX: Question: block for them now, or wait further down?
+ // Probably we can simply just watch for the first list...
+ // stopping EPs should be *dead* easy, and never block.
+ nni_ep_stop(ep); or nni_ep_shutdown(ep);
#endif
break; /* REMOVE ME */
}
nni_mutex_exit(sock->s_mx);
- /* XXX: close endpoints - no new pipes made... */
-
- /* XXX: protocol shutdown */
- /*
- * Paths to pipe close:
- *
- * - user calls nng_pipe_close()
- * - protocol calls pipe_close() after underlying close
- * - socket calls pipe close due to socket_close (here)
- */
+ // XXX: TODO. This is a place where we should drain the write side
+ // msgqueue, effectively getting a linger on the socket. The
+ // protocols will drain this queue, and should continue to run
+ // handling both transmit and receive until that's done.
+ // Note that *all* protocols need to monitor for this linger, even
+ // those that do not transmit. This way they can notice and go ahead
+ // and quickly shutdown their pipes; this keeps us from having to wait
+ // for *our* pipelist to become empty. This helps us ensure that
+ // they effectively get the chance to linger on their side, without
+ // requring us to do anything magical for them.
+
+ // nni_msgqueue_drain(sock->s_uwq, sock->s_linger);
+
+ // Now we should attempt to wait for the list of pipes to drop to
+ // zero -- indicating that the protocol has shut things down
+ // cleanly, voluntarily. (I.e. it finished its drain.)
+ nni_mutex_enter(sock->s_mx);
+ while (nni_list_first(&sock->s_pipes) != NULL) {
+ // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger);
+ int rv = NNG_ETIMEDOUT;
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
+ }
- /* XXX: close remaining pipes */
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_list_remove(&sock->s_pipes, pipe);
- /* XXX: nni_pipe_destroy */
+ // Time's up! Shut it down the hard way.
+ nni_msgqueue_close(sock->s_urq);
+ nni_msgqueue_close(sock->s_uwq);
+ // This gives the protocol notice, in case it didn't get the hint.
+ // XXX: In retrospect, this entry point seems redundant.
+ sock->s_ops.proto_shutdown(sock->s_data);
+
+ // The protocol *MUST* give us back our pipes at this point, and
+ // quickly too! If this blocks for any non-trivial amount of time
+ // here, it indicates a protocol implementation bug.
+ while (nni_list_first(&sock->s_pipes) != NULL) {
+ nni_cond_wait(sock->s_cv);
}
- /* XXX: wait for workers to cease activity */
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
- /* XXX: nni_ep_destroy(ep); */
+ // Wait to make sure endpoint listeners have shutdown and exited
+ // as well. They should have done so *long* ago.
+ while (nni_list_first(&sock->s_eps) != NULL) {
+ nni_cond_wait(sock->s_cv);
}
return (0);
@@ -115,11 +147,9 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
int rv;
int besteffort;
- /*
- * Senderr is typically set by protocols when the state machine
- * indicates that it is no longer valid to send a message. E.g.
- * a REP socket with no REQ pending.
- */
+ // Senderr is typically set by protocols when the state machine
+ // indicates that it is no longer valid to send a message. E.g.
+ // a REP socket with no REQ pending.
nni_mutex_enter(sock->s_mx);
if ((rv = sock->s_senderr) != 0) {
nni_mutex_exit(sock->s_mx);
@@ -136,15 +166,13 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
}
if (besteffort) {
- /*
- * BestEffort mode -- if we cannot handle the message due to
- * backpressure, we just throw it away, and don't complain.
- */
+ // BestEffort mode -- if we cannot handle the message due to
+ // backpressure, we just throw it away, and don't complain.
tmout = 0;
}
rv = nni_msgqueue_put(sock->s_uwq, msg, tmout);
if (besteffort && (rv == NNG_EAGAIN)) {
- /* Pretend this worked... it didn't, but pretend. */
+ // Pretend this worked... it didn't, but pretend.
nni_msg_free(msg);
return (0);
}
@@ -152,35 +180,40 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
}
+// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
-nni_socket_protocol(nni_socket_t sock)
+nni_socket_proto(nni_socket_t sock)
{
return (sock->s_ops.proto_self);
}
-
+// nni_socket_remove_pipe removes the pipe from the socket. This is often
+// called by the protocol when a pipe is removed due to close.
void
-nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe)
+nni_socket_rem_pipe(nni_socket_t sock, nni_pipe_t pipe)
{
nni_mutex_enter(sock->s_mx);
if (pipe->p_sock != sock) {
nni_mutex_exit(sock->s_mx);
}
- /*
- * Remove the pipe from the protocol. Protocols may
- * keep lists of pipes for managing their topologies.
- */
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
sock->s_ops.proto_remove_pipe(sock->s_data, pipe);
- /* Now remove it from our own list */
+ // Now remove it from our own list.
nni_list_remove(&sock->s_pipes, pipe);
pipe->p_sock = NULL;
- // XXX: Redial
+ // XXX: TODO: Redial
// XXX: also publish event...
- //if (pipe->p_ep != NULL) {
+ // if (pipe->p_ep != NULL) {
// nn_endpt_remove_pipe(pipe->p_ep, pipe)
- //}
+ // }
+
+ // If we're closing, wake the socket if we finished draining.
+ if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
+ nni_cond_broadcast(sock->s_cv);
+ }
nni_mutex_exit(sock->s_mx);
}
@@ -191,6 +224,10 @@ nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe)
int rv;
nni_mutex_enter(sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
nni_mutex_exit(sock->s_mx);
return (rv);