diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 124 |
1 files changed, 115 insertions, 9 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index d58e64ba..40fb42bc 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -128,12 +128,113 @@ nni_sock_held_close(nni_sock *sock) } +void +nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +{ + nni_mtx_lock(&sock->s_mx); + nni_list_append(&sock->s_pipes, pipe); + nni_mtx_unlock(&sock->s_mx); +} + + +int +nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) +{ + int rv; + + nni_mtx_lock(&sock->s_mx); + + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_EPROTO); + } + + if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + + pipe->p_active = 1; + + nni_list_remove(&sock->s_idles, pipe); + nni_list_append(&sock->s_pipes, pipe); + + nni_mtx_unlock(&sock->s_mx); + + return (0); +} + + +void +nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe) +{ + nni_ep *ep; + + nni_mtx_lock(&sock->s_mx); + + // NB: nni_list_remove doesn't really care *which* list the pipe + // is on, and so if the pipe is already on the idle list these + // two statements are effectively a no-op. + nni_list_remove(&sock->s_pipes, pipe); + nni_list_append(&sock->s_idles, pipe); + + if (pipe->p_active) { + pipe->p_active = 0; + sock->s_pipe_ops.pipe_rem(pipe->p_proto_data); + } + + // Notify the endpoint that the pipe has closed. + if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { + ep->ep_pipe = NULL; + nni_cv_wake(&ep->ep_cv); + } + nni_mtx_unlock(&sock->s_mx); +} + + +void +nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) +{ + nni_ep *ep; + + nni_mtx_lock(&sock->s_mx); + nni_list_remove(&sock->s_idles, pipe); + + // Notify the endpoint that the pipe has closed - if not already done. + if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { + ep->ep_pipe = NULL; + nni_cv_wake(&ep->ep_cv); + } + nni_cv_wake(&sock->s_cv); + nni_mtx_unlock(&sock->s_mx); +} + + +void +nni_sock_lock(nni_sock *sock) +{ + nni_mtx_lock(&sock->s_mx); +} + + +void +nni_sock_unlock(nni_sock *sock) +{ + nni_mtx_unlock(&sock->s_mx); +} + + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. static void nni_reaper(void *arg) { +#if 0 nni_sock *sock = arg; for (;;) { @@ -183,6 +284,7 @@ nni_reaper(void *arg) nni_cv_wait(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); } +#endif } @@ -301,7 +403,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) sock->s_reapexit = 0; sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); - NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node); NNI_LIST_INIT(&sock->s_events, nni_event, e_node); @@ -512,15 +614,14 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, close the underlying transport, and move it to - // deathrow (the reaplist). + // For each pipe, close the underlying transport. Also move it + // to the idle list so we won't keep looping. while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - if (pipe->p_tran_data != NULL) { - pipe->p_tran_ops.pipe_close(pipe->p_tran_data); - } - pipe->p_reap = 1; - nni_list_remove(&sock->s_pipes, pipe); - nni_list_append(&sock->s_reaps, pipe); + nni_pipe_incref(pipe); + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + nni_pipe_decref(pipe); + nni_mtx_lock(&sock->s_mx); } sock->s_sock_ops.sock_close(sock->s_data); @@ -528,6 +629,11 @@ nni_sock_shutdown(nni_sock *sock) sock->s_reapexit = 1; nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); + + while ((nni_list_first(&sock->s_idles) != NULL) || + (nni_list_first(&sock->s_pipes) != NULL)) { + nni_cv_wait(&sock->s_cv); + } nni_mtx_unlock(&sock->s_mx); // Wait for the threads to exit. |
