aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c87
1 files changed, 50 insertions, 37 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 4be5a856..3a4f36e5 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -109,7 +109,8 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
{
- void *pdata;
+ void * pdata;
+ nni_ep *ep;
pdata = nni_pipe_get_proto_data(pipe);
@@ -119,10 +120,24 @@ nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
return;
}
+
+ // Break up the relationship between the EP and the pipe.
+ if ((ep = pipe->p_ep) != NULL) {
+ nni_mtx_lock(&ep->ep_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&ep->ep_pipes, pipe)) {
+ nni_list_remove(&ep->ep_pipes, pipe);
+ }
+ pipe->p_ep = NULL;
+ ep->ep_pipe = NULL; // XXX: remove this soon
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+ }
+
sock->s_pipe_ops.pipe_stop(pdata);
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
- if (sock->s_closing) {
+ if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
nni_cv_wake(&sock->s_cv);
}
}
@@ -459,10 +474,10 @@ nni_sock_shutdown(nni_sock *sock)
linger = nni_clock() + sock->s_linger;
}
- // Stop the EPs. This prevents new connections from forming but
+ // Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_stop(ep);
+ nni_ep_close(ep);
}
nni_mtx_unlock(&sock->s_mx);
@@ -492,15 +507,14 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, close the underlying transport.
+ // For each pipe, arrange for it to teardown hard. (Close, etc.).
NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_stop(pipe);
}
- // For each ep, close it; this will also tell it to force any
- // of its pipes to close.
+ // For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_close(ep);
+ nni_ep_stop(ep);
}
// Wait for the pipes to be reaped (there should not be any because
@@ -511,14 +525,7 @@ nni_sock_shutdown(nni_sock *sock)
// Wait for the eps to be reaped.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
-
- // This has to be done without the lock held, as the remove
- // operation requires shutting down a thread which might be
- // trying to acquire the socket lock.
- nni_mtx_unlock(&sock->s_mx);
- nni_ep_remove(ep);
- nni_mtx_lock(&sock->s_mx);
+ nni_cv_wait(&sock->s_cv);
}
sock->s_sock_ops.sock_close(sock->s_data);
@@ -535,28 +542,10 @@ nni_sock_shutdown(nni_sock *sock)
return (0);
}
-// nni_sock_ep_add adds a newly created endpoint to the socket. The
-// caller must hold references on the sock and the ep, and not be holding
-// the socket lock. The ep acquires a reference against the sock,
-// which will be dropped later by nni_sock_rem_ep. The endpoint must not
-// already be associated with a socket. (Note, the ep holds the reference
-// on the socket, not the other way around.)
-int
-nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
-{
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
- return (0);
-}
-
void
nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
{
+ nni_pipe *pipe;
// If we're not on the list, then nothing to do. Be idempotent.
// Note that if the ep is not on a list, then we assume that we have
// exclusive access. Therefore the check for being active need not
@@ -564,8 +553,24 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
if ((sock == NULL) || (!nni_list_active(&sock->s_eps, ep))) {
return;
}
+
+ // This is done under the endpoints lock, although the remove
+ // is done under that as well, we also make sure that we hold
+ // the socket lock in the remove step.
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_stop(pipe);
+ }
+ while (!nni_list_empty(&ep->ep_pipes)) {
+ nni_cv_wait(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+
nni_mtx_lock(&sock->s_mx);
nni_list_remove(&sock->s_eps, ep);
+ if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
+ nni_cv_wake(&sock->s_cv);
+ }
nni_mtx_unlock(&sock->s_mx);
}
@@ -727,12 +732,16 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_remove(ep);
+ nni_ep_stop(ep);
} else if (epp != NULL) {
*epp = ep;
}
@@ -746,12 +755,16 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_remove(ep);
+ nni_ep_stop(ep);
} else if (epp != NULL) {
*epp = ep;
}