From fdefff742662ed4eb476bf19b9dda245f86bc406 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 20 Apr 2018 20:52:32 -0700 Subject: fixes #342 Want Surveyor/Respondent context support fixes #360 core should nng_aio_begin before nng_aio_finish_error fixes #361 nng_send_aio should check for NULL message fixes #362 nni_msgq does not signal pollable on certain events This adds support for contexts for both sides of the surveyor pattern. Prior to this commit, the raw mode was completely broken, and there were numerous other bugs found and fixed. This integration includes *much* deeper validation of this pattern. Some changes to the core and other patterns have been made, where it was obvioius that we could make such improvements. (The obviousness stemming from the fact that RESPONDENT in particular is very closely derived from REP.) --- src/core/msgqueue.c | 25 ++ src/core/msgqueue.h | 4 + src/nng.c | 28 +- src/protocol/reqrep0/rep.c | 73 ++-- src/protocol/reqrep0/xrep.c | 25 +- src/protocol/reqrep0/xreq.c | 20 +- src/protocol/survey0/CMakeLists.txt | 12 +- src/protocol/survey0/respond.c | 644 +++++++++++++++++++++++------------- src/protocol/survey0/survey.c | 546 ++++++++++++++++++------------ src/protocol/survey0/xrespond.c | 408 +++++++++++++++++++++++ src/protocol/survey0/xsurvey.c | 380 +++++++++++++++++++++ 11 files changed, 1631 insertions(+), 534 deletions(-) create mode 100644 src/protocol/survey0/xrespond.c create mode 100644 src/protocol/survey0/xsurvey.c (limited to 'src') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 1bb5a762..7c33b256 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -40,6 +40,8 @@ struct nni_msgq { void * mq_filter_arg; }; +static void nni_msgq_run_notify(nni_msgq *); + int nni_msgq_init(nni_msgq **mqp, unsigned cap) { @@ -128,6 +130,7 @@ nni_msgq_set_get_error(nni_msgq *mq, int error) } } mq->mq_geterr = error; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -149,6 +152,7 @@ nni_msgq_set_put_error(nni_msgq *mq, int error) } } mq->mq_puterr = error; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -172,6 +176,24 @@ nni_msgq_set_error(nni_msgq *mq, int error) } mq->mq_puterr = error; mq->mq_geterr = error; + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); +} + +void +nni_msgq_flush(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + while (mq->mq_len > 0) { + nni_msg *msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get >= mq->mq_alloc) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -331,6 +353,7 @@ nni_msgq_cancel(nni_aio *aio, int rv) nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); } @@ -413,6 +436,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) nni_list_remove(&mq->mq_aio_getq, raio); nni_aio_finish_msg(raio, msg); + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } @@ -424,6 +448,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) mq->mq_put = 0; } mq->mq_len++; + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 2f1a46eb..65215bd0 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -33,6 +33,10 @@ extern int nni_msgq_init(nni_msgq **, unsigned); // messages that may be in the queue. extern void nni_msgq_fini(nni_msgq *); +// nni_msgq_flush discards any messages that are sitting in the queue. +// It does not wake any writers that might be waiting. +extern void nni_msgq_flush(nni_msgq *); + extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); diff --git a/src/nng.c b/src/nng.c index 9eb33f50..fb374dde 100644 --- a/src/nng.c +++ b/src/nng.c @@ -198,7 +198,9 @@ nng_recv_aio(nng_socket sid, nng_aio *aio) int rv; if ((rv = nni_sock_find(&sock, sid)) != 0) { - nni_aio_finish_error(aio, rv); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, rv); + } return; } nni_sock_recv(sock, aio); @@ -211,8 +213,16 @@ nng_send_aio(nng_socket sid, nng_aio *aio) nni_sock *sock; int rv; + if (nni_aio_get_msg(aio) == NULL) { + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_EINVAL); + } + return; + } if ((rv = nni_sock_find(&sock, sid)) != 0) { - nni_aio_finish_error(aio, rv); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, rv); + } return; } nni_sock_send(sock, aio); @@ -260,7 +270,9 @@ nng_ctx_recv(nng_ctx cid, nng_aio *aio) nni_ctx *ctx; if ((rv = nni_ctx_find(&ctx, cid, false)) != 0) { - nni_aio_finish_error(aio, rv); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, rv); + } return; } nni_ctx_recv(ctx, aio); @@ -273,8 +285,16 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio) int rv; nni_ctx *ctx; + if (nni_aio_get_msg(aio) == NULL) { + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, NNG_EINVAL); + } + return; + } if ((rv = nni_ctx_find(&ctx, cid, false)) != 0) { - nni_aio_finish_error(aio, rv); + if (nni_aio_begin(aio) == 0) { + nni_aio_finish_error(aio, rv); + } return; } nni_ctx_send(ctx, aio); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 4e20466b..385860cd 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -36,11 +36,9 @@ static void rep0_pipe_fini(void *); struct rep0_ctx { rep0_sock * sock; - bool closed; char * btrace; size_t btrace_len; size_t btrace_size; - int ttl; uint32_t pipe_id; rep0_pipe * spipe; // send pipe nni_aio * saio; // send aio @@ -56,7 +54,6 @@ struct rep0_sock { nni_idhash * pipes; nni_list recvpipes; // list of pipes with data to receive nni_list recvq; - bool closed; rep0_ctx * ctx; nni_pollable *recvable; nni_pollable *sendable; @@ -82,15 +79,11 @@ rep0_ctx_close(void *arg) nni_aio * aio; nni_mtx_lock(&s->lk); - ctx->closed = true; if ((aio = ctx->saio) != NULL) { - nni_msg * msg; rep0_pipe *pipe = ctx->spipe; ctx->saio = NULL; ctx->spipe = NULL; nni_list_remove(&pipe->sendq, ctx); - msg = nni_aio_get_msg(aio); - nni_msg_free(msg); nni_aio_finish_error(aio, NNG_ECLOSED); } if ((aio = ctx->raio) != NULL) { @@ -191,53 +184,48 @@ rep0_ctx_send(void *arg, nni_aio *aio) nni_pollable_clear(s->sendable); } - if (ctx->closed) { + if (len == 0) { nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_aio_finish_error(aio, NNG_ESTATE); return; } - if (len == 0) { + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ESTATE); + nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { + if (nni_idhash_find(s->pipes, p_id, (void **) &p) != 0) { // Pipe is gone. Make this look like a good send to avoid // disrupting the state machine. We don't care if the peer // lost interest in our reply. - nni_aio_set_msg(aio, NULL); nni_mtx_unlock(&s->lk); + nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, nni_msg_len(msg)); nni_msg_free(msg); return; } - if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + if (!p->busy) { + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); + + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); return; } - if (p->busy) { - rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); - if (rv != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - ctx->saio = aio; - ctx->spipe = p; - nni_list_append(&p->sendq, ctx); + + rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx); + if (rv != 0) { nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); return; } - - p->busy = true; - len = nni_msg_len(msg); - nni_aio_set_msg(aio, NULL); - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + ctx->saio = aio; + ctx->spipe = p; + nni_list_append(&p->sendq, ctx); nni_mtx_unlock(&s->lk); - - nni_aio_finish(aio, 0, len); } static void @@ -376,6 +364,7 @@ rep0_pipe_stop(void *arg) aio = ctx->saio; ctx->saio = NULL; msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); nni_aio_finish(aio, 0, nni_msg_len(msg)); nni_msg_free(msg); } @@ -384,12 +373,11 @@ rep0_pipe_stop(void *arg) // accept a message and discard it.) nni_pollable_raise(s->sendable); } + nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - - nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); } static void @@ -465,11 +453,6 @@ rep0_ctx_recv(void *arg, nni_aio *aio) return; } nni_mtx_lock(&s->lk); - if (ctx->closed) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } if ((p = nni_list_first(&s->recvpipes)) == NULL) { int rv; rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx); @@ -509,7 +492,6 @@ rep0_pipe_recv_cb(void *arg) rep0_sock *s = p->rep; rep0_ctx * ctx; nni_msg * msg; - int rv; uint8_t * body; nni_aio * aio; size_t len; @@ -527,7 +509,7 @@ rep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - int end = 0; + bool end = false; if (hops > s->ttl) { // This isn't malformed, but it has gone through @@ -544,9 +526,8 @@ rep0_pipe_recv_cb(void *arg) return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { // Out of memory, so drop it. goto drop; } @@ -571,7 +552,6 @@ rep0_pipe_recv_cb(void *arg) nni_list_remove(&s->recvq, ctx); aio = ctx->raio; ctx->raio = NULL; - nni_aio_set_msg(aio, msg); nni_aio_set_msg(p->aio_recv, NULL); // schedule another receive @@ -591,6 +571,7 @@ rep0_pipe_recv_cb(void *arg) nni_mtx_unlock(&s->lk); + nni_aio_set_msg(aio, msg); nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); return; diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index f7189453..4773677e 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -189,7 +189,9 @@ xrep0_pipe_stop(void *arg) nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_putq); + nni_mtx_lock(&s->lk); nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); + nni_mtx_unlock(&s->lk); } static void @@ -200,7 +202,6 @@ xrep0_sock_getq_cb(void *arg) nni_msg * msg; uint32_t id; xrep0_pipe *p; - int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -229,12 +230,12 @@ xrep0_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { - rv = nni_msgq_tryput(p->sendq, msg); - } - if (rv != 0) { + nni_mtx_lock(&s->lk); + if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + (nni_msgq_tryput(p->sendq, msg) != 0)) { nni_msg_free(msg); } + nni_mtx_unlock(&s->lk); // Now look for another message on the upper write queue. nni_msgq_aio_get(uwq, s->aio_getq); @@ -277,8 +278,6 @@ xrep0_pipe_recv_cb(void *arg) xrep0_pipe *p = arg; xrep0_sock *s = p->rep; nni_msg * msg; - int rv; - uint8_t * body; int hops; if (nni_aio_result(p->aio_recv) != 0) { @@ -292,8 +291,7 @@ xrep0_pipe_recv_cb(void *arg) nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // Store the pipe id in the header, first thing. - rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); - if (rv != 0) { + if (nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)) != 0) { // Failure here causes us to drop the message. goto drop; } @@ -301,7 +299,8 @@ xrep0_pipe_recv_cb(void *arg) // Move backtrace from body to header hops = 1; for (;;) { - int end = 0; + bool end = 0; + uint8_t *body; if (hops > s->ttl) { // This isn't malformed, but it has gone through // too many hops. Do not disconnect, because we @@ -317,9 +316,8 @@ xrep0_pipe_recv_cb(void *arg) return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { // Out of memory most likely, but keep going to // avoid breaking things. goto drop; @@ -413,7 +411,6 @@ static nni_proto_sock_ops xrep0_sock_ops = { .sock_open = xrep0_sock_open, .sock_close = xrep0_sock_close, .sock_options = xrep0_sock_options, - .sock_filter = NULL, // No filtering for raw mode .sock_send = xrep0_sock_send, .sock_recv = xrep0_sock_recv, }; diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c index 5c1841b2..13ae7418 100644 --- a/src/protocol/reqrep0/xreq.c +++ b/src/protocol/reqrep0/xreq.c @@ -226,25 +226,21 @@ xreq0_recv_cb(void *arg) // We yank 4 bytes from front of body, and move them to the header. if (nni_msg_len(msg) < 4) { - // Malformed message. - goto malformed; + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_stop(p->pipe); + return; } id = nni_msg_trim_u32(msg); if (nni_msg_header_append_u32(msg, id) != 0) { - // Arguably we could just discard and carry on. But - // dropping the connection is probably more helpful since - // it lets the other side see that a problem occurred. - // Plus it gives us a chance to reclaim some memory. - goto malformed; + // Probably ENOMEM, discard and carry on. + nni_msg_free(msg); + nni_pipe_recv(p->pipe, p->aio_recv); + return; } nni_aio_set_msg(p->aio_putq, msg); nni_msgq_aio_put(sock->urq, p->aio_putq); - return; - -malformed: - nni_msg_free(msg); - nni_pipe_stop(p->pipe); } static void diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt index 479c031c..0a82463c 100644 --- a/src/protocol/survey0/CMakeLists.txt +++ b/src/protocol/survey0/CMakeLists.txt @@ -1,6 +1,6 @@ # -# Copyright 2017 Garrett D'Amore -# Copyright 2017 Capitar IT Group BV +# Copyright 2018 Staysail Systems, Inc. +# Copyright 2018 Capitar IT Group BV # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -8,15 +8,17 @@ # found online at https://opensource.org/licenses/MIT. # -# Req/Rep protocol +# Surveyor/Respondent protocol if (NNG_PROTO_SURVEYOR0) - set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/xsurvey.c + protocol/survey0/survey.h) set(SURV0_HEADERS protocol/survey0/survey.h) endif() if (NNG_PROTO_RESPONDENT0) - set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/xrespond.c + protocol/survey0/respond.h) set(RESP0_HEADERS protocol/survey0/respond.h) endif() diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 1605d9e6..60cf188a 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -28,49 +28,212 @@ typedef struct resp0_pipe resp0_pipe; typedef struct resp0_sock resp0_sock; +typedef struct resp0_ctx resp0_ctx; -static void resp0_recv_cb(void *); -static void resp0_putq_cb(void *); -static void resp0_getq_cb(void *); -static void resp0_send_cb(void *); -static void resp0_sock_getq_cb(void *); +static void resp0_pipe_send_cb(void *); +static void resp0_pipe_recv_cb(void *); static void resp0_pipe_fini(void *); +struct resp0_ctx { + resp0_sock * sock; + char * btrace; + size_t btrace_len; + size_t btrace_size; + uint32_t pipe_id; + resp0_pipe * spipe; // send pipe + nni_aio * saio; // send aio + nni_aio * raio; // recv aio + nni_list_node sqnode; + nni_list_node rqnode; +}; + // resp0_sock is our per-socket protocol private structure. struct resp0_sock { - nni_msgq * urq; - nni_msgq * uwq; - int ttl; - nni_idhash *pipes; - char * btrace; - size_t btrace_len; - nni_aio * aio_getq; - nni_mtx mtx; + nni_mtx mtx; + int ttl; + nni_idhash * pipes; + resp0_ctx * ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable *recvable; + nni_pollable *sendable; }; // resp0_pipe is our per-pipe protocol private structure. struct resp0_pipe { - nni_pipe * npipe; - resp0_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_pipe * npipe; + resp0_sock * psock; + bool busy; + uint32_t id; + nni_list sendq; // contexts waiting to send + nni_aio * aio_send; + nni_aio * aio_recv; + nni_list_node rnode; // receivable linkage }; +static void +resp0_ctx_close(void *arg) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + nni_aio * aio; + + // complete any outstanding operations here, cancellation, etc. + + nni_mtx_lock(&s->mtx); + if ((aio = ctx->saio) != NULL) { + resp0_pipe *p = ctx->spipe; + ctx->saio = NULL; + ctx->spipe = NULL; + nni_list_remove(&p->sendq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_list_remove(&s->recvq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_ctx_fini(void *arg) +{ + resp0_ctx *ctx = arg; + + resp0_ctx_close(ctx); + nni_free(ctx->btrace, ctx->btrace_size); + NNI_FREE_STRUCT(ctx); +} + +static int +resp0_ctx_init(void **ctxp, void *sarg) +{ + resp0_sock *s = sarg; + resp0_ctx * ctx; + + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + // this is 1kB, which covers the worst case. + ctx->btrace_size = 256 * sizeof(uint32_t); + if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) { + NNI_FREE_STRUCT(ctx); + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&ctx->sqnode); + // XXX: NNI_LIST_NODE_INIT(&ctx->rqnode); + ctx->btrace_len = 0; + ctx->sock = s; + ctx->pipe_id = 0; + *ctxp = ctx; + + return (0); +} + +static void +resp0_ctx_cancel_send(nni_aio *aio, int rv) +{ + resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->saio != aio) { + nni_mtx_unlock(&s->mtx); + return; + } + nni_list_node_remove(&ctx->sqnode); + ctx->saio = NULL; + nni_mtx_unlock(&s->mtx); + nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers + nni_aio_finish_error(aio, rv); +} + +static void +resp0_ctx_send(void *arg, nni_aio *aio) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + nni_msg * msg; + size_t len; + uint32_t pid; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + + if (ctx == s->ctx) { + // We can't send anymore, because only one send per request. + nni_pollable_clear(s->sendable); + } + + nni_mtx_lock(&s->mtx); + + if ((len = ctx->btrace_len) == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + pid = ctx->pipe_id; + ctx->pipe_id = 0; + ctx->btrace_len = 0; + + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + if (nni_idhash_find(s->pipes, pid, (void **) &p) != 0) { + // Surveyor has left the building. Just discard the reply. + nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + return; + } + + if (!p->busy) { + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + return; + } + + if ((rv = nni_aio_schedule_verify(aio, resp0_ctx_cancel_send, ctx)) != + 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + ctx->saio = aio; + ctx->spipe = p; + nni_list_append(&p->sendq, ctx); + nni_mtx_unlock(&s->mtx); +} + static void resp0_sock_fini(void *arg) { resp0_sock *s = arg; - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); + if (s->ctx != NULL) { + resp0_ctx_fini(s->ctx); } + nni_pollable_free(s->sendable); + nni_pollable_free(s->recvable); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); } @@ -81,22 +244,34 @@ resp0_sock_init(void **sp, nni_sock *nsock) resp0_sock *s; int rv; + NNI_ARG_UNUSED(nsock); + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + if ((rv = nni_idhash_init(&s->pipes)) != 0) { resp0_sock_fini(s); return (rv); } - s->ttl = 8; // Per RFC - s->btrace = NULL; - s->btrace_len = 0; - s->urq = nni_sock_recvq(nsock); - s->uwq = nni_sock_sendq(nsock); + NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); + NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); + + s->ttl = 8; // Per RFC + + if ((rv = resp0_ctx_init((void **) &s->ctx, s)) != 0) { + resp0_ctx_fini(s); + return (rv); + } + // We start off without being either readable or pollable. + // Readability comes when there is something on the socket. + if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || + ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { + resp0_sock_fini(s); + return (rv); + } *sp = s; return (0); } @@ -104,9 +279,7 @@ resp0_sock_init(void **sp, nni_sock *nsock) static void resp0_sock_open(void *arg) { - resp0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void @@ -114,7 +287,7 @@ resp0_sock_close(void *arg) { resp0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + resp0_ctx_close(s->ctx); } static void @@ -122,11 +295,8 @@ resp0_pipe_fini(void *arg) { resp0_pipe *p = arg; - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_msgq_fini(p->sendq); NNI_FREE_STRUCT(p); } @@ -139,18 +309,20 @@ resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { resp0_pipe_fini(p); return (rv); } + NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode); + p->npipe = npipe; p->psock = s; - *pp = p; + p->busy = false; + p->id = nni_pipe_id(npipe); + + *pp = p; return (0); } @@ -161,8 +333,6 @@ resp0_pipe_start(void *arg) resp0_sock *s = p->psock; int rv; - p->id = nni_pipe_id(p->npipe); - nni_mtx_lock(&s->mtx); rv = nni_idhash_insert(s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); @@ -171,8 +341,6 @@ resp0_pipe_start(void *arg) } nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); - return (rv); } @@ -181,139 +349,179 @@ resp0_pipe_stop(void *arg) { resp0_pipe *p = arg; resp0_sock *s = p->psock; + resp0_ctx * ctx; + + nni_mtx_lock(&s->mtx); + while ((ctx = nni_list_first(&p->sendq)) != NULL) { + nni_aio *aio; + nni_msg *msg; + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } + if (p->id == s->ctx->pipe_id) { + // Make sure user space knows they can send a message to us, + // which we will happily discard. + nni_pollable_raise(s->sendable); + } + nni_idhash_remove(s->pipes, p->id); + nni_mtx_unlock(&s->mtx); - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - - if (p->id != 0) { - nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, p->id); - nni_mtx_unlock(&s->mtx); - p->id = 0; - } } -// resp0_sock_send watches for messages from the upper write queue, -// extracts the destination pipe, and forwards it to the appropriate -// destination pipe via a separate queue. This prevents a single bad -// or slow pipe from gumming up the works for the entire socket.s - -void -resp0_sock_getq_cb(void *arg) +static void +resp0_pipe_send_cb(void *arg) { - resp0_sock *s = arg; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + nni_aio * aio; nni_msg * msg; - uint32_t id; - resp0_pipe *p; - int rv; + size_t len; - if (nni_aio_result(s->aio_getq) != 0) { + nni_mtx_lock(&s->mtx); + p->busy = false; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + nni_mtx_unlock(&s->mtx); return; } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // We yank the outgoing pipe id from the header - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - // We can't really close down the socket, so just keep going. - nni_msgq_aio_get(s->uwq, s->aio_getq); + if ((ctx = nni_list_first(&p->sendq)) == NULL) { + // Nothing else to send. + if (p->id == s->ctx->pipe_id) { + // Mark us ready for the other side to send! + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->mtx); return; } - id = nni_msg_header_trim_u32(msg); - nni_mtx_lock(&s->mtx); - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) { - // Destination pipe not present. - nni_msg_free(msg); - } else { - // Non-blocking put. - if (nni_msgq_tryput(p->sendq, msg) != 0) { - nni_msg_free(msg); - } - } - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + ctx->spipe = NULL; + p->busy = true; + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); + nni_mtx_unlock(&s->mtx); + + nni_aio_finish_synch(aio, 0, len); } -void -resp0_getq_cb(void *arg) +static void +resp0_cancel_recv(nni_aio *aio, int rv) { - resp0_pipe *p = arg; + resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_sock *s = ctx->sock; - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); - return; + nni_mtx_lock(&s->mtx); + if (ctx->raio == aio) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, rv); } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->npipe, p->aio_send); + nni_mtx_unlock(&s->mtx); } -void -resp0_send_cb(void *arg) +static void +resp0_ctx_recv(void *arg, nni_aio *aio) { - resp0_pipe *p = arg; + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + size_t len; + nni_msg * msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->mtx); + if ((p = nni_list_first(&s->recvpipes)) == NULL) { + int rv; + rv = nni_aio_schedule_verify(aio, resp0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + ctx->raio = aio; + nni_list_append(&s->recvq, ctx); + nni_mtx_unlock(&s->mtx); return; } + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_list_remove(&s->recvpipes, p); + if (nni_list_empty(&s->recvpipes)) { + nni_pollable_clear(s->recvable); + } + nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); + len = nni_msg_header_len(msg); + memcpy(ctx->btrace, nni_msg_header(msg), len); + ctx->btrace_len = len; + ctx->pipe_id = p->id; + if (ctx == s->ctx) { + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->mtx); + + nni_msg_header_clear(msg); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void -resp0_recv_cb(void *arg) +resp0_pipe_recv_cb(void *arg) { - resp0_pipe *p = arg; - resp0_sock *s = p->psock; - nni_msgq * urq = s->urq; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; nni_msg * msg; + nni_aio * aio; int hops; - int rv; + size_t len; if (nni_aio_result(p->aio_recv) != 0) { - goto error; + nni_pipe_stop(p->npipe); + return; } msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); - // Store the pipe id in the header, first thing. - if (nni_msg_header_append_u32(msg, p->id) != 0) { - nni_msg_free(msg); - goto error; - } - // Move backtrace from body to header - hops = 0; + hops = 1; for (;;) { - int end = 0; + bool end = 0; uint8_t *body; - if (hops >= s->ttl) { - nni_msg_free(msg); - goto error; + if (hops > s->ttl) { + goto drop; } + hops++; if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage, kick it. nni_msg_free(msg); - goto error; + nni_pipe_stop(p->npipe); + return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { - nni_msg_free(msg); - goto error; + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; } nni_msg_trim(msg, 4); if (end) { @@ -321,26 +529,41 @@ resp0_recv_cb(void *arg) } } - // Now send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); - return; + len = nni_msg_header_len(msg); -error: - nni_pipe_stop(p->npipe); -} + nni_mtx_lock(&s->mtx); + if ((ctx = nni_list_first(&s->recvq)) == NULL) { + // No one blocked in recv, stall. + nni_list_append(&s->recvpipes, p); + nni_pollable_raise(s->recvable); + nni_mtx_unlock(&s->mtx); + return; + } -static void -resp0_putq_cb(void *arg) -{ - resp0_pipe *p = arg; + nni_list_remove(&s->recvq, ctx); + aio = ctx->raio; + ctx->raio = NULL; + nni_aio_set_msg(p->aio_recv, NULL); - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + // Start the next receive. + nni_pipe_recv(p->npipe, p->aio_recv); + + ctx->btrace_len = len; + memcpy(ctx->btrace, nni_msg_header(msg), len); + nni_msg_header_clear(msg); + ctx->pipe_id = p->id; + + if ((ctx == s->ctx) && (!p->busy)) { + nni_pollable_raise(s->sendable); } + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + return; +drop: + nni_msg_free(msg); nni_pipe_recv(p->npipe, p->aio_recv); } @@ -358,76 +581,38 @@ resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) return (nni_copyout_int(s->ttl, buf, szp, typ)); } -static void -resp0_sock_send_raw(void *arg, nni_aio *aio) +static int +resp0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) { resp0_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_put(s->uwq, aio); + if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -resp0_sock_send(void *arg, nni_aio *aio) +static int +resp0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { resp0_sock *s = arg; - nni_msg * msg; int rv; + int fd; - nni_mtx_lock(&s->mtx); - - msg = nni_aio_get_msg(aio); - - // If we have a stored backtrace, append it to the header... - // if we don't have a backtrace, discard the message. - if (s->btrace == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - - // drop anything else in the header... - nni_msg_header_clear(msg); - - if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; + if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + return (rv); } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); + return (nni_copyout_int(fd, buf, szp, typ)); } -static nni_msg * -resp0_sock_filter(void *arg, nni_msg *msg) +static void +resp0_sock_send(void *arg, nni_aio *aio) { resp0_sock *s = arg; - char * header; - size_t len; - - nni_mtx_lock(&s->mtx); - len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - } - if ((s->btrace = nni_alloc(len)) == NULL) { - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - s->btrace_len = len; - memcpy(s->btrace, header, len); - nni_msg_header_clear(msg); - nni_mtx_unlock(&s->mtx); - return (msg); + resp0_ctx_send(s->ctx, aio); } static void @@ -435,7 +620,7 @@ resp0_sock_recv(void *arg, nni_aio *aio) { resp0_sock *s = arg; - nni_msgq_aio_get(s->urq, aio); + resp0_ctx_recv(s->ctx, aio); } static nni_proto_pipe_ops resp0_pipe_ops = { @@ -445,6 +630,13 @@ static nni_proto_pipe_ops resp0_pipe_ops = { .pipe_stop = resp0_pipe_stop, }; +static nni_proto_ctx_ops resp0_ctx_ops = { + .ctx_init = resp0_ctx_init, + .ctx_fini = resp0_ctx_fini, + .ctx_send = resp0_ctx_send, + .ctx_recv = resp0_ctx_recv, +}; + static nni_proto_sock_option resp0_sock_options[] = { { .pso_name = NNG_OPT_MAXTTL, @@ -452,6 +644,18 @@ static nni_proto_sock_option resp0_sock_options[] = { .pso_getopt = resp0_sock_getopt_maxttl, .pso_setopt = resp0_sock_setopt_maxttl, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = resp0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = resp0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -463,39 +667,19 @@ static nni_proto_sock_ops resp0_sock_ops = { .sock_fini = resp0_sock_fini, .sock_open = resp0_sock_open, .sock_close = resp0_sock_close, - .sock_filter = resp0_sock_filter, .sock_send = resp0_sock_send, .sock_recv = resp0_sock_recv, .sock_options = resp0_sock_options, }; -static nni_proto_sock_ops resp0_sock_ops_raw = { - .sock_init = resp0_sock_init, - .sock_fini = resp0_sock_fini, - .sock_open = resp0_sock_open, - .sock_close = resp0_sock_close, - .sock_filter = NULL, // no filter for raw - .sock_send = resp0_sock_send_raw, - .sock_recv = resp0_sock_recv, - .sock_options = resp0_sock_options, -}; - static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &resp0_sock_ops, .proto_pipe_ops = &resp0_pipe_ops, -}; - -static nni_proto resp0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &resp0_sock_ops_raw, - .proto_pipe_ops = &resp0_pipe_ops, + .proto_ctx_ops = &resp0_ctx_ops, }; int @@ -503,9 +687,3 @@ nng_respondent0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &resp0_proto)); } - -int -nng_respondent0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &resp0_proto_raw)); -} diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index b7158464..e725d2b3 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -16,6 +16,9 @@ // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. +// Note that this pattern is not optimized for extreme low latency, as it makes +// multiple use of queues for simplicity. Typically this is used in cases +// where a few dozen extra microseconds does not matter. #ifndef NNI_PROTO_SURVEYOR_V0 #define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) @@ -27,86 +30,249 @@ typedef struct surv0_pipe surv0_pipe; typedef struct surv0_sock surv0_sock; +typedef struct surv0_ctx surv0_ctx; -static void surv0_sock_getq_cb(void *); -static void surv0_getq_cb(void *); -static void surv0_putq_cb(void *); -static void surv0_send_cb(void *); -static void surv0_recv_cb(void *); -static void surv0_timeout(void *); +static void surv0_pipe_getq_cb(void *); +static void surv0_pipe_send_cb(void *); +static void surv0_pipe_recv_cb(void *); +static void surv0_ctx_timeout(void *); + +struct surv0_ctx { + surv0_sock * sock; + uint64_t survid; // survey id + nni_timer_node timer; + nni_time expire; + nni_duration survtime; + nni_msgq * rq; // recv message queue +}; // surv0_sock is our per-socket protocol private structure. struct surv0_sock { - nni_duration survtime; - nni_time expire; - int ttl; - uint32_t nextid; // next id - uint32_t survid; // outstanding request ID (big endian) - nni_list pipes; - nni_aio * aio_getq; - nni_timer_node timer; - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx mtx; + int ttl; + nni_list pipes; + nni_mtx mtx; + surv0_ctx * ctx; + nni_idhash * surveys; + nni_pollable *sendable; }; // surv0_pipe is our per-pipe protocol private structure. struct surv0_pipe { nni_pipe * npipe; - surv0_sock * psock; + surv0_sock * sock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; - nni_aio * aio_putq; nni_aio * aio_send; nni_aio * aio_recv; }; +static void +surv0_ctx_fini(void *arg) +{ + surv0_ctx *ctx = arg; + + if (ctx->rq != NULL) { + nni_msgq_close(ctx->rq); + nni_msgq_fini(ctx->rq); + } + nni_timer_cancel(&ctx->timer); + NNI_FREE_STRUCT(ctx); +} + +static int +surv0_ctx_init(void **ctxp, void *sarg) +{ + surv0_ctx * ctx; + surv0_sock *sock = sarg; + int rv; + + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_lock(&sock->mtx); + if (sock->ctx != NULL) { + ctx->survtime = sock->ctx->survtime; + } + nni_mtx_unlock(&sock->mtx); + ctx->sock = sock; + // 126 is a deep enough queue, and leaves 2 extra cells for the + // pushback bit in msgqs. This can result in up to 1kB of allocation + // for the message queue. + if ((rv = nni_msgq_init(&ctx->rq, 126)) != 0) { + surv0_ctx_fini(ctx); + return (rv); + } + + nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); + *ctxp = ctx; + return (0); +} + +static void +surv0_ctx_recv(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + if (ctx->survid == 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + nni_msgq_aio_get(ctx->rq, aio); + nni_mtx_unlock(&sock->mtx); +} + +void +surv0_ctx_timeout(void *arg) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + if (nni_clock() < ctx->expire) { + nni_mtx_unlock(&sock->mtx); + return; + } + // Abort any pending receives. + nni_msgq_set_get_error(ctx->rq, NNG_ETIMEDOUT); + if (ctx->survid != 0) { + nni_idhash_remove(sock->surveys, ctx->survid); + ctx->survid = 0; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_send(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + surv0_pipe *pipe; + nni_msg * msg = nni_aio_get_msg(aio); + size_t len = nni_msg_len(msg); + nni_time now = nni_clock(); + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&sock->mtx); + + // Abort any pending receives -- this is the same as cancellation. + nni_msgq_set_get_error(ctx->rq, NNG_ECANCELED); + nni_msgq_flush(ctx->rq); + + // New survey id will be generated, so unregister the old one. + if (ctx->survid) { + nni_idhash_remove(sock->surveys, ctx->survid); + ctx->survid = 0; + } + // Allocate the new ID. + if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survid, ctx)) != 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + // Insert it into the message. We report an error if one occurs, + // although arguably at this point we could just discard silently. + if ((rv = nni_msg_header_append_u32(msg, (uint32_t) ctx->survid)) != + 0) { + nni_idhash_remove(sock->surveys, ctx->survid); + ctx->survid = 0; + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // From this point, we're committed to success. Note that we send + // regardless of whether there are any pipes or not. If no pipes, + // then it just gets discarded. + nni_aio_set_msg(aio, NULL); + NNI_LIST_FOREACH (&sock->pipes, pipe) { + nni_msg *dmsg; + + if (nni_list_next(&sock->pipes, pipe) != NULL) { + if (nni_msg_dup(&dmsg, msg) != 0) { + continue; + } + } else { + dmsg = msg; + msg = NULL; + } + if (nni_msgq_tryput(pipe->sendq, dmsg) != 0) { + nni_msg_free(dmsg); + } + } + + ctx->expire = now + ctx->survtime; + nni_timer_schedule(&ctx->timer, ctx->expire); + + // Allow recv to run. + nni_msgq_set_get_error(ctx->rq, 0); + + nni_mtx_unlock(&sock->mtx); + if (msg != NULL) { + nni_msg_free(msg); + } + + nni_aio_finish(aio, 0, len); +} + static void surv0_sock_fini(void *arg) { - surv0_sock *s = arg; + surv0_sock *sock = arg; - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); + if (sock->ctx != NULL) { + surv0_ctx_fini(sock->ctx); + } + nni_idhash_fini(sock->surveys); + nni_pollable_free(sock->sendable); + nni_mtx_fini(&sock->mtx); + NNI_FREE_STRUCT(sock); } static int surv0_sock_init(void **sp, nni_sock *nsock) { - surv0_sock *s; + surv0_sock *sock; int rv; - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + NNI_ARG_UNUSED(nsock); + + if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { - surv0_sock_fini(s); + NNI_LIST_INIT(&sock->pipes, surv0_pipe, node); + nni_mtx_init(&sock->mtx); + + if (((rv = nni_idhash_init(&sock->surveys)) != 0) || + ((rv = surv0_ctx_init((void **) &sock->ctx, sock)) != 0)) { + surv0_sock_fini(sock); return (rv); } - NNI_LIST_INIT(&s->pipes, surv0_pipe, node); - nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv0_timeout, s); - - s->nextid = nni_random(); - s->survtime = NNI_SECOND; - s->expire = NNI_TIME_ZERO; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - - *sp = s; + + // Survey IDs are 32 bits, with the high order bit set. + // We start at a random point, to minimize likelihood of + // accidental collision across restarts. + nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu, + nni_random() | 0x80000000u); + + sock->ctx->survtime = NNI_SECOND; + sock->ttl = 8; + + *sp = sock; return (0); } static void surv0_sock_open(void *arg) { - surv0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void @@ -114,8 +280,7 @@ surv0_sock_close(void *arg) { surv0_sock *s = arg; - nni_timer_cancel(&s->timer); - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_msgq_close(s->ctx->rq); } static void @@ -126,7 +291,6 @@ surv0_pipe_fini(void *arg) nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); nni_msgq_fini(p->sendq); NNI_FREE_STRUCT(p); } @@ -140,18 +304,20 @@ surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - // This depth could be tunable. + // This depth could be tunable. The deeper the queue, the more + // concurrent surveys that can be delivered. Having said that, this + // is best effort, and a deep queue doesn't really do much for us. + // Note that surveys can be *outstanding*, but not yet put on the wire. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { surv0_pipe_fini(p); return (rv); } p->npipe = npipe; - p->psock = s; + p->sock = s; *pp = p; return (0); } @@ -160,7 +326,7 @@ static int surv0_pipe_start(void *arg) { surv0_pipe *p = arg; - surv0_sock *s = p->psock; + surv0_sock *s = p->sock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); @@ -175,12 +341,11 @@ static void surv0_pipe_stop(void *arg) { surv0_pipe *p = arg; - surv0_sock *s = p->psock; + surv0_sock *s = p->sock; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); nni_msgq_close(p->sendq); @@ -192,7 +357,7 @@ surv0_pipe_stop(void *arg) } static void -surv0_getq_cb(void *arg) +surv0_pipe_getq_cb(void *arg) { surv0_pipe *p = arg; @@ -208,7 +373,7 @@ surv0_getq_cb(void *arg) } static void -surv0_send_cb(void *arg) +surv0_pipe_send_cb(void *arg) { surv0_pipe *p = arg; @@ -223,28 +388,17 @@ surv0_send_cb(void *arg) } static void -surv0_putq_cb(void *arg) +surv0_pipe_recv_cb(void *arg) { - surv0_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static void -surv0_recv_cb(void *arg) -{ - surv0_pipe *p = arg; + surv0_pipe *p = arg; + surv0_sock *sock = p->sock; + surv0_ctx * ctx; nni_msg * msg; + uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - goto failed; + nni_pipe_stop(p->npipe); + return; } msg = nni_aio_get_msg(p->aio_recv); @@ -253,23 +407,45 @@ surv0_recv_cb(void *arg) // We yank 4 bytes of body, and move them to the header. if (nni_msg_len(msg) < 4) { - // Not enough data, just toss it. + // Peer sent us garbage. Kick it. nni_msg_free(msg); - goto failed; + nni_pipe_stop(p->npipe); + return; + } + id = nni_msg_trim_u32(msg); + if (nni_msg_header_append_u32(msg, id) != 0) { + // Should be NNG_ENOMEM - discard and try again. + nni_msg_free(msg); + nni_pipe_recv(p->npipe, p->aio_recv); + return; } - if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { - // Should be NNG_ENOMEM + + nni_mtx_lock(&sock->mtx); + + // Best effort at delivery. Discard if no context or context is + // unable to receive it. + if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) || + (nni_msgq_tryput(ctx->rq, msg) != 0)) { nni_msg_free(msg); - goto failed; } - (void) nni_msg_trim(msg, 4); - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(p->psock->urq, p->aio_putq); - return; + nni_mtx_unlock(&sock->mtx); -failed: - nni_pipe_stop(p->npipe); + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static int +surv0_ctx_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ) +{ + surv0_ctx *ctx = arg; + return (nni_copyin_ms(&ctx->survtime, buf, sz, typ)); +} + +static int +surv0_ctx_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ) +{ + surv0_ctx *ctx = arg; + return (nni_copyout_ms(ctx->survtime, buf, szp, typ)); } static int @@ -290,141 +466,66 @@ static int surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ) { surv0_sock *s = arg; - return (nni_copyin_ms(&s->survtime, buf, sz, typ)); + return (surv0_ctx_setopt_surveytime(s->ctx, buf, sz, typ)); } static int surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ) { surv0_sock *s = arg; - return (nni_copyout_ms(s->survtime, buf, szp, typ)); + return (surv0_ctx_getopt_surveytime(s->ctx, buf, szp, typ)); } -static void -surv0_sock_getq_cb(void *arg) +static int +surv0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) { - surv0_sock *s = arg; - surv0_pipe *p; - surv0_pipe *last; - nni_msg * msg, *dup; - - if (nni_aio_result(s->aio_getq) != 0) { - // Should be NNG_ECLOSED. - return; - } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); + surv0_sock *sock = arg; + int rv; + int fd; - nni_mtx_lock(&s->mtx); - last = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != last) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + nni_mtx_lock(&sock->mtx); + if (sock->sendable == NULL) { + if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { + nni_mtx_unlock(&sock->mtx); + return (rv); } + // We are always sendable. + nni_pollable_raise(sock->sendable); } - - nni_msgq_aio_get(s->uwq, s->aio_getq); - nni_mtx_unlock(&s->mtx); - - if (last == NULL) { - // If there were no pipes to send on, just toss the message. - nni_msg_free(msg); + nni_mtx_unlock(&sock->mtx); + if ((rv = nni_pollable_getfd(sock->sendable, &fd)) != 0) { + return (rv); } + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -surv0_timeout(void *arg) -{ - surv0_sock *s = arg; - - nni_mtx_lock(&s->mtx); - s->survid = 0; - nni_mtx_unlock(&s->mtx); - - nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); -} - -static void -surv0_sock_recv(void *arg, nni_aio *aio) +static int +surv0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { - surv0_sock *s = arg; + surv0_sock * sock = arg; + nni_pollable *recvable; + int rv; + int fd; - nni_mtx_lock(&s->mtx); - if (s->survid == 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; + if (((rv = nni_msgq_get_recvable(sock->ctx->rq, &recvable)) != 0) || + ((rv = nni_pollable_getfd(recvable, &fd)) != 0)) { + return (rv); } - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->urq, aio); + return (nni_copyout_int(fd, buf, szp, typ)); } static void -surv0_sock_send_raw(void *arg, nni_aio *aio) +surv0_sock_recv(void *arg, nni_aio *aio) { surv0_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); + surv0_ctx_recv(s->ctx, aio); } static void surv0_sock_send(void *arg, nni_aio *aio) { surv0_sock *s = arg; - nni_msg * msg; - int rv; - - nni_mtx_lock(&s->mtx); - - // Generate a new request ID. We always set the high - // order bit so that the peer can locate the end of the - // backtrace. (Pipe IDs have the high order bit clear.) - s->survid = (s->nextid++) | 0x80000000u; - - msg = nni_aio_get_msg(aio); - nni_msg_header_clear(msg); - if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - // If another message is there, this cancels it. We move the - // survey expiration out. The timeout thread will wake up in - // the wake below, and reschedule itself appropriately. - nni_msgq_set_get_error(s->urq, 0); - s->expire = nni_clock() + s->survtime; - nni_timer_schedule(&s->timer, s->expire); - - nni_mtx_unlock(&s->mtx); - - nni_msgq_aio_put(s->uwq, aio); -} - -static nni_msg * -surv0_sock_filter(void *arg, nni_msg *msg) -{ - surv0_sock *s = arg; - - nni_mtx_lock(&s->mtx); - - if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || - (nni_msg_header_trim_u32(msg) != s->survid)) { - // Wrong request id - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - nni_mtx_unlock(&s->mtx); - - return (msg); + surv0_ctx_send(s->ctx, aio); } static nni_proto_pipe_ops surv0_pipe_ops = { @@ -434,6 +535,25 @@ static nni_proto_pipe_ops surv0_pipe_ops = { .pipe_stop = surv0_pipe_stop, }; +static nni_proto_ctx_option surv0_ctx_options[] = { + { + .co_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .co_type = NNI_TYPE_DURATION, + .co_getopt = surv0_ctx_getopt_surveytime, + .co_setopt = surv0_ctx_setopt_surveytime, + }, + { + .co_name = NULL, + } +}; +static nni_proto_ctx_ops surv0_ctx_ops = { + .ctx_init = surv0_ctx_init, + .ctx_fini = surv0_ctx_fini, + .ctx_send = surv0_ctx_send, + .ctx_recv = surv0_ctx_recv, + .ctx_options = surv0_ctx_options, +}; + static nni_proto_sock_option surv0_sock_options[] = { { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, @@ -447,6 +567,18 @@ static nni_proto_sock_option surv0_sock_options[] = { .pso_getopt = surv0_sock_getopt_maxttl, .pso_setopt = surv0_sock_setopt_maxttl, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = surv0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = surv0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -460,18 +592,6 @@ static nni_proto_sock_ops surv0_sock_ops = { .sock_close = surv0_sock_close, .sock_send = surv0_sock_send, .sock_recv = surv0_sock_recv, - .sock_filter = surv0_sock_filter, - .sock_options = surv0_sock_options, -}; - -static nni_proto_sock_ops surv0_sock_ops_raw = { - .sock_init = surv0_sock_init, - .sock_fini = surv0_sock_fini, - .sock_open = surv0_sock_open, - .sock_close = surv0_sock_close, - .sock_send = surv0_sock_send_raw, - .sock_recv = surv0_sock_recv, - .sock_filter = surv0_sock_filter, .sock_options = surv0_sock_options, }; @@ -479,18 +599,10 @@ static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &surv0_sock_ops, .proto_pipe_ops = &surv0_pipe_ops, -}; - -static nni_proto surv0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &surv0_sock_ops_raw, - .proto_pipe_ops = &surv0_pipe_ops, + .proto_ctx_ops = &surv0_ctx_ops, }; int @@ -498,9 +610,3 @@ nng_surveyor0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &surv0_proto)); } - -int -nng_surveyor0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &surv0_proto_raw)); -} diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c new file mode 100644 index 00000000..7aaed6da --- /dev/null +++ b/src/protocol/survey0/xrespond.c @@ -0,0 +1,408 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/survey0/respond.h" + +// Respondent protocol. The RESPONDENT protocol is the "replier" side of +// the surveyor pattern. This is useful for building service discovery, or +// voting algorithms, for example. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct xresp0_pipe xresp0_pipe; +typedef struct xresp0_sock xresp0_sock; + +static void xresp0_recv_cb(void *); +static void xresp0_putq_cb(void *); +static void xresp0_getq_cb(void *); +static void xresp0_send_cb(void *); +static void xresp0_sock_getq_cb(void *); +static void xresp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct xresp0_sock { + nni_msgq * urq; + nni_msgq * uwq; + int ttl; + nni_idhash *pipes; + nni_aio * aio_getq; + nni_mtx mtx; +}; + +// resp0_pipe is our per-pipe protocol private structure. +struct xresp0_pipe { + nni_pipe * npipe; + xresp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; +}; + +static void +xresp0_sock_fini(void *arg) +{ + xresp0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +xresp0_sock_init(void **sp, nni_sock *nsock) +{ + xresp0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->mtx); + if (((rv = nni_idhash_init(&s->pipes)) != 0) || + ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) { + xresp0_sock_fini(s); + return (rv); + } + + s->ttl = 8; // Per RFC + s->urq = nni_sock_recvq(nsock); + s->uwq = nni_sock_sendq(nsock); + + *sp = s; + return (0); +} + +static void +xresp0_sock_open(void *arg) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +xresp0_sock_close(void *arg) +{ + xresp0_sock *s = arg; + + nni_aio_abort(s->aio_getq, NNG_ECLOSED); +} + +static void +xresp0_pipe_fini(void *arg) +{ + xresp0_pipe *p = arg; + + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +xresp0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + xresp0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) { + xresp0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +xresp0_pipe_start(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + int rv; + + p->id = nni_pipe_id(p->npipe); + + nni_mtx_lock(&s->mtx); + rv = nni_idhash_insert(s->pipes, p->id, p); + nni_mtx_unlock(&s->mtx); + if (rv != 0) { + return (rv); + } + + nni_pipe_recv(p->npipe, p->aio_recv); + nni_msgq_aio_get(p->sendq, p->aio_getq); + + return (rv); +} + +static void +xresp0_pipe_stop(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + + nni_mtx_lock(&s->mtx); + nni_idhash_remove(s->pipes, p->id); + nni_mtx_unlock(&s->mtx); +} + +// resp0_sock_send watches for messages from the upper write queue, +// extracts the destination pipe, and forwards it to the appropriate +// destination pipe via a separate queue. This prevents a single bad +// or slow pipe from gumming up the works for the entire socket.s + +void +xresp0_sock_getq_cb(void *arg) +{ + xresp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + xresp0_pipe *p; + + if (nni_aio_result(s->aio_getq) != 0) { + return; + } + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + // We can't really close down the socket, so just keep going. + nni_msgq_aio_get(s->uwq, s->aio_getq); + return; + } + id = nni_msg_header_trim_u32(msg); + + nni_mtx_lock(&s->mtx); + // Look for the pipe, and attempt to put the message there + // (nonblocking) if we can. If we can't for any reason, then we + // free the message. + if (((nni_idhash_find(s->pipes, id, (void **) &p)) != 0) || + (nni_msgq_tryput(p->sendq, msg) != 0)) { + nni_msg_free(msg); + } + nni_mtx_unlock(&s->mtx); + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +void +xresp0_getq_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +void +xresp0_send_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +xresp0_recv_cb(void *arg) +{ + xresp0_pipe *p = arg; + xresp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, p->id); + + // Store the pipe id in the header, first thing. + if (nni_msg_header_append_u32(msg, p->id) != 0) { + goto drop; + } + + // Move backtrace from body to header + hops = 1; + for (;;) { + bool end = false; + uint8_t *body; + + if (hops > s->ttl) { + goto drop; + } + hops++; + if (nni_msg_len(msg) < 4) { + // Peer sent us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_stop(p->npipe); + return; + } + body = nni_msg_body(msg); + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Now send it up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(urq, p->aio_putq); + return; + +drop: + nni_msg_free(msg); + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static void +xresp0_putq_cb(void *arg) +{ + xresp0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static int +xresp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +{ + xresp0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); +} + +static int +xresp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) +{ + xresp0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); +} + +static void +xresp0_sock_send(void *arg, nni_aio *aio) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +xresp0_sock_recv(void *arg, nni_aio *aio) +{ + xresp0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static nni_proto_pipe_ops xresp0_pipe_ops = { + .pipe_init = xresp0_pipe_init, + .pipe_fini = xresp0_pipe_fini, + .pipe_start = xresp0_pipe_start, + .pipe_stop = xresp0_pipe_stop, +}; + +static nni_proto_sock_option xresp0_sock_options[] = { + { + .pso_name = NNG_OPT_MAXTTL, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = xresp0_sock_getopt_maxttl, + .pso_setopt = xresp0_sock_setopt_maxttl, + }, + // terminate list + { + .pso_name = NULL, + }, +}; + +static nni_proto_sock_ops xresp0_sock_ops = { + .sock_init = xresp0_sock_init, + .sock_fini = xresp0_sock_fini, + .sock_open = xresp0_sock_open, + .sock_close = xresp0_sock_close, + .sock_send = xresp0_sock_send, + .sock_recv = xresp0_sock_recv, + .sock_options = xresp0_sock_options, +}; + +static nni_proto xresp0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xresp0_sock_ops, + .proto_pipe_ops = &xresp0_pipe_ops, +}; + +int +nng_respondent0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xresp0_proto)); +} diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c new file mode 100644 index 00000000..cf311b15 --- /dev/null +++ b/src/protocol/survey0/xsurvey.c @@ -0,0 +1,380 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/survey0/survey.h" + +// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the +// survey pattern. This is useful for building service discovery, voting, etc. + +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif + +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif + +typedef struct xsurv0_pipe xsurv0_pipe; +typedef struct xsurv0_sock xsurv0_sock; + +static void xsurv0_sock_getq_cb(void *); +static void xsurv0_getq_cb(void *); +static void xsurv0_putq_cb(void *); +static void xsurv0_send_cb(void *); +static void xsurv0_recv_cb(void *); + +// surv0_sock is our per-socket protocol private structure. +struct xsurv0_sock { + int ttl; + nni_list pipes; + nni_aio * aio_getq; + nni_msgq *uwq; + nni_msgq *urq; + nni_mtx mtx; +}; + +// surv0_pipe is our per-pipe protocol private structure. +struct xsurv0_pipe { + nni_pipe * npipe; + xsurv0_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; +}; + +static void +xsurv0_sock_fini(void *arg) +{ + xsurv0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static int +xsurv0_sock_init(void **sp, nni_sock *nsock) +{ + xsurv0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) { + xsurv0_sock_fini(s); + return (rv); + } + NNI_LIST_INIT(&s->pipes, xsurv0_pipe, node); + nni_mtx_init(&s->mtx); + + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + + *sp = s; + return (0); +} + +static void +xsurv0_sock_open(void *arg) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +xsurv0_sock_close(void *arg) +{ + xsurv0_sock *s = arg; + + nni_aio_abort(s->aio_getq, NNG_ECLOSED); +} + +static void +xsurv0_pipe_fini(void *arg) +{ + xsurv0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +xsurv0_pipe_init(void **pp, nni_pipe *npipe, void *s) +{ + xsurv0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + // This depth could be tunable. The queue exists so that if we + // have multiple requests coming in faster than we can deliver them, + // we try to avoid dropping them. We don't really have a solution + // for applying backpressure. It would be nice if surveys carried + // an expiration with them, so that we could discard any that are + // not delivered before their expiration date. + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) { + xsurv0_pipe_fini(p); + return (rv); + } + + p->npipe = npipe; + p->psock = s; + *pp = p; + return (0); +} + +static int +xsurv0_pipe_start(void *arg) +{ + xsurv0_pipe *p = arg; + xsurv0_sock *s = p->psock; + + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); + + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); + return (0); +} + +static void +xsurv0_pipe_stop(void *arg) +{ + xsurv0_pipe *p = arg; + xsurv0_sock *s = p->psock; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_msgq_close(p->sendq); + + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); + } + nni_mtx_unlock(&s->mtx); +} + +static void +xsurv0_getq_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->npipe, p->aio_send); +} + +static void +xsurv0_send_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +xsurv0_putq_cb(void *arg) +{ + xsurv0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); + return; + } + + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static void +xsurv0_recv_cb(void *arg) +{ + xsurv0_pipe *p = arg; + nni_msg * msg; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->npipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); + + // We yank 4 bytes of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Peer gave us garbage, so kick it. + nni_msg_free(msg); + nni_pipe_stop(p->npipe); + return; + } + if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + // Probably ENOMEM, discard and keep going. + nni_msg_free(msg); + nni_pipe_recv(p->npipe, p->aio_recv); + return; + } + (void) nni_msg_trim(msg, 4); + + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->psock->urq, p->aio_putq); +} + +static int +xsurv0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +{ + xsurv0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); +} + +static int +xsurv0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) +{ + xsurv0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); +} + +static void +xsurv0_sock_getq_cb(void *arg) +{ + xsurv0_sock *s = arg; + xsurv0_pipe *p; + xsurv0_pipe *last; + nni_msg * msg, *dup; + + if (nni_aio_result(s->aio_getq) != 0) { + // Should be NNG_ECLOSED. + return; + } + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + nni_mtx_lock(&s->mtx); + last = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (p != last) { + if (nni_msg_dup(&dup, msg) != 0) { + continue; + } + } else { + dup = msg; + } + if (nni_msgq_tryput(p->sendq, dup) != 0) { + nni_msg_free(dup); + } + } + + nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_mtx_unlock(&s->mtx); + + if (last == NULL) { + // If there were no pipes to send on, just toss the message. + nni_msg_free(msg); + } +} + +static void +xsurv0_sock_recv(void *arg, nni_aio *aio) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static void +xsurv0_sock_send(void *arg, nni_aio *aio) +{ + xsurv0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static nni_proto_pipe_ops xsurv0_pipe_ops = { + .pipe_init = xsurv0_pipe_init, + .pipe_fini = xsurv0_pipe_fini, + .pipe_start = xsurv0_pipe_start, + .pipe_stop = xsurv0_pipe_stop, +}; + +static nni_proto_sock_option xsurv0_sock_options[] = { + { + .pso_name = NNG_OPT_MAXTTL, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = xsurv0_sock_getopt_maxttl, + .pso_setopt = xsurv0_sock_setopt_maxttl, + }, + // terminate list + { + .pso_name = NULL, + }, +}; + +static nni_proto_sock_ops xsurv0_sock_ops = { + .sock_init = xsurv0_sock_init, + .sock_fini = xsurv0_sock_fini, + .sock_open = xsurv0_sock_open, + .sock_close = xsurv0_sock_close, + .sock_send = xsurv0_sock_send, + .sock_recv = xsurv0_sock_recv, + .sock_options = xsurv0_sock_options, +}; + +static nni_proto xsurv0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xsurv0_sock_ops, + .proto_pipe_ops = &xsurv0_pipe_ops, +}; + +int +nng_surveyor0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xsurv0_proto)); +} -- cgit v1.2.3-70-g09d2