diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-20 20:52:32 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-24 15:06:33 -0700 |
| commit | fdefff742662ed4eb476bf19b9dda245f86bc406 (patch) | |
| tree | a4e132716debd64e434478f8814f368db052cbc6 /src/protocol/reqrep0/xrep.c | |
| parent | e0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff) | |
| download | nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2 nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip | |
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error
fixes #361 nng_send_aio should check for NULL message
fixes #362 nni_msgq does not signal pollable on certain events
This adds support for contexts for both sides of the surveyor pattern.
Prior to this commit, the raw mode was completely broken, and there
were numerous other bugs found and fixed. This integration includes
*much* deeper validation of this pattern.
Some changes to the core and other patterns have been made, where it
was obvioius that we could make such improvements. (The obviousness
stemming from the fact that RESPONDENT in particular is very closely
derived from REP.)
Diffstat (limited to 'src/protocol/reqrep0/xrep.c')
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index f7189453..4773677e 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -189,7 +189,9 @@ xrep0_pipe_stop(void *arg) nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_putq); + nni_mtx_lock(&s->lk); nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_mtx_unlock(&s->lk); } static void @@ -200,7 +202,6 @@ xrep0_sock_getq_cb(void *arg) nni_msg * msg; uint32_t id; xrep0_pipe *p; - int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -229,12 +230,12 @@ xrep0_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { - rv = nni_msgq_tryput(p->sendq, msg); - } - if (rv != 0) { + nni_mtx_lock(&s->lk); + if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } + nni_mtx_unlock(&s->lk); // Now look for another message on the upper write queue. nni_msgq_aio_get(uwq, s->aio_getq); @@ -277,8 +278,6 @@ xrep0_pipe_recv_cb(void *arg) xrep0_pipe *p = arg; xrep0_sock *s = p->rep; nni_msg * msg; - int rv; - uint8_t * body; int hops; if (nni_aio_result(p->aio_recv) != 0) { @@ -292,8 +291,7 @@ xrep0_pipe_recv_cb(void *arg) nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // Store the pipe id in the header, first thing. - rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); - if (rv != 0) { + if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) { // Failure here causes us to drop the message. goto drop; } @@ -301,7 +299,8 @@ xrep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - int end = 0; + bool end = 0; + uint8_t *body; if (hops > s->ttl) { // This isn't malformed, but it has gone through // too many hops. Do not disconnect, because we @@ -317,9 +316,8 @@ xrep0_pipe_recv_cb(void *arg) return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { // Out of memory most likely, but keep going to // avoid breaking things. goto drop; @@ -413,7 +411,6 @@ static nni_proto_sock_ops xrep0_sock_ops = { .sock_open = xrep0_sock_open, .sock_close = xrep0_sock_close, .sock_options = xrep0_sock_options, - .sock_filter = NULL, // No filtering for raw mode .sock_send = xrep0_sock_send, .sock_recv = xrep0_sock_recv, }; |
