aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/panic.c6
-rw-r--r--src/core/panic.h46
-rw-r--r--src/core/protocol.h4
-rw-r--r--src/core/socket.c165
-rw-r--r--src/core/socket.h33
5 files changed, 140 insertions, 114 deletions
diff --git a/src/core/panic.c b/src/core/panic.c
index 65eb0747..d0d6078d 100644
--- a/src/core/panic.c
+++ b/src/core/panic.c
@@ -18,9 +18,7 @@
#include "core/nng_impl.h"
-//
// Panic handling.
-//
static void
nni_show_backtrace(void)
{
@@ -44,13 +42,11 @@ nni_show_backtrace(void)
}
-//
// nni_panic shows a panic message, a possible stack bracktrace, then aborts
// the process/program. This should only be called when a condition arises
// that should not be possible, e.g. a programming assertion failure. It should
// not be called in situations such as ENOMEM, as nni_panic is fairly rude
// to any application it may be called from within.
-//
void
nni_panic(const char *fmt, ...)
{
@@ -75,6 +71,6 @@ nni_panic(const char *fmt, ...)
void
nni_println(const char *msg)
{
- /* TODO: support redirection of this later. */
+ // TODO: support redirection of this later.
nni_plat_println(msg);
}
diff --git a/src/core/panic.h b/src/core/panic.h
index 88ddb7b0..323d64a1 100644
--- a/src/core/panic.h
+++ b/src/core/panic.h
@@ -1,33 +1,29 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_PANIC_H
#define CORE_PANIC_H
-/*
- * nni_panic is used to terminate the process with prejudice, and
- * should only be called in the face of a critical programming error,
- * or other situation where it would be unsafe to attempt to continue.
- * As this crashes the program, it should never be used when factors outside
- * the program can cause it, such as receiving protocol errors, or running
- * out of memory. Its better in those cases to return an error to the
- * program and let the caller handle the error situation.
- */
+// nni_panic is used to terminate the process with prejudice, and
+// should only be called in the face of a critical programming error,
+// or other situation where it would be unsafe to attempt to continue.
+// As this crashes the program, it should never be used when factors outside
+// the program can cause it, such as receiving protocol errors, or running
+// out of memory. Its better in those cases to return an error to the
+// program and let the caller handle the error situation.
extern void nni_panic(const char *, ...);
-/*
- * nni_println is used to print output to a debug console. This should only
- * be used in the most dire of circumstances -- such as during an assertion
- * failure that is going to cause the program to crash. After the string is
- * emitted, a new line character is emitted, so the string should not
- * include one.
- */
+// nni_println is used to print output to a debug console. This should only
+// be used in the most dire of circumstances -- such as during an assertion
+// failure that is going to cause the program to crash. After the string is
+// emitted, a new line character is emitted, so the string should not
+// include one.
extern void nni_println(const char *);
-#endif /* CORE_PANIC_H */
+#endif // CORE_PANIC_H
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 2b4625cb..b9760725 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -49,8 +49,10 @@ struct nni_protocol {
* Shutdown the protocol instance, including giving time to
* drain any outbound frames (linger). The protocol is not
* required to honor the linger.
+ * XXX: This is probably redundant -- protocol should notice
+ * drain by getting NNG_ECLOSED on the upper write queue.
*/
- void (*proto_shutdown)(void *, uint64_t);
+ void (*proto_shutdown)(void *);
/*
* Add and remove pipes. These are called as connections are
diff --git a/src/core/socket.c b/src/core/socket.c
index c7de05fa..21b1b300 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1,22 +1,18 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "core/nng_impl.h"
-/*
- * Socket implementation.
- */
+// Socket implementation.
-/*
- * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
- * the upper read and write queues.
- */
+// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
+// the upper read and write queues.
nni_msgqueue_t
nni_socket_sendq(nni_socket_t s)
{
@@ -31,6 +27,7 @@ nni_socket_recvq(nni_socket_t s)
}
+// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket_t *sockp, uint16_t proto)
{
@@ -46,10 +43,22 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
sock->s_ops = *ops;
+ if ((rv = nni_mutex_create(&sock->s_mx)) != 0) {
+ nni_free(sock, sizeof (*sock));
+ return (rv);
+ }
+ if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) {
+ nni_mutex_destroy(sock->s_mx);
+ nni_free(sock, sizeof (*sock));
+ return (rv);
+ }
+
NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node);
- //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
+ // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
+ nni_cond_destroy(sock->s_cv);
+ nni_mutex_destroy(sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -58,51 +67,74 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
+// nni_socket_close closes the underlying socket.
int
nni_socket_close(nni_socket_t sock)
{
nni_pipe_t pipe;
nni_endpt_t ep;
- nni_msgqueue_close(sock->s_urq);
- /* XXX: drain this? */
- nni_msgqueue_close(sock->s_uwq);
nni_mutex_enter(sock->s_mx);
+ // Mark us closing, so no more EPs or changes can occur.
+ sock->s_closing = 1;
+
+ // Stop all EPS. We're going to do this first, since we know
+ // we're closing.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
#if 0
- nni_ep_stop(ep);
- // OR....
- nni_mutex_enter(ep->ep_mx);
- ep->ep_stop = 1;
- nni_cond_broadcast(ep->ep_cond);
- nni_mutex_exit(ep->ep_mx);
+ // XXX: Question: block for them now, or wait further down?
+ // Probably we can simply just watch for the first list...
+ // stopping EPs should be *dead* easy, and never block.
+ nni_ep_stop(ep); or nni_ep_shutdown(ep);
#endif
break; /* REMOVE ME */
}
nni_mutex_exit(sock->s_mx);
- /* XXX: close endpoints - no new pipes made... */
-
- /* XXX: protocol shutdown */
- /*
- * Paths to pipe close:
- *
- * - user calls nng_pipe_close()
- * - protocol calls pipe_close() after underlying close
- * - socket calls pipe close due to socket_close (here)
- */
+ // XXX: TODO. This is a place where we should drain the write side
+ // msgqueue, effectively getting a linger on the socket. The
+ // protocols will drain this queue, and should continue to run
+ // handling both transmit and receive until that's done.
+ // Note that *all* protocols need to monitor for this linger, even
+ // those that do not transmit. This way they can notice and go ahead
+ // and quickly shutdown their pipes; this keeps us from having to wait
+ // for *our* pipelist to become empty. This helps us ensure that
+ // they effectively get the chance to linger on their side, without
+ // requring us to do anything magical for them.
+
+ // nni_msgqueue_drain(sock->s_uwq, sock->s_linger);
+
+ // Now we should attempt to wait for the list of pipes to drop to
+ // zero -- indicating that the protocol has shut things down
+ // cleanly, voluntarily. (I.e. it finished its drain.)
+ nni_mutex_enter(sock->s_mx);
+ while (nni_list_first(&sock->s_pipes) != NULL) {
+ // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger);
+ int rv = NNG_ETIMEDOUT;
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
+ }
- /* XXX: close remaining pipes */
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_list_remove(&sock->s_pipes, pipe);
- /* XXX: nni_pipe_destroy */
+ // Time's up! Shut it down the hard way.
+ nni_msgqueue_close(sock->s_urq);
+ nni_msgqueue_close(sock->s_uwq);
+ // This gives the protocol notice, in case it didn't get the hint.
+ // XXX: In retrospect, this entry point seems redundant.
+ sock->s_ops.proto_shutdown(sock->s_data);
+
+ // The protocol *MUST* give us back our pipes at this point, and
+ // quickly too! If this blocks for any non-trivial amount of time
+ // here, it indicates a protocol implementation bug.
+ while (nni_list_first(&sock->s_pipes) != NULL) {
+ nni_cond_wait(sock->s_cv);
}
- /* XXX: wait for workers to cease activity */
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
- /* XXX: nni_ep_destroy(ep); */
+ // Wait to make sure endpoint listeners have shutdown and exited
+ // as well. They should have done so *long* ago.
+ while (nni_list_first(&sock->s_eps) != NULL) {
+ nni_cond_wait(sock->s_cv);
}
return (0);
@@ -115,11 +147,9 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
int rv;
int besteffort;
- /*
- * Senderr is typically set by protocols when the state machine
- * indicates that it is no longer valid to send a message. E.g.
- * a REP socket with no REQ pending.
- */
+ // Senderr is typically set by protocols when the state machine
+ // indicates that it is no longer valid to send a message. E.g.
+ // a REP socket with no REQ pending.
nni_mutex_enter(sock->s_mx);
if ((rv = sock->s_senderr) != 0) {
nni_mutex_exit(sock->s_mx);
@@ -136,15 +166,13 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
}
if (besteffort) {
- /*
- * BestEffort mode -- if we cannot handle the message due to
- * backpressure, we just throw it away, and don't complain.
- */
+ // BestEffort mode -- if we cannot handle the message due to
+ // backpressure, we just throw it away, and don't complain.
tmout = 0;
}
rv = nni_msgqueue_put(sock->s_uwq, msg, tmout);
if (besteffort && (rv == NNG_EAGAIN)) {
- /* Pretend this worked... it didn't, but pretend. */
+ // Pretend this worked... it didn't, but pretend.
nni_msg_free(msg);
return (0);
}
@@ -152,35 +180,40 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout)
}
+// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
-nni_socket_protocol(nni_socket_t sock)
+nni_socket_proto(nni_socket_t sock)
{
return (sock->s_ops.proto_self);
}
-
+// nni_socket_remove_pipe removes the pipe from the socket. This is often
+// called by the protocol when a pipe is removed due to close.
void
-nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe)
+nni_socket_rem_pipe(nni_socket_t sock, nni_pipe_t pipe)
{
nni_mutex_enter(sock->s_mx);
if (pipe->p_sock != sock) {
nni_mutex_exit(sock->s_mx);
}
- /*
- * Remove the pipe from the protocol. Protocols may
- * keep lists of pipes for managing their topologies.
- */
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
sock->s_ops.proto_remove_pipe(sock->s_data, pipe);
- /* Now remove it from our own list */
+ // Now remove it from our own list.
nni_list_remove(&sock->s_pipes, pipe);
pipe->p_sock = NULL;
- // XXX: Redial
+ // XXX: TODO: Redial
// XXX: also publish event...
- //if (pipe->p_ep != NULL) {
+ // if (pipe->p_ep != NULL) {
// nn_endpt_remove_pipe(pipe->p_ep, pipe)
- //}
+ // }
+
+ // If we're closing, wake the socket if we finished draining.
+ if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
+ nni_cond_broadcast(sock->s_cv);
+ }
nni_mutex_exit(sock->s_mx);
}
@@ -191,6 +224,10 @@ nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe)
int rv;
nni_mutex_enter(sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
nni_mutex_exit(sock->s_mx);
return (rv);
diff --git a/src/core/socket.h b/src/core/socket.h
index da531836..366e9958 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -10,42 +10,37 @@
#ifndef CORE_SOCKET_H
#define CORE_SOCKET_H
-/*
- * NB: This structure is supplied here for use by the CORE. Use of this library
- * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
- * TRANSPORTS.
- */
+// NB: This structure is supplied here for use by the CORE. Use of this library
+// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
+// TRANSPORTS.
struct nng_socket {
nni_mutex_t s_mx;
+ nni_cond_t s_cv;
- nni_msgqueue_t s_uwq; /* Upper write queue. */
- nni_msgqueue_t s_urq; /* Upper read queue. */
+ nni_msgqueue_t s_uwq; // Upper write queue
+ nni_msgqueue_t s_urq; // Upper read queue
struct nni_protocol s_ops;
- void * s_data; /* Protocol private. */
+ void * s_data; // Protocol private
- /* options */
+ /* XXX: options */
nni_list_t s_eps;
nni_list_t s_pipes;
- int s_besteffort; /* Best effort mode delivery. */
- int s_senderr; /* Protocol state machine use. */
+ int s_closing; // Socket is closing
+ int s_besteffort; // Best effort mode delivery
+ int s_senderr; // Protocol state machine use
};
-/*
- * Internally used socket API. Again, this stuff is not part of our public
- * API.
- */
-
extern int nni_socket_create(nni_socket_t *, uint16_t);
extern int nni_socket_close(nni_socket_t);
extern int nni_socket_add_pipe(nni_socket_t, nni_pipe_t);
-extern void nni_socket_remove_pipe(nni_socket_t, nni_pipe_t);
-extern uint16_t nni_socket_protocol(nni_socket_t);
+extern void nni_socket_rem_pipe(nni_socket_t, nni_pipe_t);
+extern uint16_t nni_socket_proto(nni_socket_t);
extern int nni_socket_setopt(nni_socket_t, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket_t, int, void *, size_t *);
-#endif /* CORE_SOCKET_H */
+#endif // CORE_SOCKET_H