diff options
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 38 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 32 |
2 files changed, 47 insertions, 23 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index e18675ee..afc52629 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -170,6 +170,10 @@ rep0_ctx_send(void *arg, nni_aio *aio) msg = nni_aio_get_msg(aio); nni_msg_header_clear(msg); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->lk); len = ctx->btrace_len; p_id = ctx->pipe_id; @@ -187,10 +191,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_pollable_clear(s->sendable); } - if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) { - nni_mtx_unlock(&s->lk); - return; - } if (ctx->closed) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -201,12 +201,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ESTATE); return; } - if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer @@ -217,7 +211,18 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_msg_free(msg); return; } + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } if (p->busy) { + rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } ctx->saio = aio; ctx->spipe = p; nni_list_append(&p->sendq, ctx); @@ -456,18 +461,23 @@ rep0_ctx_recv(void *arg, nni_aio *aio) size_t len; nni_msg * msg; - nni_mtx_lock(&s->lk); - if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) { - nni_mtx_unlock(&s->lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&s->lk); if (ctx->closed) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } if ((p = nni_list_first(&s->recvpipes)) == NULL) { - nni_pollable_clear(s->recvable); + int rv; + rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } ctx->raio = aio; nni_list_append(&s->recvq, ctx); nni_mtx_unlock(&s->lk); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 8149ce08..3ecc8604 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -617,11 +617,10 @@ req0_ctx_recv(void *arg, nni_aio *aio) req0_sock *s = ctx->sock; nni_msg * msg; - nni_mtx_lock(&s->mtx); - if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) { - nni_mtx_unlock(&s->mtx); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&s->mtx); if (s->closed) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -638,6 +637,13 @@ req0_ctx_recv(void *arg, nni_aio *aio) } if ((msg = ctx->repmsg) == NULL) { + int rv; + rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } ctx->raio = aio; nni_mtx_unlock(&s->mtx); return; @@ -697,14 +703,11 @@ req0_ctx_send(void *arg, nni_aio *aio) uint64_t id; int rv; - nni_mtx_lock(&s->mtx); - // Even though we always complete synchronously, this guards against - // restarting a request that was stopped. - if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) { - nni_mtx_unlock(&s->mtx); + if (nni_aio_begin(aio) != 0) { return; } - // Sending a new requst cancels the old one, including any + nni_mtx_lock(&s->mtx); + // Sending a new request cancels the old one, including any // outstanding reply. if (ctx->raio != NULL) { nni_aio_finish_error(ctx->raio, NNG_ECANCELED); @@ -735,6 +738,15 @@ req0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } + // If no pipes are ready, and the request was a poll (no background + // schedule), then fail it. Should be NNG_TIMEDOUT. + rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_send, ctx); + if ((rv != 0) && (nni_list_empty(&s->readypipes))) { + nni_idhash_remove(s->reqids, id); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } ctx->reqlen = nni_msg_len(msg); ctx->reqmsg = msg; ctx->saio = aio; @@ -743,6 +755,8 @@ req0_ctx_send(void *arg, nni_aio *aio) // Stick us on the sendq list. nni_list_append(&s->sendq, ctx); + // Note that this will be synchronous if the readypipes list was + // not empty. req0_run_sendq(s, NULL); nni_mtx_unlock(&s->mtx); } |
