aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c124
1 files changed, 115 insertions, 9 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index d58e64ba..40fb42bc 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -128,12 +128,113 @@ nni_sock_held_close(nni_sock *sock)
}
+void
+nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+{
+ nni_mtx_lock(&sock->s_mx);
+ nni_list_append(&sock->s_pipes, pipe);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+int
+nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
+{
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_EPROTO);
+ }
+
+ if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
+ pipe->p_active = 1;
+
+ nni_list_remove(&sock->s_idles, pipe);
+ nni_list_append(&sock->s_pipes, pipe);
+
+ nni_mtx_unlock(&sock->s_mx);
+
+ return (0);
+}
+
+
+void
+nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
+{
+ nni_ep *ep;
+
+ nni_mtx_lock(&sock->s_mx);
+
+ // NB: nni_list_remove doesn't really care *which* list the pipe
+ // is on, and so if the pipe is already on the idle list these
+ // two statements are effectively a no-op.
+ nni_list_remove(&sock->s_pipes, pipe);
+ nni_list_append(&sock->s_idles, pipe);
+
+ if (pipe->p_active) {
+ pipe->p_active = 0;
+ sock->s_pipe_ops.pipe_rem(pipe->p_proto_data);
+ }
+
+ // Notify the endpoint that the pipe has closed.
+ if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
+ ep->ep_pipe = NULL;
+ nni_cv_wake(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+void
+nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
+{
+ nni_ep *ep;
+
+ nni_mtx_lock(&sock->s_mx);
+ nni_list_remove(&sock->s_idles, pipe);
+
+ // Notify the endpoint that the pipe has closed - if not already done.
+ if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
+ ep->ep_pipe = NULL;
+ nni_cv_wake(&ep->ep_cv);
+ }
+ nni_cv_wake(&sock->s_cv);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+void
+nni_sock_lock(nni_sock *sock)
+{
+ nni_mtx_lock(&sock->s_mx);
+}
+
+
+void
+nni_sock_unlock(nni_sock *sock)
+{
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
static void
nni_reaper(void *arg)
{
+#if 0
nni_sock *sock = arg;
for (;;) {
@@ -183,6 +284,7 @@ nni_reaper(void *arg)
nni_cv_wait(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
}
+#endif
}
@@ -301,7 +403,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sock->s_reapexit = 0;
sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
- NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
@@ -512,15 +614,14 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, close the underlying transport, and move it to
- // deathrow (the reaplist).
+ // For each pipe, close the underlying transport. Also move it
+ // to the idle list so we won't keep looping.
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- if (pipe->p_tran_data != NULL) {
- pipe->p_tran_ops.pipe_close(pipe->p_tran_data);
- }
- pipe->p_reap = 1;
- nni_list_remove(&sock->s_pipes, pipe);
- nni_list_append(&sock->s_reaps, pipe);
+ nni_pipe_incref(pipe);
+ nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
+ nni_pipe_decref(pipe);
+ nni_mtx_lock(&sock->s_mx);
}
sock->s_sock_ops.sock_close(sock->s_data);
@@ -528,6 +629,11 @@ nni_sock_shutdown(nni_sock *sock)
sock->s_reapexit = 1;
nni_cv_wake(&sock->s_notify_cv);
nni_cv_wake(&sock->s_cv);
+
+ while ((nni_list_first(&sock->s_idles) != NULL) ||
+ (nni_list_first(&sock->s_pipes) != NULL)) {
+ nni_cv_wait(&sock->s_cv);
+ }
nni_mtx_unlock(&sock->s_mx);
// Wait for the threads to exit.