aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep0/req.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep0/req.c')
-rw-r--r--src/protocol/reqrep0/req.c112
1 files changed, 42 insertions, 70 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 796bd71e..b5681688 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -134,17 +134,9 @@ static void
req0_sock_close(void *arg)
{
req0_sock *s = arg;
- req0_ctx * ctx;
nni_mtx_lock(&s->mtx);
s->closed = true;
- NNI_LIST_FOREACH (&s->contexts, ctx) {
- if (ctx->recv_aio != NULL) {
- nni_aio_finish_error(ctx->recv_aio, NNG_ECLOSED);
- ctx->recv_aio = NULL;
- req0_ctx_reset(ctx);
- }
- }
nni_mtx_unlock(&s->mtx);
}
@@ -213,10 +205,6 @@ req0_pipe_start(void *arg)
}
nni_mtx_lock(&s->mtx);
- if (s->closed || p->closed) {
- nni_mtx_unlock(&s->mtx);
- return (NNG_ECLOSED);
- }
nni_list_append(&s->ready_pipes, p);
nni_pollable_raise(&s->writable);
req0_run_send_queue(s, NULL);
@@ -269,9 +257,9 @@ req0_send_cb(void *arg)
req0_pipe *p = arg;
req0_sock *s = p->req;
nni_aio * aio;
- nni_list send_list;
+ nni_list sent_list;
- nni_aio_list_init(&send_list);
+ nni_aio_list_init(&sent_list);
if (nni_aio_result(&p->aio_send) != 0) {
// We failed to send... clean up and deal with it.
nni_msg_free(nni_aio_get_msg(&p->aio_send));
@@ -295,11 +283,11 @@ req0_send_cb(void *arg)
if (nni_list_empty(&s->send_queue)) {
nni_pollable_raise(&s->writable);
}
- req0_run_send_queue(s, &send_list);
+ req0_run_send_queue(s, &sent_list);
nni_mtx_unlock(&s->mtx);
- while ((aio = nni_list_first(&send_list)) != NULL) {
- nni_list_remove(&send_list, aio);
+ while ((aio = nni_list_first(&sent_list)) != NULL) {
+ nni_list_remove(&sent_list, aio);
nni_aio_finish_synch(aio, 0, 0);
}
}
@@ -451,7 +439,7 @@ req0_ctx_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t)
}
static void
-req0_run_send_queue(req0_sock *s, nni_list *send_list)
+req0_run_send_queue(req0_sock *s, nni_list *sent_list)
{
req0_ctx *ctx;
nni_aio * aio;
@@ -495,24 +483,20 @@ req0_run_send_queue(req0_sock *s, nni_list *send_list)
nni_list_remove(&s->ready_pipes, p);
nni_list_append(&s->busy_pipes, p);
+ if (nni_list_empty(&s->ready_pipes)) {
+ nni_pollable_clear(&s->writable);
+ }
if ((aio = ctx->send_aio) != NULL) {
ctx->send_aio = NULL;
nni_aio_bump_count(aio, ctx->req_len);
// If the list was passed in, we want to do a
// synchronous completion later.
- if (send_list != NULL) {
- nni_list_append(send_list, aio);
+ if (sent_list != NULL) {
+ nni_list_append(sent_list, aio);
} else {
nni_aio_finish(aio, 0, 0);
}
- if (ctx == &s->master) {
- if (nni_list_empty(&s->ready_pipes)) {
- nni_pollable_clear(&s->writable);
- } else {
- nni_pollable_raise(&s->writable);
- }
- }
}
nni_aio_set_msg(&p->aio_send, msg);
@@ -559,22 +543,20 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv)
req0_sock *s = ctx->sock;
nni_mtx_lock(&s->mtx);
- if (ctx->recv_aio != aio) {
- // already completed, ignore this.
- nni_mtx_unlock(&s->mtx);
- return;
- }
- ctx->recv_aio = NULL;
+ if (ctx->recv_aio == aio) {
+ ctx->recv_aio = NULL;
- // Cancellation of a pending receive is treated as aborting the
- // entire state machine. This allows us to preserve the semantic of
- // exactly one receive operation per send operation, and should
- // be the least surprising for users. The main consequence is that
- // if a receive operation is completed (in error or otherwise), the
- // user must submit a new send operation to restart the state machine.
- req0_ctx_reset(ctx);
+ // Cancellation of a pending receive is treated as aborting the
+ // entire state machine. This allows us to preserve the
+ // semantic of exactly one receive operation per send
+ // operation, and should be the least surprising for users. The
+ // main consequence is that if a receive operation is completed
+ // (in error or otherwise), the user must submit a new send
+ // operation to restart the state machine.
+ req0_ctx_reset(ctx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, rv);
+ }
nni_mtx_unlock(&s->mtx);
}
@@ -589,11 +571,6 @@ req0_ctx_recv(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&s->mtx);
- if (s->closed) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
if ((ctx->recv_aio != NULL) ||
((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) {
// We have already got a pending receive or have not
@@ -635,30 +612,27 @@ req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
req0_sock *s = ctx->sock;
nni_mtx_lock(&s->mtx);
- if (ctx->send_aio != aio) {
- // already completed, ignore this.
- nni_mtx_unlock(&s->mtx);
- return;
- }
+ if (ctx->send_aio == aio) {
+ // There should not be a pending reply, because we canceled
+ // it while we were waiting.
+ NNI_ASSERT(ctx->recv_aio == NULL);
+ ctx->send_aio = NULL;
+ // Restore the message back to the aio.
+ nni_aio_set_msg(aio, ctx->req_msg);
+ nni_msg_header_clear(ctx->req_msg);
+ ctx->req_msg = NULL;
- // There should not be a pending reply, because we canceled
- // it while we were waiting.
- NNI_ASSERT(ctx->recv_aio == NULL);
- ctx->send_aio = NULL;
- // Restore the message back to the aio.
- nni_aio_set_msg(aio, ctx->req_msg);
- nni_msg_header_clear(ctx->req_msg);
- ctx->req_msg = NULL;
-
- // Cancellation of a pending receive is treated as aborting the
- // entire state machine. This allows us to preserve the semantic of
- // exactly one receive operation per send operation, and should
- // be the least surprising for users. The main consequence is that
- // if a receive operation is completed (in error or otherwise), the
- // user must submit a new send operation to restart the state machine.
- req0_ctx_reset(ctx);
+ // Cancellation of a pending receive is treated as aborting the
+ // entire state machine. This allows us to preserve the
+ // semantic of exactly one receive operation per send
+ // operation, and should be the least surprising for users. The
+ // main consequence is that if a receive operation is completed
+ // (in error or otherwise), the user must submit a new send
+ // operation to restart the state machine.
+ req0_ctx_reset(ctx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, rv);
+ }
nni_mtx_unlock(&s->mtx);
}
@@ -728,8 +702,6 @@ req0_ctx_send(void *arg, nni_aio *aio)
// Stick us on the send_queue list.
nni_list_append(&s->send_queue, ctx);
- // Note that this will be synchronous if the ready_pipes list was
- // not empty.
req0_run_send_queue(s, NULL);
nni_mtx_unlock(&s->mtx);
}