aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-07-02 22:36:08 -0700
committerGarrett D'Amore <garrett@damore.org>2018-07-03 19:00:19 -0700
commitd1a9c84a6b375cb25a8b7475957130e364b41753 (patch)
tree5444721d96a84d92e3ed258b4d51f80adf6b200c /src/protocol/reqrep0
parenta772bcc6ebe198f939889abbda18eded2a326941 (diff)
downloadnng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.gz
nng-d1a9c84a6b375cb25a8b7475957130e364b41753.tar.bz2
nng-d1a9c84a6b375cb25a8b7475957130e364b41753.zip
fixes #572 Several locking errors found
fixes #573 atomic flags could help This introduces a new atomic flag, and reduces some of the global locking. The lock refactoring work is not yet complete, but this is a positive step forward, and should help with certain things. While here we also fixed a compile warning due to incorrect types.
Diffstat (limited to 'src/protocol/reqrep0')
-rw-r--r--src/protocol/reqrep0/rep.c5
-rw-r--r--src/protocol/reqrep0/req.c30
2 files changed, 18 insertions, 17 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 1f4f0b33..f725cadb 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -413,15 +413,14 @@ rep0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- nni_mtx_lock(&s->lk);
- p->busy = false;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->pipe);
- nni_mtx_unlock(&s->lk);
return;
}
+ nni_mtx_lock(&s->lk);
+ p->busy = false;
if ((ctx = nni_list_first(&p->sendq)) == NULL) {
// Nothing else to send.
if (p->id == s->ctx->pipe_id) {
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 8a0dd4d8..43751d14 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -82,7 +82,8 @@ struct req0_pipe {
nni_pipe * pipe;
req0_sock * req;
nni_list_node node;
- nni_list ctxs; // ctxs with pending traffic
+ nni_list ctxs; // ctxs with pending traffic
+ bool sending; // if busy sending
nni_aio * aio_send;
nni_aio * aio_recv;
};
@@ -264,6 +265,7 @@ req0_pipe_close(void *arg)
nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
+ p->sending = false;
if (nni_list_node_active(&p->node)) {
nni_list_node_remove(&p->node);
if (s->closed) {
@@ -311,20 +313,19 @@ req0_send_cb(void *arg)
// in the ready list, and re-run the sendq.
nni_mtx_lock(&s->mtx);
- if (nni_list_active(&s->busypipes, p)) {
- nni_list_remove(&s->busypipes, p);
- nni_list_append(&s->readypipes, p);
- if (nni_list_empty(&s->sendq)) {
- nni_pollable_raise(s->sendable);
- }
- req0_run_sendq(s, &aios);
- } else {
- // We wind up here if stop was called from the reader
- // side while we were waiting to be scheduled to run for the
- // writer side. In this case we can't complete the operation,
- // and we have to abort.
- nni_pipe_stop(p->pipe);
+ if (!p->sending) {
+ // This occurs if the req0_pipe_close has been called.
+ // In that case we don't want any more processing.
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_list_remove(&s->busypipes, p);
+ nni_list_append(&s->readypipes, p);
+ p->sending = false;
+ if (nni_list_empty(&s->sendq)) {
+ nni_pollable_raise(s->sendable);
}
+ req0_run_sendq(s, &aios);
nni_mtx_unlock(&s->mtx);
while ((aio = nni_list_first(&aios)) != NULL) {
@@ -533,6 +534,7 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist)
nni_list_remove(&s->readypipes, p);
nni_list_append(&s->busypipes, p);
+ p->sending = true;
if ((aio = ctx->saio) != NULL) {
ctx->saio = NULL;