aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c239
1 files changed, 117 insertions, 122 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 9263b142..ce5dbf3c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s)
}
+// Because we have to call back into the socket, and possibly also the proto,
+// and wait for threads to terminate, we do this in a special thread. The
+// assumption is that closing is always a "fast" operation.
+static void
+nni_reaper(void *arg)
+{
+ nni_socket *sock = arg;
+
+ for (;;) {
+ nni_pipe *pipe;
+ nni_endpt *ep;
+
+ nni_mutex_enter(&sock->s_mx);
+ if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
+ nni_list_remove(&sock->s_reaps, pipe);
+ nni_mutex_exit(&sock->s_mx);
+
+ // This should already have been done.
+ pipe->p_ops.p_close(pipe->p_trandata);
+
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
+ // Note that if a protocol has rejected the pipe, it
+ // won't have any data.
+ if (pipe->p_protdata != NULL) {
+ sock->s_ops.proto_rem_pipe(sock->s_data,
+ pipe->p_protdata);
+ }
+
+ // If pipe was a connected (dialer) pipe,
+ // then let the endpoint know so it can try to
+ // reestablish the connection.
+ if ((ep = pipe->p_ep) != NULL) {
+ ep->ep_pipe = NULL;
+ pipe->p_ep = NULL;
+ nni_mutex_enter(&ep->ep_mx);
+ nni_cond_signal(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ }
+
+ // XXX: also publish event...
+ nni_pipe_destroy(pipe);
+ continue;
+ }
+
+ if (sock->s_reaper == NULL) {
+ nni_mutex_exit(&sock->s_mx);
+ break;
+ }
+
+ nni_cond_wait(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ }
+}
+
+
// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket **sockp, uint16_t proto)
@@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
+ sock->s_closing = 0;
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = NNI_SECOND;
@@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (rv);
}
- NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node);
- NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node);
+ NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node);
+
+ if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) {
+ goto fail;
+ }
if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) ||
((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) {
@@ -81,6 +143,14 @@ fail:
if (sock->s_uwq != NULL) {
nni_msgqueue_destroy(sock->s_uwq);
}
+ if (sock->s_reaper != NULL) {
+ nni_thread *reap = sock->s_reaper;
+ nni_mutex_enter(&sock->s_mx);
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ nni_thread_reap(reap);
+ }
nni_cond_fini(&sock->s_cv);
nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
@@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock)
nni_pipe *pipe;
nni_endpt *ep;
nni_time linger;
+ nni_thread *reaper;
nni_mutex_enter(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
@@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock)
// safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- // Go through and close all the pipes.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
+ // Go through and schedule close on all pipes.
+ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
+ nni_list_remove(&sock->s_pipes, pipe);
+ pipe->p_active = 0;
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
}
- // At this point, the protocols should have all their operations
- // failing, if they have any remaining, and they should be returning
- // any pipes back to us very quickly. We'll wait for them to finish,
- // as it MUST occur shortly.
- while (nni_list_first(&sock->s_pipes) != NULL) {
- nni_cond_wait(&sock->s_cv);
- }
+ // Tell the reaper it's done once it finishes. Also kick it off.
+ reaper = sock->s_reaper;
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
// We already told the endpoints to shutdown. We just
// need to reap them now.
@@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
+ // Wait for the reaper to exit.
+ nni_thread_reap(reaper);
+
// At this point nothing else should be referencing us.
// The protocol needs to clean up its state.
sock->s_ops.proto_destroy(sock->s_data);
@@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock)
}
-// nni_socket_rem_pipe removes the pipe from the socket. This is often
-// called by the protocol when a pipe is removed due to close.
-void
-nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
-{
- nni_endpt *ep;
-
- nni_mutex_enter(&sock->s_mx);
- if (pipe->p_sock != sock) {
- nni_mutex_exit(&sock->s_mx);
- }
-
- // Remove the pipe from the protocol. Protocols may
- // keep lists of pipes for managing their topologies.
- sock->s_ops.proto_rem_pipe(sock->s_data, pipe);
-
- // Now remove it from our own list.
- nni_list_remove(&sock->s_pipes, pipe);
- pipe->p_sock = NULL;
-
- // If we were a connected (dialer) pipe, then let the endpoint
- // know so it can try to reestablish the connection.
- if ((ep = pipe->p_ep) != NULL) {
- ep->ep_pipe = NULL;
- pipe->p_ep = NULL;
- nni_mutex_enter(&ep->ep_mx);
- nni_cond_signal(&ep->ep_cv);
- nni_mutex_exit(&ep->ep_mx);
- }
-
- // XXX: also publish event...
- nni_pipe_destroy(pipe);
-
- // If we're closing, wake the socket if we finished draining.
- if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
- nni_cond_broadcast(&sock->s_cv);
- }
- nni_mutex_exit(&sock->s_mx);
-}
-
-
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
+nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
{
int rv;
@@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
nni_mutex_exit(&sock->s_mx);
return (NNG_ECLOSED);
}
- if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
+ rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata);
+ if (rv != 0) {
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
+ nni_cond_broadcast(&sock->s_cv);
nni_mutex_exit(&sock->s_mx);
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_active = 1;
- pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
@@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
if (rv != 0) {
nni_endpt_close(ep);
nni_endpt_destroy(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
- return (rv);
-}
-
-
-static void
-nni_socket_accepter(void *arg)
-{
- nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
- int rv;
-
- for (;;) {
- nni_mutex_enter(&ep->ep_mx);
- if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
- break;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- pipe = NULL;
-
- if (((rv = nni_endpt_accept(ep, &pipe)) != 0) ||
- ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) {
- if (rv == NNG_ECLOSED) {
- break;
- }
- if (pipe != NULL) {
- nni_pipe_destroy(pipe);
- }
- // XXX: Publish accept error event...
-
- // If we can't allocate memory, don't spin, so that
- // things get a chance to release memory later.
- // Other errors, like ECONNRESET, should not recur.
- // (If we find otherwise we can inject a short sleep
- // here of about 1 ms without too much penalty.)
- if (rv == NNG_ENOMEM) {
- nni_usleep(100000);
- }
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
}
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
+ return (rv);
}
int
-nni_socket_accept(nni_socket *sock, nni_endpt *ep)
+nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp,
+ int flags)
{
- int rv = 0;
+ nni_endpt *ep;
+ int rv;
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
- }
- if (ep->ep_sock != sock) { // Should never happen
- rv = NNG_EINVAL;
- goto out;
- }
- if (sock->s_closing) {
- rv = NNG_ECLOSED;
- goto out;
+ if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) {
+ return (rv);
}
- if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
+ rv = nni_endpt_listen(ep, flags);
+ if (rv != 0) {
+ nni_endpt_close(ep);
+ nni_endpt_destroy(ep);
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
+ }
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
- nni_list_append(&sock->s_eps, ep);
-out:
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
-
return (rv);
}