diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/endpt.c | 198 | ||||
| -rw-r--r-- | src/core/endpt.h | 5 | ||||
| -rw-r--r-- | src/core/list.c | 17 | ||||
| -rw-r--r-- | src/core/list.h | 5 | ||||
| -rw-r--r-- | src/core/panic.c | 2 | ||||
| -rw-r--r-- | src/core/pipe.c | 42 | ||||
| -rw-r--r-- | src/core/pipe.h | 9 | ||||
| -rw-r--r-- | src/core/protocol.h | 4 | ||||
| -rw-r--r-- | src/core/socket.c | 239 | ||||
| -rw-r--r-- | src/core/socket.h | 6 | ||||
| -rw-r--r-- | src/nng.c | 8 | ||||
| -rw-r--r-- | src/platform/posix/posix_alloc.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 2 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 52 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 14 |
16 files changed, 384 insertions, 227 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 1c5f4db5..24d92206 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) ep->ep_listener = NULL; ep->ep_close = 0; ep->ep_start = 0; + ep->ep_bound = 0; ep->ep_pipe = NULL; + NNI_LIST_NODE_INIT(&ep->ep_node); if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) { nni_free(ep, sizeof (*ep)); return (NNG_ENOMEM); @@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep) } -int -nni_endpt_bind(nni_endpt *ep) +static int +nni_endpt_connect(nni_endpt *ep, nni_pipe **pp) { - if (ep->ep_close) { - return (NNG_ECLOSED); + nni_pipe *pipe; + int rv; + + if ((rv = nni_pipe_create(&pipe, ep)) != 0) { + return (rv); } - return (ep->ep_ops.ep_bind(ep->ep_data)); + if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + *pp = pipe; + return (0); } @@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep) nni_pipe *pipe; int rv; - pipe = NULL; - - if (ep->ep_close) { - return (NNG_ECLOSED); - } - if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) { - return (rv); - } - if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) { - nni_pipe_destroy(pipe); - return (rv); + if (((rv = nni_endpt_connect(ep, &pipe)) == 0) && + ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) { + return (0); } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); - return (rv); - } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); + return (rv); } @@ -160,7 +148,6 @@ static void nni_dialer(void *arg) { nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; nni_time cooldown; @@ -180,6 +167,10 @@ nni_dialer(void *arg) while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { nni_cond_wait(&ep->ep_cv); } + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + break; + } nni_mutex_exit(&ep->ep_mx); rv = nni_dial_once(ep); @@ -190,9 +181,12 @@ nni_dialer(void *arg) case NNG_ENOMEM: cooldown = 1000000; break; + case NNG_ECLOSED: + break; default: // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; + printf("COOLING DOWN!!\n"); + cooldown = 1000000; break; } // we inject a delay so we don't just spin hard on @@ -200,9 +194,12 @@ nni_dialer(void *arg) // wait even longer, since the system needs time to // release resources. cooldown += nni_clock(); + nni_mutex_enter(&ep->ep_mx); while (!ep->ep_close) { + // We need a different condvar... nni_cond_waituntil(&ep->ep_cv, cooldown); } + nni_mutex_exit(&ep->ep_mx); } } @@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags) { int rv = 0; nni_thread *reap = NULL; - nni_socket *sock = ep->ep_sock; - nni_mutex_enter(&sock->s_mx); nni_mutex_enter(&ep->ep_mx); if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { rv = NNG_EBUSY; goto out; } - if (sock->s_closing || ep->ep_close) { + if (ep->ep_close) { rv = NNG_ECLOSED; goto out; } @@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags) } if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); rv = nni_dial_once(ep); - nni_mutex_enter(&sock->s_mx); nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { ep->ep_start = 1; } else { @@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags) } out: nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); if (reap != NULL) { nni_thread_reap(reap); @@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) if (ep->ep_close) { return (NNG_ECLOSED); } - if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) { + if ((rv = nni_pipe_create(&pipe, ep)) != 0) { return (rv); } - if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) { + if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) { nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; *pp = pipe; return (0); } + + +static void +nni_listener(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + for (;;) { + nni_time cooldown; + nni_mutex_enter(&ep->ep_mx); + + // If we didn't bind synchronously, do it now. + while (!ep->ep_bound && !ep->ep_close) { + int rv; + + nni_mutex_exit(&ep->ep_mx); + rv = ep->ep_ops.ep_bind(ep->ep_data); + nni_mutex_enter(&ep->ep_mx); + + if (rv == 0) { + ep->ep_bound = 1; + } else { + // Invalid address? Out of memory? Who knows. + // Try again in a bit (10ms). + // XXX: PROPER BACKOFF NEEDED + cooldown = 10000; + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, + cooldown); + } + } + } + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + break; + } + nni_mutex_exit(&ep->ep_mx); + + pipe = NULL; + + if (((rv = nni_endpt_accept(ep, &pipe)) == 0) && + ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) { + continue; + } + if (rv == NNG_ECLOSED) { + break; + } + cooldown = 1000; // 1 ms cooldown + if (rv == NNG_ENOMEM) { + // For out of memory, we need to give more + // time for the system to reclaim resources. + cooldown = 100000; // 100ms + } + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + nni_mutex_exit(&ep->ep_mx); + } +} + + +int +nni_endpt_listen(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + rv = ep->ep_ops.ep_bind(ep->ep_data); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_bound = 1; + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_listener; + ep->ep_listener = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} diff --git a/src/core/endpt.h b/src/core/endpt.h index 71d3ff59..36f55de2 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -18,7 +18,7 @@ struct nng_endpt { nni_endpt_ops ep_ops; void * ep_data; // Transport private - nni_list_node ep_sock_node; // Per socket list + nni_list_node ep_node; // Per socket list nni_socket * ep_sock; char ep_addr[NNG_MAXADDRLEN]; nni_thread * ep_dialer; @@ -26,6 +26,7 @@ struct nng_endpt { int ep_stop; // thread exits before start int ep_start; // start thread running int ep_close; // full shutdown + int ep_bound; // true if we bound locally nni_mutex ep_mx; nni_cond ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) @@ -33,9 +34,9 @@ struct nng_endpt { extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); extern void nni_endpt_destroy(nni_endpt *); -extern int nni_endpt_listen(nni_endpt *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); extern int nni_endpt_dial(nni_endpt *, int); +extern int nni_endpt_listen(nni_endpt *, int); #endif // CORE_ENDPT_H diff --git a/src/core/list.c b/src/core/list.c index 69a74db6..8d6c3ace 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -58,6 +58,9 @@ nni_list_append(nni_list *list, void *item) { nni_list_node *node = NODE(list, item); + if ((node->ln_next != NULL) || (node->ln_prev != NULL)) { + nni_panic("appending node already on a list or not inited"); + } node->ln_prev = list->ll_head.ln_prev; node->ln_next = &list->ll_head; node->ln_next->ln_prev = node; @@ -70,6 +73,9 @@ nni_list_prepend(nni_list *list, void *item) { nni_list_node *node = NODE(list, item); + if ((node->ln_next != NULL) || (node->ln_prev != NULL)) { + nni_panic("prepending node already on a list or not inited"); + } node->ln_next = list->ll_head.ln_next; node->ln_prev = &list->ll_head; node->ln_next->ln_prev = node; @@ -108,13 +114,6 @@ nni_list_remove(nni_list *list, void *item) node->ln_prev->ln_next = node->ln_next; node->ln_next->ln_prev = node->ln_prev; -} - - -void -nni_list_node_init(nni_list *list, void *item) -{ - nni_list_node *node = NODE(list, item); - - node->ln_prev = node->ln_next = NULL; + node->ln_next = NULL; + node->ln_prev = NULL; } diff --git a/src/core/list.h b/src/core/list.h index 822d9621..9d95a527 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -28,6 +28,10 @@ extern void nni_list_init_offset(nni_list *list, size_t offset); #define NNI_LIST_INIT(list, type, field) \ nni_list_init_offset(list, offsetof(type, field)) + +#define NNI_LIST_NODE_INIT(node) \ + { (node)->ln_prev = (node)->ln_next = 0; } + extern void *nni_list_first(nni_list *); extern void *nni_list_last(nni_list *); extern void nni_list_append(nni_list *, void *); @@ -35,7 +39,6 @@ extern void nni_list_prepend(nni_list *, void *); extern void *nni_list_next(nni_list *, void *); extern void *nni_list_prev(nni_list *, void *); extern void nni_list_remove(nni_list *, void *); -extern void nni_list_node_init(nni_list *, void *); #define NNI_LIST_FOREACH(l, it) \ for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) diff --git a/src/core/panic.c b/src/core/panic.c index d0d6078d..2205779f 100644 --- a/src/core/panic.c +++ b/src/core/panic.c @@ -19,7 +19,7 @@ #include "core/nng_impl.h" // Panic handling. -static void +void nni_show_backtrace(void) { #if NNG_HAVE_BACKTRACE diff --git a/src/core/pipe.c b/src/core/pipe.c index cc92e6fe..0a7bbed1 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -25,14 +25,14 @@ nni_pipe_id(nni_pipe *p) int nni_pipe_send(nni_pipe *p, nng_msg *msg) { - return (p->p_ops.p_send(p->p_data, msg)); + return (p->p_ops.p_send(p->p_trandata, msg)); } int nni_pipe_recv(nni_pipe *p, nng_msg **msgp) { - return (p->p_ops.p_recv(p->p_data, msgp)); + return (p->p_ops.p_recv(p->p_trandata, msgp)); } @@ -42,38 +42,60 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp) void nni_pipe_close(nni_pipe *p) { - p->p_ops.p_close(p->p_data); + nni_socket *sock = p->p_sock; + + p->p_ops.p_close(p->p_trandata); + + nni_mutex_enter(&sock->s_mx); + if (!p->p_reap) { + // schedule deferred reap/close + p->p_reap = 1; + if (p->p_active) { + nni_list_remove(&sock->s_pipes, p); + p->p_active = 0; + } + nni_list_append(&sock->s_reaps, p); + nni_cond_broadcast(&sock->s_cv); + } + nni_mutex_exit(&sock->s_mx); } uint16_t nni_pipe_peer(nni_pipe *p) { - return (p->p_ops.p_peer(p->p_data)); + return (p->p_ops.p_peer(p->p_trandata)); } void nni_pipe_destroy(nni_pipe *p) { - if (p->p_data != NULL) { - p->p_ops.p_destroy(p->p_data); + if (p->p_trandata != NULL) { + p->p_ops.p_destroy(p->p_trandata); } nni_free(p, sizeof (*p)); } int -nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops) +nni_pipe_create(nni_pipe **pp, nni_endpt *ep) { nni_pipe *p; if ((p = nni_alloc(sizeof (*p))) == NULL) { return (NNG_ENOMEM); } - p->p_data = NULL; - p->p_ops = *ops; + p->p_trandata = NULL; + p->p_protdata = NULL; + p->p_ops = *ep->ep_ops.ep_pipe_ops; p->p_id = nni_plat_nextid(); + p->p_ep = ep; + p->p_sock = ep->ep_sock; + if (ep->ep_dialer != NULL) { + ep->ep_pipe = p; + } + NNI_LIST_NODE_INIT(&p->p_node); *pp = p; return (0); } @@ -86,5 +108,5 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) if (p->p_ops.p_getopt == NULL) { return (NNG_ENOTSUP); } - return (p->p_ops.p_getopt(p->p_data, opt, val, szp)); + return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp)); } diff --git a/src/core/pipe.h b/src/core/pipe.h index 5349bbb1..4594f65b 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -19,10 +19,13 @@ struct nng_pipe { uint32_t p_id; struct nni_pipe_ops p_ops; - void * p_data; - nni_list_node p_sock_node; + void * p_trandata; + void * p_protdata; + nni_list_node p_node; nni_socket * p_sock; nni_endpt * p_ep; + int p_reap; + int p_active; }; // Pipe operations that protocols use. @@ -33,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *); // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. -extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *); +extern int nni_pipe_create(nni_pipe **, nni_endpt *); extern void nni_pipe_destroy(nni_pipe *); diff --git a/src/core/protocol.h b/src/core/protocol.h index f73825d7..63683a6b 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -32,8 +32,8 @@ struct nni_protocol { // Add and remove pipes. These are called as connections are // created or destroyed. - int (*proto_add_pipe)(void *, nni_pipe *); - int (*proto_rem_pipe)(void *, nni_pipe *); + int (*proto_add_pipe)(void *, nni_pipe *, void **); + void (*proto_rem_pipe)(void *, void *); // Option manipulation. These may be NULL. int (*proto_setopt)(void *, int, const void *, size_t); diff --git a/src/core/socket.c b/src/core/socket.c index 9263b142..ce5dbf3c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s) } +// Because we have to call back into the socket, and possibly also the proto, +// and wait for threads to terminate, we do this in a special thread. The +// assumption is that closing is always a "fast" operation. +static void +nni_reaper(void *arg) +{ + nni_socket *sock = arg; + + for (;;) { + nni_pipe *pipe; + nni_endpt *ep; + + nni_mutex_enter(&sock->s_mx); + if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) { + nni_list_remove(&sock->s_reaps, pipe); + nni_mutex_exit(&sock->s_mx); + + // This should already have been done. + pipe->p_ops.p_close(pipe->p_trandata); + + // Remove the pipe from the protocol. Protocols may + // keep lists of pipes for managing their topologies. + // Note that if a protocol has rejected the pipe, it + // won't have any data. + if (pipe->p_protdata != NULL) { + sock->s_ops.proto_rem_pipe(sock->s_data, + pipe->p_protdata); + } + + // If pipe was a connected (dialer) pipe, + // then let the endpoint know so it can try to + // reestablish the connection. + if ((ep = pipe->p_ep) != NULL) { + ep->ep_pipe = NULL; + pipe->p_ep = NULL; + nni_mutex_enter(&ep->ep_mx); + nni_cond_signal(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + } + + // XXX: also publish event... + nni_pipe_destroy(pipe); + continue; + } + + if (sock->s_reaper == NULL) { + nni_mutex_exit(&sock->s_mx); + break; + } + + nni_cond_wait(&sock->s_cv); + nni_mutex_exit(&sock->s_mx); + } +} + + // nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket **sockp, uint16_t proto) @@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; + sock->s_closing = 0; sock->s_reconn = NNI_SECOND; sock->s_reconnmax = NNI_SECOND; @@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node); - NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node); + NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); + + if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) { + goto fail; + } if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) || ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) { @@ -81,6 +143,14 @@ fail: if (sock->s_uwq != NULL) { nni_msgqueue_destroy(sock->s_uwq); } + if (sock->s_reaper != NULL) { + nni_thread *reap = sock->s_reaper; + nni_mutex_enter(&sock->s_mx); + sock->s_reaper = NULL; + nni_cond_broadcast(&sock->s_cv); + nni_mutex_exit(&sock->s_mx); + nni_thread_reap(reap); + } nni_cond_fini(&sock->s_cv); nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); @@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock) nni_pipe *pipe; nni_endpt *ep; nni_time linger; + nni_thread *reaper; nni_mutex_enter(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. @@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock) // safely while we hold the lock. nni_msgqueue_close(sock->s_urq); - // Go through and close all the pipes. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); + // Go through and schedule close on all pipes. + while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { + nni_list_remove(&sock->s_pipes, pipe); + pipe->p_active = 0; + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); } - // At this point, the protocols should have all their operations - // failing, if they have any remaining, and they should be returning - // any pipes back to us very quickly. We'll wait for them to finish, - // as it MUST occur shortly. - while (nni_list_first(&sock->s_pipes) != NULL) { - nni_cond_wait(&sock->s_cv); - } + // Tell the reaper it's done once it finishes. Also kick it off. + reaper = sock->s_reaper; + sock->s_reaper = NULL; + nni_cond_broadcast(&sock->s_cv); // We already told the endpoints to shutdown. We just // need to reap them now. @@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock) } nni_mutex_exit(&sock->s_mx); + // Wait for the reaper to exit. + nni_thread_reap(reaper); + // At this point nothing else should be referencing us. // The protocol needs to clean up its state. sock->s_ops.proto_destroy(sock->s_data); @@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock) } -// nni_socket_rem_pipe removes the pipe from the socket. This is often -// called by the protocol when a pipe is removed due to close. -void -nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) -{ - nni_endpt *ep; - - nni_mutex_enter(&sock->s_mx); - if (pipe->p_sock != sock) { - nni_mutex_exit(&sock->s_mx); - } - - // Remove the pipe from the protocol. Protocols may - // keep lists of pipes for managing their topologies. - sock->s_ops.proto_rem_pipe(sock->s_data, pipe); - - // Now remove it from our own list. - nni_list_remove(&sock->s_pipes, pipe); - pipe->p_sock = NULL; - - // If we were a connected (dialer) pipe, then let the endpoint - // know so it can try to reestablish the connection. - if ((ep = pipe->p_ep) != NULL) { - ep->ep_pipe = NULL; - pipe->p_ep = NULL; - nni_mutex_enter(&ep->ep_mx); - nni_cond_signal(&ep->ep_cv); - nni_mutex_exit(&ep->ep_mx); - } - - // XXX: also publish event... - nni_pipe_destroy(pipe); - - // If we're closing, wake the socket if we finished draining. - if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_cond_broadcast(&sock->s_cv); - } - nni_mutex_exit(&sock->s_mx); -} - - int -nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) +nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) { int rv; @@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) nni_mutex_exit(&sock->s_mx); return (NNG_ECLOSED); } - if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { + rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata); + if (rv != 0) { + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); + nni_cond_broadcast(&sock->s_cv); nni_mutex_exit(&sock->s_mx); return (rv); } nni_list_append(&sock->s_pipes, pipe); + pipe->p_active = 1; - pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); return (0); @@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) if (rv != 0) { nni_endpt_close(ep); nni_endpt_destroy(ep); - } else if (epp != NULL) { - *epp = ep; - } - return (rv); -} - - -static void -nni_socket_accepter(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - if (ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - break; - } - nni_mutex_exit(&ep->ep_mx); - - pipe = NULL; - - if (((rv = nni_endpt_accept(ep, &pipe)) != 0) || - ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) { - if (rv == NNG_ECLOSED) { - break; - } - if (pipe != NULL) { - nni_pipe_destroy(pipe); - } - // XXX: Publish accept error event... - - // If we can't allocate memory, don't spin, so that - // things get a chance to release memory later. - // Other errors, like ECONNRESET, should not recur. - // (If we find otherwise we can inject a short sleep - // here of about 1 ms without too much penalty.) - if (rv == NNG_ENOMEM) { - nni_usleep(100000); - } + } else { + if (epp != NULL) { + *epp = ep; } + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); } + return (rv); } int -nni_socket_accept(nni_socket *sock, nni_endpt *ep) +nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp, + int flags) { - int rv = 0; + nni_endpt *ep; + int rv; - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing) { - rv = NNG_ECLOSED; - goto out; + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { + return (rv); } - if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { - rv = NNG_ENOMEM; - goto out; + rv = nni_endpt_listen(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else { + if (epp != NULL) { + *epp = ep; + } + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); } - nni_list_append(&sock->s_eps, ep); -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 4e1ea166..37a0e5eb 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -34,6 +34,9 @@ struct nng_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // pipes for this socket + nni_list s_reaps; // pipes to reap + nni_thread * s_reaper; + int s_closing; // Socket is closing int s_besteffort; // Best effort mode delivery int s_senderr; // Protocol state machine use @@ -44,8 +47,7 @@ struct nng_socket { extern int nni_socket_create(nni_socket **, uint16_t); extern int nni_socket_close(nni_socket *); -extern int nni_socket_add_pipe(nni_socket *, nni_pipe *, int); -extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *); +extern int nni_socket_add_pipe(nni_socket *, nni_pipe *); extern uint16_t nni_socket_proto(nni_socket *); extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); extern int nni_socket_getopt(nni_socket *, int, void *, size_t *); @@ -104,6 +104,14 @@ nng_dial(nng_socket *s, const char *addr, nng_endpt **epp, int flags) int +nng_listen(nng_socket *s, const char *addr, nng_endpt **epp, int flags) +{ + NNI_INIT_INT(); + return (nni_socket_listen(s, addr, epp, flags)); +} + + +int nng_setopt(nng_socket *s, int opt, const void *val, size_t sz) { NNI_INIT_INT(); diff --git a/src/platform/posix/posix_alloc.c b/src/platform/posix/posix_alloc.c index 83fe305d..98a76669 100644 --- a/src/platform/posix/posix_alloc.c +++ b/src/platform/posix/posix_alloc.c @@ -17,7 +17,7 @@ void * nni_alloc(size_t size) { - return (malloc(size)); + return (calloc(1, size)); } diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c index 2fb92915..0526eb7c 100644 --- a/src/platform/posix/posix_synch.c +++ b/src/platform/posix/posix_synch.c @@ -45,8 +45,10 @@ nni_mutex_fini(nni_mutex *mp) void nni_mutex_enter(nni_mutex *m) { - if (pthread_mutex_lock(&m->mx) != 0) { - nni_panic("pthread_mutex_lock failed"); + int rv; + + if ((rv = pthread_mutex_lock(&m->mx)) != 0) { + nni_panic("pthread_mutex_lock failed: %s", strerror(rv)); } } diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 3fd9bfe7..f5cad7a9 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -80,7 +80,7 @@ nni_thread_reap(nni_thread *thr) int rv; if ((rv = pthread_join(thr->tid, NULL)) != 0) { - nni_panic("pthread_thread: %s", strerror(errno)); + nni_panic("pthread_thread: %s", strerror(rv)); } nni_free(thr, sizeof (*thr)); } diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 692f4b0e..98995186 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -16,27 +16,30 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. +typedef struct nni_pair_pipe nni_pair_pipe; +typedef struct nni_pair_sock nni_pair_sock; + // An nni_pair_sock is our per-socket protocol private structure. -typedef struct nni_pair_sock { +struct nni_pair_sock { nni_socket * sock; - nni_pipe * pipe; + nni_pair_pipe * pipe; nni_mutex mx; nni_msgqueue * uwq; nni_msgqueue * urq; -} nni_pair_sock; +}; // An nni_pair_pipe is our per-pipe protocol private structure. We keep // one of these even though in theory we'd only have a single underlying // pipe. The separate data structure is more like other protocols that do // manage multiple pipes. -typedef struct nni_pair_pipe { +struct nni_pair_pipe { nni_pipe * pipe; nni_pair_sock * pair; int good; nni_thread * sthr; nni_thread * rthr; int sigclose; -} nni_pair_pipe; +}; static void nni_pair_receiver(void *); static void nni_pair_sender(void *); @@ -55,6 +58,7 @@ nni_pair_create(void **pairp, nni_socket *sock) return (rv); } pair->sock = sock; + pair->pipe = NULL; pair->uwq = nni_socket_sendq(sock); pair->urq = nni_socket_recvq(sock); *pairp = pair; @@ -77,7 +81,7 @@ nni_pair_destroy(void *arg) static int -nni_pair_add_pipe(void *arg, nni_pipe *pipe) +nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap) { nni_pair_sock *pair = arg; nni_pair_pipe *pp; @@ -89,6 +93,7 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe) pp->sigclose = 0; pp->sthr = NULL; pp->rthr = NULL; + pp->pair = pair; nni_mutex_enter(&pair->mx); if (pair->pipe != NULL) { @@ -106,31 +111,33 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe) return (rv); } pp->good = 1; - pair->pipe = pipe; + pair->pipe = pp; + *datap = pp; nni_mutex_exit(&pair->mx); - return (NNG_EINVAL); + return (0); } -static int -nni_pair_rem_pipe(void *arg, nni_pipe *pipe) +static void +nni_pair_rem_pipe(void *arg, void *data) { - nni_pair_pipe *pp = arg; - nni_pair_sock *pair = pp->pair; + nni_pair_sock *pair = arg; + nni_pair_pipe *pp = data; - if (pp->sthr) { - (void) nni_thread_reap(pp->sthr); - } - if (pp->rthr) { - (void) nni_thread_reap(pp->rthr); - } nni_mutex_enter(&pair->mx); - if (pair->pipe != pipe) { + if (pair->pipe != pp) { nni_mutex_exit(&pair->mx); - return (NNG_EINVAL); + return; } + pair->pipe = NULL; nni_mutex_exit(&pair->mx); - return (NNG_EINVAL); + + if (pp->sthr != NULL) { + (void) nni_thread_reap(pp->sthr); + } + if (pp->rthr != NULL) { + (void) nni_thread_reap(pp->rthr); + } } @@ -166,7 +173,6 @@ nni_pair_sender(void *arg) } nni_msgqueue_signal(urq, &pp->sigclose); nni_pipe_close(pipe); - nni_socket_rem_pipe(pair->sock, pipe); } @@ -187,7 +193,6 @@ nni_pair_receiver(void *arg) return; } nni_mutex_exit(&pair->mx); - for (;;) { rv = nni_pipe_recv(pipe, &msg); if (rv != 0) { @@ -201,7 +206,6 @@ nni_pair_receiver(void *arg) } nni_msgqueue_signal(uwq, &pp->sigclose); nni_pipe_close(pipe); - nni_socket_rem_pipe(pair->sock, pipe); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 0fc0a72c..5a9c0472 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -199,8 +199,7 @@ nni_inproc_ep_create(void **epp, const char *url, uint16_t proto) ep->mode = NNI_INPROC_EP_IDLE; ep->closed = 0; ep->proto = proto; - nni_list_node_init(&nni_inproc.eps, ep); - nni_list_append(&nni_inproc.eps, ep); + NNI_LIST_NODE_INIT(&ep->node); (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; return (0); @@ -227,7 +226,10 @@ nni_inproc_ep_close(void *arg) nni_mutex_enter(&nni_inproc.mx); if (!ep->closed) { ep->closed = 1; - nni_list_remove(&nni_inproc.eps, ep); + if ((ep->mode == NNI_INPROC_EP_LISTEN) || + (ep->mode == NNI_INPROC_EP_DIAL)) { + nni_list_remove(&nni_inproc.eps, ep); + } nni_cond_broadcast(&nni_inproc.cv); } nni_mutex_exit(&nni_inproc.mx); @@ -273,7 +275,6 @@ nni_inproc_ep_connect(void *arg, void **pipep) nni_cond_wait(&nni_inproc.cv); } // NB: The acceptor or closer removes us from the list. - ep->mode = NNI_INPROC_EP_IDLE; *pipep = ep->cpipe; nni_mutex_exit(&nni_inproc.mx); return (ep->closed ? NNG_ECLOSED : 0); @@ -333,7 +334,7 @@ nni_inproc_ep_accept(void *arg, void **pipep) return (rv); } if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || - ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { + ((rv = nni_msgqueue_create(&pair->q[1], 4)) != 0)) { nni_inproc_pair_destroy(pair); return (rv); } @@ -360,6 +361,9 @@ nni_inproc_ep_accept(void *arg, void **pipep) } nni_cond_wait(&nni_inproc.cv); } + + nni_list_remove(&nni_inproc.eps, srch); + srch->mode = NNI_INPROC_EP_IDLE; (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1]; |
