aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus/bus.c51
-rw-r--r--src/protocol/pair/pair.c44
-rw-r--r--src/protocol/pipeline/pull.c30
-rw-r--r--src/protocol/pipeline/push.c38
-rw-r--r--src/protocol/pubsub/pub.c39
-rw-r--r--src/protocol/pubsub/sub.c13
-rw-r--r--src/protocol/reqrep/rep.c42
-rw-r--r--src/protocol/reqrep/req.c44
-rw-r--r--src/protocol/survey/respond.c49
-rw-r--r--src/protocol/survey/survey.c45
10 files changed, 126 insertions, 269 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index cb624a10..1c32fec6 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -51,7 +51,6 @@ struct nni_bus_pipe {
nni_aio aio_send;
nni_aio aio_putq;
nni_mtx mtx;
- int refcnt;
};
@@ -133,7 +132,6 @@ nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (NNG_ENOMEM);
}
NNI_LIST_NODE_INIT(&ppipe->node);
- ppipe->refcnt = 0;
if (((rv = nni_mtx_init(&ppipe->mtx)) != 0) ||
((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0)) {
goto fail;
@@ -176,12 +174,6 @@ nni_bus_pipe_start(void *arg)
nni_list_append(&psock->pipes, ppipe);
nni_mtx_unlock(&psock->mtx);
- // Mark the ppipe busy twice -- once for each of the oustanding
- // asynchronous "threads" of operation.
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- nni_mtx_unlock(&ppipe->mtx);
-
nni_bus_pipe_recv(ppipe);
nni_bus_pipe_getq(ppipe);
@@ -189,40 +181,24 @@ nni_bus_pipe_start(void *arg)
}
-// nni_bus_pipe_stop is called only internally when one of our handlers notices
-// that the transport layer has closed. This allows us to stop all further
-// actions.
static void
-nni_bus_pipe_stop(nni_bus_pipe *ppipe)
+nni_bus_pipe_stop(void *arg)
{
- int refcnt;
+ nni_bus_pipe *ppipe = arg;
nni_bus_sock *psock = ppipe->psock;
- // As we are called only on error paths, shut down the underlying
- // pipe transport. This should cause any other consumer to also get
- // a suitable error (NNG_ECLOSED), so that we can shut down completely.
- nni_pipe_close(ppipe->npipe);
+ nni_msgq_close(ppipe->sendq);
+
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
nni_mtx_lock(&ppipe->psock->mtx);
if (nni_list_active(&psock->pipes, ppipe)) {
nni_list_remove(&psock->pipes, ppipe);
-
- nni_msgq_close(ppipe->sendq);
- nni_msgq_aio_cancel(nni_sock_recvq(psock->nsock),
- &ppipe->aio_putq);
}
nni_mtx_unlock(&ppipe->psock->mtx);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- refcnt = --ppipe->refcnt;
- nni_mtx_unlock(&ppipe->mtx);
-
- // If we are done with the pipe, let the system know so it can
- // deregister it.
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -233,7 +209,7 @@ nni_bus_pipe_getq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_getq) != 0) {
// closed?
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
@@ -252,7 +228,7 @@ nni_bus_pipe_send_cb(void *arg)
// closed?
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -269,7 +245,7 @@ nni_bus_pipe_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
msg = ppipe->aio_recv.a_msg;
@@ -278,7 +254,7 @@ nni_bus_pipe_recv_cb(void *arg)
if (nni_msg_prepend_header(msg, &id, 4) != 0) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -295,7 +271,7 @@ nni_bus_pipe_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_bus_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -417,6 +393,7 @@ static nni_proto_pipe_ops nni_bus_pipe_ops = {
.pipe_init = nni_bus_pipe_init,
.pipe_fini = nni_bus_pipe_fini,
.pipe_start = nni_bus_pipe_start,
+ .pipe_stop = nni_bus_pipe_stop,
};
static nni_proto_sock_ops nni_bus_sock_ops = {
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index c2d4fa0d..67119f80 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -46,10 +46,6 @@ struct nni_pair_pipe {
nni_aio aio_recv;
nni_aio aio_getq;
nni_aio aio_putq;
- int busy;
- int closed;
- nni_mtx mtx;
- int refcnt;
};
static int
@@ -97,9 +93,6 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&ppipe->mtx)) != 0) {
- goto fail;
- }
rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe);
if (rv != 0) {
goto fail;
@@ -132,12 +125,10 @@ nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *ppipe = arg;
- NNI_ASSERT(ppipe->busy >= 0);
nni_aio_fini(&ppipe->aio_send);
nni_aio_fini(&ppipe->aio_recv);
nni_aio_fini(&ppipe->aio_putq);
nni_aio_fini(&ppipe->aio_getq);
- nni_mtx_fini(&ppipe->mtx);
NNI_FREE_STRUCT(ppipe);
}
@@ -156,10 +147,6 @@ nni_pair_pipe_start(void *arg)
psock->ppipe = ppipe;
nni_mtx_unlock(&psock->mtx);
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- nni_mtx_unlock(&ppipe->mtx);
-
// Schedule a getq on the upper, and a read from the pipe.
// Each of these also sets up another hold on the pipe itself.
nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
@@ -170,31 +157,21 @@ nni_pair_pipe_start(void *arg)
static void
-nni_pair_pipe_stop(nni_pair_pipe *ppipe)
+nni_pair_pipe_stop(void *arg)
{
+ nni_pair_pipe *ppipe = arg;
nni_pair_sock *psock = ppipe->psock;
- int refcnt;
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(&ppipe->aio_getq);
nni_mtx_lock(&psock->mtx);
if (psock->ppipe == ppipe) {
psock->ppipe = NULL;
}
nni_mtx_unlock(&psock->mtx);
-
- // These operations are idempotent.
- nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- ppipe->refcnt--;
- refcnt = ppipe->refcnt;
- nni_mtx_unlock(&ppipe->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -205,7 +182,7 @@ nni_pair_recv_cb(void *arg)
nni_pair_sock *psock = ppipe->psock;
if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -223,7 +200,7 @@ nni_pair_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
@@ -238,7 +215,7 @@ nni_pair_getq_cb(void *arg)
nni_msg *msg;
if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -257,7 +234,7 @@ nni_pair_send_cb(void *arg)
if (nni_aio_result(&ppipe->aio_send) != 0) {
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_pair_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -306,6 +283,7 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = {
.pipe_init = nni_pair_pipe_init,
.pipe_fini = nni_pair_pipe_fini,
.pipe_start = nni_pair_pipe_start,
+ .pipe_stop = nni_pair_pipe_stop,
};
static nni_proto_sock_ops nni_pair_sock_ops = {
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index eb66bc21..728682dd 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -19,7 +19,6 @@ typedef struct nni_pull_sock nni_pull_sock;
static void nni_pull_putq_cb(void *);
static void nni_pull_recv_cb(void *);
-static void nni_pull_recv(nni_pull_pipe *);
static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
// An nni_pull_sock is our per-socket protocol private structure.
@@ -107,18 +106,19 @@ nni_pull_pipe_start(void *arg)
nni_pull_pipe *pp = arg;
// Start the pending pull...
- nni_pull_recv(pp);
+ nni_pipe_aio_recv(pp->pipe, &pp->recv_aio);
return (0);
}
static void
-nni_pull_pipe_stop(nni_pull_pipe *pp)
+nni_pull_pipe_stop(void *arg)
{
- // Cancel any pending sendup.
- nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio);
- nni_pipe_remove(pp->pipe);
+ nni_pull_pipe *pp = arg;
+
+ nni_aio_stop(&pp->putq_aio);
+ nni_aio_stop(&pp->recv_aio);
}
@@ -131,7 +131,7 @@ nni_pull_recv_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pull_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -154,22 +154,11 @@ nni_pull_putq_cb(void *arg)
// we can do. Just close the pipe.
nni_msg_free(aio->a_msg);
aio->a_msg = NULL;
- nni_pull_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
- nni_pull_recv(pp);
-}
-
-
-// nni_pull_recv is called to schedule a pending recv on the incoming pipe.
-static void
-nni_pull_recv(nni_pull_pipe *pp)
-{
- // Schedule the aio with callback.
- if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) {
- nni_pipe_remove(pp->pipe);
- }
+ nni_pipe_aio_recv(pp->pipe, &pp->recv_aio);
}
@@ -225,6 +214,7 @@ static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
.pipe_start = nni_pull_pipe_start,
+ .pipe_stop = nni_pull_pipe_stop,
};
static nni_proto_sock_ops nni_pull_sock_ops = {
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index e69ebdbf..7d3be42e 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -39,8 +39,6 @@ struct nni_push_pipe {
nni_aio aio_recv;
nni_aio aio_send;
nni_aio aio_getq;
- int refcnt;
- nni_mtx mtx;
};
static int
@@ -80,7 +78,6 @@ nni_push_pipe_fini(void *arg)
nni_aio_fini(&pp->aio_recv);
nni_aio_fini(&pp->aio_send);
nni_aio_fini(&pp->aio_getq);
- nni_mtx_fini(&pp->mtx);
NNI_FREE_STRUCT(pp);
}
@@ -99,14 +96,10 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
}
if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) {
goto fail;
- return (rv);
}
if ((rv = nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp)) != 0) {
goto fail;
}
- if ((rv = nni_mtx_init(&pp->mtx)) != 0) {
- goto fail;
- }
NNI_LIST_NODE_INIT(&pp->node);
pp->pipe = pipe;
@@ -130,17 +123,11 @@ nni_push_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_mtx_lock(&pp->mtx);
- pp->refcnt = 2;
- nni_mtx_unlock(&pp->mtx);
-
// Schedule a receiver. This is mostly so that we can detect
// a closed transport pipe.
- nni_pipe_hold(pp->pipe);
nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
// Schedule a sender.
- nni_pipe_hold(pp->pipe);
nni_msgq_aio_get(push->uwq, &pp->aio_getq);
return (0);
@@ -148,22 +135,14 @@ nni_push_pipe_start(void *arg)
static void
-nni_push_pipe_stop(nni_push_pipe *pp)
+nni_push_pipe_stop(void *arg)
{
+ nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
- int refcnt;
- nni_msgq_aio_cancel(push->uwq, &pp->aio_getq);
-
- nni_mtx_lock(&pp->mtx);
- NNI_ASSERT(pp->refcnt > 0);
- pp->refcnt--;
- refcnt = pp->refcnt;
- nni_mtx_unlock(&pp->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(pp->pipe);
- }
+ nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_getq);
}
@@ -175,7 +154,7 @@ nni_push_recv_cb(void *arg)
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_push_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
nni_msg_free(pp->aio_recv.a_msg);
@@ -193,7 +172,7 @@ nni_push_send_cb(void *arg)
if (nni_aio_result(&pp->aio_send) != 0) {
nni_msg_free(pp->aio_send.a_msg);
pp->aio_send.a_msg = NULL;
- nni_push_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -209,7 +188,7 @@ nni_push_getq_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_push_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -260,6 +239,7 @@ static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_init = nni_push_pipe_init,
.pipe_fini = nni_push_pipe_fini,
.pipe_start = nni_push_pipe_start,
+ .pipe_stop = nni_push_pipe_stop,
};
static nni_proto_sock_ops nni_push_sock_ops = {
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 8ad9bb6d..2767f6b6 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -46,8 +46,6 @@ struct nni_pub_pipe {
nni_aio aio_send;
nni_aio aio_recv;
nni_list_node node;
- int refcnt;
- nni_mtx mtx;
};
static int
@@ -109,7 +107,6 @@ nni_pub_pipe_fini(void *arg)
nni_aio_fini(&pp->aio_getq);
nni_aio_fini(&pp->aio_send);
nni_aio_fini(&pp->aio_recv);
- nni_mtx_fini(&pp->mtx);
NNI_FREE_STRUCT(pp);
}
@@ -124,8 +121,7 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
return (NNG_ENOMEM);
}
// XXX: consider making this depth tunable
- if (((rv = nni_msgq_init(&pp->sendq, 16)) != 0) ||
- ((rv = nni_mtx_init(&pp->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
goto fail;
}
@@ -163,43 +159,35 @@ nni_pub_pipe_start(void *arg)
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
}
+ nni_mtx_lock(&pub->mtx);
nni_list_append(&pub->pipes, pp);
-
- nni_mtx_lock(&pp->mtx);
- pp->refcnt = 2;
- nni_mtx_unlock(&pp->mtx);
+ nni_mtx_unlock(&pub->mtx);
// Start the receiver and the queue reader.
nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
-
return (0);
}
static void
-nni_pub_pipe_stop(nni_pub_pipe *pp)
+nni_pub_pipe_stop(void *arg)
{
+ nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
int refcnt;
+ nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_recv);
+ nni_msgq_close(pp->sendq);
+
nni_mtx_lock(&pub->mtx);
if (nni_list_active(&pub->pipes, pp)) {
nni_list_remove(&pub->pipes, pp);
- nni_msgq_close(pp->sendq);
}
nni_mtx_unlock(&pub->mtx);
-
- nni_mtx_lock(&pp->mtx);
- NNI_ASSERT(pp->refcnt > 0);
- pp->refcnt--;
- refcnt = pp->refcnt;
- nni_mtx_unlock(&pp->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(pp->pipe);
- }
}
@@ -252,7 +240,7 @@ nni_pub_pipe_recv_cb(void *arg)
nni_pub_pipe *pp = arg;
if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pub_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -268,7 +256,7 @@ nni_pub_pipe_getq_cb(void *arg)
nni_pub_pipe *pp = arg;
if (nni_aio_result(&pp->aio_getq) != 0) {
- nni_pub_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -287,7 +275,7 @@ nni_pub_pipe_send_cb(void *arg)
if (nni_aio_result(&pp->aio_send) != 0) {
nni_msg_free(pp->aio_send.a_msg);
pp->aio_send.a_msg = NULL;
- nni_pub_pipe_stop(pp);
+ nni_pipe_stop(pp->pipe);
return;
}
@@ -336,6 +324,7 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = {
.pipe_init = nni_pub_pipe_init,
.pipe_fini = nni_pub_pipe_fini,
.pipe_start = nni_pub_pipe_start,
+ .pipe_stop = nni_pub_pipe_stop,
};
nni_proto_sock_ops nni_pub_sock_ops = {
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index cab745b5..5733b8f2 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -123,10 +123,12 @@ nni_sub_pipe_start(void *arg)
static void
-nni_sub_pipe_stop(nni_sub_pipe *sp)
+nni_sub_pipe_stop(void *arg)
{
- nni_msgq_aio_cancel(sp->sub->urq, &sp->aio_putq);
- nni_pipe_remove(sp->pipe);
+ nni_sub_pipe *sp = arg;
+
+ nni_aio_stop(&sp->aio_putq);
+ nni_aio_stop(&sp->aio_recv);
}
@@ -139,7 +141,7 @@ nni_sub_recv_cb(void *arg)
nni_msg *msg;
if (nni_aio_result(&sp->aio_recv) != 0) {
- nni_sub_pipe_stop(sp);
+ nni_pipe_stop(sp->pipe);
return;
}
@@ -157,7 +159,7 @@ nni_sub_putq_cb(void *arg)
if (nni_aio_result(&sp->aio_putq) != 0) {
nni_msg_free(sp->aio_putq.a_msg);
sp->aio_putq.a_msg = NULL;
- nni_sub_pipe_stop(sp);
+ nni_pipe_stop(sp->pipe);
return;
}
@@ -335,6 +337,7 @@ static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
.pipe_start = nni_sub_pipe_start,
+ .pipe_stop = nni_sub_pipe_stop,
};
static nni_proto_sock_ops nni_sub_sock_ops = {
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 507edf66..38bf082a 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -51,8 +51,6 @@ struct nni_rep_pipe {
nni_aio aio_send;
nni_aio aio_recv;
nni_aio aio_putq;
- int running;
- int refcnt;
nni_mtx mtx;
};
@@ -123,7 +121,7 @@ nni_rep_sock_close(void *arg)
{
nni_rep_sock *rep = arg;
- nni_msgq_aio_cancel(rep->uwq, &rep->aio_getq);
+ nni_aio_stop(&rep->aio_getq);
}
@@ -194,11 +192,6 @@ nni_rep_pipe_start(void *arg)
return (rv);
}
- nni_mtx_lock(&rp->mtx);
- rp->refcnt = 2;
- rp->running = 1;
- nni_mtx_unlock(&rp->mtx);
-
nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
return (0);
@@ -206,23 +199,21 @@ nni_rep_pipe_start(void *arg)
static void
-nni_rep_pipe_stop(nni_rep_pipe *rp)
+nni_rep_pipe_stop(void *arg)
{
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int refcnt;
uint32_t id;
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_send);
+ nni_aio_stop(&rp->aio_recv);
+
nni_mtx_lock(&rp->mtx);
- NNI_ASSERT(rp->refcnt > 0);
- rp->refcnt--;
- refcnt = rp->refcnt;
id = rp->id;
- rp->id = 0;
- if (rp->running) {
- rp->running = 0;
- nni_msgq_close(rp->sendq);
- nni_msgq_aio_cancel(rep->urq, &rp->aio_putq);
- }
+ rp->id = 0; // makes this idempotent
+ nni_msgq_close(rp->sendq);
nni_mtx_unlock(&rp->mtx);
if (id != 0) {
@@ -230,10 +221,6 @@ nni_rep_pipe_stop(nni_rep_pipe *rp)
nni_idhash_remove(&rep->pipes, id);
nni_mtx_unlock(&rep->mtx);
}
-
- if (refcnt == 0) {
- nni_pipe_remove(rp->pipe);
- }
}
@@ -298,7 +285,7 @@ nni_rep_pipe_getq_cb(void *arg)
nni_rep_pipe *rp = arg;
if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -317,7 +304,7 @@ nni_rep_pipe_send_cb(void *arg)
if (nni_aio_result(&rp->aio_send) != 0) {
nni_msg_free(rp->aio_send.a_msg);
rp->aio_send.a_msg = NULL;
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -337,7 +324,7 @@ nni_rep_pipe_recv_cb(void *arg)
int hops;
if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -399,7 +386,7 @@ nni_rep_pipe_putq_cb(void *arg)
if (nni_aio_result(&rp->aio_putq) != 0) {
nni_msg_free(rp->aio_putq.a_msg);
rp->aio_putq.a_msg = NULL;
- nni_rep_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -521,6 +508,7 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = {
.pipe_init = nni_rep_pipe_init,
.pipe_fini = nni_rep_pipe_fini,
.pipe_start = nni_rep_pipe_start,
+ .pipe_stop = nni_rep_pipe_stop,
};
static nni_proto_sock_ops nni_rep_sock_ops = {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 1ce1156c..c2542e18 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -57,7 +57,6 @@ struct nni_req_pipe {
nni_aio aio_sendcooked; // cooked mode only
nni_aio aio_recv;
nni_aio aio_putq;
- int refcnt;
nni_mtx mtx;
};
@@ -197,10 +196,6 @@ nni_req_pipe_start(void *arg)
return (NNG_EPROTO);
}
- nni_mtx_lock(&rp->mtx);
- rp->refcnt = 2;
- nni_mtx_unlock(&rp->mtx);
-
nni_mtx_lock(&req->mtx);
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
@@ -216,19 +211,19 @@ nni_req_pipe_start(void *arg)
static void
-nni_req_pipe_stop(nni_req_pipe *rp)
+nni_req_pipe_stop(void *arg)
{
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- int refcnt;
- nni_mtx_lock(&rp->mtx);
- NNI_ASSERT(rp->refcnt > 0);
- rp->refcnt--;
- refcnt = rp->refcnt;
- nni_mtx_unlock(&rp->mtx);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_sendcooked);
+ nni_aio_stop(&rp->aio_sendraw);
- nni_msgq_aio_cancel(req->uwq, &rp->aio_getq);
- nni_msgq_aio_cancel(req->urq, &rp->aio_putq);
+ // At this point there should not be any further AIOs running.
+ // Further, any completion tasks have completed.
nni_mtx_lock(&req->mtx);
// This removes the node from either busypipes or readypipes.
@@ -245,12 +240,7 @@ nni_req_pipe_stop(nni_req_pipe *rp)
req->wantw = 1;
nni_req_resend(req);
}
-
nni_mtx_unlock(&req->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(rp->pipe);
- }
}
@@ -323,7 +313,7 @@ nni_req_getq_cb(void *arg)
// exception: we wind up here in error state when the uwq is closed.)
if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -344,7 +334,7 @@ nni_req_sendraw_cb(void *arg)
if (nni_aio_result(&rp->aio_sendraw) != 0) {
nni_msg_free(rp->aio_sendraw.a_msg);
rp->aio_sendraw.a_msg = NULL;
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -359,14 +349,13 @@ nni_req_sendcooked_cb(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- NNI_ASSERT(rp->refcnt > 0);
if (nni_aio_result(&rp->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
// We leave ourselves on the busy list for now, which
// means no new asynchronous traffic can occur here.
nni_msg_free(rp->aio_sendcooked.a_msg);
rp->aio_sendcooked.a_msg = NULL;
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -384,7 +373,7 @@ nni_req_sendcooked_cb(void *arg)
// side while we were waiting to be scheduled to run for the
// writer side. In this case we can't complete the operation,
// and we have to abort.
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
}
nni_mtx_unlock(&req->mtx);
}
@@ -397,7 +386,7 @@ nni_req_putq_cb(void *arg)
if (nni_aio_result(&rp->aio_putq) != 0) {
nni_msg_free(rp->aio_putq.a_msg);
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
rp->aio_putq.a_msg = NULL;
@@ -413,7 +402,7 @@ nni_req_recv_cb(void *arg)
nni_msg *msg;
if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
return;
}
@@ -443,7 +432,7 @@ nni_req_recv_cb(void *arg)
malformed:
nni_msg_free(msg);
- nni_req_pipe_stop(rp);
+ nni_pipe_stop(rp->pipe);
}
@@ -615,6 +604,7 @@ static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_init = nni_req_pipe_init,
.pipe_fini = nni_req_pipe_fini,
.pipe_start = nni_req_pipe_start,
+ .pipe_stop = nni_req_pipe_stop,
};
static nni_proto_sock_ops nni_req_sock_ops = {
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 33360ab5..68e4b00f 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -50,9 +50,6 @@ struct nni_resp_pipe {
nni_aio aio_putq;
nni_aio aio_send;
nni_aio aio_recv;
- int running;
- int refcnt;
- nni_mtx mtx;
};
@@ -135,8 +132,7 @@ nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) ||
- ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
goto fail;
}
rv = nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
@@ -177,7 +173,6 @@ nni_resp_pipe_fini(void *arg)
nni_aio_fini(&ppipe->aio_getq);
nni_aio_fini(&ppipe->aio_send);
nni_aio_fini(&ppipe->aio_recv);
- nni_mtx_fini(&ppipe->mtx);
NNI_FREE_STRUCT(ppipe);
}
@@ -198,11 +193,6 @@ nni_resp_pipe_start(void *arg)
return (rv);
}
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- ppipe->running = 1;
- nni_mtx_unlock(&ppipe->mtx);
-
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
@@ -211,11 +201,16 @@ nni_resp_pipe_start(void *arg)
static void
-nni_resp_pipe_stop(nni_resp_pipe *ppipe)
+nni_resp_pipe_stop(void *arg)
{
+ nni_resp_pipe *ppipe = arg;
nni_resp_sock *psock = ppipe->psock;
- int refcnt;
- int running;
+
+ nni_msgq_close(ppipe->sendq);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
nni_mtx_lock(&psock->mtx);
if (ppipe->id != 0) {
@@ -223,23 +218,6 @@ nni_resp_pipe_stop(nni_resp_pipe *ppipe)
ppipe->id = 0;
}
nni_mtx_unlock(&psock->mtx);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- ppipe->refcnt--;
- refcnt = ppipe->refcnt;
-
- running = ppipe->running;
- ppipe->running = 0;
- nni_mtx_unlock(&ppipe->mtx);
-
- if (running) {
- nni_msgq_close(ppipe->sendq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
- }
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -298,7 +276,7 @@ nni_resp_getq_cb(void *arg)
nni_resp_pipe *ppipe = arg;
if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -317,7 +295,7 @@ nni_resp_send_cb(void *arg)
if (nni_aio_result(&ppipe->aio_send) != 0) {
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -386,7 +364,7 @@ nni_resp_recv_cb(void *arg)
return;
error:
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
}
@@ -398,7 +376,7 @@ nni_resp_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_resp_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
}
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
@@ -525,6 +503,7 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = {
.pipe_init = nni_resp_pipe_init,
.pipe_fini = nni_resp_pipe_fini,
.pipe_start = nni_resp_pipe_start,
+ .pipe_stop = nni_resp_pipe_stop,
};
static nni_proto_sock_ops nni_resp_sock_ops = {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 962371c1..fa89e61a 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -52,9 +52,6 @@ struct nni_surv_pipe {
nni_aio aio_putq;
nni_aio aio_send;
nni_aio aio_recv;
- int running;
- int refcnt;
- nni_mtx mtx;
};
static void
@@ -134,7 +131,6 @@ nni_surv_pipe_fini(void *arg)
nni_aio_fini(&ppipe->aio_recv);
nni_aio_fini(&ppipe->aio_putq);
nni_msgq_fini(ppipe->sendq);
- nni_mtx_fini(&ppipe->mtx);
NNI_FREE_STRUCT(ppipe);
}
@@ -149,8 +145,7 @@ nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (NNG_ENOMEM);
}
// This depth could be tunable.
- if (((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) ||
- ((rv = nni_mtx_init(&ppipe->mtx)) != 0)) {
+ if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
goto failed;
}
rv = nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
@@ -190,11 +185,6 @@ nni_surv_pipe_start(void *arg)
nni_list_append(&psock->pipes, ppipe);
nni_mtx_unlock(&psock->mtx);
- nni_mtx_lock(&ppipe->mtx);
- ppipe->refcnt = 2;
- ppipe->running = 1;
- nni_mtx_unlock(&ppipe->mtx);
-
nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
return (0);
@@ -202,30 +192,22 @@ nni_surv_pipe_start(void *arg)
static void
-nni_surv_pipe_stop(nni_surv_pipe *ppipe)
+nni_surv_pipe_stop(void *arg)
{
+ nni_surv_pipe *ppipe = arg;
nni_surv_sock *psock = ppipe->psock;
- int refcnt;
+
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_msgq_close(ppipe->sendq);
nni_mtx_lock(&psock->mtx);
if (nni_list_active(&psock->pipes, ppipe)) {
nni_list_remove(&psock->pipes, ppipe);
}
nni_mtx_unlock(&psock->mtx);
-
- nni_mtx_lock(&ppipe->mtx);
- NNI_ASSERT(ppipe->refcnt > 0);
- ppipe->refcnt--;
- refcnt = ppipe->refcnt;
- if (ppipe->running) {
- nni_msgq_close(ppipe->sendq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
- }
- nni_mtx_unlock(&ppipe->mtx);
-
- if (refcnt == 0) {
- nni_pipe_remove(ppipe->npipe);
- }
}
@@ -235,7 +217,7 @@ nni_surv_getq_cb(void *arg)
nni_surv_pipe *ppipe = arg;
if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -254,7 +236,7 @@ nni_surv_send_cb(void *arg)
if (nni_aio_result(&ppipe->aio_send) != 0) {
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -270,7 +252,7 @@ nni_surv_putq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_putq) != 0) {
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
return;
}
@@ -313,7 +295,7 @@ nni_surv_recv_cb(void *arg)
return;
failed:
- nni_surv_pipe_stop(ppipe);
+ nni_pipe_stop(ppipe->npipe);
}
@@ -490,6 +472,7 @@ static nni_proto_pipe_ops nni_surv_pipe_ops = {
.pipe_init = nni_surv_pipe_init,
.pipe_fini = nni_surv_pipe_fini,
.pipe_start = nni_surv_pipe_start,
+ .pipe_stop = nni_surv_pipe_stop,
};
static nni_proto_sock_ops nni_surv_sock_ops = {