aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-24 14:11:35 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-24 14:11:35 -0700
commit0a51aa7bfc88d55b98fdde0d497b072e6911457d (patch)
tree722ef4713bf27a9aac9dce0a1fe9fa0edfe34a2d /src/core
parentd753c00d43e6dc642b2445e4821537a92b8b8d23 (diff)
downloadnng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.tar.gz
nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.tar.bz2
nng-0a51aa7bfc88d55b98fdde0d497b072e6911457d.zip
Protocols keep their own reference counts.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c129
-rw-r--r--src/core/endpt.h6
-rw-r--r--src/core/pipe.c45
-rw-r--r--src/core/pipe.h19
-rw-r--r--src/core/socket.c98
-rw-r--r--src/core/socket.h19
6 files changed, 185 insertions, 131 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index efd3eefb..00690cd4 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -19,13 +19,54 @@ static nni_objhash *nni_eps = NULL;
static void *nni_ep_ctor(uint32_t);
static void nni_ep_dtor(void *);
+// Because we can't reap threads from themselves, we need to have a separate
+// task to reap endpoints. We use one global task to do this, and we just
+// add to the reap list as needed.
+static nni_taskq_ent nni_ep_reap_tqe;
+static nni_mtx nni_ep_reap_mx;
+static nni_list nni_ep_reap_list;
+
+static void
+nni_ep_reaper(void *arg)
+{
+ nni_ep *ep;
+
+ NNI_ARG_UNUSED(arg);
+
+ nni_mtx_lock(&nni_ep_reap_mx);
+ while ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) {
+ nni_list_remove(&nni_ep_reap_list, ep);
+
+ nni_mtx_unlock(&nni_ep_reap_mx);
+ nni_thr_fini(&ep->ep_thr);
+ nni_objhash_unref(nni_eps, ep->ep_id);
+ nni_mtx_lock(&nni_ep_reap_mx);
+
+ continue;
+ }
+
+ nni_mtx_unlock(&nni_ep_reap_mx);
+}
+
+
int
nni_ep_sys_init(void)
{
int rv;
rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor);
+ if (rv != 0) {
+ return (rv);
+ }
+ rv = nni_mtx_init(&nni_ep_reap_mx);
+ if (rv != 0) {
+ nni_objhash_fini(nni_eps);
+ nni_eps = NULL;
+ return (rv);
+ }
+ nni_ep_list_init(&nni_ep_reap_list);
+ nni_taskq_ent_init(&nni_ep_reap_tqe, nni_ep_reaper, NULL);
return (rv);
}
@@ -33,6 +74,8 @@ nni_ep_sys_init(void)
void
nni_ep_sys_fini(void)
{
+ nni_taskq_cancel(NULL, &nni_ep_reap_tqe);
+ nni_mtx_fini(&nni_ep_reap_mx);
nni_objhash_fini(nni_eps);
nni_eps = NULL;
}
@@ -129,13 +172,6 @@ nni_ep_dtor(void *ptr)
{
nni_ep *ep = ptr;
- // If a thread is running, make sure it is stopped.
- nni_thr_fini(&ep->ep_thr);
-
- if (ep->ep_sock != NULL) {
- // This is idempotent; harmless if not already on the list.
- nni_sock_rem_ep(ep->ep_sock, ep);
- }
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
@@ -181,7 +217,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
return (rv);
}
- if ((rv = nni_sock_add_ep(sock, ep)) != 0) {
+ if ((rv = nni_sock_ep_add(sock, ep)) != 0) {
nni_objhash_unref(nni_eps, id);
return (rv);
}
@@ -192,22 +228,50 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
void
+nni_ep_stop(nni_ep *ep)
+{
+ nni_mtx_lock(&ep->ep_mtx);
+ if (ep->ep_closed == 0) {
+ ep->ep_closed = 1;
+ ep->ep_ops.ep_close(ep->ep_data);
+ }
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+}
+
+
+void
nni_ep_close(nni_ep *ep)
{
nni_pipe *pipe;
+
+ nni_ep_stop(ep);
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+}
+
+
+void
+nni_ep_remove(nni_ep *ep)
+{
+ nni_pipe *pipe;
nni_sock *sock = ep->ep_sock;
+ nni_ep_close(ep);
+
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed == 0) {
- ep->ep_closed = 1;
- ep->ep_ops.ep_close(ep->ep_data);
- if ((pipe = ep->ep_pipe) != NULL) {
- pipe->p_ep = NULL;
- ep->ep_pipe = NULL;
- }
- nni_cv_wake(&ep->ep_cv);
+ while (nni_list_first(&ep->ep_pipes) != NULL) {
+ nni_cv_wait(&ep->ep_cv);
}
nni_mtx_unlock(&ep->ep_mtx);
+
+ nni_sock_ep_remove(sock, ep);
+
+ nni_thr_fini(&ep->ep_thr);
+ nni_objhash_unref(nni_eps, ep->ep_id);
}
@@ -222,29 +286,27 @@ nni_ep_connect(nni_ep *ep)
}
rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_destroy(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
if ((rv = nni_pipe_start(pipe)) != 0) {
- nni_pipe_close(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
ep->ep_pipe = pipe;
- pipe->p_ep = ep;
return (0);
}
int
-nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe)
+nni_ep_pipe_add(nni_ep *ep, nni_pipe *pipe)
{
- nni_ep_hold(ep);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
return (NNG_ECLOSED);
}
+
nni_list_append(&ep->ep_pipes, pipe);
nni_mtx_unlock(&ep->ep_mtx);
@@ -253,16 +315,19 @@ nni_ep_add_pipe(nni_ep *ep, nni_pipe *pipe)
void
-nni_ep_rem_pipe(nni_ep *ep, nni_pipe *pipe)
+nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
{
- nni_mtx_lock(&ep->ep_mtx);
- if (!nni_list_active(&ep->ep_pipes, pipe)) {
+ if ((ep != NULL) && (nni_list_active(&ep->ep_pipes, pipe))) {
+ nni_mtx_lock(&ep->ep_mtx);
+ nni_list_remove(&ep->ep_pipes, pipe);
+
+ if (ep->ep_pipe == pipe) {
+ ep->ep_pipe = NULL;
+ }
+ nni_cv_wake(&ep->ep_cv);
+
nni_mtx_unlock(&ep->ep_mtx);
- return;
}
- nni_list_remove(&ep->ep_pipes, pipe);
- nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_rele(ep);
}
@@ -295,7 +360,7 @@ nni_dialer(void *arg)
}
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
- break;
+ return;
}
nni_mtx_unlock(&ep->ep_mtx);
@@ -382,11 +447,11 @@ nni_ep_accept(nni_ep *ep)
}
rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_destroy(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
if ((rv = nni_pipe_start(pipe)) != 0) {
- nni_pipe_close(pipe);
+ nni_pipe_remove(pipe);
return (rv);
}
return (0);
diff --git a/src/core/endpt.h b/src/core/endpt.h
index cfb332bb..3d353cdf 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -47,11 +47,13 @@ extern void nni_ep_hold(nni_ep *);
extern void nni_ep_rele(nni_ep *);
extern uint32_t nni_ep_id(nni_ep *);
extern int nni_ep_create(nni_ep **, nni_sock *, const char *);
+extern void nni_ep_stop(nni_ep *);
extern void nni_ep_close(nni_ep *);
+extern void nni_ep_remove(nni_ep *);
extern int nni_ep_dial(nni_ep *, int);
extern int nni_ep_listen(nni_ep *, int);
extern void nni_ep_list_init(nni_list *);
-extern int nni_ep_add_pipe(nni_ep *, nni_pipe *);
-extern void nni_ep_rem_pipe(nni_ep *, nni_pipe *);
+extern int nni_ep_pipe_add(nni_ep *, nni_pipe *);
+extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
#endif // CORE_ENDPT_H
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 80b39171..5d774e6d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -31,6 +31,7 @@ nni_pipe_ctor(uint32_t id)
p->p_tran_data = NULL;
p->p_proto_data = NULL;
+ p->p_proto_dtor = NULL;
p->p_id = id;
NNI_LIST_NODE_INIT(&p->p_sock_node);
@@ -45,8 +46,9 @@ nni_pipe_dtor(void *ptr)
{
nni_pipe *p = ptr;
- nni_sock_pipe_rem(p->p_sock, p);
-
+ if (p->p_proto_dtor != NULL) {
+ p->p_proto_dtor(p->p_proto_data);
+ }
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -140,11 +142,23 @@ nni_pipe_close(nni_pipe *p)
}
nni_mtx_unlock(&p->p_mtx);
+}
- // Let the socket (and endpoint) know we have closed.
- nni_sock_pipe_closed(sock, p);
- nni_objhash_unref(nni_pipes, p->p_id);
+// nni_pipe_remove is called by protocol implementations to indicate that
+// they are finished using the pipe (it should be closed already), and the
+// owning socket and endpoint should de-register it.
+void
+nni_pipe_remove(nni_pipe *p)
+{
+ // Make sure the pipe is closed, in case it wasn't already done.
+ nni_pipe_close(p);
+
+ nni_ep_pipe_remove(p->p_ep, p);
+ nni_sock_pipe_remove(p->p_sock, p);
+
+ // XXX: would be simpler to just do a destroy here
+ nni_pipe_rele(p);
}
@@ -167,6 +181,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
return (rv);
}
p->p_sock = sock;
+ p->p_ep = ep;
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
@@ -178,8 +193,12 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
return (rv);
}
+ if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
+ nni_pipe_remove(p);
+ }
if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_objhash_unref(nni_pipes, p->p_id);
+ nni_pipe_remove(p);
+ //nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
@@ -188,15 +207,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
}
-void
-nni_pipe_destroy(nni_pipe *p)
-{
- NNI_ASSERT(p->p_refcnt == 0);
-
- nni_objhash_unref(nni_pipes, p->p_id);
-}
-
-
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
@@ -221,7 +231,7 @@ nni_pipe_start(nni_pipe *p)
NNI_ASSERT(p == scratch);
if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
- nni_pipe_close(p);
+ nni_pipe_remove(p);
return (rv);
}
@@ -232,9 +242,10 @@ nni_pipe_start(nni_pipe *p)
void
-nni_pipe_set_proto_data(nni_pipe *p, void *data)
+nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor)
{
p->p_proto_data = data;
+ p->p_proto_dtor = dtor;
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 379b1d8a..1f911480 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -23,13 +23,13 @@ struct nni_pipe {
nni_tran_pipe p_tran_ops;
void * p_tran_data;
void * p_proto_data;
+ nni_cb p_proto_dtor;
nni_list_node p_sock_node;
nni_list_node p_ep_node;
nni_sock * p_sock;
nni_ep * p_ep;
int p_reap;
nni_mtx p_mtx;
- int p_refcnt;
};
extern int nni_pipe_sys_init(void);
@@ -41,16 +41,24 @@ extern int nni_pipe_aio_send(nni_pipe *, nni_aio *);
// Pipe operations that protocols use.
extern uint32_t nni_pipe_id(nni_pipe *);
+
+// nni_pipe_close closes the underlying transport for the pipe. Further
+// operations against will return NNG_ECLOSED.
extern void nni_pipe_close(nni_pipe *);
+
extern void nni_pipe_hold(nni_pipe *);
extern void nni_pipe_rele(nni_pipe *);
+// nni_pipe_remove is called by the protocol when it is done with the socket.
+// The pipe should already be closed; it will be unregistered and it's
+// resources released back to the system. The protocol MUST not reference
+// the pipe after this.
+extern void nni_pipe_remove(nni_pipe *);
+
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
extern int nni_pipe_create(nni_pipe **, nni_ep *, nni_sock *, nni_tran *);
-extern void nni_pipe_destroy(nni_pipe *);
-
extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_start(nni_pipe *);
@@ -58,8 +66,9 @@ extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
// nni_pipe_set_proto_data sets the protocol private data. No locking is
// performed, and this routine should only be called once per pipe at
-// initialization.
-extern void nni_pipe_set_proto_data(nni_pipe *, void *);
+// initialization. The third argument is called to destroy the data,
+// at termination.
+extern void nni_pipe_set_proto_data(nni_pipe *, void *, nni_cb);
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
diff --git a/src/core/socket.c b/src/core/socket.c
index 257f7fa3..67b3f978 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -101,7 +101,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
sock->s_pipe_ops.pipe_fini(pdata);
return (NNG_ECLOSED);
}
- nni_pipe_set_proto_data(pipe, pdata);
+ nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini);
nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
return (0);
@@ -137,58 +137,18 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
-nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- nni_ep *ep;
- void *pdata = nni_pipe_get_proto_data(pipe);
-
- nni_mtx_lock(&sock->s_mx);
-
- // NB: nni_list_remove doesn't really care *which* list the pipe
- // is on, and so if the pipe is already on the idle list these
- // two statements are effectively a no-op.
- nni_list_remove(&sock->s_pipes, pipe);
- if (nni_list_first(&sock->s_pipes) == NULL) {
- nni_cv_wake(&sock->s_cv);
- }
-
- sock->s_pipe_ops.pipe_stop(pdata);
-
- // Notify the endpoint that the pipe has closed.
- if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
- ep->ep_pipe = NULL;
- nni_cv_wake(&ep->ep_cv);
- }
- nni_mtx_unlock(&sock->s_mx);
-}
-
-
-void
-nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
-{
- nni_ep *ep;
- void *pdata = nni_pipe_get_proto_data(pipe);
+ void *pdata;
nni_mtx_lock(&sock->s_mx);
-
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
+ if (sock->s_closing) {
+ nni_cv_wake(&sock->s_cv);
+ }
}
-
- if (pdata != NULL) {
- sock->s_pipe_ops.pipe_fini(pdata);
- }
-
- // XXX: Move this to a seperate ep-specific API.
- // Notify the endpoint that the pipe has closed - if not already done.
- if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
- ep->ep_pipe = NULL;
- nni_cv_wake(&ep->ep_cv);
- }
- nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
-
- // XXX release the hold on the pipe
}
@@ -556,12 +516,13 @@ nni_sock_shutdown(nni_sock *sock)
linger = nni_clock() + sock->s_linger;
}
+ // Stop the EPs. This prevents new connections from forming but
+ // but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_close(ep);
+ nni_ep_stop(ep);
}
nni_mtx_unlock(&sock->s_mx);
-
// We drain the upper write queue. This is just like closing it,
// except that the protocol gets a chance to get the messages and
// push them down to the transport. This operation can *block*
@@ -588,23 +549,36 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Stop all EPS.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
+ // For each ep, close it; this will also tell it to force any
+ // of its pipes to close.
+ NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_close(ep);
- nni_ep_rele(ep);
- nni_mtx_lock(&sock->s_mx);
}
// For each pipe, close the underlying transport. Also move it
// to the idle list so we won't keep looping.
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_mtx_unlock(&sock->s_mx);
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_close(pipe);
+ }
+
+ // Wait for the eps to be reaped.
+ while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_list_remove(&sock->s_eps, ep);
+
+ // This has to be done without the lock held, as the remove
+ // operation requires shutting down a thread which might be
+ // trying to acquire the socket lock.
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ep_remove(ep);
nni_mtx_lock(&sock->s_mx);
}
+ // Wait for the pipes to be reaped (there should not be any because
+ // we have already reaped the EPs.)
+ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
+ nni_cv_wait(&sock->s_cv);
+ }
+
sock->s_sock_ops.sock_close(sock->s_data);
nni_cv_wake(&sock->s_cv);
@@ -620,14 +594,14 @@ nni_sock_shutdown(nni_sock *sock)
}
-// nni_sock_add_ep adds a newly created endpoint to the socket. The
+// nni_sock_ep_add adds a newly created endpoint to the socket. The
// caller must hold references on the sock and the ep, and not be holding
// the socket lock. The ep acquires a reference against the sock,
// which will be dropped later by nni_sock_rem_ep. The endpoint must not
// already be associated with a socket. (Note, the ep holds the reference
// on the socket, not the other way around.)
int
-nni_sock_add_ep(nni_sock *sock, nni_ep *ep)
+nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
{
int rv;
@@ -643,7 +617,7 @@ nni_sock_add_ep(nni_sock *sock, nni_ep *ep)
void
-nni_sock_rem_ep(nni_sock *sock, nni_ep *ep)
+nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
{
// If we're not on the list, then nothing to do. Be idempotent.
// Note that if the ep is not on a list, then we assume that we have
@@ -832,8 +806,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
}
if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_close(ep);
- nni_ep_rele(ep);
+ nni_ep_remove(ep);
} else if (epp != NULL) {
*epp = ep;
}
@@ -853,8 +826,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
}
if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_close(ep);
- nni_ep_rele(ep);
+ nni_ep_remove(ep);
} else if (epp != NULL) {
*epp = ep;
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 68f05705..7d5e0f20 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -85,18 +85,19 @@ extern void nni_sock_unlock(nni_sock *);
extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_sock_unnotify(nni_sock *, nni_notify *);
-extern int nni_sock_add_ep(nni_sock *, nni_ep *);
-extern void nni_sock_rem_ep(nni_sock *, nni_ep *);
+extern int nni_sock_ep_add(nni_sock *, nni_ep *);
+extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// nni_sock_pipe_add is called by the pipe to register the pipe with
// with the socket. The pipe is added to the idle list. The protocol
// private pipe data is initialized as well.
extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
-// nni_sock_pipe_rem deregisters the pipe from the socket. The socket
-// will block during close if there are registered pipes outstanding.
-// This also frees any protocol private pipe data.
-extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *);
+// nni_sock_pipe_remove is called by the pipe when the protocol is
+// done with it. This is the sockets indication that it should be
+// removed, and freed. The protocol MUST guarantee that the pipe is
+// no longer in use when this function is called.
+extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
// nni_sock_pipe_ready lets the socket know the pipe is ready for
// business. This also calls the socket/protocol specific add function,
@@ -104,12 +105,6 @@ extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *);
// on success. The reference count should be dropped by nni_sock_pipe_closed.
extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *);
-// nni_sock_pipe_closed lets the socket know that the pipe is closed.
-// This keeps the socket from trying to schedule traffic to it. It
-// also lets the endpoint know about it, to possibly restart a dial
-// operation.
-extern void nni_sock_pipe_closed(nni_sock *, nni_pipe *);
-
// Set error codes for applications. These are only ever
// called from the filter functions in protocols, and thus
// already have the socket lock held.