diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/panic.c | 6 | ||||
| -rw-r--r-- | src/core/panic.h | 46 | ||||
| -rw-r--r-- | src/core/protocol.h | 4 | ||||
| -rw-r--r-- | src/core/socket.c | 165 | ||||
| -rw-r--r-- | src/core/socket.h | 33 |
5 files changed, 140 insertions, 114 deletions
diff --git a/src/core/panic.c b/src/core/panic.c index 65eb0747..d0d6078d 100644 --- a/src/core/panic.c +++ b/src/core/panic.c @@ -18,9 +18,7 @@ #include "core/nng_impl.h" -// // Panic handling. -// static void nni_show_backtrace(void) { @@ -44,13 +42,11 @@ nni_show_backtrace(void) } -// // nni_panic shows a panic message, a possible stack bracktrace, then aborts // the process/program. This should only be called when a condition arises // that should not be possible, e.g. a programming assertion failure. It should // not be called in situations such as ENOMEM, as nni_panic is fairly rude // to any application it may be called from within. -// void nni_panic(const char *fmt, ...) { @@ -75,6 +71,6 @@ nni_panic(const char *fmt, ...) void nni_println(const char *msg) { - /* TODO: support redirection of this later. */ + // TODO: support redirection of this later. nni_plat_println(msg); } diff --git a/src/core/panic.h b/src/core/panic.h index 88ddb7b0..323d64a1 100644 --- a/src/core/panic.h +++ b/src/core/panic.h @@ -1,33 +1,29 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_PANIC_H #define CORE_PANIC_H -/* - * nni_panic is used to terminate the process with prejudice, and - * should only be called in the face of a critical programming error, - * or other situation where it would be unsafe to attempt to continue. - * As this crashes the program, it should never be used when factors outside - * the program can cause it, such as receiving protocol errors, or running - * out of memory. Its better in those cases to return an error to the - * program and let the caller handle the error situation. - */ +// nni_panic is used to terminate the process with prejudice, and +// should only be called in the face of a critical programming error, +// or other situation where it would be unsafe to attempt to continue. +// As this crashes the program, it should never be used when factors outside +// the program can cause it, such as receiving protocol errors, or running +// out of memory. Its better in those cases to return an error to the +// program and let the caller handle the error situation. extern void nni_panic(const char *, ...); -/* - * nni_println is used to print output to a debug console. This should only - * be used in the most dire of circumstances -- such as during an assertion - * failure that is going to cause the program to crash. After the string is - * emitted, a new line character is emitted, so the string should not - * include one. - */ +// nni_println is used to print output to a debug console. This should only +// be used in the most dire of circumstances -- such as during an assertion +// failure that is going to cause the program to crash. After the string is +// emitted, a new line character is emitted, so the string should not +// include one. extern void nni_println(const char *); -#endif /* CORE_PANIC_H */ +#endif // CORE_PANIC_H diff --git a/src/core/protocol.h b/src/core/protocol.h index 2b4625cb..b9760725 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -49,8 +49,10 @@ struct nni_protocol { * Shutdown the protocol instance, including giving time to * drain any outbound frames (linger). The protocol is not * required to honor the linger. + * XXX: This is probably redundant -- protocol should notice + * drain by getting NNG_ECLOSED on the upper write queue. */ - void (*proto_shutdown)(void *, uint64_t); + void (*proto_shutdown)(void *); /* * Add and remove pipes. These are called as connections are diff --git a/src/core/socket.c b/src/core/socket.c index c7de05fa..21b1b300 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1,22 +1,18 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" -/* - * Socket implementation. - */ +// Socket implementation. -/* - * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain - * the upper read and write queues. - */ +// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain +// the upper read and write queues. nni_msgqueue_t nni_socket_sendq(nni_socket_t s) { @@ -31,6 +27,7 @@ nni_socket_recvq(nni_socket_t s) } +// nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket_t *sockp, uint16_t proto) { @@ -46,10 +43,22 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } sock->s_ops = *ops; + if ((rv = nni_mutex_create(&sock->s_mx)) != 0) { + nni_free(sock, sizeof (*sock)); + return (rv); + } + if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) { + nni_mutex_destroy(sock->s_mx); + nni_free(sock, sizeof (*sock)); + return (rv); + } + NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); - //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); + // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { + nni_cond_destroy(sock->s_cv); + nni_mutex_destroy(sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -58,51 +67,74 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } +// nni_socket_close closes the underlying socket. int nni_socket_close(nni_socket_t sock) { nni_pipe_t pipe; nni_endpt_t ep; - nni_msgqueue_close(sock->s_urq); - /* XXX: drain this? */ - nni_msgqueue_close(sock->s_uwq); nni_mutex_enter(sock->s_mx); + // Mark us closing, so no more EPs or changes can occur. + sock->s_closing = 1; + + // Stop all EPS. We're going to do this first, since we know + // we're closing. NNI_LIST_FOREACH (&sock->s_eps, ep) { #if 0 - nni_ep_stop(ep); - // OR.... - nni_mutex_enter(ep->ep_mx); - ep->ep_stop = 1; - nni_cond_broadcast(ep->ep_cond); - nni_mutex_exit(ep->ep_mx); + // XXX: Question: block for them now, or wait further down? + // Probably we can simply just watch for the first list... + // stopping EPs should be *dead* easy, and never block. + nni_ep_stop(ep); or nni_ep_shutdown(ep); #endif break; /* REMOVE ME */ } nni_mutex_exit(sock->s_mx); - /* XXX: close endpoints - no new pipes made... */ - - /* XXX: protocol shutdown */ - /* - * Paths to pipe close: - * - * - user calls nng_pipe_close() - * - protocol calls pipe_close() after underlying close - * - socket calls pipe close due to socket_close (here) - */ + // XXX: TODO. This is a place where we should drain the write side + // msgqueue, effectively getting a linger on the socket. The + // protocols will drain this queue, and should continue to run + // handling both transmit and receive until that's done. + // Note that *all* protocols need to monitor for this linger, even + // those that do not transmit. This way they can notice and go ahead + // and quickly shutdown their pipes; this keeps us from having to wait + // for *our* pipelist to become empty. This helps us ensure that + // they effectively get the chance to linger on their side, without + // requring us to do anything magical for them. + + // nni_msgqueue_drain(sock->s_uwq, sock->s_linger); + + // Now we should attempt to wait for the list of pipes to drop to + // zero -- indicating that the protocol has shut things down + // cleanly, voluntarily. (I.e. it finished its drain.) + nni_mutex_enter(sock->s_mx); + while (nni_list_first(&sock->s_pipes) != NULL) { + // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger); + int rv = NNG_ETIMEDOUT; + if (rv == NNG_ETIMEDOUT) { + break; + } + } - /* XXX: close remaining pipes */ - while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - nni_list_remove(&sock->s_pipes, pipe); - /* XXX: nni_pipe_destroy */ + // Time's up! Shut it down the hard way. + nni_msgqueue_close(sock->s_urq); + nni_msgqueue_close(sock->s_uwq); + // This gives the protocol notice, in case it didn't get the hint. + // XXX: In retrospect, this entry point seems redundant. + sock->s_ops.proto_shutdown(sock->s_data); + + // The protocol *MUST* give us back our pipes at this point, and + // quickly too! If this blocks for any non-trivial amount of time + // here, it indicates a protocol implementation bug. + while (nni_list_first(&sock->s_pipes) != NULL) { + nni_cond_wait(sock->s_cv); } - /* XXX: wait for workers to cease activity */ - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - /* XXX: nni_ep_destroy(ep); */ + // Wait to make sure endpoint listeners have shutdown and exited + // as well. They should have done so *long* ago. + while (nni_list_first(&sock->s_eps) != NULL) { + nni_cond_wait(sock->s_cv); } return (0); @@ -115,11 +147,9 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) int rv; int besteffort; - /* - * Senderr is typically set by protocols when the state machine - * indicates that it is no longer valid to send a message. E.g. - * a REP socket with no REQ pending. - */ + // Senderr is typically set by protocols when the state machine + // indicates that it is no longer valid to send a message. E.g. + // a REP socket with no REQ pending. nni_mutex_enter(sock->s_mx); if ((rv = sock->s_senderr) != 0) { nni_mutex_exit(sock->s_mx); @@ -136,15 +166,13 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) } if (besteffort) { - /* - * BestEffort mode -- if we cannot handle the message due to - * backpressure, we just throw it away, and don't complain. - */ + // BestEffort mode -- if we cannot handle the message due to + // backpressure, we just throw it away, and don't complain. tmout = 0; } rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); if (besteffort && (rv == NNG_EAGAIN)) { - /* Pretend this worked... it didn't, but pretend. */ + // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); return (0); } @@ -152,35 +180,40 @@ nni_socket_sendmsg(nni_socket_t sock, nni_msg_t msg, int tmout) } +// nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t -nni_socket_protocol(nni_socket_t sock) +nni_socket_proto(nni_socket_t sock) { return (sock->s_ops.proto_self); } - +// nni_socket_remove_pipe removes the pipe from the socket. This is often +// called by the protocol when a pipe is removed due to close. void -nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe) +nni_socket_rem_pipe(nni_socket_t sock, nni_pipe_t pipe) { nni_mutex_enter(sock->s_mx); if (pipe->p_sock != sock) { nni_mutex_exit(sock->s_mx); } - /* - * Remove the pipe from the protocol. Protocols may - * keep lists of pipes for managing their topologies. - */ + // Remove the pipe from the protocol. Protocols may + // keep lists of pipes for managing their topologies. sock->s_ops.proto_remove_pipe(sock->s_data, pipe); - /* Now remove it from our own list */ + // Now remove it from our own list. nni_list_remove(&sock->s_pipes, pipe); pipe->p_sock = NULL; - // XXX: Redial + // XXX: TODO: Redial // XXX: also publish event... - //if (pipe->p_ep != NULL) { + // if (pipe->p_ep != NULL) { // nn_endpt_remove_pipe(pipe->p_ep, pipe) - //} + // } + + // If we're closing, wake the socket if we finished draining. + if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { + nni_cond_broadcast(sock->s_cv); + } nni_mutex_exit(sock->s_mx); } @@ -191,6 +224,10 @@ nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe) int rv; nni_mutex_enter(sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { nni_mutex_exit(sock->s_mx); return (rv); diff --git a/src/core/socket.h b/src/core/socket.h index da531836..366e9958 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -10,42 +10,37 @@ #ifndef CORE_SOCKET_H #define CORE_SOCKET_H -/* - * NB: This structure is supplied here for use by the CORE. Use of this library - * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR - * TRANSPORTS. - */ +// NB: This structure is supplied here for use by the CORE. Use of this library +// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR +// TRANSPORTS. struct nng_socket { nni_mutex_t s_mx; + nni_cond_t s_cv; - nni_msgqueue_t s_uwq; /* Upper write queue. */ - nni_msgqueue_t s_urq; /* Upper read queue. */ + nni_msgqueue_t s_uwq; // Upper write queue + nni_msgqueue_t s_urq; // Upper read queue struct nni_protocol s_ops; - void * s_data; /* Protocol private. */ + void * s_data; // Protocol private - /* options */ + /* XXX: options */ nni_list_t s_eps; nni_list_t s_pipes; - int s_besteffort; /* Best effort mode delivery. */ - int s_senderr; /* Protocol state machine use. */ + int s_closing; // Socket is closing + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use }; -/* - * Internally used socket API. Again, this stuff is not part of our public - * API. - */ - extern int nni_socket_create(nni_socket_t *, uint16_t); extern int nni_socket_close(nni_socket_t); extern int nni_socket_add_pipe(nni_socket_t, nni_pipe_t); -extern void nni_socket_remove_pipe(nni_socket_t, nni_pipe_t); -extern uint16_t nni_socket_protocol(nni_socket_t); +extern void nni_socket_rem_pipe(nni_socket_t, nni_pipe_t); +extern uint16_t nni_socket_proto(nni_socket_t); extern int nni_socket_setopt(nni_socket_t, int, const void *, size_t); extern int nni_socket_getopt(nni_socket_t, int, void *, size_t *); -#endif /* CORE_SOCKET_H */ +#endif // CORE_SOCKET_H |
