diff options
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index e18675ee..afc52629 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -170,6 +170,10 @@ rep0_ctx_send(void *arg, nni_aio *aio) msg = nni_aio_get_msg(aio); nni_msg_header_clear(msg); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->lk); len = ctx->btrace_len; p_id = ctx->pipe_id; @@ -187,10 +191,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_pollable_clear(s->sendable); } - if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) { - nni_mtx_unlock(&s->lk); - return; - } if (ctx->closed) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -201,12 +201,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ESTATE); return; } - if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer @@ -217,7 +211,18 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_msg_free(msg); return; } + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } if (p->busy) { + rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } ctx->saio = aio; ctx->spipe = p; nni_list_append(&p->sendq, ctx); @@ -456,18 +461,23 @@ rep0_ctx_recv(void *arg, nni_aio *aio) size_t len; nni_msg * msg; - nni_mtx_lock(&s->lk); - if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) { - nni_mtx_unlock(&s->lk); + if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&s->lk); if (ctx->closed) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ECLOSED); return; } if ((p = nni_list_first(&s->recvpipes)) == NULL) { - nni_pollable_clear(s->recvable); + int rv; + rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } ctx->raio = aio; nni_list_append(&s->recvq, ctx); nni_mtx_unlock(&s->lk); |
