diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-25 18:08:44 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-25 18:08:44 -0800 |
| commit | 0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c (patch) | |
| tree | 1098c7f4976033bb311b45c378079700c9330b62 /src/core/socket.c | |
| parent | 64de60d98e8e4a896f9d13e4aa70343f329d88b4 (diff) | |
| download | nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.tar.gz nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.tar.bz2 nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.zip | |
Substantial fixes for listen & dialers.
At this point listening and dialing operations appear to function properly.
As part of this I had to break the close logic up since otherwise we had a
loop trying to reap a thread from itself. So there is now a separate reaper
thread for pipes per-socket. I also changed lists to be a bit more rigid,
and allocations now zero memory initially. (We had bugs due to uninitialized
memory, and rather than hunt them all down, lets just init them to sane zero
values.)
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 239 |
1 files changed, 117 insertions, 122 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 9263b142..ce5dbf3c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s) } +// Because we have to call back into the socket, and possibly also the proto, +// and wait for threads to terminate, we do this in a special thread. The +// assumption is that closing is always a "fast" operation. +static void +nni_reaper(void *arg) +{ + nni_socket *sock = arg; + + for (;;) { + nni_pipe *pipe; + nni_endpt *ep; + + nni_mutex_enter(&sock->s_mx); + if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) { + nni_list_remove(&sock->s_reaps, pipe); + nni_mutex_exit(&sock->s_mx); + + // This should already have been done. + pipe->p_ops.p_close(pipe->p_trandata); + + // Remove the pipe from the protocol. Protocols may + // keep lists of pipes for managing their topologies. + // Note that if a protocol has rejected the pipe, it + // won't have any data. + if (pipe->p_protdata != NULL) { + sock->s_ops.proto_rem_pipe(sock->s_data, + pipe->p_protdata); + } + + // If pipe was a connected (dialer) pipe, + // then let the endpoint know so it can try to + // reestablish the connection. + if ((ep = pipe->p_ep) != NULL) { + ep->ep_pipe = NULL; + pipe->p_ep = NULL; + nni_mutex_enter(&ep->ep_mx); + nni_cond_signal(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + } + + // XXX: also publish event... + nni_pipe_destroy(pipe); + continue; + } + + if (sock->s_reaper == NULL) { + nni_mutex_exit(&sock->s_mx); + break; + } + + nni_cond_wait(&sock->s_cv); + nni_mutex_exit(&sock->s_mx); + } +} + + // nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket **sockp, uint16_t proto) @@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; + sock->s_closing = 0; sock->s_reconn = NNI_SECOND; sock->s_reconnmax = NNI_SECOND; @@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node); - NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node); + NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); + + if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) { + goto fail; + } if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) || ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) { @@ -81,6 +143,14 @@ fail: if (sock->s_uwq != NULL) { nni_msgqueue_destroy(sock->s_uwq); } + if (sock->s_reaper != NULL) { + nni_thread *reap = sock->s_reaper; + nni_mutex_enter(&sock->s_mx); + sock->s_reaper = NULL; + nni_cond_broadcast(&sock->s_cv); + nni_mutex_exit(&sock->s_mx); + nni_thread_reap(reap); + } nni_cond_fini(&sock->s_cv); nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); @@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock) nni_pipe *pipe; nni_endpt *ep; nni_time linger; + nni_thread *reaper; nni_mutex_enter(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. @@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock) // safely while we hold the lock. nni_msgqueue_close(sock->s_urq); - // Go through and close all the pipes. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); + // Go through and schedule close on all pipes. + while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { + nni_list_remove(&sock->s_pipes, pipe); + pipe->p_active = 0; + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); } - // At this point, the protocols should have all their operations - // failing, if they have any remaining, and they should be returning - // any pipes back to us very quickly. We'll wait for them to finish, - // as it MUST occur shortly. - while (nni_list_first(&sock->s_pipes) != NULL) { - nni_cond_wait(&sock->s_cv); - } + // Tell the reaper it's done once it finishes. Also kick it off. + reaper = sock->s_reaper; + sock->s_reaper = NULL; + nni_cond_broadcast(&sock->s_cv); // We already told the endpoints to shutdown. We just // need to reap them now. @@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock) } nni_mutex_exit(&sock->s_mx); + // Wait for the reaper to exit. + nni_thread_reap(reaper); + // At this point nothing else should be referencing us. // The protocol needs to clean up its state. sock->s_ops.proto_destroy(sock->s_data); @@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock) } -// nni_socket_rem_pipe removes the pipe from the socket. This is often -// called by the protocol when a pipe is removed due to close. -void -nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) -{ - nni_endpt *ep; - - nni_mutex_enter(&sock->s_mx); - if (pipe->p_sock != sock) { - nni_mutex_exit(&sock->s_mx); - } - - // Remove the pipe from the protocol. Protocols may - // keep lists of pipes for managing their topologies. - sock->s_ops.proto_rem_pipe(sock->s_data, pipe); - - // Now remove it from our own list. - nni_list_remove(&sock->s_pipes, pipe); - pipe->p_sock = NULL; - - // If we were a connected (dialer) pipe, then let the endpoint - // know so it can try to reestablish the connection. - if ((ep = pipe->p_ep) != NULL) { - ep->ep_pipe = NULL; - pipe->p_ep = NULL; - nni_mutex_enter(&ep->ep_mx); - nni_cond_signal(&ep->ep_cv); - nni_mutex_exit(&ep->ep_mx); - } - - // XXX: also publish event... - nni_pipe_destroy(pipe); - - // If we're closing, wake the socket if we finished draining. - if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_cond_broadcast(&sock->s_cv); - } - nni_mutex_exit(&sock->s_mx); -} - - int -nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) +nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) { int rv; @@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) nni_mutex_exit(&sock->s_mx); return (NNG_ECLOSED); } - if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { + rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata); + if (rv != 0) { + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); + nni_cond_broadcast(&sock->s_cv); nni_mutex_exit(&sock->s_mx); return (rv); } nni_list_append(&sock->s_pipes, pipe); + pipe->p_active = 1; - pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); return (0); @@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) if (rv != 0) { nni_endpt_close(ep); nni_endpt_destroy(ep); - } else if (epp != NULL) { - *epp = ep; - } - return (rv); -} - - -static void -nni_socket_accepter(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - if (ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - break; - } - nni_mutex_exit(&ep->ep_mx); - - pipe = NULL; - - if (((rv = nni_endpt_accept(ep, &pipe)) != 0) || - ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) { - if (rv == NNG_ECLOSED) { - break; - } - if (pipe != NULL) { - nni_pipe_destroy(pipe); - } - // XXX: Publish accept error event... - - // If we can't allocate memory, don't spin, so that - // things get a chance to release memory later. - // Other errors, like ECONNRESET, should not recur. - // (If we find otherwise we can inject a short sleep - // here of about 1 ms without too much penalty.) - if (rv == NNG_ENOMEM) { - nni_usleep(100000); - } + } else { + if (epp != NULL) { + *epp = ep; } + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); } + return (rv); } int -nni_socket_accept(nni_socket *sock, nni_endpt *ep) +nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp, + int flags) { - int rv = 0; + nni_endpt *ep; + int rv; - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing) { - rv = NNG_ECLOSED; - goto out; + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { + return (rv); } - if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { - rv = NNG_ENOMEM; - goto out; + rv = nni_endpt_listen(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else { + if (epp != NULL) { + *epp = ep; + } + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); } - nni_list_append(&sock->s_eps, ep); -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - return (rv); } |
