aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-28 23:07:28 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-28 23:07:28 -0700
commitfe3c9705072ac8cafecdf2ea6bca4c26f9464824 (patch)
tree07aaea70cbf8bb6af369d5efede475ed03ffdd63 /src
parent10d748fa6444324878a77cc5749c93b75819ced2 (diff)
downloadnng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.gz
nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.tar.bz2
nng-fe3c9705072ac8cafecdf2ea6bca4c26f9464824.zip
Refactor stop again, closing numerous races (thanks valgrind!)
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c123
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/endpt.c17
-rw-r--r--src/core/init.c2
-rw-r--r--src/core/msgqueue.c47
-rw-r--r--src/core/pipe.c61
-rw-r--r--src/core/pipe.h9
-rw-r--r--src/core/socket.c51
-rw-r--r--src/core/socket.h11
-rw-r--r--src/core/taskq.c4
-rw-r--r--src/platform/posix/posix_aio.h1
-rw-r--r--src/platform/posix/posix_ipc.c3
-rw-r--r--src/platform/posix/posix_net.c3
-rw-r--r--src/platform/posix/posix_poll.c30
-rw-r--r--src/protocol/bus/bus.c51
-rw-r--r--src/protocol/pair/pair.c44
-rw-r--r--src/protocol/pipeline/pull.c30
-rw-r--r--src/protocol/pipeline/push.c38
-rw-r--r--src/protocol/pubsub/pub.c39
-rw-r--r--src/protocol/pubsub/sub.c13
-rw-r--r--src/protocol/reqrep/rep.c42
-rw-r--r--src/protocol/reqrep/req.c44
-rw-r--r--src/protocol/survey/respond.c49
-rw-r--r--src/protocol/survey/survey.c45
-rw-r--r--src/transport/ipc/ipc.c2
-rw-r--r--src/transport/tcp/tcp.c2
26 files changed, 360 insertions, 404 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 11aadcb7..96e7c950 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -10,7 +10,10 @@
#include <string.h>
#include "core/nng_impl.h"
-#define NNI_AIO_WAKE (1<<0)
+#define NNI_AIO_WAKE (1<<0)
+#define NNI_AIO_DONE (1<<1)
+#define NNI_AIO_FINI (1<<2)
+#define NNI_AIO_STOP (1<<3)
int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
@@ -32,6 +35,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -41,7 +45,24 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // Cancel the AIO if it was scheduled.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ // if the task is already dispatched, cancel it (or wait for it to
+ // complete). No further dispatches will happen because of the
+ // above logic to set NNI_AIO_FINI.
nni_taskq_cancel(NULL, &aio->a_tqe);
+
+ // At this point the AIO is done.
nni_cv_fini(&aio->a_cv);
nni_mtx_fini(&aio->a_lk);
}
@@ -82,21 +103,105 @@ nni_aio_wait(nni_aio *aio)
}
+int
+nni_aio_start(nni_aio *aio, void (*cancel)(nni_aio *), void *data)
+{
+ NNI_ASSERT(aio->a_prov_data == NULL);
+ NNI_ASSERT(aio->a_prov_cancel == NULL);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags &= ~(NNI_AIO_DONE|NNI_AIO_WAKE);
+ if (aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP)) {
+ // We should not reschedule anything at this point.
+ nni_mtx_unlock(&aio->a_lk);
+ return (NNG_ECANCELED);
+ }
+ aio->a_prov_cancel = cancel;
+ aio->a_prov_data = data;
+ nni_mtx_unlock(&aio->a_lk);
+ return (0);
+}
+
+
+void
+nni_aio_stop(nni_aio *aio)
+{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_DONE|NNI_AIO_STOP;
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This unregisters the AIO from the provider.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This either aborts the task, or waits for it to complete if already
+ // dispatched.
+ nni_taskq_cancel(NULL, &aio->a_tqe);
+}
+
+
+void
+nni_aio_cancel(nni_aio *aio)
+{
+ void (*cancelfn)(nni_aio *);
+
+ nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & NNI_AIO_DONE) {
+ // The operation already completed - so there's nothing
+ // left for us to do.
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_flags |= NNI_AIO_DONE;
+ aio->a_result = NNG_ECANCELED;
+ cancelfn = aio->a_prov_cancel;
+ nni_mtx_unlock(&aio->a_lk);
+
+ // This unregisters the AIO from the provider.
+ if (cancelfn != NULL) {
+ cancelfn(aio);
+ }
+
+ nni_mtx_lock(&aio->a_lk);
+ // These should have already been cleared by the cancel function.
+ aio->a_prov_data = NULL;
+ aio->a_prov_cancel = NULL;
+
+ if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) {
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+ nni_mtx_unlock(&aio->a_lk);
+}
+
+
// I/O provider related functions.
void
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_cb cb;
- void *arg;
-
nni_mtx_lock(&aio->a_lk);
+ if (aio->a_flags & NNI_AIO_DONE) {
+ // Operation already done (canceled or timed out?)
+ nni_mtx_unlock(&aio->a_lk);
+ return;
+ }
+ aio->a_flags |= NNI_AIO_DONE;
aio->a_result = result;
aio->a_count = count;
- cb = aio->a_cb;
- arg = aio->a_cbarg;
- nni_cv_wake(&aio->a_cv);
- nni_mtx_unlock(&aio->a_lk);
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
+ if (!(aio->a_flags & (NNI_AIO_FINI|NNI_AIO_STOP))) {
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
+ }
+ nni_mtx_unlock(&aio->a_lk);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index a5f78b3f..a290281f 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -87,4 +87,7 @@ extern void nni_aio_wait(nni_aio *);
// and the amount of data transferred (if any).
extern void nni_aio_finish(nni_aio *, int, size_t);
+extern int nni_aio_start(nni_aio *, void (*)(nni_aio *), void *);
+extern void nni_aio_stop(nni_aio *);
+
#endif // CORE_AIO_H
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 74bd0314..e3f78ecd 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -203,14 +203,7 @@ nni_ep_stop(nni_ep *ep)
void
nni_ep_close(nni_ep *ep)
{
- nni_pipe *pipe;
-
nni_ep_stop(ep);
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
- nni_pipe_close(pipe);
- }
- nni_mtx_unlock(&ep->ep_mtx);
}
@@ -220,7 +213,15 @@ nni_ep_remove(nni_ep *ep)
nni_pipe *pipe;
nni_sock *sock = ep->ep_sock;
- nni_ep_close(ep);
+ nni_ep_stop(ep);
+
+ nni_thr_wait(&ep->ep_thr);
+
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&ep->ep_mtx);
while (nni_list_first(&ep->ep_pipes) != NULL) {
diff --git a/src/core/init.c b/src/core/init.c
index ca7c214f..98b187d8 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -66,12 +66,12 @@ nni_fini(void)
{
// XXX: We should make sure that underlying sockets and
// file descriptors are closed. Details TBD.
+ nni_taskq_sys_fini();
nni_tran_sys_fini();
nni_pipe_sys_fini();
nni_ep_sys_fini();
nni_sock_sys_fini();
nni_random_sys_fini();
nni_timer_sys_fini();
- nni_taskq_sys_fini();
nni_plat_fini();
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 99b53274..3d373e2b 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -137,8 +137,6 @@ nni_msgq_finish(nni_aio *aio, int rv)
{
nni_msgq *mq = aio->a_prov_data;
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) {
nni_list_remove(&mq->mq_aio_putq, aio);
}
@@ -339,8 +337,6 @@ nni_msgq_cancel(nni_aio *aio)
if (nni_list_active(&mq->mq_aio_getq, aio)) {
nni_list_remove(&mq->mq_aio_getq, aio);
}
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_mtx_unlock(&mq->mq_lock);
}
@@ -349,8 +345,10 @@ void
nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
nni_list_remove(&mq->mq_aio_notify_put, aio);
}
@@ -363,8 +361,10 @@ void
nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
nni_list_remove(&mq->mq_aio_notify_get, aio);
}
@@ -379,7 +379,10 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
-
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -391,9 +394,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
-
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -413,6 +413,10 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
if (mq->mq_closed) {
nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&mq->mq_lock);
@@ -424,9 +428,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
- aio->a_prov_data = mq;
- aio->a_prov_cancel = nni_msgq_cancel;
-
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -507,8 +508,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
nni_list_remove(&mq->mq_aio_getq, raio);
raio->a_msg = msg;
- raio->a_prov_cancel = NULL;
- raio->a_prov_data = NULL;
nni_aio_finish(raio, 0, len);
nni_mtx_unlock(&mq->mq_lock);
@@ -550,13 +549,9 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -568,13 +563,9 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
@@ -662,8 +653,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_list_remove(&mq->mq_aio_putq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
while (mq->mq_len > 0) {
@@ -701,8 +690,6 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
@@ -710,8 +697,6 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f33f21a6..3dcfe9e0 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -46,7 +46,7 @@ nni_pipe_dtor(void *ptr)
{
nni_pipe *p = ptr;
- if (p->p_proto_dtor != NULL) {
+ if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
}
if (p->p_tran_data != NULL) {
@@ -145,23 +145,41 @@ nni_pipe_close(nni_pipe *p)
}
-// nni_pipe_remove is called by protocol implementations to indicate that
-// they are finished using the pipe (it should be closed already), and the
-// owning socket and endpoint should de-register it.
+// We have to stop asynchronously using a task, because otherwise we can
+// wind up having a callback from an AIO trying to cancel itself. That
+// simply will not work.
void
nni_pipe_remove(nni_pipe *p)
{
- // Make sure the pipe is closed, in case it wasn't already done.
+ // Transport close...
nni_pipe_close(p);
nni_ep_pipe_remove(p->p_ep, p);
- nni_sock_pipe_remove(p->p_sock, p);
+
+ // Tell the protocol to stop.
+ nni_sock_pipe_stop(p->p_sock, p);
// XXX: would be simpler to just do a destroy here
nni_pipe_rele(p);
}
+void
+nni_pipe_stop(nni_pipe *p)
+{
+ // Guard against recursive calls.
+ nni_mtx_lock(&p->p_mtx);
+ if (p->p_stop) {
+ nni_mtx_unlock(&p->p_mtx);
+ return;
+ }
+ p->p_stop = 1;
+ nni_mtx_unlock(&p->p_mtx);
+ nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_remove, p);
+ nni_taskq_dispatch(NULL, &p->p_reap_tqe);
+}
+
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -175,6 +193,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
nni_pipe *p;
int rv;
uint32_t id;
+ void *pdata;
rv = nni_objhash_alloc(nni_pipes, &id, (void **) &p);
if (rv != 0) {
@@ -187,18 +206,24 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *tran->tran_pipe;
+ // Save the protocol destructor.
+ p->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
+
// Initialize the transport pipe data.
if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- nni_pipe_remove(p);
+ // Initialize protocol pipe data.
+ rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data);
+ if (rv != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_pipe_remove(p);
+
+ if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
+ nni_objhash_unref(nni_pipes, p->p_id);
return (rv);
}
@@ -222,16 +247,8 @@ int
nni_pipe_start(nni_pipe *p)
{
int rv;
- nni_pipe *scratch;
-
- rv = nni_objhash_find(nni_pipes, p->p_id, (void **) &scratch);
- if (rv != 0) {
- return (rv);
- }
- NNI_ASSERT(p == scratch);
if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
- nni_pipe_remove(p);
return (rv);
}
@@ -241,14 +258,6 @@ nni_pipe_start(nni_pipe *p)
}
-void
-nni_pipe_set_proto_data(nni_pipe *p, void *data, nni_cb dtor)
-{
- p->p_proto_data = data;
- p->p_proto_dtor = dtor;
-}
-
-
void *
nni_pipe_get_proto_data(nni_pipe *p)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 1f911480..80560e0e 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -29,7 +29,9 @@ struct nni_pipe {
nni_sock * p_sock;
nni_ep * p_ep;
int p_reap;
+ int p_stop;
nni_mtx p_mtx;
+ nni_taskq_ent p_reap_tqe;
};
extern int nni_pipe_sys_init(void);
@@ -54,6 +56,7 @@ extern void nni_pipe_rele(nni_pipe *);
// resources released back to the system. The protocol MUST not reference
// the pipe after this.
extern void nni_pipe_remove(nni_pipe *);
+extern void nni_pipe_stop(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
@@ -64,12 +67,6 @@ extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
-// nni_pipe_set_proto_data sets the protocol private data. No locking is
-// performed, and this routine should only be called once per pipe at
-// initialization. The third argument is called to destroy the data,
-// at termination.
-extern void nni_pipe_set_proto_data(nni_pipe *, void *, nni_cb);
-
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
extern void *nni_pipe_get_proto_data(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 85b97363..4535d2f4 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -83,32 +83,6 @@ nni_sock_rele(nni_sock *sock)
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
-{
- int rv;
- void *pdata;
-
- rv = sock->s_pipe_ops.pipe_init(&pdata, pipe, sock->s_data);
- if (rv != 0) {
- return (rv);
- }
-
- // XXX: place a hold on the socket.
-
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- sock->s_pipe_ops.pipe_fini(pdata);
- return (NNG_ECLOSED);
- }
- nni_pipe_set_proto_data(pipe, pdata, sock->s_pipe_ops.pipe_fini);
- nni_list_append(&sock->s_pipes, pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (0);
-}
-
-
-int
nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
{
int rv;
@@ -130,6 +104,9 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
return (rv);
}
+ // We have claimed ownership of the pipe, so add it to the list.
+ // Up until this point, the caller could destroy the pipe.
+ nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
return (0);
@@ -137,14 +114,19 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
-nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
{
void *pdata;
- if (sock == NULL) {
+ pdata = nni_pipe_get_proto_data(pipe);
+
+ nni_mtx_lock(&sock->s_mx);
+
+ if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
+ nni_mtx_unlock(&sock->s_mx);
return;
}
- nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_ops.pipe_stop(pdata);
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
if (sock->s_closing) {
@@ -552,18 +534,17 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
+ // For each pipe, close the underlying transport.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
+ }
+
// For each ep, close it; this will also tell it to force any
// of its pipes to close.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_close(ep);
}
- // For each pipe, close the underlying transport. Also move it
- // to the idle list so we won't keep looping.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// Wait for the eps to be reaped.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
nni_list_remove(&sock->s_eps, ep);
diff --git a/src/core/socket.h b/src/core/socket.h
index 7d5e0f20..928264d9 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -88,16 +88,7 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *);
extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
-// nni_sock_pipe_add is called by the pipe to register the pipe with
-// with the socket. The pipe is added to the idle list. The protocol
-// private pipe data is initialized as well.
-extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
-
-// nni_sock_pipe_remove is called by the pipe when the protocol is
-// done with it. This is the sockets indication that it should be
-// removed, and freed. The protocol MUST guarantee that the pipe is
-// no longer in use when this function is called.
-extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
+extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *);
// nni_sock_pipe_ready lets the socket know the pipe is ready for
// business. This also calls the socket/protocol specific add function,
diff --git a/src/core/taskq.c b/src/core/taskq.c
index f33a68bf..f4a0beee 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -184,7 +184,9 @@ nni_taskq_cancel(nni_taskq *tq, nni_taskq_ent *ent)
nni_mtx_unlock(&tq->tq_mtx);
return (NNG_ENOENT);
}
- nni_list_remove(&tq->tq_ents, ent);
+ if (nni_list_active(&tq->tq_ents, ent)) {
+ nni_list_remove(&tq->tq_ents, ent);
+ }
nni_mtx_unlock(&tq->tq_mtx);
return (0);
}
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 9ab322a0..de662ff2 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -26,5 +26,6 @@ extern int nni_posix_pipedesc_init(nni_posix_pipedesc **, int);
extern void nni_posix_pipedesc_fini(nni_posix_pipedesc *);
extern int nni_posix_pipedesc_read(nni_posix_pipedesc *, nni_aio *);
extern int nni_posix_pipedesc_write(nni_posix_pipedesc *, nni_aio *);
+extern void nni_posix_pipedesc_close(nni_posix_pipedesc *);
#endif // PLATFORM_POSIX_AIO_H
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index ccf19fed..ba46b41e 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -243,6 +243,9 @@ nni_plat_ipc_shutdown(nni_plat_ipcsock *isp)
// (macOS does not see the shtudown).
(void) dup2(nni_plat_devnull, isp->fd);
}
+ if (isp->pd != NULL) {
+ nni_posix_pipedesc_close(isp->pd);
+ }
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index c8b7766e..5ae9904a 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -299,6 +299,9 @@ nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp)
// (macOS does not see the shtudown).
(void) dup2(nni_plat_devnull, tsp->fd);
}
+ if (tsp->pd != NULL) {
+ nni_posix_pipedesc_close(tsp->pd);
+ }
}
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index a6024db0..7fb07917 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -224,6 +224,22 @@ nni_posix_poll_close(nni_posix_pipedesc *pd)
}
+void
+nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
+{
+ nni_posix_pollq *pq;
+
+ pq = pd->pq;
+ nni_mtx_lock(&pq->mtx);
+ pd->fd = -1;
+ nni_posix_poll_close(pd);
+ if (nni_list_active(&pq->pds, pd)) {
+ nni_list_remove(&pq->pds, pd);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+
static void
nni_posix_poll_thr(void *arg)
{
@@ -355,8 +371,6 @@ nni_posix_pipedesc_cancel(nni_aio *aio)
if (nni_list_active(&pd->readq, aio)) {
nni_list_remove(&pd->readq, aio);
}
- aio->a_prov_cancel = NULL;
- aio->a_prov_data = NULL;
nni_mtx_unlock(&pq->mtx);
}
@@ -403,9 +417,19 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
int rv;
nni_posix_pollq *pq = pd->pq;
+ // XXX: this should be done only once, after tcp negot. is done
+ // or at init if we can get tcp negot. to be async.
(void) fcntl(pd->fd, F_SETFL, O_NONBLOCK);
nni_mtx_lock(&pq->mtx);
+ if (pd->fd < 0) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
+ }
+ if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
+ }
if (!nni_list_active(&pq->pds, pd)) {
if ((rv = nni_posix_poll_grow(pq)) != 0) {
nni_aio_finish(aio, rv, aio->a_count);
@@ -416,8 +440,6 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
nni_list_append(&pq->pds, pd);
}
NNI_ASSERT(!nni_list_active(l, aio));
- aio->a_prov_data = pd;
- aio->a_prov_cancel = nni_posix_pipedesc_cancel;
// Only wake if we aren't already waiting for this type of I/O on
// this descriptor.
wake = nni_list_first(l) == NULL ? 1 : 0;
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index cb624a10..1c32fec6 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -51,7 +51,6 @@ struct nni_bus_pipe {
nni_aio aio_send;
nni_aio aio_putq;
nni_mtx mtx;
- int refcnt;
};
@@ -133,7 +132,6 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (NNG_ENOMEM);
}
NNI_LIST_NODE_INIT(&ppipe->node);
- ppipe->refcnt = 0;
if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) ||
((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) {
goto fail;
@@ -176,12 +174,6 @@ nni_bus_pipe_start(void *arg)
nni_list_append(&psock->pipes, ppipe);
nni_mtx_unlock(&psock->mtx);
- // Mark the ppipe busy twice -- once for each of the oustanding
- // asynchronous "threads" of operation.
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- nni_mtx_unlock(&ppipe->mtx);
-
nni_bus_pipe_recv(ppipe);
nni_bus_pipe_getq(ppipe);
@@ -189,40 +181,24 @@ nni_bus_pipe_start(void *arg)
}
-// nni_bus_pipe_stop is called only internally when one of our handlers notices
-// that the transport layer has closed. This allows us to stop all further
-// actions.
static void
-nni_bus_pipe_stop(nni_bus_pipe *ppipe)
+nni_bus_pipe_stop(void *arg)
{
- int refcnt;
+ nni_bus_pipe *ppipe = arg;
nni_bus_sock *psock = ppipe->psock;
- // As we are called only on error paths, shut down the underlying
- // pipe transport. This should cause any other consumer to also get
- // a suitable error (NNG_ECLOSED), so that we can shut down completely.
- nni_pipe_close(ppipe->npipe);
+ nni_msgq_close(ppipe->sendq);
+
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
nni_mtx_lock(&ppipe->psock->mtx);
if (nni_list_active(&psock->pipes, ppipe)) {
nni_list_remove(&psock->pipes, ppipe);
-
- nni_msgq_close(ppipe->sendq);
- nni_msgq_aio_cancel(nni_sock_recvq(psock->nsock),
- &ppipe->aio_putq);
}
nni_mtx_unlock(&ppipe->psock->mtx);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- refcnt = --ppipe->refcnt;
- nni_mtx_unlock(&ppipe->mtx);
-
- // If we are done with the pipe, let the system know so it can
- // deregister it.
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -233,7 +209,7 @@ nni_bus_pipe_getq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_getq) != 0) {
// closed?
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
@@ -252,7 +228,7 @@ nni_bus_pipe_send_cb(void *arg)
// closed?
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -269,7 +245,7 @@ nni_bus_pipe_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
msg = ppipe->aio_recv.a_msg;
@@ -278,7 +254,7 @@ nni_bus_pipe_recv_cb(void *arg)
if (nni_msg_prepend_header(msg, &id, 4) != 0) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -295,7 +271,7 @@ nni_bus_pipe_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -417,6 +393,7 @@ static nni_proto_pipe_ops nni_bus_pipe_ops = {
.pipe_init = nni_bus_pipe_init,
.pipe_fini = nni_bus_pipe_fini,
.pipe_start = nni_bus_pipe_start,
+ .pipe_stop = nni_bus_pipe_stop,
};
static nni_proto_sock_ops nni_bus_sock_ops = {
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index c2d4fa0d..67119f80 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -46,10 +46,6 @@ struct nni_pair_pipe {
nni_aio aio_recv;
nni_aio aio_getq;
nni_aio aio_putq;
- int busy;
- int closed;
- nni_mtx mtx;
- int refcnt;
};
static int
@@ -97,9 +93,6 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&ppipe->mtx)) != 0) {
- goto fail;
- }
rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe);
if (rv != 0) {
goto fail;
@@ -132,12 +125,10 @@ nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *ppipe = arg;
- NNI_ASSERT(ppipe->busy >= 0);
nni_aio_fini(&ppipe->aio_send);
nni_aio_fini(&ppipe->aio_recv);
nni_aio_fini(&ppipe->aio_putq);
nni_aio_fini(&ppipe->aio_getq);
- nni_mtx_fini(&ppipe->mtx);
NNI_FREE_STRUCT(ppipe);
}
@@ -156,10 +147,6 @@ nni_pair_pipe_start(void *arg)
psock->ppipe = ppipe;
nni_mtx_unlock(&psock->mtx);
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- nni_mtx_unlock(&ppipe->mtx);
-
// Schedule a getq on the upper, and a read from the pipe.
// Each of these also sets up another hold on the pipe itself.
nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
@@ -170,31 +157,21 @@ nni_pair_pipe_start(void *arg)
static void
-nni_pair_pipe_stop(nni_pair_pipe *ppipe)
+nni_pair_pipe_stop(void *arg)
{
+ nni_pair_pipe *ppipe = arg;
nni_pair_sock *psock = ppipe->psock;
- int refcnt;
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(&ppipe->aio_getq);
nni_mtx_lock(&psock->mtx);
if (psock->ppipe == ppipe) {
psock->ppipe = NULL;
}
nni_mtx_unlock(&psock->mtx);
-
- // These operations are idempotent.
- nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- ppipe->refcnt--;
- refcnt = ppipe->refcnt;
- nni_mtx_unlock(&ppipe->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -205,7 +182,7 @@ nni_pair_recv_cb(void *arg)
nni_pair_sock *psock = ppipe->psock;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -223,7 +200,7 @@ nni_pair_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
@@ -238,7 +215,7 @@ nni_pair_getq_cb(void *arg)
nni_msg *msg;
if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -257,7 +234,7 @@ nni_pair_send_cb(void *arg)
if (nni_aio_result(&ppipe->aio_send) != 0) {
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -306,6 +283,7 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = {
.pipe_init = nni_pair_pipe_init,
.pipe_fini = nni_pair_pipe_fini,
.pipe_start = nni_pair_pipe_start,
+ .pipe_stop = nni_pair_pipe_stop,
};
static nni_proto_sock_ops nni_pair_sock_ops = {
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index eb66bc21..728682dd 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -19,7 +19,6 @@ typedef struct nni_pull_sock nni_pull_sock;
static void nni_pull_putq_cb(void *);
static void nni_pull_recv_cb(void *);
-static void nni_pull_recv(nni_pull_pipe *);
static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
// An nni_pull_sock is our per-socket protocol private structure.
@@ -107,18 +106,19 @@ nni_pull_pipe_start(void *arg)
nni_pull_pipe *pp = arg;
// Start the pending pull...
- nni_pull_recv(pp);
+ nni_pipe_aio_recv(pp->pipe, &pp->recv_aio);
return (0);
}
static void
-nni_pull_pipe_stop(nni_pull_pipe *pp)
+nni_pull_pipe_stop(void *arg)
{
- // Cancel any pending sendup.
- nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio);
- nni_pipe_remove(pp->pipe);
+ nni_pull_pipe *pp = arg;
+
+ nni_aio_stop(&pp->putq_aio);
+ nni_aio_stop(&pp->recv_aio);
}
@@ -131,7 +131,7 @@ nni_pull_recv_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pull_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -154,22 +154,11 @@ nni_pull_putq_cb(void *arg)
// we can do. Just close the pipe.
nni_msg_free(aio->a_msg);
aio->a_msg = NULL;
- nni_pull_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
- nni_pull_recv(pp);
-}
-
-
-// nni_pull_recv is called to schedule a pending recv on the incoming pipe.
-static void
-nni_pull_recv(nni_pull_pipe *pp)
-{
- // Schedule the aio with callback.
- if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) {
- nni_pipe_remove(pp->pipe);
- }
+ nni_pipe_aio_recv(pp->pipe, &pp->recv_aio);
}
@@ -225,6 +214,7 @@ static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
.pipe_start = nni_pull_pipe_start,
+ .pipe_stop = nni_pull_pipe_stop,
};
static nni_proto_sock_ops nni_pull_sock_ops = {
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index e69ebdbf..7d3be42e 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -39,8 +39,6 @@ struct nni_push_pipe {
nni_aio aio_recv;
nni_aio aio_send;
nni_aio aio_getq;
- int refcnt;
- nni_mtx mtx;
};
static int
@@ -80,7 +78,6 @@ nni_push_pipe_fini(void *arg)
nni_aio_fini(&pp->aio_recv);
nni_aio_fini(&pp->aio_send);
nni_aio_fini(&pp->aio_getq);
- nni_mtx_fini(&pp->mtx);
NNI_FREE_STRUCT(pp);
}
@@ -99,14 +96,10 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
}
if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) {
goto fail;
- return (rv);
}
if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) {
goto fail;
}
- if ((rv = nni_mtx_init(&pp->mtx)) != 0) {
- goto fail;
- }
NNI_LIST_NODE_INIT(&pp->node);
pp->pipe = pipe;
@@ -130,17 +123,11 @@ nni_push_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_mtx_lock(&pp->mtx);
- pp->refcnt = 2;
- nni_mtx_unlock(&pp->mtx);
-
// Schedule a receiver. This is mostly so that we can detect
// a closed transport pipe.
- nni_pipe_hold(pp->pipe);
nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
// Schedule a sender.
- nni_pipe_hold(pp->pipe);
nni_msgq_aio_get(push->uwq, &pp->aio_getq);
return (0);
@@ -148,22 +135,14 @@ nni_push_pipe_start(void *arg)
static void
-nni_push_pipe_stop(nni_push_pipe *pp)
+nni_push_pipe_stop(void *arg)
{
+ nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
- int refcnt;
- nni_msgq_aio_cancel(push->uwq, &pp->aio_getq);
-
- nni_mtx_lock(&pp->mtx);
- NNI_ASSERT(pp->refcnt > 0);
- pp->refcnt--;
- refcnt = pp->refcnt;
- nni_mtx_unlock(&pp->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(pp->pipe);
- }
+ nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_getq);
}
@@ -175,7 +154,7 @@ nni_push_recv_cb(void *arg)
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_push_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
nni_msg_free(pp->aio_recv.a_msg);
@@ -193,7 +172,7 @@ nni_push_send_cb(void *arg)
if (nni_aio_result(&pp->aio_send) != 0) {
nni_msg_free(pp->aio_send.a_msg);
pp->aio_send.a_msg = NULL;
- nni_push_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -209,7 +188,7 @@ nni_push_getq_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_push_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -260,6 +239,7 @@ static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_init = nni_push_pipe_init,
.pipe_fini = nni_push_pipe_fini,
.pipe_start = nni_push_pipe_start,
+ .pipe_stop = nni_push_pipe_stop,
};
static nni_proto_sock_ops nni_push_sock_ops = {
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 8ad9bb6d..2767f6b6 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -46,8 +46,6 @@ struct nni_pub_pipe {
nni_aio aio_send;
nni_aio aio_recv;
nni_list_node node;
- int refcnt;
- nni_mtx mtx;
};
static int
@@ -109,7 +107,6 @@ nni_pub_pipe_fini(void *arg)
nni_aio_fini(&pp->aio_getq);
nni_aio_fini(&pp->aio_send);
nni_aio_fini(&pp->aio_recv);
- nni_mtx_fini(&pp->mtx);
NNI_FREE_STRUCT(pp);
}
@@ -124,8 +121,7 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
return (NNG_ENOMEM);
}
// XXX: consider making this depth tunable
- if (((rv = nni_msgq_init(&pp->sendq, 16)) != 0) ||
- ((rv = nni_mtx_init(&pp->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
goto fail;
}
@@ -163,43 +159,35 @@ nni_pub_pipe_start(void *arg)
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
}
+ nni_mtx_lock(&pub->mtx);
nni_list_append(&pub->pipes, pp);
-
- nni_mtx_lock(&pp->mtx);
- pp->refcnt = 2;
- nni_mtx_unlock(&pp->mtx);
+ nni_mtx_unlock(&pub->mtx);
// Start the receiver and the queue reader.
nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
-
return (0);
}
static void
-nni_pub_pipe_stop(nni_pub_pipe *pp)
+nni_pub_pipe_stop(void *arg)
{
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
int refcnt;
+ nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_recv);
+ nni_msgq_close(pp->sendq);
+
nni_mtx_lock(&pub->mtx);
if (nni_list_active(&pub->pipes, pp)) {
nni_list_remove(&pub->pipes, pp);
- nni_msgq_close(pp->sendq);
}
nni_mtx_unlock(&pub->mtx);
-
- nni_mtx_lock(&pp->mtx);
- NNI_ASSERT(pp->refcnt > 0);
- pp->refcnt--;
- refcnt = pp->refcnt;
- nni_mtx_unlock(&pp->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(pp->pipe);
- }
}
@@ -252,7 +240,7 @@ nni_pub_pipe_recv_cb(void *arg)
nni_pub_pipe *pp = arg;
if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pub_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -268,7 +256,7 @@ nni_pub_pipe_getq_cb(void *arg)
nni_pub_pipe *pp = arg;
if (nni_aio_result(&pp->aio_getq) != 0) {
- nni_pub_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -287,7 +275,7 @@ nni_pub_pipe_send_cb(void *arg)
if (nni_aio_result(&pp->aio_send) != 0) {
nni_msg_free(pp->aio_send.a_msg);
pp->aio_send.a_msg = NULL;
- nni_pub_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -336,6 +324,7 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = {
.pipe_init = nni_pub_pipe_init,
.pipe_fini = nni_pub_pipe_fini,
.pipe_start = nni_pub_pipe_start,
+ .pipe_stop = nni_pub_pipe_stop,
};
nni_proto_sock_ops nni_pub_sock_ops = {
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index cab745b5..5733b8f2 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -123,10 +123,12 @@ nni_sub_pipe_start(void *arg)
static void
-nni_sub_pipe_stop(nni_sub_pipe *sp)
+nni_sub_pipe_stop(void *arg)
{
- nni_msgq_aio_cancel(sp->sub->urq, &sp->aio_putq);
- nni_pipe_remove(sp->pipe);
+ nni_sub_pipe *sp = arg;
+
+ nni_aio_stop(&sp->aio_putq);
+ nni_aio_stop(&sp->aio_recv);
}
@@ -139,7 +141,7 @@ nni_sub_recv_cb(void *arg)
nni_msg *msg;
if (nni_aio_result(&sp->aio_recv) != 0) {
- nni_sub_pipe_stop(sp);
+ nni_pipe_stop(sp->pipe);
return;
}
@@ -157,7 +159,7 @@ nni_sub_putq_cb(void *arg)
if (nni_aio_result(&sp->aio_putq) != 0) {
nni_msg_free(sp->aio_putq.a_msg);
sp->aio_putq.a_msg = NULL;
- nni_sub_pipe_stop(sp);
+ nni_pipe_stop(sp->pipe);
return;
}
@@ -335,6 +337,7 @@ static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
.pipe_start = nni_sub_pipe_start,
+ .pipe_stop = nni_sub_pipe_stop,
};
static nni_proto_sock_ops nni_sub_sock_ops = {
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 507edf66..38bf082a 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -51,8 +51,6 @@ struct nni_rep_pipe {
nni_aio aio_send;
nni_aio aio_recv;
nni_aio aio_putq;
- int running;
- int refcnt;
nni_mtx mtx;
};
@@ -123,7 +121,7 @@ nni_rep_sock_close(void *arg)
{
nni_rep_sock *rep = arg;
- nni_msgq_aio_cancel(rep->uwq, &rep->aio_getq);
+ nni_aio_stop(&rep->aio_getq);
}
@@ -194,11 +192,6 @@ nni_rep_pipe_start(void *arg)
return (rv);
}
- nni_mtx_lock(&rp->mtx);
- rp->refcnt = 2;
- rp->running = 1;
- nni_mtx_unlock(&rp->mtx);
-
nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
return (0);
@@ -206,23 +199,21 @@ nni_rep_pipe_start(void *arg)
static void
-nni_rep_pipe_stop(nni_rep_pipe *rp)
+nni_rep_pipe_stop(void *arg)
{
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int refcnt;
uint32_t id;
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_send);
+ nni_aio_stop(&rp->aio_recv);
+
nni_mtx_lock(&rp->mtx);
- NNI_ASSERT(rp->refcnt > 0);
- rp->refcnt--;
- refcnt = rp->refcnt;
id = rp->id;
- rp->id = 0;
- if (rp->running) {
- rp->running = 0;
- nni_msgq_close(rp->sendq);
- nni_msgq_aio_cancel(rep->urq, &rp->aio_putq);
- }
+ rp->id = 0; // makes this idempotent
+ nni_msgq_close(rp->sendq);
nni_mtx_unlock(&rp->mtx);
if (id != 0) {
@@ -230,10 +221,6 @@ nni_rep_pipe_stop(nni_rep_pipe *rp)
nni_idhash_remove(&rep->pipes, id);
nni_mtx_unlock(&rep->mtx);
}
-
- if (refcnt == 0) {
- nni_pipe_remove(rp->pipe);
- }
}
@@ -298,7 +285,7 @@ nni_rep_pipe_getq_cb(void *arg)
nni_rep_pipe *rp = arg;
if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -317,7 +304,7 @@ nni_rep_pipe_send_cb(void *arg)
if (nni_aio_result(&rp->aio_send) != 0) {
nni_msg_free(rp->aio_send.a_msg);
rp->aio_send.a_msg = NULL;
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -337,7 +324,7 @@ nni_rep_pipe_recv_cb(void *arg)
int hops;
if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -399,7 +386,7 @@ nni_rep_pipe_putq_cb(void *arg)
if (nni_aio_result(&rp->aio_putq) != 0) {
nni_msg_free(rp->aio_putq.a_msg);
rp->aio_putq.a_msg = NULL;
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -521,6 +508,7 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = {
.pipe_init = nni_rep_pipe_init,
.pipe_fini = nni_rep_pipe_fini,
.pipe_start = nni_rep_pipe_start,
+ .pipe_stop = nni_rep_pipe_stop,
};
static nni_proto_sock_ops nni_rep_sock_ops = {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 1ce1156c..c2542e18 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -57,7 +57,6 @@ struct nni_req_pipe {
nni_aio aio_sendcooked; // cooked mode only
nni_aio aio_recv;
nni_aio aio_putq;
- int refcnt;
nni_mtx mtx;
};
@@ -197,10 +196,6 @@ nni_req_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_mtx_lock(&rp->mtx);
- rp->refcnt = 2;
- nni_mtx_unlock(&rp->mtx);
-
nni_mtx_lock(&req->mtx);
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
@@ -216,19 +211,19 @@ nni_req_pipe_start(void *arg)
static void
-nni_req_pipe_stop(nni_req_pipe *rp)
+nni_req_pipe_stop(void *arg)
{
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- int refcnt;
- nni_mtx_lock(&rp->mtx);
- NNI_ASSERT(rp->refcnt > 0);
- rp->refcnt--;
- refcnt = rp->refcnt;
- nni_mtx_unlock(&rp->mtx);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_sendcooked);
+ nni_aio_stop(&rp->aio_sendraw);
- nni_msgq_aio_cancel(req->uwq, &rp->aio_getq);
- nni_msgq_aio_cancel(req->urq, &rp->aio_putq);
+ // At this point there should not be any further AIOs running.
+ // Further, any completion tasks have completed.
nni_mtx_lock(&req->mtx);
// This removes the node from either busypipes or readypipes.
@@ -245,12 +240,7 @@ nni_req_pipe_stop(nni_req_pipe *rp)
req->wantw = 1;
nni_req_resend(req);
}
-
nni_mtx_unlock(&req->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(rp->pipe);
- }
}
@@ -323,7 +313,7 @@ nni_req_getq_cb(void *arg)
// exception: we wind up here in error state when the uwq is closed.)
if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -344,7 +334,7 @@ nni_req_sendraw_cb(void *arg)
if (nni_aio_result(&rp->aio_sendraw) != 0) {
nni_msg_free(rp->aio_sendraw.a_msg);
rp->aio_sendraw.a_msg = NULL;
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -359,14 +349,13 @@ nni_req_sendcooked_cb(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- NNI_ASSERT(rp->refcnt > 0);
if (nni_aio_result(&rp->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
// We leave ourselves on the busy list for now, which
// means no new asynchronous traffic can occur here.
nni_msg_free(rp->aio_sendcooked.a_msg);
rp->aio_sendcooked.a_msg = NULL;
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -384,7 +373,7 @@ nni_req_sendcooked_cb(void *arg)
// side while we were waiting to be scheduled to run for the
// writer side. In this case we can't complete the operation,
// and we have to abort.
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
}
nni_mtx_unlock(&req->mtx);
}
@@ -397,7 +386,7 @@ nni_req_putq_cb(void *arg)
if (nni_aio_result(&rp->aio_putq) != 0) {
nni_msg_free(rp->aio_putq.a_msg);
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
rp->aio_putq.a_msg = NULL;
@@ -413,7 +402,7 @@ nni_req_recv_cb(void *arg)
nni_msg *msg;
if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -443,7 +432,7 @@ nni_req_recv_cb(void *arg)
malformed:
nni_msg_free(msg);
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
}
@@ -615,6 +604,7 @@ static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_init = nni_req_pipe_init,
.pipe_fini = nni_req_pipe_fini,
.pipe_start = nni_req_pipe_start,
+ .pipe_stop = nni_req_pipe_stop,
};
static nni_proto_sock_ops nni_req_sock_ops = {
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 33360ab5..68e4b00f 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -50,9 +50,6 @@ struct nni_resp_pipe {
nni_aio aio_putq;
nni_aio aio_send;
nni_aio aio_recv;
- int running;
- int refcnt;
- nni_mtx mtx;
};
@@ -135,8 +132,7 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) ||
- ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
goto fail;
}
rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
@@ -177,7 +173,6 @@ nni_resp_pipe_fini(void *arg)
nni_aio_fini(&ppipe->aio_getq);
nni_aio_fini(&ppipe->aio_send);
nni_aio_fini(&ppipe->aio_recv);
- nni_mtx_fini(&ppipe->mtx);
NNI_FREE_STRUCT(ppipe);
}
@@ -198,11 +193,6 @@ nni_resp_pipe_start(void *arg)
return (rv);
}
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- ppipe->running = 1;
- nni_mtx_unlock(&ppipe->mtx);
-
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
@@ -211,11 +201,16 @@ nni_resp_pipe_start(void *arg)
static void
-nni_resp_pipe_stop(nni_resp_pipe *ppipe)
+nni_resp_pipe_stop(void *arg)
{
+ nni_resp_pipe *ppipe = arg;
nni_resp_sock *psock = ppipe->psock;
- int refcnt;
- int running;
+
+ nni_msgq_close(ppipe->sendq);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
nni_mtx_lock(&psock->mtx);
if (ppipe->id != 0) {
@@ -223,23 +218,6 @@ nni_resp_pipe_stop(nni_resp_pipe *ppipe)
ppipe->id = 0;
}
nni_mtx_unlock(&psock->mtx);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- ppipe->refcnt--;
- refcnt = ppipe->refcnt;
-
- running = ppipe->running;
- ppipe->running = 0;
- nni_mtx_unlock(&ppipe->mtx);
-
- if (running) {
- nni_msgq_close(ppipe->sendq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
- }
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -298,7 +276,7 @@ nni_resp_getq_cb(void *arg)
nni_resp_pipe *ppipe = arg;
if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -317,7 +295,7 @@ nni_resp_send_cb(void *arg)
if (nni_aio_result(&ppipe->aio_send) != 0) {
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -386,7 +364,7 @@ nni_resp_recv_cb(void *arg)
return;
error:
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
}
@@ -398,7 +376,7 @@ nni_resp_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
}
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
@@ -525,6 +503,7 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = {
.pipe_init = nni_resp_pipe_init,
.pipe_fini = nni_resp_pipe_fini,
.pipe_start = nni_resp_pipe_start,
+ .pipe_stop = nni_resp_pipe_stop,
};
static nni_proto_sock_ops nni_resp_sock_ops = {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 962371c1..fa89e61a 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -52,9 +52,6 @@ struct nni_surv_pipe {
nni_aio aio_putq;
nni_aio aio_send;
nni_aio aio_recv;
- int running;
- int refcnt;
- nni_mtx mtx;
};
static void
@@ -134,7 +131,6 @@ nni_surv_pipe_fini(void *arg)
nni_aio_fini(&ppipe->aio_recv);
nni_aio_fini(&ppipe->aio_putq);
nni_msgq_fini(ppipe->sendq);
- nni_mtx_fini(&ppipe->mtx);
NNI_FREE_STRUCT(ppipe);
}
@@ -149,8 +145,7 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (NNG_ENOMEM);
}
// This depth could be tunable.
- if (((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) ||
- ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
goto failed;
}
rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
@@ -190,11 +185,6 @@ nni_surv_pipe_start(void *arg)
nni_list_append(&psock->pipes, ppipe);
nni_mtx_unlock(&psock->mtx);
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- ppipe->running = 1;
- nni_mtx_unlock(&ppipe->mtx);
-
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
return (0);
@@ -202,30 +192,22 @@ nni_surv_pipe_start(void *arg)
static void
-nni_surv_pipe_stop(nni_surv_pipe *ppipe)
+nni_surv_pipe_stop(void *arg)
{
+ nni_surv_pipe *ppipe = arg;
nni_surv_sock *psock = ppipe->psock;
- int refcnt;
+
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_msgq_close(ppipe->sendq);
nni_mtx_lock(&psock->mtx);
if (nni_list_active(&psock->pipes, ppipe)) {
nni_list_remove(&psock->pipes, ppipe);
}
nni_mtx_unlock(&psock->mtx);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- ppipe->refcnt--;
- refcnt = ppipe->refcnt;
- if (ppipe->running) {
- nni_msgq_close(ppipe->sendq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
- }
- nni_mtx_unlock(&ppipe->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -235,7 +217,7 @@ nni_surv_getq_cb(void *arg)
nni_surv_pipe *ppipe = arg;
if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -254,7 +236,7 @@ nni_surv_send_cb(void *arg)
if (nni_aio_result(&ppipe->aio_send) != 0) {
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -270,7 +252,7 @@ nni_surv_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -313,7 +295,7 @@ nni_surv_recv_cb(void *arg)
return;
failed:
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
}
@@ -490,6 +472,7 @@ static nni_proto_pipe_ops nni_surv_pipe_ops = {
.pipe_init = nni_surv_pipe_init,
.pipe_fini = nni_surv_pipe_fini,
.pipe_start = nni_surv_pipe_start,
+ .pipe_stop = nni_surv_pipe_stop,
};
static nni_proto_sock_ops nni_surv_sock_ops = {
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 7ffea62d..ada7a87c 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -409,7 +409,6 @@ nni_ipc_ep_connect(void *arg, void *pipearg)
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
- nni_plat_ipc_shutdown(pipe->isp);
return (rv);
}
return (0);
@@ -450,7 +449,6 @@ nni_ipc_ep_accept(void *arg, void *pipearg)
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
- nni_plat_ipc_shutdown(pipe->isp);
return (rv);
}
return (0);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 7e712cc8..1c1eabbf 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -494,7 +494,6 @@ nni_tcp_ep_connect(void *arg, void *pipearg)
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
- nni_plat_tcp_shutdown(pipe->tsp);
return (rv);
}
return (0);
@@ -546,7 +545,6 @@ nni_tcp_ep_accept(void *arg, void *pipearg)
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
- nni_plat_tcp_shutdown(pipe->tsp);
return (rv);
}
return (0);