diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-04 13:36:54 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-10 15:40:00 -0700 |
| commit | 5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 (patch) | |
| tree | 39debf4ecde234b2a0be19c9cb15628cc32c2edb /src/protocol/reqrep0/req.c | |
| parent | 56f1bf30e61c53646dd2f8425da7c7fa0d97b3e1 (diff) | |
| download | nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.gz nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.bz2 nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.zip | |
fixes #334 Separate context for state machines from sockets
This provides context support for REQ and REP sockets.
More discussion around this is in the issue itself.
Optionally we would like to extend this to the surveyor pattern.
Note that we specifically do not support pollable descriptors
for non-default contexts, and the results of using file descriptors
for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined.
In the future, it might be nice to figure out how to factor in
optional use of a message queue for users who want more buffering,
but we think there is little need for this with cooked mode.
Diffstat (limited to 'src/protocol/reqrep0/req.c')
| -rw-r--r-- | src/protocol/reqrep0/req.c | 861 |
1 files changed, 533 insertions, 328 deletions
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 4d35ca1f..8149ce08 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -28,34 +28,53 @@ typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; +typedef struct req0_ctx req0_ctx; -static void req0_resend(req0_sock *); -static void req0_timeout(void *); +static void req0_run_sendq(req0_sock *, nni_list *); +static void req0_ctx_reset(req0_ctx *); +static void req0_ctx_timeout(void *); static void req0_pipe_fini(void *); +static void req0_ctx_fini(void *); +static int req0_ctx_init(void **, void *); + +// A req0_ctx is a "context" for the request. It uses most of the +// socket, but keeps track of its own outstanding replays, the request ID, +// and so forth. +struct req0_ctx { + nni_list_node snode; + nni_list_node sqnode; // node on the sendq + nni_list_node pnode; // node on the pipe list + uint32_t reqid; + req0_sock * sock; + nni_aio * raio; // user aio waiting to receive - only one! + nni_aio * saio; + nng_msg * reqmsg; // request message + size_t reqlen; + nng_msg * repmsg; // reply message + nni_timer_node timer; + nni_duration retry; +}; // A req0_sock is our per-socket protocol private structure. struct req0_sock { - nni_msgq * uwq; - nni_msgq * urq; + nni_sock * nsock; nni_duration retry; - nni_time resend; - bool raw; - bool wantw; bool closed; int ttl; - nni_msg * reqmsg; - req0_pipe *pendpipe; + req0_ctx *ctx; // base socket ctx nni_list readypipes; nni_list busypipes; + nni_list ctxs; - nni_timer_node timer; + nni_list sendq; // contexts waiting to send. + nni_idhash * reqids; // contexts by request ID + nni_pollable *recvable; + nni_pollable *sendable; - uint32_t nextid; // next id - uint8_t reqid[4]; // outstanding request ID (big endian) - nni_mtx mtx; - nni_cv cv; + nni_mtx mtx; + nni_cv cv; }; // A req0_pipe is our per-pipe protocol private structure. @@ -63,60 +82,61 @@ struct req0_pipe { nni_pipe * pipe; req0_sock * req; nni_list_node node; - nni_aio * aio_getq; // raw mode only - nni_aio * aio_sendraw; // raw mode only - nni_aio * aio_sendcooked; // cooked mode only + nni_list ctxs; // ctxs with pending traffic + nni_aio * aio_send; nni_aio * aio_recv; - nni_aio * aio_putq; - nni_mtx mtx; }; -static void req0_getq_cb(void *); -static void req0_sendraw_cb(void *); -static void req0_sendcooked_cb(void *); +static void req0_sock_fini(void *); +static void req0_send_cb(void *); static void req0_recv_cb(void *); -static void req0_putq_cb(void *); static int -req0_sock_init_impl(void **sp, nni_sock *sock, bool raw) +req0_sock_init(void **sp, nni_sock *sock) { req0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&s->reqids)) != 0) { + NNI_FREE_STRUCT(s); + return (rv); + } + + // Request IDs are 32 bits, with the high order bit set. + // We start at a random point, to minimize likelihood of + // accidental collision across restarts. + nni_idhash_set_limits( + s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); + nni_mtx_init(&s->mtx); nni_cv_init(&s->cv, &s->mtx); NNI_LIST_INIT(&s->readypipes, req0_pipe, node); NNI_LIST_INIT(&s->busypipes, req0_pipe, node); - nni_timer_init(&s->timer, req0_timeout, s); + NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode); + NNI_LIST_INIT(&s->ctxs, req0_ctx, snode); // this is "semi random" start for request IDs. - s->nextid = nni_random(); - s->retry = NNI_SECOND * 60; - s->reqmsg = NULL; - s->raw = raw; - s->wantw = false; - s->resend = NNI_TIME_ZERO; - s->ttl = 8; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); - *sp = s; + s->nsock = sock; + s->retry = NNI_SECOND * 60; - return (0); -} + if ((rv = req0_ctx_init((void **) &s->ctx, s)) != 0) { + req0_sock_fini(s); + return (rv); + } + if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || + ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { + req0_sock_fini(s); + return (rv); + } -static int -req0_sock_init(void **sp, nni_sock *sock) -{ - return (req0_sock_init_impl(sp, sock, false)); -} + s->ttl = 8; + *sp = s; -static int -req0_sock_init_raw(void **sp, nni_sock *sock) -{ - return (req0_sock_init_impl(sp, sock, true)); + return (0); } static void @@ -129,12 +149,18 @@ static void req0_sock_close(void *arg) { req0_sock *s = arg; + req0_ctx * ctx; nni_mtx_lock(&s->mtx); s->closed = true; + NNI_LIST_FOREACH (&s->ctxs, ctx) { + if (ctx->raio != NULL) { + nni_aio_finish_error(ctx->raio, NNG_ECLOSED); + ctx->raio = NULL; + req0_ctx_reset(ctx); + } + } nni_mtx_unlock(&s->mtx); - - nni_timer_cancel(&s->timer); } static void @@ -147,10 +173,13 @@ req0_sock_fini(void *arg) (!nni_list_empty(&s->busypipes))) { nni_cv_wait(&s->cv); } - if (s->reqmsg != NULL) { - nni_msg_free(s->reqmsg); - } nni_mtx_unlock(&s->mtx); + if (s->ctx) { + req0_ctx_fini(s->ctx); + } + nni_pollable_free(s->recvable); + nni_pollable_free(s->sendable); + nni_idhash_fini(s->reqids); nni_cv_fini(&s->cv); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); @@ -161,12 +190,8 @@ req0_pipe_fini(void *arg) { req0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_sendcooked); - nni_aio_fini(p->aio_sendraw); - nni_mtx_fini(&p->mtx); + nni_aio_fini(p->aio_send); NNI_FREE_STRUCT(p); } @@ -179,18 +204,14 @@ req0_pipe_init(void **pp, nni_pipe *pipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != - 0)) { + if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) { req0_pipe_fini(p); return (rv); } NNI_LIST_NODE_INIT(&p->node); + NNI_LIST_INIT(&p->ctxs, req0_ctx, pnode); p->pipe = pipe; p->req = s; *pp = p; @@ -213,14 +234,10 @@ req0_pipe_start(void *arg) return (NNG_ECLOSED); } nni_list_append(&s->readypipes, p); - // If sock was waiting for somewhere to send data, go ahead and - // send it to this pipe. - if (s->wantw) { - req0_resend(s); - } + nni_pollable_raise(s->sendable); + req0_run_sendq(s, NULL); nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, p->aio_getq); nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -230,12 +247,10 @@ req0_pipe_stop(void *arg) { req0_pipe *p = arg; req0_sock *s = p->req; + req0_ctx * ctx; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_sendcooked); - nni_aio_stop(p->aio_sendraw); + nni_aio_stop(p->aio_send); // At this point there should not be any further AIOs running. // Further, any completion tasks have completed. @@ -249,126 +264,54 @@ req0_pipe_stop(void *arg) nni_cv_wake(&s->cv); } } - - if ((p == s->pendpipe) && (s->reqmsg != NULL)) { - // removing the pipe we sent the last request on... - // schedule immediate resend. - s->pendpipe = NULL; - s->resend = NNI_TIME_ZERO; - s->wantw = true; - req0_resend(s); + if (nni_list_empty(&s->readypipes)) { + nni_pollable_clear(s->sendable); } - nni_mtx_unlock(&s->mtx); -} -static int -req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); -} - -static int -req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, typ)); -} - -static int -req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_ms(&s->retry, buf, sz, typ)); -} - -static int -req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_ms(s->retry, buf, szp, typ)); -} - -// Raw and cooked mode differ in the way they send messages out. -// -// For cooked mdes, we have a getq callback on the upper write queue, which -// when it finds a message, cancels any current processing, and saves a copy -// of the message, and then tries to "resend" the message, looking for a -// suitable available outgoing pipe. If no suitable pipe is available, -// a flag is set, so that as soon as such a pipe is available we trigger -// a resend attempt. We also trigger the attempt on either timeout, or if -// the underlying pipe we chose disconnects. -// -// For raw mode we can just let the pipes "contend" via getq to get a -// message from the upper write queue. The msgqueue implementation -// actually provides ordering, so load will be spread automatically. -// (NB: We may have to revise this in the future if we want to provide some -// kind of priority.) - -static void -req0_getq_cb(void *arg) -{ - req0_pipe *p = arg; - - // We should be in RAW mode. Cooked mode traffic bypasses - // the upper write queue entirely, and should never end up here. - // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). (One minor - // exception: we wind up here in error state when the uwq is closed.) - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; + while ((ctx = nni_list_first(&p->ctxs)) != NULL) { + nni_list_remove(&p->ctxs, ctx); + // Reset the timer on this so it expires immediately. + // This is actually easier than canceling the timer and + // running the sendq separately. (In particular, it avoids + // a potential deadlock on cancelling the timer.) + nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); } - - nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - // Send the message, but use the raw mode aio. - nni_pipe_send(p->pipe, p->aio_sendraw); + nni_mtx_unlock(&s->mtx); } -static void -req0_sendraw_cb(void *arg) -{ - req0_pipe *p = arg; - - if (nni_aio_result(p->aio_sendraw) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); - nni_aio_set_msg(p->aio_sendraw, NULL); - nni_pipe_stop(p->pipe); - return; - } - - // Sent a message so we just need to look for another one. - nni_msgq_aio_get(p->req->uwq, p->aio_getq); -} +// For cooked mode, we use a context, and send out that way. This +// completely bypasses the upper write queue. Each context keeps one +// message pending; these are "scheduled" via the sendq. The sendq +// is ordered, so FIFO ordering between contexts is provided for. static void -req0_sendcooked_cb(void *arg) +req0_send_cb(void *arg) { req0_pipe *p = arg; req0_sock *s = p->req; + nni_aio * aio; + nni_list aios; - if (nni_aio_result(p->aio_sendcooked) != 0) { + nni_aio_list_init(&aios); + if (nni_aio_result(p->aio_send) != 0) { // We failed to send... clean up and deal with it. - // We leave ourselves on the busy list for now, which - // means no new asynchronous traffic can occur here. - nni_msg_free(nni_aio_get_msg(p->aio_sendcooked)); - nni_aio_set_msg(p->aio_sendcooked, NULL); + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->pipe); return; } - // Cooked mode. We completed a cooked send, so we need to - // reinsert ourselves in the ready list, and possibly schedule - // a resend. + // We completed a cooked send, so we need to reinsert ourselves + // in the ready list, and re-run the sendq. nni_mtx_lock(&s->mtx); if (nni_list_active(&s->busypipes, p)) { nni_list_remove(&s->busypipes, p); nni_list_append(&s->readypipes, p); - req0_resend(s); + if (nni_list_empty(&s->sendq)) { + nni_pollable_raise(s->sendable); + } + req0_run_sendq(s, &aios); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the @@ -377,29 +320,22 @@ req0_sendcooked_cb(void *arg) nni_pipe_stop(p->pipe); } nni_mtx_unlock(&s->mtx); -} -static void -req0_putq_cb(void *arg) -{ - req0_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; + while ((aio = nni_list_first(&aios)) != NULL) { + nni_list_remove(&aios, aio); + nni_aio_finish_synch(aio, 0, 0); } - nni_aio_set_msg(p->aio_putq, NULL); - - nni_pipe_recv(p->pipe, p->aio_recv); } static void req0_recv_cb(void *arg) { req0_pipe *p = arg; + req0_sock *s = p->req; + req0_ctx * ctx; nni_msg * msg; + nni_aio * aio; + uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -410,22 +346,58 @@ req0_recv_cb(void *arg) nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - // We yank 4 bytes of body, and move them to the header. + // We yank 4 bytes from front of body, and move them to the header. if (nni_msg_len(msg) < 4) { // Malformed message. goto malformed; } - if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + id = nni_msg_trim_u32(msg); + if (nni_msg_header_append_u32(msg, id) != 0) { // Arguably we could just discard and carry on. But // dropping the connection is probably more helpful since // it lets the other side see that a problem occurred. // Plus it gives us a chance to reclaim some memory. goto malformed; } - (void) nni_msg_trim(msg, 4); // Cannot fail - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(p->req->urq, p->aio_putq); + // Schedule another receive while we are processing this. + nni_mtx_lock(&s->mtx); + nni_pipe_recv(p->pipe, p->aio_recv); + + // Look for a context to receive it. + if ((nni_idhash_find(s->reqids, id, (void **) &ctx) != 0) || + (ctx->saio != NULL) || (ctx->repmsg != NULL)) { + nni_mtx_unlock(&s->mtx); + // No waiting context, we have not sent the request out to + // the wire yet, or context already has a reply ready. + // Discard the message. + nni_msg_free(msg); + return; + } + + // We have our match, so we can remove this. + nni_list_node_remove(&ctx->sqnode); + nni_idhash_remove(s->reqids, id); + ctx->reqid = 0; + if (ctx->reqmsg != NULL) { + nni_msg_free(ctx->reqmsg); + ctx->reqmsg = NULL; + } + + // Is there an aio waiting for us? + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, msg); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + } else { + // No AIO, so stash msg. Receive will pick it up later. + ctx->repmsg = msg; + if (ctx == s->ctx) { + nni_pollable_raise(s->recvable); + } + nni_mtx_unlock(&s->mtx); + } return; malformed: @@ -434,191 +406,417 @@ malformed: } static void -req0_timeout(void *arg) +req0_ctx_timeout(void *arg) { - req0_sock *s = arg; + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (s->reqmsg != NULL) { - s->wantw = true; - req0_resend(s); + if ((ctx->reqmsg != NULL) && (!s->closed)) { + if (!nni_list_node_active(&ctx->sqnode)) { + nni_list_append(&s->sendq, ctx); + } + req0_run_sendq(s, NULL); } nni_mtx_unlock(&s->mtx); } -static void -req0_resend(req0_sock *s) +static int +req0_ctx_init(void **cpp, void *sarg) { - req0_pipe *p; - nni_msg * msg; + req0_sock *s = sarg; + req0_ctx * ctx; - // Note: This routine should be called with the socket lock held. - // Also, this should only be called while handling cooked mode - // requests. - if ((msg = s->reqmsg) == NULL) { - return; + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); } - if (s->closed) { - s->reqmsg = NULL; - nni_msg_free(msg); + nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx); + + nni_mtx_lock(&s->mtx); + ctx->sock = s; + ctx->raio = NULL; + ctx->retry = s->retry; + nni_list_append(&s->ctxs, ctx); + nni_mtx_unlock(&s->mtx); + + *cpp = ctx; + return (0); +} + +static void +req0_ctx_fini(void *arg) +{ + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; + nni_aio * aio; + + nni_mtx_lock(&s->mtx); + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); } + if ((aio = ctx->saio) != NULL) { + ctx->saio = NULL; + nni_aio_set_msg(aio, ctx->reqmsg); + ctx->reqmsg = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + req0_ctx_reset(ctx); + nni_list_remove(&s->ctxs, ctx); + nni_mtx_unlock(&s->mtx); - if (s->wantw) { - s->wantw = false; + nni_timer_cancel(&ctx->timer); + nni_timer_fini(&ctx->timer); - if (nni_msg_dup(&msg, s->reqmsg) != 0) { - // Failed to alloc message, reschedule it. Also, - // mark that we have a message we want to resend, - // in case something comes available. - s->wantw = true; - nni_timer_schedule(&s->timer, nni_clock() + s->retry); - return; - } + NNI_FREE_STRUCT(ctx); +} + +static int +req0_ctx_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ) +{ + req0_ctx *ctx = arg; + return (nni_copyin_ms(&ctx->retry, buf, sz, typ)); +} + +static int +req0_ctx_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ) +{ + req0_ctx *ctx = arg; + return (nni_copyout_ms(ctx->retry, buf, szp, typ)); +} + +static void +req0_run_sendq(req0_sock *s, nni_list *aiolist) +{ + req0_ctx *ctx; + nni_aio * aio; + + // Note: This routine should be called with the socket lock held. + while ((ctx = nni_list_first(&s->sendq)) != NULL) { + nni_msg * msg; + req0_pipe *p; - // Now we iterate across all possible outpipes, until - // one accepts it. if ((p = nni_list_first(&s->readypipes)) == NULL) { - // No pipes ready to process us. Note that we have - // something to send, and schedule it. - nni_msg_free(msg); - s->wantw = true; return; } + // We have a place to send it, so do the send. + // If a sending error occurs that causes the message to + // be dropped, we rely on the resend timer to pick it up. + // We also notify the completion callback if this is the + // first send attempt. + nni_list_remove(&s->sendq, ctx); + + // Schedule a resubmit timer. We only do this if we got + // a pipe to send to. Otherwise, we should get handled + // the next time that the sendq is run. + nni_timer_schedule(&ctx->timer, nni_clock() + ctx->retry); + + if (nni_msg_dup(&msg, ctx->reqmsg) != 0) { + // Oops. Well, keep trying each context; maybe + // one of them will get lucky. + continue; + } + + // Put us on the pipe list of active contexts. + // This gives the pipe a chance to kick a resubmit + // if the pipe is removed. + nni_list_node_remove(&ctx->pnode); + nni_list_append(&p->ctxs, ctx); + nni_list_remove(&s->readypipes, p); nni_list_append(&s->busypipes, p); - s->pendpipe = p; - s->resend = nni_clock() + s->retry; - nni_aio_set_msg(p->aio_sendcooked, msg); + if ((aio = ctx->saio) != NULL) { + ctx->saio = NULL; + nni_aio_bump_count(aio, ctx->reqlen); + // If the list was passed in, we want to do a + // synchronous completion later. + if (aiolist != NULL) { + nni_list_append(aiolist, aio); + } else { + nni_aio_finish(aio, 0, 0); + } + if (ctx == s->ctx) { + if (nni_list_empty(&s->readypipes)) { + nni_pollable_clear(s->sendable); + } else { + nni_pollable_raise(s->sendable); + } + } + } + + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + } +} - // Note that because we were ready rather than busy, we - // should not have any I/O oustanding and hence the aio - // object will be available for our use. - nni_pipe_send(p->pipe, p->aio_sendcooked); - nni_timer_schedule(&s->timer, s->resend); +void +req0_ctx_reset(req0_ctx *ctx) +{ + req0_sock *s = ctx->sock; + // Call with sock lock held! + + // We cannot safely "wait" using nni_timer_cancel, but this removes + // any scheduled timer activation. If the timeout is already running + // concurrently, it will still run. It should do nothing, because + // we toss the reqmsg. There is still a very narrow race if the + // timeout fires, but doesn't actually start running before we + // both finish this function, *and* manage to reschedule another + // request. The consequence of that occurring is that the request + // will be emitted on the wire twice. This is not actually tragic. + nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER); + + nni_list_node_remove(&ctx->pnode); + nni_list_node_remove(&ctx->sqnode); + if (ctx->reqid != 0) { + nni_idhash_remove(s->reqids, ctx->reqid); + ctx->reqid = 0; + } + if (ctx->reqmsg != NULL) { + nni_msg_free(ctx->reqmsg); + ctx->reqmsg = NULL; + } + if (ctx->repmsg != NULL) { + nni_msg_free(ctx->repmsg); + ctx->repmsg = NULL; } } static void -req0_sock_send(void *arg, nni_aio *aio) +req0_ctx_cancel_recv(nni_aio *aio, int rv) { - req0_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; + req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); + if (ctx->raio != aio) { + // already completed, ignore this. + nni_mtx_unlock(&s->mtx); + return; + } + ctx->raio = NULL; - msg = nni_aio_get_msg(aio); - len = nni_msg_len(msg); + // Cancellation of a pending receive is treated as aborting the + // entire state machine. This allows us to preserve the semantic of + // exactly one receive operation per send operation, and should + // be the least surprising for users. The main consequence is that + // if a receive operation is completed (in error or otherwise), the + // user must submit a new send operation to restart the state machine. + req0_ctx_reset(ctx); - // In cooked mode, because we need to manage our own resend logic, - // we bypass the upper writeq entirely. + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); +} - // Generate a new request ID. We always set the high - // order bit so that the peer can locate the end of the - // backtrace. (Pipe IDs have the high order bit clear.) - id = (s->nextid++) | 0x80000000u; - // Request ID is in big endian format. - NNI_PUT32(s->reqid, id); +static void +req0_ctx_recv(void *arg, nni_aio *aio) +{ + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; + nni_msg * msg; - if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { + nni_mtx_lock(&s->mtx); + if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) { nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); return; } - - // If another message is there, this cancels it. - if (s->reqmsg != NULL) { - nni_msg_free(s->reqmsg); - s->reqmsg = NULL; + if (s->closed) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((ctx->raio != NULL) || + ((ctx->reqmsg == NULL) && (ctx->repmsg == NULL))) { + // We have already got a pending receive or have not + // tried to send a request yet. + // Either of these violate our basic state assumptions. + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } - nni_aio_set_msg(aio, NULL); - - // Make a duplicate message... for retries. - s->reqmsg = msg; - // Schedule for immediate send - s->resend = NNI_TIME_ZERO; - s->wantw = true; + if ((msg = ctx->repmsg) == NULL) { + ctx->raio = aio; + nni_mtx_unlock(&s->mtx); + return; + } - req0_resend(s); + ctx->repmsg = NULL; + // We have got a message to pass up, yay! + nni_aio_set_msg(aio, msg); + if (ctx == s->ctx) { + nni_pollable_clear(s->recvable); + } nni_mtx_unlock(&s->mtx); - - nni_aio_finish(aio, 0, len); + nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void -req0_sock_send_raw(void *arg, nni_aio *aio) +req0_ctx_cancel_send(nni_aio *aio, int rv) { - req0_sock *s = arg; + req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->saio != aio) { + // already completed, ignore this. + nni_mtx_unlock(&s->mtx); + return; + } - nni_msgq_aio_put(s->uwq, aio); + // There should not be a pending reply, because we canceled + // it while we were waiting. + NNI_ASSERT(ctx->raio == NULL); + ctx->saio = NULL; + // Restore the message back to the aio. + nni_aio_set_msg(aio, ctx->reqmsg); + nni_msg_header_clear(ctx->reqmsg); + ctx->reqmsg = NULL; + + // Cancellation of a pending receive is treated as aborting the + // entire state machine. This allows us to preserve the semantic of + // exactly one receive operation per send operation, and should + // be the least surprising for users. The main consequence is that + // if a receive operation is completed (in error or otherwise), the + // user must submit a new send operation to restart the state machine. + req0_ctx_reset(ctx); + + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); } -static nni_msg * -req0_sock_filter(void *arg, nni_msg *msg) +static void +req0_ctx_send(void *arg, nni_aio *aio) { - req0_sock *s = arg; - nni_msg * rmsg; + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; + nng_msg * msg = nni_aio_get_msg(aio); + uint64_t id; + int rv; nni_mtx_lock(&s->mtx); - - if (nni_msg_header_len(msg) < 4) { + // Even though we always complete synchronously, this guards against + // restarting a request that was stopped. + if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) { nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); + return; + } + // Sending a new requst cancels the old one, including any + // outstanding reply. + if (ctx->raio != NULL) { + nni_aio_finish_error(ctx->raio, NNG_ECANCELED); + ctx->raio = NULL; + } + if (ctx->saio != NULL) { + nni_aio_set_msg(ctx->saio, ctx->reqmsg); + nni_msg_header_clear(ctx->reqmsg); + ctx->reqmsg = NULL; + nni_aio_finish_error(ctx->saio, NNG_ECANCELED); + ctx->saio = NULL; + nni_list_remove(&s->sendq, ctx); } - if ((rmsg = s->reqmsg) == NULL) { - // We had no outstanding request. (Perhaps canceled, - // or duplicate response.) + // This resets the entire state machine. + req0_ctx_reset(ctx); + + // Insert us on the per ID hash list, so that receives can find us. + if ((rv = nni_idhash_alloc(s->reqids, &id, ctx)) != 0) { nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); + nni_aio_finish_error(aio, rv); + return; } - - if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { - // Wrong request id. + ctx->reqid = (uint32_t) id; + if ((rv = nni_msg_header_append_u32(msg, ctx->reqid)) != 0) { + nni_idhash_remove(s->reqids, id); nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); + nni_aio_finish_error(aio, rv); + return; } + ctx->reqlen = nni_msg_len(msg); + ctx->reqmsg = msg; + ctx->saio = aio; + nni_aio_set_msg(aio, NULL); - s->reqmsg = NULL; - s->pendpipe = NULL; - nni_mtx_unlock(&s->mtx); + // Stick us on the sendq list. + nni_list_append(&s->sendq, ctx); - nni_msg_free(rmsg); + req0_run_sendq(s, NULL); + nni_mtx_unlock(&s->mtx); +} - return (msg); +static void +req0_sock_send(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + req0_ctx_send(s->ctx, aio); } static void req0_sock_recv(void *arg, nni_aio *aio) { req0_sock *s = arg; + req0_ctx_recv(s->ctx, aio); +} - nni_mtx_lock(&s->mtx); - if (s->reqmsg == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; +static int +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +{ + req0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); +} + +static int +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) +{ + req0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); +} + +static int +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ) +{ + req0_sock *s = arg; + int rv; + rv = req0_ctx_setopt_resendtime(s->ctx, buf, sz, typ); + s->retry = s->ctx->retry; + return (rv); +} + +static int +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ) +{ + req0_sock *s = arg; + return (req0_ctx_getopt_resendtime(s->ctx, buf, szp, typ)); +} + +static int +req0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) +{ + req0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + return (rv); } - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->urq, aio); + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -req0_sock_recv_raw(void *arg, nni_aio *aio) +static int +req0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { req0_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + return (rv); + } + + return (nni_copyout_int(fd, buf, szp, typ)); } static nni_proto_pipe_ops req0_pipe_ops = { @@ -628,6 +826,26 @@ static nni_proto_pipe_ops req0_pipe_ops = { .pipe_stop = req0_pipe_stop, }; +static nni_proto_ctx_option req0_ctx_options[] = { + { + .co_name = NNG_OPT_REQ_RESENDTIME, + .co_type = NNI_TYPE_DURATION, + .co_getopt = req0_ctx_getopt_resendtime, + .co_setopt = req0_ctx_setopt_resendtime, + }, + { + .co_name = NULL, + }, +}; + +static nni_proto_ctx_ops req0_ctx_ops = { + .ctx_init = req0_ctx_init, + .ctx_fini = req0_ctx_fini, + .ctx_recv = req0_ctx_recv, + .ctx_send = req0_ctx_send, + .ctx_options = req0_ctx_options, +}; + static nni_proto_sock_option req0_sock_options[] = { { .pso_name = NNG_OPT_MAXTTL, @@ -641,6 +859,18 @@ static nni_proto_sock_option req0_sock_options[] = { .pso_getopt = req0_sock_getopt_resendtime, .pso_setopt = req0_sock_setopt_resendtime, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = req0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = req0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -653,7 +883,6 @@ static nni_proto_sock_ops req0_sock_ops = { .sock_open = req0_sock_open, .sock_close = req0_sock_close, .sock_options = req0_sock_options, - .sock_filter = req0_sock_filter, .sock_send = req0_sock_send, .sock_recv = req0_sock_recv, }; @@ -662,9 +891,10 @@ static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REQ_V0, "req" }, .proto_peer = { NNI_PROTO_REP_V0, "rep" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &req0_sock_ops, .proto_pipe_ops = &req0_pipe_ops, + .proto_ctx_ops = &req0_ctx_ops, }; int @@ -672,28 +902,3 @@ nng_req0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &req0_proto)); } - -static nni_proto_sock_ops req0_sock_ops_raw = { - .sock_init = req0_sock_init_raw, - .sock_fini = req0_sock_fini, - .sock_open = req0_sock_open, - .sock_close = req0_sock_close, - .sock_options = req0_sock_options, - .sock_send = req0_sock_send_raw, - .sock_recv = req0_sock_recv_raw, -}; - -static nni_proto req0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &req0_sock_ops_raw, - .proto_pipe_ops = &req0_pipe_ops, -}; - -int -nng_req0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &req0_proto_raw)); -}
\ No newline at end of file |
