diff options
Diffstat (limited to 'src/protocol/reqrep0/req.c')
| -rw-r--r-- | src/protocol/reqrep0/req.c | 112 |
1 files changed, 42 insertions, 70 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 796bd71e..b5681688 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -134,17 +134,9 @@ static void req0_sock_close(void *arg) { req0_sock *s = arg; - req0_ctx * ctx; nni_mtx_lock(&s->mtx); s->closed = true; - NNI_LIST_FOREACH (&s->contexts, ctx) { - if (ctx->recv_aio != NULL) { - nni_aio_finish_error(ctx->recv_aio, NNG_ECLOSED); - ctx->recv_aio = NULL; - req0_ctx_reset(ctx); - } - } nni_mtx_unlock(&s->mtx); } @@ -213,10 +205,6 @@ req0_pipe_start(void *arg) } nni_mtx_lock(&s->mtx); - if (s->closed || p->closed) { - nni_mtx_unlock(&s->mtx); - return (NNG_ECLOSED); - } nni_list_append(&s->ready_pipes, p); nni_pollable_raise(&s->writable); req0_run_send_queue(s, NULL); @@ -269,9 +257,9 @@ req0_send_cb(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; nni_aio * aio; - nni_list send_list; + nni_list sent_list; - nni_aio_list_init(&send_list); + nni_aio_list_init(&sent_list); if (nni_aio_result(&p->aio_send) != 0) { // We failed to send... clean up and deal with it. nni_msg_free(nni_aio_get_msg(&p->aio_send)); @@ -295,11 +283,11 @@ req0_send_cb(void *arg) if (nni_list_empty(&s->send_queue)) { nni_pollable_raise(&s->writable); } - req0_run_send_queue(s, &send_list); + req0_run_send_queue(s, &sent_list); nni_mtx_unlock(&s->mtx); - while ((aio = nni_list_first(&send_list)) != NULL) { - nni_list_remove(&send_list, aio); + while ((aio = nni_list_first(&sent_list)) != NULL) { + nni_list_remove(&sent_list, aio); nni_aio_finish_synch(aio, 0, 0); } } @@ -451,7 +439,7 @@ req0_ctx_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t) } static void -req0_run_send_queue(req0_sock *s, nni_list *send_list) +req0_run_send_queue(req0_sock *s, nni_list *sent_list) { req0_ctx *ctx; nni_aio * aio; @@ -495,24 +483,20 @@ req0_run_send_queue(req0_sock *s, nni_list *send_list) nni_list_remove(&s->ready_pipes, p); nni_list_append(&s->busy_pipes, p); + if (nni_list_empty(&s->ready_pipes)) { + nni_pollable_clear(&s->writable); + } if ((aio = ctx->send_aio) != NULL) { ctx->send_aio = NULL; nni_aio_bump_count(aio, ctx->req_len); // If the list was passed in, we want to do a // synchronous completion later. - if (send_list != NULL) { - nni_list_append(send_list, aio); + if (sent_list != NULL) { + nni_list_append(sent_list, aio); } else { nni_aio_finish(aio, 0, 0); } - if (ctx == &s->master) { - if (nni_list_empty(&s->ready_pipes)) { - nni_pollable_clear(&s->writable); - } else { - nni_pollable_raise(&s->writable); - } - } } nni_aio_set_msg(&p->aio_send, msg); @@ -559,22 +543,20 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (ctx->recv_aio != aio) { - // already completed, ignore this. - nni_mtx_unlock(&s->mtx); - return; - } - ctx->recv_aio = NULL; + if (ctx->recv_aio == aio) { + ctx->recv_aio = NULL; - // Cancellation of a pending receive is treated as aborting the - // entire state machine. This allows us to preserve the semantic of - // exactly one receive operation per send operation, and should - // be the least surprising for users. The main consequence is that - // if a receive operation is completed (in error or otherwise), the - // user must submit a new send operation to restart the state machine. - req0_ctx_reset(ctx); + // Cancellation of a pending receive is treated as aborting the + // entire state machine. This allows us to preserve the + // semantic of exactly one receive operation per send + // operation, and should be the least surprising for users. The + // main consequence is that if a receive operation is completed + // (in error or otherwise), the user must submit a new send + // operation to restart the state machine. + req0_ctx_reset(ctx); - nni_aio_finish_error(aio, rv); + nni_aio_finish_error(aio, rv); + } nni_mtx_unlock(&s->mtx); } @@ -589,11 +571,6 @@ req0_ctx_recv(void *arg, nni_aio *aio) return; } nni_mtx_lock(&s->mtx); - if (s->closed) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } if ((ctx->recv_aio != NULL) || ((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) { // We have already got a pending receive or have not @@ -635,30 +612,27 @@ req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (ctx->send_aio != aio) { - // already completed, ignore this. - nni_mtx_unlock(&s->mtx); - return; - } + if (ctx->send_aio == aio) { + // There should not be a pending reply, because we canceled + // it while we were waiting. + NNI_ASSERT(ctx->recv_aio == NULL); + ctx->send_aio = NULL; + // Restore the message back to the aio. + nni_aio_set_msg(aio, ctx->req_msg); + nni_msg_header_clear(ctx->req_msg); + ctx->req_msg = NULL; - // There should not be a pending reply, because we canceled - // it while we were waiting. - NNI_ASSERT(ctx->recv_aio == NULL); - ctx->send_aio = NULL; - // Restore the message back to the aio. - nni_aio_set_msg(aio, ctx->req_msg); - nni_msg_header_clear(ctx->req_msg); - ctx->req_msg = NULL; - - // Cancellation of a pending receive is treated as aborting the - // entire state machine. This allows us to preserve the semantic of - // exactly one receive operation per send operation, and should - // be the least surprising for users. The main consequence is that - // if a receive operation is completed (in error or otherwise), the - // user must submit a new send operation to restart the state machine. - req0_ctx_reset(ctx); + // Cancellation of a pending receive is treated as aborting the + // entire state machine. This allows us to preserve the + // semantic of exactly one receive operation per send + // operation, and should be the least surprising for users. The + // main consequence is that if a receive operation is completed + // (in error or otherwise), the user must submit a new send + // operation to restart the state machine. + req0_ctx_reset(ctx); - nni_aio_finish_error(aio, rv); + nni_aio_finish_error(aio, rv); + } nni_mtx_unlock(&s->mtx); } @@ -728,8 +702,6 @@ req0_ctx_send(void *arg, nni_aio *aio) // Stick us on the send_queue list. nni_list_append(&s->send_queue, ctx); - // Note that this will be synchronous if the ready_pipes list was - // not empty. req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); } |
