diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-20 17:03:12 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-23 16:14:53 -0700 |
| commit | 3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch) | |
| tree | a45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/protocol/reqrep | |
| parent | fdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff) | |
| download | nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2 nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip | |
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/protocol/reqrep')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 109 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 91 |
2 files changed, 124 insertions, 76 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index cd9411d9..510beab3 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -32,6 +32,7 @@ struct rep_sock { nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; + nni_mtx lk; int raw; int ttl; nni_idhash *pipes; @@ -62,6 +63,7 @@ rep_sock_fini(void *arg) if (s->btrace != NULL) { nni_free(s->btrace, s->btrace_len); } + nni_mtx_fini(&s->lk); NNI_FREE_STRUCT(s); } @@ -74,6 +76,7 @@ rep_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { rep_sock_fini(s); @@ -89,7 +92,6 @@ rep_sock_init(void **sp, nni_sock *sock) s->urq = nni_sock_recvq(sock); *sp = s; - nni_sock_senderr(sock, NNG_ESTATE); return (0); } @@ -215,6 +217,7 @@ rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. + // XXX: LOCKING?!?! if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { rv = nni_msgq_tryput(p->sendq, msg); } @@ -347,10 +350,10 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) { rep_sock *s = arg; int rv; + + nni_mtx_lock(&s->lk); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - if (rv == 0) { - nni_sock_senderr(s->sock, s->raw ? 0 : NNG_ESTATE); - } + nni_mtx_unlock(&s->lk); return (rv); } @@ -376,53 +379,18 @@ rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) } static nni_msg * -rep_sock_sfilter(void *arg, nni_msg *msg) -{ - rep_sock *s = arg; - - if (s->raw) { - return (msg); - } - - // Cannot send again until a receive is done... - nni_sock_senderr(s->sock, NNG_ESTATE); - - // If we have a stored backtrace, append it to the header... - // if we don't have a backtrace, discard the message. - if (s->btrace == NULL) { - nni_msg_free(msg); - return (NULL); - } - - // drop anything else in the header... - nni_msg_header_clear(msg); - - if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - nni_msg_free(msg); - return (NULL); - } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - return (msg); -} - -static nni_msg * -rep_sock_rfilter(void *arg, nni_msg *msg) +rep_sock_filter(void *arg, nni_msg *msg) { rep_sock *s = arg; char * header; size_t len; + nni_mtx_lock(&s->lk); if (s->raw) { + nni_mtx_unlock(&s->lk); return (msg); } - nni_sock_senderr(s->sock, 0); len = nni_msg_header_len(msg); header = nni_msg_header(msg); if (s->btrace != NULL) { @@ -437,9 +405,61 @@ rep_sock_rfilter(void *arg, nni_msg *msg) s->btrace_len = len; memcpy(s->btrace, header, len); nni_msg_header_clear(msg); + nni_mtx_unlock(&s->lk); return (msg); } +static void +rep_sock_send(void *arg, nni_aio *aio) +{ + rep_sock *s = arg; + int rv; + nni_msg * msg; + + nni_mtx_lock(&s->lk); + if (s->raw) { + // Pass thru + nni_mtx_unlock(&s->lk); + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); + return; + } + if (s->btrace == NULL) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + + msg = nni_aio_get_msg(aio); + + // drop anything else in the header... (it should already be + // empty, but there can be stale backtrace info there.) + nni_msg_header_clear(msg); + + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + + nni_mtx_unlock(&s->lk); + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +rep_sock_recv(void *arg, nni_aio *aio) +{ + rep_sock *s = arg; + + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops rep_pipe_ops = { @@ -470,8 +490,9 @@ static nni_proto_sock_ops rep_sock_ops = { .sock_open = rep_sock_open, .sock_close = rep_sock_close, .sock_options = rep_sock_options, - .sock_rfilter = rep_sock_rfilter, - .sock_sfilter = rep_sock_sfilter, + .sock_filter = rep_sock_filter, + .sock_send = rep_sock_send, + .sock_recv = rep_sock_recv, }; static nni_proto nni_rep_proto = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 24d01df2..81abd306 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -99,7 +99,7 @@ req_sock_init(void **sp, nni_sock *sock) s->uwq = nni_sock_sendq(sock); s->urq = nni_sock_recvq(sock); *sp = s; - nni_sock_recverr(sock, NNG_ESTATE); + return (0); } @@ -249,12 +249,7 @@ static int req_sock_setopt_raw(void *arg, const void *buf, size_t sz) { req_sock *s = arg; - int rv; - rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - if (rv == 0) { - nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE); - } - return (rv); + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int @@ -505,35 +500,46 @@ req_resend(req_sock *s) } } -static nni_msg * -req_sock_sfilter(void *arg, nni_msg *msg) +static void +req_sock_send(void *arg, nni_aio *aio) { req_sock *s = arg; uint32_t id; + size_t len; + nni_msg * msg; + int rv; + nni_mtx_lock(&s->mtx); if (s->raw) { - // No automatic retry, and the request ID must - // be in the header coming down. - return (msg); + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); + return; } + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + + // In cooked mode, because we need to manage our own resend logic, + // we bypass the upper writeq entirely. + // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the // backtrace. (Pipe IDs have the high order bit clear.) id = (s->nextid++) | 0x80000000u; - // Request ID is in big endian format. NNI_PUT32(s->reqid, id); - if (nni_msg_header_append(msg, s->reqid, 4) != 0) { - // Should be ENOMEM. - nni_msg_free(msg); - return (NULL); + if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } - // NB: The socket lock is also held, so this is always self-serialized. - // But we have to serialize against other async callbacks. - nni_mtx_lock(&s->mtx); + // XXX: I think we should just not do this... and leave the + // socket "permanently writeable". This does screw up all the + // backpressure. + // nni_sock_send_pending(s->sock); // If another message is there, this cancels it. if (s->reqmsg != NULL) { @@ -541,6 +547,8 @@ req_sock_sfilter(void *arg, nni_msg *msg) s->reqmsg = NULL; } + nni_aio_set_msg(aio, NULL); + // Make a duplicate message... for retries. s->reqmsg = msg; // Schedule for immediate send @@ -548,40 +556,41 @@ req_sock_sfilter(void *arg, nni_msg *msg) s->wantw = 1; req_resend(s); - nni_mtx_unlock(&s->mtx); - // Clear the error condition. - nni_sock_recverr(s->sock, 0); + nni_mtx_unlock(&s->mtx); - return (NULL); + nni_aio_finish(aio, 0, len); } static nni_msg * -req_sock_rfilter(void *arg, nni_msg *msg) +req_sock_filter(void *arg, nni_msg *msg) { req_sock *s = arg; nni_msg * rmsg; + nni_mtx_lock(&s->mtx); if (s->raw) { // Pass it unmolested + nni_mtx_unlock(&s->mtx); return (msg); } if (nni_msg_header_len(msg) < 4) { + nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } - nni_mtx_lock(&s->mtx); - if ((rmsg = s->reqmsg) == NULL) { - // We had no outstanding request. + // We had no outstanding request. (Perhaps canceled, + // or duplicate response.) nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } + if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { - // Wrong request id + // Wrong request id. nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); @@ -591,12 +600,29 @@ req_sock_rfilter(void *arg, nni_msg *msg) s->pendpipe = NULL; nni_mtx_unlock(&s->mtx); - nni_sock_recverr(s->sock, NNG_ESTATE); nni_msg_free(rmsg); return (msg); } +static void +req_sock_recv(void *arg, nni_aio *aio) +{ + req_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (!s->raw) { + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + } + nni_mtx_unlock(&s->mtx); + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops req_pipe_ops = { .pipe_init = req_pipe_init, .pipe_fini = req_pipe_fini, @@ -630,8 +656,9 @@ static nni_proto_sock_ops req_sock_ops = { .sock_open = req_sock_open, .sock_close = req_sock_close, .sock_options = req_sock_options, - .sock_rfilter = req_sock_rfilter, - .sock_sfilter = req_sock_sfilter, + .sock_filter = req_sock_filter, + .sock_send = req_sock_send, + .sock_recv = req_sock_recv, }; static nni_proto req_proto = { |
