diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-20 20:52:32 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-24 15:06:33 -0700 |
| commit | fdefff742662ed4eb476bf19b9dda245f86bc406 (patch) | |
| tree | a4e132716debd64e434478f8814f368db052cbc6 /src/protocol/reqrep0/rep.c | |
| parent | e0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff) | |
| download | nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2 nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip | |
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error
fixes #361 nng_send_aio should check for NULL message
fixes #362 nni_msgq does not signal pollable on certain events
This adds support for contexts for both sides of the surveyor pattern.
Prior to this commit, the raw mode was completely broken, and there
were numerous other bugs found and fixed. This integration includes
*much* deeper validation of this pattern.
Some changes to the core and other patterns have been made, where it
was obvioius that we could make such improvements. (The obviousness
stemming from the fact that RESPONDENT in particular is very closely
derived from REP.)
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 73 |
1 files changed, 27 insertions, 46 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 4e20466b..385860cd 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -36,11 +36,9 @@ static void rep0_pipe_fini(void *); struct rep0_ctx { rep0_sock * sock; - bool closed; char * btrace; size_t btrace_len; size_t btrace_size; - int ttl; uint32_t pipe_id; rep0_pipe * spipe; // send pipe nni_aio * saio; // send aio @@ -56,7 +54,6 @@ struct rep0_sock { nni_idhash * pipes; nni_list recvpipes; // list of pipes with data to receive nni_list recvq; - bool closed; rep0_ctx * ctx; nni_pollable *recvable; nni_pollable *sendable; @@ -82,15 +79,11 @@ rep0_ctx_close(void *arg) nni_aio * aio; nni_mtx_lock(&s->lk); - ctx->closed = true; if ((aio = ctx->saio) != NULL) { - nni_msg * msg; rep0_pipe *pipe = ctx->spipe; ctx->saio = NULL; ctx->spipe = NULL; nni_list_remove(&pipe->sendq, ctx); - msg = nni_aio_get_msg(aio); - nni_msg_free(msg); nni_aio_finish_error(aio, NNG_ECLOSED); } if ((aio = ctx->raio) != NULL) { @@ -191,53 +184,48 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_pollable_clear(s->sendable); } - if (ctx->closed) { + if (len == 0) { nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_aio_finish_error(aio, NNG_ESTATE); return; } - if (len == 0) { + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ESTATE); + nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { + if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer // lost interest in our reply. - nni_aio_set_msg(aio, NULL); nni_mtx_unlock(&s->lk); + nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, nni_msg_len(msg)); nni_msg_free(msg); return; } - if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + if (!p->busy) { + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); + + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); return; } - if (p->busy) { - rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); - if (rv != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - ctx->saio = aio; - ctx->spipe = p; - nni_list_append(&p->sendq, ctx); + + rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); + if (rv != 0) { nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); return; } - - p->busy = true; - len = nni_msg_len(msg); - nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + ctx->saio = aio; + ctx->spipe = p; + nni_list_append(&p->sendq, ctx); nni_mtx_unlock(&s->lk); - - nni_aio_finish(aio, 0, len); } static void @@ -376,6 +364,7 @@ rep0_pipe_stop(void *arg) aio = ctx->saio; ctx->saio = NULL; msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, nni_msg_len(msg)); nni_msg_free(msg); } @@ -384,12 +373,11 @@ rep0_pipe_stop(void *arg) // accept a message and discard it.) nni_pollable_raise(s->sendable); } + nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); } static void @@ -465,11 +453,6 @@ rep0_ctx_recv(void *arg, nni_aio *aio) return; } nni_mtx_lock(&s->lk); - if (ctx->closed) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } if ((p = nni_list_first(&s->recvpipes)) == NULL) { int rv; rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx); @@ -509,7 +492,6 @@ rep0_pipe_recv_cb(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; nni_msg * msg; - int rv; uint8_t * body; nni_aio * aio; size_t len; @@ -527,7 +509,7 @@ rep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - int end = 0; + bool end = false; if (hops > s->ttl) { // This isn't malformed, but it has gone through @@ -544,9 +526,8 @@ rep0_pipe_recv_cb(void *arg) return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { // Out of memory, so drop it. goto drop; } @@ -571,7 +552,6 @@ rep0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(aio, msg); nni_aio_set_msg(p->aio_recv, NULL); // schedule another receive @@ -591,6 +571,7 @@ rep0_pipe_recv_cb(void *arg) nni_mtx_unlock(&s->lk); + nni_aio_set_msg(aio, msg); nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); return; |
