aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c51
1 files changed, 16 insertions, 35 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 85b97363..4535d2f4 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -83,32 +83,6 @@ nni_sock_rele(nni_sock *sock)
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
-{
- int rv;
- void *pdata;
-
- rv = sock->s_pipe_ops.pipe_init(&pdata, pipe, sock->s_data);
- if (rv != 0) {
- return (rv);
- }
-
- // XXX: place a hold on the socket.
-
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- sock->s_pipe_ops.pipe_fini(pdata);
- return (NNG_ECLOSED);
- }
- nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini);
- nni_list_append(&sock->s_pipes, pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (0);
-}
-
-
-int
nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
{
int rv;
@@ -130,6 +104,9 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
return (rv);
}
+ // We have claimed ownership of the pipe, so add it to the list.
+ // Up until this point, the caller could destroy the pipe.
+ nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
return (0);
@@ -137,14 +114,19 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
-nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
{
void *pdata;
- if (sock == NULL) {
+ pdata = nni_pipe_get_proto_data(pipe);
+
+ nni_mtx_lock(&sock->s_mx);
+
+ if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
+ nni_mtx_unlock(&sock->s_mx);
return;
}
- nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_ops.pipe_stop(pdata);
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
if (sock->s_closing) {
@@ -552,18 +534,17 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
+ // For each pipe, close the underlying transport.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
+ }
+
// For each ep, close it; this will also tell it to force any
// of its pipes to close.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_close(ep);
}
- // For each pipe, close the underlying transport. Also move it
- // to the idle list so we won't keep looping.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// Wait for the eps to be reaped.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
nni_list_remove(&sock->s_eps, ep);