aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-24 14:11:35 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-24 14:11:35 -0700
commit0a51aa7bfc88d55b98fdde0d497b072e6911457d (patch)
tree722ef4713bf27a9aac9dce0a1fe9fa0edfe34a2d /src/core/socket.c
parentd753c00d43e6dc642b2445e4821537a92b8b8d23 (diff)
downloadnng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.tar.gz
nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.tar.bz2
nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.zip
Protocols keep their own reference counts.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c98
1 files changed, 35 insertions, 63 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 257f7fa3..67b3f978 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -101,7 +101,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
sock->s_pipe_ops.pipe_fini(pdata);
return (NNG_ECLOSED);
}
- nni_pipe_set_proto_data(pipe, pdata);
+ nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini);
nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
return (0);
@@ -137,58 +137,18 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
-nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- nni_ep *ep;
- void *pdata = nni_pipe_get_proto_data(pipe);
-
- nni_mtx_lock(&sock->s_mx);
-
- // NB: nni_list_remove doesn't really care *which* list the pipe
- // is on, and so if the pipe is already on the idle list these
- // two statements are effectively a no-op.
- nni_list_remove(&sock->s_pipes, pipe);
- if (nni_list_first(&sock->s_pipes) == NULL) {
- nni_cv_wake(&sock->s_cv);
- }
-
- sock->s_pipe_ops.pipe_stop(pdata);
-
- // Notify the endpoint that the pipe has closed.
- if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
- ep->ep_pipe = NULL;
- nni_cv_wake(&ep->ep_cv);
- }
- nni_mtx_unlock(&sock->s_mx);
-}
-
-
-void
-nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
-{
- nni_ep *ep;
- void *pdata = nni_pipe_get_proto_data(pipe);
+ void *pdata;
nni_mtx_lock(&sock->s_mx);
-
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
+ if (sock->s_closing) {
+ nni_cv_wake(&sock->s_cv);
+ }
}
-
- if (pdata != NULL) {
- sock->s_pipe_ops.pipe_fini(pdata);
- }
-
- // XXX: Move this to a seperate ep-specific API.
- // Notify the endpoint that the pipe has closed - if not already done.
- if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
- ep->ep_pipe = NULL;
- nni_cv_wake(&ep->ep_cv);
- }
- nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
-
- // XXX release the hold on the pipe
}
@@ -556,12 +516,13 @@ nni_sock_shutdown(nni_sock *sock)
linger = nni_clock() + sock->s_linger;
}
+ // Stop the EPs. This prevents new connections from forming but
+ // but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_close(ep);
+ nni_ep_stop(ep);
}
nni_mtx_unlock(&sock->s_mx);
-
// We drain the upper write queue. This is just like closing it,
// except that the protocol gets a chance to get the messages and
// push them down to the transport. This operation can *block*
@@ -588,23 +549,36 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Stop all EPS.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
+ // For each ep, close it; this will also tell it to force any
+ // of its pipes to close.
+ NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_close(ep);
- nni_ep_rele(ep);
- nni_mtx_lock(&sock->s_mx);
}
// For each pipe, close the underlying transport. Also move it
// to the idle list so we won't keep looping.
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_mtx_unlock(&sock->s_mx);
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_close(pipe);
+ }
+
+ // Wait for the eps to be reaped.
+ while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_list_remove(&sock->s_eps, ep);
+
+ // This has to be done without the lock held, as the remove
+ // operation requires shutting down a thread which might be
+ // trying to acquire the socket lock.
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ep_remove(ep);
nni_mtx_lock(&sock->s_mx);
}
+ // Wait for the pipes to be reaped (there should not be any because
+ // we have already reaped the EPs.)
+ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
+ nni_cv_wait(&sock->s_cv);
+ }
+
sock->s_sock_ops.sock_close(sock->s_data);
nni_cv_wake(&sock->s_cv);
@@ -620,14 +594,14 @@ nni_sock_shutdown(nni_sock *sock)
}
-// nni_sock_add_ep adds a newly created endpoint to the socket. The
+// nni_sock_ep_add adds a newly created endpoint to the socket. The
// caller must hold references on the sock and the ep, and not be holding
// the socket lock. The ep acquires a reference against the sock,
// which will be dropped later by nni_sock_rem_ep. The endpoint must not
// already be associated with a socket. (Note, the ep holds the reference
// on the socket, not the other way around.)
int
-nni_sock_add_ep(nni_sock *sock, nni_ep *ep)
+nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
{
int rv;
@@ -643,7 +617,7 @@ nni_sock_add_ep(nni_sock *sock, nni_ep *ep)
void
-nni_sock_rem_ep(nni_sock *sock, nni_ep *ep)
+nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
{
// If we're not on the list, then nothing to do. Be idempotent.
// Note that if the ep is not on a list, then we assume that we have
@@ -832,8 +806,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
}
if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_close(ep);
- nni_ep_rele(ep);
+ nni_ep_remove(ep);
} else if (epp != NULL) {
*epp = ep;
}
@@ -853,8 +826,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
}
if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_close(ep);
- nni_ep_rele(ep);
+ nni_ep_remove(ep);
} else if (epp != NULL) {
*epp = ep;
}