aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 01:05:43 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 01:05:43 -0800
commitb92672e20420683e73bfc017956ac6ef2b6b793b (patch)
treefb110918430e41a3751ea63801f8acb7c21b7db9 /src
parent0283e8bbef80d42fda1cd9b21e4d14673c3641b8 (diff)
downloadnng-b92672e20420683e73bfc017956ac6ef2b6b793b.tar.gz
nng-b92672e20420683e73bfc017956ac6ef2b6b793b.tar.bz2
nng-b92672e20420683e73bfc017956ac6ef2b6b793b.zip
Logic for socket shutdown, cleanup, and draining figured out.
There's work to do still, but I've left clear indications of the design in comments. Some ugly mysteries are now solved.
Diffstat (limited to 'src')
-rw-r--r--src/core/panic.c6
-rw-r--r--src/core/panic.h46
-rw-r--r--src/core/protocol.h4
-rw-r--r--src/core/socket.c165
-rw-r--r--src/core/socket.h33
-rw-r--r--src/nng.c2
-rw-r--r--src/protocol/pair/pair.c28
7 files changed, 158 insertions, 126 deletions
diff --git a/src/core/panic.c b/src/core/panic.c
index 65eb0747..d0d6078d 100644
--- a/src/core/panic.c
+++ b/src/core/panic.c
@@ -18,9 +18,7 @@
#include "core/nng_impl.h"
-//
// Panic handling.
-//
static void
nni_show_backtrace(void)
{
@@ -44,13 +42,11 @@ nni_show_backtrace(void)
}
-//
// nni_panic shows a panic message, a possible stack bracktrace, then aborts
// the process/program. This should only be called when a condition arises
// that should not be possible, e.g. a programming assertion failure. It should
// not be called in situations such as ENOMEM, as nni_panic is fairly rude
// to any application it may be called from within.
-//
void
nni_panic(const char *fmt, ...)
{
@@ -75,6 +71,6 @@ nni_panic(const char *fmt, ...)
void
nni_println(const char *msg)
{
- /* TODO: support redirection of this later. */
+ // TODO: support redirection of this later.
nni_plat_println(msg);
}
diff --git a/src/core/panic.h b/src/core/panic.h
index 88ddb7b0..323d64a1 100644
--- a/src/core/panic.h
+++ b/src/core/panic.h
@@ -1,33 +1,29 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_PANIC_H
#define CORE_PANIC_H
-/*
- * nni_panic is used to terminate the process with prejudice, and
- * should only be called in the face of a critical programming error,
- * or other situation where it would be unsafe to attempt to continue.
- * As this crashes the program, it should never be used when factors outside
- * the program can cause it, such as receiving protocol errors, or running
- * out of memory. Its better in those cases to return an error to the
- * program and let the caller handle the error situation.
- */
+// nni_panic is used to terminate the process with prejudice, and
+// should only be called in the face of a critical programming error,
+// or other situation where it would be unsafe to attempt to continue.
+// As this crashes the program, it should never be used when factors outside
+// the program can cause it, such as receiving protocol errors, or running
+// out of memory. Its better in those cases to return an error to the
+// program and let the caller handle the error situation.
extern void nni_panic(const char *, ...);
-/*
- * nni_println is used to print output to a debug console. This should only
- * be used in the most dire of circumstances -- such as during an assertion
- * failure that is going to cause the program to crash. After the string is
- * emitted, a new line character is emitted, so the string should not
- * include one.
- */
+// nni_println is used to print output to a debug console. This should only
+// be used in the most dire of circumstances -- such as during an assertion
+// failure that is going to cause the program to crash. After the string is
+// emitted, a new line character is emitted, so the string should not
+// include one.
extern void nni_println(const char *);
-#endif /* CORE_PANIC_H */
+#endif // CORE_PANIC_H
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 2b4625cb..b9760725 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -49,8 +49,10 @@ struct nni_protocol {
* Shutdown the protocol instance, including giving time to
* drain any outbound frames (linger). The protocol is not
* required to honor the linger.
+ * XXX: This is probably redundant -- protocol should notice
+ * drain by getting NNG_ECLOSED on the upper write queue.
*/
- void (*proto_shutdown)(void *, uint64_t);
+ void (*proto_shutdown)(void *);
/*
* Add and remove pipes. These are called as connections are
diff --git a/src/core/socket.c b/src/core/socket.c
index c7de05fa..21b1b300 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1,22 +1,18 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "core/nng_impl.h"
-/*
- * Socket implementation.
- */
+// Socket implementation.
-/*
- * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
- * the upper read and write queues.
- */
+// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
+// the upper read and write queues.
nni_msgqueue_t
nni_socket_sendq(nni_socket_t s)
{
@@ -31,6 +27,7 @@ nni_socket_recvq(nni_socket_t s)
}
+// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket_t *sockp, uint16_t proto)
{
@@ -46,10 +43,22 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
sock->s_ops = *ops;
+ if ((rv = nni_mutex_create(&sock->s_mx)) != 0) {
+ nni_free(sock, sizeof (*sock));
+ return (rv);
+ }
+ if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) {
+ nni_mutex_destroy(sock->s_mx);
+ nni_free(sock, sizeof (*sock));
+ return (rv);
+ }
+
NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node);
- //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
+ // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
+ nni_cond_destroy(sock->s_cv);
+ nni_mutex_destroy(sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -58,51 +67,74 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
+// nni_socket_close closes the underlying socket.
int
nni_socket_close(nni_socket_t sock)
{
nni_pipe_t pipe;
nni_endpt_t ep;
- nni_msgqueue_close(sock->s_urq);
- /* XXX: drain this? */
- nni_msgqueue_close(sock->s_uwq);
nni_mutex_enter(sock->s_mx);
+ // Mark us closing, so no more EPs or changes can occur.
+ sock->s_closing = 1;
+
+ // Stop all EPS. We're going to do this first, since we know
+ // we're closing.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
#if 0
- nni_ep_stop(ep);
- // OR....
- nni_mutex_enter(ep->ep_mx);
- ep->ep_stop = 1;
- nni_cond_broadcast(ep->ep_cond);
- nni_mutex_exit(ep->ep_mx);
+ // XXX: Question: block for them now, or wait further down?
+ // Probably we can simply just watch for the first list...
+ // stopping EPs should be *dead* easy, and never block.
+ nni_ep_stop(ep); or nni_ep_shutdown(ep);
#endif
break; /* REMOVE ME */
}
nni_mutex_exit(sock->s_mx);
- /* XXX: close endpoints - no new pipes made... */
-
- /* XXX: protocol shutdown */
- /*
- * Paths to pipe close:
- *
- * - user calls nng_pipe_close()
- * - protocol calls pipe_close() after underlying close
- * - socket calls pipe close due to socket_close (here)
- */
+ // XXX: TODO. This is a place where we should drain the write side
+ // msgqueue, effectively getting a linger on the socket. The
+ // protocols will drain this queue, and should continue to run
+ // handling both transmit and receive until that's done.
+ // Note that *all* protocols need to monitor for this linger, even
+ // those that do not transmit. This way they can notice and go ahead
+ // and quickly shutdown their pipes; this keeps us from having to wait
+ // for *our* pipelist to become empty. This helps us ensure that
+ // they effectively get the chance to linger on their side, without
+ // requring us to do anything magical for them.
+
+ // nni_msgqueue_drain(sock->s_uwq, sock->s_linger);
+
+ // Now we should attempt to wait for the list of pipes to drop to
+ // zero -- indicating that the protocol has shut things down
+ // cleanly, voluntarily. (I.e. it finished its drain.)
+ nni_mutex_enter(sock->s_mx);
+ while (nni_list_first(&sock->s_pipes) != NULL) {
+ // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger);
+ int rv = NNG_ETIMEDOUT;
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
+ }
- /* XXX: close remaining pipes */
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_list_remove(&sock->s_pipes, pipe);
- /* XXX: nni_pipe_destroy */
+ // Time's up! Shut it down the hard way.
+ nni_msgqueue_close(sock->s_urq);
+ nni_msgqueue_close(sock->s_uwq);
+ // This gives the protocol notice, in case it didn't get the hint.
+ // XXX: In retrospect, this entry point seems redundant.
+ sock->s_ops.proto_shutdown(sock->s_data);
+
+ // The protocol *MUST* give us back our pipes at this point, and
+ // quickly too! If this blocks for any non-trivial amount of time
+ // here, it indicates a protocol implementation bug.
+ while (nni_list_first(&sock->s_pipes) != NULL) {
+ nni_cond_wait(sock->s_cv);
}
- /* XXX: wait for workers to cease activity */
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
- /* XXX: nni_ep_destroy(ep); */
+ // Wait to make sure endpoint listeners have shutdown and exited
+ // as well. They should have done so *long* ago.
+ while (nni_list_first(&sock->s_eps) != NULL) {
+ nni_cond_wait(sock->s_cv);
}
return (0);
@@ -115,11 +147,9 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
int rv;
int besteffort;
- /*
- * Senderr is typically set by protocols when the state machine
- * indicates that it is no longer valid to send a message. E.g.
- * a REP socket with no REQ pending.
- */
+ // Senderr is typically set by protocols when the state machine
+ // indicates that it is no longer valid to send a message. E.g.
+ // a REP socket with no REQ pending.
nni_mutex_enter(sock->s_mx);
if ((rv = sock->s_senderr) != 0) {
nni_mutex_exit(sock->s_mx);
@@ -136,15 +166,13 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
}
if (besteffort) {
- /*
- * BestEffort mode -- if we cannot handle the message due to
- * backpressure, we just throw it away, and don't complain.
- */
+ // BestEffort mode -- if we cannot handle the message due to
+ // backpressure, we just throw it away, and don't complain.
tmout = 0;
}
rv = nni_msgqueue_put(sock->s_uwq, msg, tmout);
if (besteffort && (rv == NNG_EAGAIN)) {
- /* Pretend this worked... it didn't, but pretend. */
+ // Pretend this worked... it didn't, but pretend.
nni_msg_free(msg);
return (0);
}
@@ -152,35 +180,40 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
}
+// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
-nni_socket_protocol(nni_socket_t sock)
+nni_socket_proto(nni_socket_t sock)
{
return (sock->s_ops.proto_self);
}
-
+// nni_socket_remove_pipe removes the pipe from the socket. This is often
+// called by the protocol when a pipe is removed due to close.
void
-nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe)
+nni_socket_rem_pipe(nni_socket_t sock, nni_pipe_t pipe)
{
nni_mutex_enter(sock->s_mx);
if (pipe->p_sock != sock) {
nni_mutex_exit(sock->s_mx);
}
- /*
- * Remove the pipe from the protocol. Protocols may
- * keep lists of pipes for managing their topologies.
- */
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
sock->s_ops.proto_remove_pipe(sock->s_data, pipe);
- /* Now remove it from our own list */
+ // Now remove it from our own list.
nni_list_remove(&sock->s_pipes, pipe);
pipe->p_sock = NULL;
- // XXX: Redial
+ // XXX: TODO: Redial
// XXX: also publish event...
- //if (pipe->p_ep != NULL) {
+ // if (pipe->p_ep != NULL) {
// nn_endpt_remove_pipe(pipe->p_ep, pipe)
- //}
+ // }
+
+ // If we're closing, wake the socket if we finished draining.
+ if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
+ nni_cond_broadcast(sock->s_cv);
+ }
nni_mutex_exit(sock->s_mx);
}
@@ -191,6 +224,10 @@ nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe)
int rv;
nni_mutex_enter(sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
nni_mutex_exit(sock->s_mx);
return (rv);
diff --git a/src/core/socket.h b/src/core/socket.h
index da531836..366e9958 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -10,42 +10,37 @@
#ifndef CORE_SOCKET_H
#define CORE_SOCKET_H
-/*
- * NB: This structure is supplied here for use by the CORE. Use of this library
- * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
- * TRANSPORTS.
- */
+// NB: This structure is supplied here for use by the CORE. Use of this library
+// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
+// TRANSPORTS.
struct nng_socket {
nni_mutex_t s_mx;
+ nni_cond_t s_cv;
- nni_msgqueue_t s_uwq; /* Upper write queue. */
- nni_msgqueue_t s_urq; /* Upper read queue. */
+ nni_msgqueue_t s_uwq; // Upper write queue
+ nni_msgqueue_t s_urq; // Upper read queue
struct nni_protocol s_ops;
- void * s_data; /* Protocol private. */
+ void * s_data; // Protocol private
- /* options */
+ /* XXX: options */
nni_list_t s_eps;
nni_list_t s_pipes;
- int s_besteffort; /* Best effort mode delivery. */
- int s_senderr; /* Protocol state machine use. */
+ int s_closing; // Socket is closing
+ int s_besteffort; // Best effort mode delivery
+ int s_senderr; // Protocol state machine use
};
-/*
- * Internally used socket API. Again, this stuff is not part of our public
- * API.
- */
-
extern int nni_socket_create(nni_socket_t *, uint16_t);
extern int nni_socket_close(nni_socket_t);
extern int nni_socket_add_pipe(nni_socket_t, nni_pipe_t);
-extern void nni_socket_remove_pipe(nni_socket_t, nni_pipe_t);
-extern uint16_t nni_socket_protocol(nni_socket_t);
+extern void nni_socket_rem_pipe(nni_socket_t, nni_pipe_t);
+extern uint16_t nni_socket_proto(nni_socket_t);
extern int nni_socket_setopt(nni_socket_t, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket_t, int, void *, size_t *);
-#endif /* CORE_SOCKET_H */
+#endif // CORE_SOCKET_H
diff --git a/src/nng.c b/src/nng.c
index a2398a9d..d818424a 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -47,7 +47,7 @@ uint16_t
nng_socket_protocol(nng_socket_t s)
{
nni_init();
- return (nni_socket_protocol(s));
+ return (nni_socket_proto(s));
}
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index ea5fc637..5c1862a1 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -12,9 +12,9 @@
#include "core/nng_impl.h"
-//
// Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern.
-//
+// While a peer is connected to the server, all other peer connection
+// attempts are discarded.
// An nni_pair_sock is our per-socket protocol private structure.
typedef struct nni_pair_sock {
@@ -67,23 +67,29 @@ nni_pair_destroy(void *arg)
{
nni_pair_sock *pair = arg;
+ // If we had any worker threads that we have not unregistered,
+ // this wold be the time to shut them all down. We don't, because
+ // the socket already shut us down, and we don't have any other
+ // threads that run.
nni_mutex_destroy(pair->mx);
nni_free(pair, sizeof (*pair));
}
static void
-nni_pair_shutdown(void *arg, uint64_t usec)
+nni_pair_shutdown(void *arg)
{
nni_pair_sock *pair = arg;
nni_pipe_t pipe;
- NNI_ARG_UNUSED(usec);
-
- // XXX: correct implementation here is to set a draining flag,
- // and wait a bit for the sender to finish draining (linger),
- // then reap the pipe. For now we just act a little more harshly.
-
+ // This just causes the protocol to close its various pipes.
+ // The draining logic, if any, will have been performed in the
+ // upper layer socket.
+ //
+ // Closing the pipes is intended to cause the receiver on them
+ // to notice the failure, and ultimately call back into the socket
+ // to unregister them. The socket can use this to wait for a clean
+ // shutdown of all pipe workers.
nni_mutex_enter(pair->mx);
pipe = pair->pipe;
pair->pipe = NULL;
@@ -183,7 +189,7 @@ nni_pair_sender(void *arg)
}
nni_msgqueue_signal(urq, &pp->sigclose);
nni_pipe_close(pipe);
- nni_socket_remove_pipe(pair->sock, pipe);
+ nni_socket_rem_pipe(pair->sock, pipe);
}
@@ -218,7 +224,7 @@ nni_pair_receiver(void *arg)
}
nni_msgqueue_signal(uwq, &pp->sigclose);
nni_pipe_close(pipe);
- nni_socket_remove_pipe(pair->sock, pipe);
+ nni_socket_rem_pipe(pair->sock, pipe);
}