diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 239 |
1 files changed, 117 insertions, 122 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 9263b142..ce5dbf3c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s) } +// Because we have to call back into the socket, and possibly also the proto, +// and wait for threads to terminate, we do this in a special thread. The +// assumption is that closing is always a "fast" operation. +static void +nni_reaper(void *arg) +{ + nni_socket *sock = arg; + + for (;;) { + nni_pipe *pipe; + nni_endpt *ep; + + nni_mutex_enter(&sock->s_mx); + if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) { + nni_list_remove(&sock->s_reaps, pipe); + nni_mutex_exit(&sock->s_mx); + + // This should already have been done. + pipe->p_ops.p_close(pipe->p_trandata); + + // Remove the pipe from the protocol. Protocols may + // keep lists of pipes for managing their topologies. + // Note that if a protocol has rejected the pipe, it + // won't have any data. + if (pipe->p_protdata != NULL) { + sock->s_ops.proto_rem_pipe(sock->s_data, + pipe->p_protdata); + } + + // If pipe was a connected (dialer) pipe, + // then let the endpoint know so it can try to + // reestablish the connection. + if ((ep = pipe->p_ep) != NULL) { + ep->ep_pipe = NULL; + pipe->p_ep = NULL; + nni_mutex_enter(&ep->ep_mx); + nni_cond_signal(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + } + + // XXX: also publish event... + nni_pipe_destroy(pipe); + continue; + } + + if (sock->s_reaper == NULL) { + nni_mutex_exit(&sock->s_mx); + break; + } + + nni_cond_wait(&sock->s_cv); + nni_mutex_exit(&sock->s_mx); + } +} + + // nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket **sockp, uint16_t proto) @@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; + sock->s_closing = 0; sock->s_reconn = NNI_SECOND; sock->s_reconnmax = NNI_SECOND; @@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node); - NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node); + NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node); + + if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) { + goto fail; + } if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) || ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) { @@ -81,6 +143,14 @@ fail: if (sock->s_uwq != NULL) { nni_msgqueue_destroy(sock->s_uwq); } + if (sock->s_reaper != NULL) { + nni_thread *reap = sock->s_reaper; + nni_mutex_enter(&sock->s_mx); + sock->s_reaper = NULL; + nni_cond_broadcast(&sock->s_cv); + nni_mutex_exit(&sock->s_mx); + nni_thread_reap(reap); + } nni_cond_fini(&sock->s_cv); nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); @@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock) nni_pipe *pipe; nni_endpt *ep; nni_time linger; + nni_thread *reaper; nni_mutex_enter(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. @@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock) // safely while we hold the lock. nni_msgqueue_close(sock->s_urq); - // Go through and close all the pipes. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); + // Go through and schedule close on all pipes. + while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { + nni_list_remove(&sock->s_pipes, pipe); + pipe->p_active = 0; + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); } - // At this point, the protocols should have all their operations - // failing, if they have any remaining, and they should be returning - // any pipes back to us very quickly. We'll wait for them to finish, - // as it MUST occur shortly. - while (nni_list_first(&sock->s_pipes) != NULL) { - nni_cond_wait(&sock->s_cv); - } + // Tell the reaper it's done once it finishes. Also kick it off. + reaper = sock->s_reaper; + sock->s_reaper = NULL; + nni_cond_broadcast(&sock->s_cv); // We already told the endpoints to shutdown. We just // need to reap them now. @@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock) } nni_mutex_exit(&sock->s_mx); + // Wait for the reaper to exit. + nni_thread_reap(reaper); + // At this point nothing else should be referencing us. // The protocol needs to clean up its state. sock->s_ops.proto_destroy(sock->s_data); @@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock) } -// nni_socket_rem_pipe removes the pipe from the socket. This is often -// called by the protocol when a pipe is removed due to close. -void -nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) -{ - nni_endpt *ep; - - nni_mutex_enter(&sock->s_mx); - if (pipe->p_sock != sock) { - nni_mutex_exit(&sock->s_mx); - } - - // Remove the pipe from the protocol. Protocols may - // keep lists of pipes for managing their topologies. - sock->s_ops.proto_rem_pipe(sock->s_data, pipe); - - // Now remove it from our own list. - nni_list_remove(&sock->s_pipes, pipe); - pipe->p_sock = NULL; - - // If we were a connected (dialer) pipe, then let the endpoint - // know so it can try to reestablish the connection. - if ((ep = pipe->p_ep) != NULL) { - ep->ep_pipe = NULL; - pipe->p_ep = NULL; - nni_mutex_enter(&ep->ep_mx); - nni_cond_signal(&ep->ep_cv); - nni_mutex_exit(&ep->ep_mx); - } - - // XXX: also publish event... - nni_pipe_destroy(pipe); - - // If we're closing, wake the socket if we finished draining. - if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_cond_broadcast(&sock->s_cv); - } - nni_mutex_exit(&sock->s_mx); -} - - int -nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) +nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) { int rv; @@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) nni_mutex_exit(&sock->s_mx); return (NNG_ECLOSED); } - if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { + rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata); + if (rv != 0) { + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); + nni_cond_broadcast(&sock->s_cv); nni_mutex_exit(&sock->s_mx); return (rv); } nni_list_append(&sock->s_pipes, pipe); + pipe->p_active = 1; - pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); return (0); @@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) if (rv != 0) { nni_endpt_close(ep); nni_endpt_destroy(ep); - } else if (epp != NULL) { - *epp = ep; - } - return (rv); -} - - -static void -nni_socket_accepter(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - if (ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - break; - } - nni_mutex_exit(&ep->ep_mx); - - pipe = NULL; - - if (((rv = nni_endpt_accept(ep, &pipe)) != 0) || - ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) { - if (rv == NNG_ECLOSED) { - break; - } - if (pipe != NULL) { - nni_pipe_destroy(pipe); - } - // XXX: Publish accept error event... - - // If we can't allocate memory, don't spin, so that - // things get a chance to release memory later. - // Other errors, like ECONNRESET, should not recur. - // (If we find otherwise we can inject a short sleep - // here of about 1 ms without too much penalty.) - if (rv == NNG_ENOMEM) { - nni_usleep(100000); - } + } else { + if (epp != NULL) { + *epp = ep; } + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); } + return (rv); } int -nni_socket_accept(nni_socket *sock, nni_endpt *ep) +nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp, + int flags) { - int rv = 0; + nni_endpt *ep; + int rv; - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing) { - rv = NNG_ECLOSED; - goto out; + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { + return (rv); } - if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { - rv = NNG_ENOMEM; - goto out; + rv = nni_endpt_listen(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else { + if (epp != NULL) { + *epp = ep; + } + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); } - nni_list_append(&sock->s_eps, ep); -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - return (rv); } |
