diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-31 17:59:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 11:47:07 -0700 |
| commit | d72076207a2fad96ff014a81366868fb47a0ed1b (patch) | |
| tree | 5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/reqrep/req.c | |
| parent | 366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff) | |
| download | nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2 nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip | |
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them
abstractly in more places without inlining them. This will be used
for the ZeroTier transport to allow us to create operations consisting
of just the AIO. Furthermore, we provide accessors for some of the
aio members, in the hopes that we will be able to wrap these for
"safe" version of the AIO capability to export to applications, and
to protocol and transport implementors.
While here we cleaned up the protocol details to use consistently
shorter names (no nni_ prefix for static symbols needed), and we
also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/reqrep/req.c')
| -rw-r--r-- | src/protocol/reqrep/req.c | 522 |
1 files changed, 262 insertions, 260 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 2579417a..5da5003f 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -18,15 +18,15 @@ // request-reply pair. This is useful for building RPC clients, for // example. -typedef struct nni_req_pipe nni_req_pipe; -typedef struct nni_req_sock nni_req_sock; +typedef struct req_pipe req_pipe; +typedef struct req_sock req_sock; -static void nni_req_resend(nni_req_sock *); -static void nni_req_timeout(void *); -static void nni_req_pipe_fini(void *); +static void req_resend(req_sock *); +static void req_timeout(void *); +static void req_pipe_fini(void *); -// An nni_req_sock is our per-socket protocol private structure. -struct nni_req_sock { +// A req_sock is our per-socket protocol private structure. +struct req_sock { nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; @@ -38,7 +38,7 @@ struct nni_req_sock { int ttl; nni_msg * reqmsg; - nni_req_pipe *pendpipe; + req_pipe *pendpipe; nni_list readypipes; nni_list busypipes; @@ -51,230 +51,235 @@ struct nni_req_sock { nni_cv cv; }; -// An nni_req_pipe is our per-pipe protocol private structure. -struct nni_req_pipe { +// A req_pipe is our per-pipe protocol private structure. +struct req_pipe { nni_pipe * pipe; - nni_req_sock *req; + req_sock * req; nni_list_node node; - nni_aio aio_getq; // raw mode only - nni_aio aio_sendraw; // raw mode only - nni_aio aio_sendcooked; // cooked mode only - nni_aio aio_recv; - nni_aio aio_putq; + nni_aio * aio_getq; // raw mode only + nni_aio * aio_sendraw; // raw mode only + nni_aio * aio_sendcooked; // cooked mode only + nni_aio * aio_recv; + nni_aio * aio_putq; nni_mtx mtx; }; -static void nni_req_resender(void *); -static void nni_req_getq_cb(void *); -static void nni_req_sendraw_cb(void *); -static void nni_req_sendcooked_cb(void *); -static void nni_req_recv_cb(void *); -static void nni_req_putq_cb(void *); +static void req_resender(void *); +static void req_getq_cb(void *); +static void req_sendraw_cb(void *); +static void req_sendcooked_cb(void *); +static void req_recv_cb(void *); +static void req_putq_cb(void *); static int -nni_req_sock_init(void **reqp, nni_sock *sock) +req_sock_init(void **sp, nni_sock *sock) { - nni_req_sock *req; + req_sock *s; - if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&req->mtx); - nni_cv_init(&req->cv, &req->mtx); + nni_mtx_init(&s->mtx); + nni_cv_init(&s->cv, &s->mtx); - NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); - NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); - nni_timer_init(&req->timer, nni_req_timeout, req); + NNI_LIST_INIT(&s->readypipes, req_pipe, node); + NNI_LIST_INIT(&s->busypipes, req_pipe, node); + nni_timer_init(&s->timer, req_timeout, s); // this is "semi random" start for request IDs. - req->nextid = nni_random(); - - req->retry = NNI_SECOND * 60; - req->sock = sock; - req->reqmsg = NULL; - req->raw = 0; - req->wantw = 0; - req->resend = NNI_TIME_ZERO; - req->ttl = 8; - - req->uwq = nni_sock_sendq(sock); - req->urq = nni_sock_recvq(sock); - *reqp = req; + s->nextid = nni_random(); + s->retry = NNI_SECOND * 60; + s->sock = sock; + s->reqmsg = NULL; + s->raw = 0; + s->wantw = 0; + s->resend = NNI_TIME_ZERO; + s->ttl = 8; + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + *sp = s; nni_sock_recverr(sock, NNG_ESTATE); return (0); } static void -nni_req_sock_open(void *arg) +req_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -nni_req_sock_close(void *arg) +req_sock_close(void *arg) { - nni_req_sock *req = arg; + req_sock *s = arg; - nni_mtx_lock(&req->mtx); - req->closed = 1; - nni_mtx_unlock(&req->mtx); + nni_mtx_lock(&s->mtx); + s->closed = 1; + nni_mtx_unlock(&s->mtx); - nni_timer_cancel(&req->timer); + nni_timer_cancel(&s->timer); } static void -nni_req_sock_fini(void *arg) +req_sock_fini(void *arg) { - nni_req_sock *req = arg; + req_sock *s = arg; - nni_mtx_lock(&req->mtx); - while ((!nni_list_empty(&req->readypipes)) || - (!nni_list_empty(&req->busypipes))) { - nni_cv_wait(&req->cv); + nni_mtx_lock(&s->mtx); + while ((!nni_list_empty(&s->readypipes)) || + (!nni_list_empty(&s->busypipes))) { + nni_cv_wait(&s->cv); } - if (req->reqmsg != NULL) { - nni_msg_free(req->reqmsg); + if (s->reqmsg != NULL) { + nni_msg_free(s->reqmsg); } - nni_mtx_unlock(&req->mtx); - nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mtx); - NNI_FREE_STRUCT(req); + nni_mtx_unlock(&s->mtx); + nni_cv_fini(&s->cv); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); +} + +static void +req_pipe_fini(void *arg) +{ + req_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_sendcooked); + nni_aio_fini(p->aio_sendraw); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); } static int -nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) +req_pipe_init(void **pp, nni_pipe *pipe, void *s) { - nni_req_pipe *rp; + req_pipe *p; + int rv; - if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&rp->mtx); - nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp); - nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp); - nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp); - nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp); - nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp); - - NNI_LIST_NODE_INIT(&rp->node); - rp->pipe = pipe; - rp->req = rsock; - *rpp = rp; - return (0); -} + nni_mtx_init(&p->mtx); + if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) != + 0)) { + req_pipe_fini(p); + return (rv); + } -static void -nni_req_pipe_fini(void *arg) -{ - nni_req_pipe *rp = arg; - - nni_aio_fini(&rp->aio_getq); - nni_aio_fini(&rp->aio_putq); - nni_aio_fini(&rp->aio_recv); - nni_aio_fini(&rp->aio_sendcooked); - nni_aio_fini(&rp->aio_sendraw); - nni_mtx_fini(&rp->mtx); - NNI_FREE_STRUCT(rp); + NNI_LIST_NODE_INIT(&p->node); + p->pipe = pipe; + p->req = s; + *pp = p; + return (0); } static int -nni_req_pipe_start(void *arg) +req_pipe_start(void *arg) { - nni_req_pipe *rp = arg; - nni_req_sock *req = rp->req; + req_pipe *p = arg; + req_sock *s = p->req; - if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { + if (nni_pipe_peer(p->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } - nni_mtx_lock(&req->mtx); - if (req->closed) { - nni_mtx_unlock(&req->mtx); + nni_mtx_lock(&s->mtx); + if (s->closed) { + nni_mtx_unlock(&s->mtx); return (NNG_ECLOSED); } - nni_list_append(&req->readypipes, rp); - if (req->wantw) { - nni_req_resend(req); + nni_list_append(&s->readypipes, p); + // If sock was waiting for somewhere to send data, go ahead and + // send it to this pipe. + if (s->wantw) { + req_resend(s); } - nni_mtx_unlock(&req->mtx); + nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(req->uwq, &rp->aio_getq); - nni_pipe_recv(rp->pipe, &rp->aio_recv); + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); return (0); } static void -nni_req_pipe_stop(void *arg) +req_pipe_stop(void *arg) { - nni_req_pipe *rp = arg; - nni_req_sock *req = rp->req; + req_pipe *p = arg; + req_sock *s = p->req; - nni_aio_stop(&rp->aio_getq); - nni_aio_stop(&rp->aio_putq); - nni_aio_stop(&rp->aio_recv); - nni_aio_stop(&rp->aio_sendcooked); - nni_aio_stop(&rp->aio_sendraw); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_sendcooked); + nni_aio_stop(p->aio_sendraw); // At this point there should not be any further AIOs running. // Further, any completion tasks have completed. - nni_mtx_lock(&req->mtx); + nni_mtx_lock(&s->mtx); // This removes the node from either busypipes or readypipes. // It doesn't much matter which. - if (nni_list_node_active(&rp->node)) { - nni_list_node_remove(&rp->node); - if (req->closed) { - nni_cv_wake(&req->cv); + if (nni_list_node_active(&p->node)) { + nni_list_node_remove(&p->node); + if (s->closed) { + nni_cv_wake(&s->cv); } } - if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + if ((p == s->pendpipe) && (s->reqmsg != NULL)) { // removing the pipe we sent the last request on... // schedule immediate resend. - req->pendpipe = NULL; - req->resend = NNI_TIME_ZERO; - req->wantw = 1; - nni_req_resend(req); + s->pendpipe = NULL; + s->resend = NNI_TIME_ZERO; + s->wantw = 1; + req_resend(s); } - nni_mtx_unlock(&req->mtx); + nni_mtx_unlock(&s->mtx); } static int -nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - nni_req_sock *req = arg; - int rv = NNG_ENOTSUP; + req_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_req_resendtime) { - rv = nni_setopt_usec(&req->retry, buf, sz); + rv = nni_setopt_usec(&s->retry, buf, sz); } else if (opt == nng_optid_raw) { - rv = nni_setopt_int(&req->raw, buf, sz, 0, 1); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); if (rv == 0) { - nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE); + nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE); } } else if (opt == nng_optid_maxttl) { - rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255); + rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255); } return (rv); } static int -nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - nni_req_sock *req = arg; - int rv = NNG_ENOTSUP; + req_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_req_resendtime) { - rv = nni_getopt_usec(&req->retry, buf, szp); + rv = nni_getopt_usec(&s->retry, buf, szp); } else if (opt == nng_optid_raw) { - rv = nni_getopt_int(&req->raw, buf, szp); + rv = nni_getopt_int(&s->raw, buf, szp); } else if (opt == nng_optid_maxttl) { - rv = nni_getopt_int(&req->ttl, buf, szp); + rv = nni_getopt_int(&s->ttl, buf, szp); } return (rv); @@ -297,10 +302,10 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) // kind of priority.) static void -nni_req_getq_cb(void *arg) +req_getq_cb(void *arg) { - nni_req_pipe *rp = arg; - nni_req_sock *req = rp->req; + req_pipe *p = arg; + req_sock *s = p->req; // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. @@ -308,47 +313,47 @@ nni_req_getq_cb(void *arg) // that's ok (there's an inherent race anyway). (One minor // exception: we wind up here in error state when the uwq is closed.) - if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_stop(rp->pipe); + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); return; } - rp->aio_sendraw.a_msg = rp->aio_getq.a_msg; - rp->aio_getq.a_msg = NULL; + nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); // Send the message, but use the raw mode aio. - nni_pipe_send(rp->pipe, &rp->aio_sendraw); + nni_pipe_send(p->pipe, p->aio_sendraw); } static void -nni_req_sendraw_cb(void *arg) +req_sendraw_cb(void *arg) { - nni_req_pipe *rp = arg; + req_pipe *p = arg; - if (nni_aio_result(&rp->aio_sendraw) != 0) { - nni_msg_free(rp->aio_sendraw.a_msg); - rp->aio_sendraw.a_msg = NULL; - nni_pipe_stop(rp->pipe); + if (nni_aio_result(p->aio_sendraw) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); + nni_aio_set_msg(p->aio_sendraw, NULL); + nni_pipe_stop(p->pipe); return; } // Sent a message so we just need to look for another one. - nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq); + nni_msgq_aio_get(p->req->uwq, p->aio_getq); } static void -nni_req_sendcooked_cb(void *arg) +req_sendcooked_cb(void *arg) { - nni_req_pipe *rp = arg; - nni_req_sock *req = rp->req; + req_pipe *p = arg; + req_sock *s = p->req; - if (nni_aio_result(&rp->aio_sendcooked) != 0) { + if (nni_aio_result(p->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. // We leave ourselves on the busy list for now, which // means no new asynchronous traffic can occur here. - nni_msg_free(rp->aio_sendcooked.a_msg); - rp->aio_sendcooked.a_msg = NULL; - nni_pipe_stop(rp->pipe); + nni_msg_free(nni_aio_get_msg(p->aio_sendcooked)); + nni_aio_set_msg(p->aio_sendcooked, NULL); + nni_pipe_stop(p->pipe); return; } @@ -356,49 +361,50 @@ nni_req_sendcooked_cb(void *arg) // reinsert ourselves in the ready list, and possibly schedule // a resend. - nni_mtx_lock(&req->mtx); - if (nni_list_active(&req->busypipes, rp)) { - nni_list_remove(&req->busypipes, rp); - nni_list_append(&req->readypipes, rp); - nni_req_resend(req); + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->busypipes, p)) { + nni_list_remove(&s->busypipes, p); + nni_list_append(&s->readypipes, p); + req_resend(s); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the // writer side. In this case we can't complete the operation, // and we have to abort. - nni_pipe_stop(rp->pipe); + nni_pipe_stop(p->pipe); } - nni_mtx_unlock(&req->mtx); + nni_mtx_unlock(&s->mtx); } static void -nni_req_putq_cb(void *arg) +req_putq_cb(void *arg) { - nni_req_pipe *rp = arg; + req_pipe *p = arg; - if (nni_aio_result(&rp->aio_putq) != 0) { - nni_msg_free(rp->aio_putq.a_msg); - nni_pipe_stop(rp->pipe); + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); return; } - rp->aio_putq.a_msg = NULL; + nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_recv(rp->pipe, &rp->aio_recv); + nni_pipe_recv(p->pipe, p->aio_recv); } static void -nni_req_recv_cb(void *arg) +req_recv_cb(void *arg) { - nni_req_pipe *rp = arg; - nni_msg * msg; + req_pipe *p = arg; + nni_msg * msg; - if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_stop(rp->pipe); + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); return; } - msg = rp->aio_recv.a_msg; - rp->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); // We yank 4 bytes of body, and move them to the header. if (nni_msg_len(msg) < 4) { @@ -414,92 +420,90 @@ nni_req_recv_cb(void *arg) } (void) nni_msg_trim(msg, 4); // Cannot fail - rp->aio_putq.a_msg = msg; - nni_msgq_aio_put(rp->req->urq, &rp->aio_putq); + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->req->urq, p->aio_putq); return; malformed: nni_msg_free(msg); - nni_pipe_stop(rp->pipe); + nni_pipe_stop(p->pipe); } static void -nni_req_timeout(void *arg) +req_timeout(void *arg) { - nni_req_sock *req = arg; + req_sock *s = arg; - nni_mtx_lock(&req->mtx); - if (req->reqmsg != NULL) { - req->wantw = 1; - nni_req_resend(req); + nni_mtx_lock(&s->mtx); + if (s->reqmsg != NULL) { + s->wantw = 1; + req_resend(s); } - nni_mtx_unlock(&req->mtx); + nni_mtx_unlock(&s->mtx); } static void -nni_req_resend(nni_req_sock *req) +req_resend(req_sock *s) { - nni_req_pipe *rp; - nni_msg * msg; + req_pipe *p; + nni_msg * msg; // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode // requests. - if ((msg = req->reqmsg) == NULL) { + if ((msg = s->reqmsg) == NULL) { return; } - if (req->closed) { - req->reqmsg = NULL; + if (s->closed) { + s->reqmsg = NULL; nni_msg_free(msg); } - if (req->wantw) { - req->wantw = 0; + if (s->wantw) { + s->wantw = 0; - if (nni_msg_dup(&msg, req->reqmsg) != 0) { + if (nni_msg_dup(&msg, s->reqmsg) != 0) { // Failed to alloc message, reschedule it. Also, // mark that we have a message we want to resend, // in case something comes available. - req->wantw = 1; - nni_timer_schedule( - &req->timer, nni_clock() + req->retry); + s->wantw = 1; + nni_timer_schedule(&s->timer, nni_clock() + s->retry); return; } // Now we iterate across all possible outpipes, until // one accepts it. - rp = nni_list_first(&req->readypipes); - if (rp == NULL) { + if ((p = nni_list_first(&s->readypipes)) == NULL) { // No pipes ready to process us. Note that we have // something to send, and schedule it. nni_msg_free(msg); - req->wantw = 1; + s->wantw = 1; return; } - nni_list_remove(&req->readypipes, rp); - nni_list_append(&req->busypipes, rp); + nni_list_remove(&s->readypipes, p); + nni_list_append(&s->busypipes, p); - req->pendpipe = rp; - req->resend = nni_clock() + req->retry; - rp->aio_sendcooked.a_msg = msg; + s->pendpipe = p; + s->resend = nni_clock() + s->retry; + nni_aio_set_msg(p->aio_sendcooked, msg); // Note that because we were ready rather than busy, we // should not have any I/O oustanding and hence the aio // object will be available for our use. - nni_pipe_send(rp->pipe, &rp->aio_sendcooked); - nni_timer_schedule(&req->timer, req->resend); + nni_pipe_send(p->pipe, p->aio_sendcooked); + nni_timer_schedule(&s->timer, s->resend); } } static nni_msg * -nni_req_sock_sfilter(void *arg, nni_msg *msg) +req_sock_sfilter(void *arg, nni_msg *msg) { - nni_req_sock *req = arg; - uint32_t id; + req_sock *s = arg; + uint32_t id; - if (req->raw) { + if (s->raw) { // No automatic retry, and the request ID must // be in the header coming down. return (msg); @@ -508,12 +512,12 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the // backtrace. (Pipe IDs have the high order bit clear.) - id = (req->nextid++) | 0x80000000u; + id = (s->nextid++) | 0x80000000u; // Request ID is in big endian format. - NNI_PUT32(req->reqid, id); + NNI_PUT32(s->reqid, id); - if (nni_msg_header_append(msg, req->reqid, 4) != 0) { + if (nni_msg_header_append(msg, s->reqid, 4) != 0) { // Should be ENOMEM. nni_msg_free(msg); return (NULL); @@ -521,36 +525,36 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) // NB: The socket lock is also held, so this is always self-serialized. // But we have to serialize against other async callbacks. - nni_mtx_lock(&req->mtx); + nni_mtx_lock(&s->mtx); // If another message is there, this cancels it. - if (req->reqmsg != NULL) { - nni_msg_free(req->reqmsg); - req->reqmsg = NULL; + if (s->reqmsg != NULL) { + nni_msg_free(s->reqmsg); + s->reqmsg = NULL; } // Make a duplicate message... for retries. - req->reqmsg = msg; + s->reqmsg = msg; // Schedule for immediate send - req->resend = NNI_TIME_ZERO; - req->wantw = 1; + s->resend = NNI_TIME_ZERO; + s->wantw = 1; - nni_req_resend(req); - nni_mtx_unlock(&req->mtx); + req_resend(s); + nni_mtx_unlock(&s->mtx); // Clear the error condition. - nni_sock_recverr(req->sock, 0); + nni_sock_recverr(s->sock, 0); return (NULL); } static nni_msg * -nni_req_sock_rfilter(void *arg, nni_msg *msg) +req_sock_rfilter(void *arg, nni_msg *msg) { - nni_req_sock *req = arg; - nni_msg * rmsg; + req_sock *s = arg; + nni_msg * rmsg; - if (req->raw) { + if (s->raw) { // Pass it unmolested return (msg); } @@ -560,62 +564,60 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) return (NULL); } - nni_mtx_lock(&req->mtx); + nni_mtx_lock(&s->mtx); - if ((rmsg = req->reqmsg) == NULL) { + if ((rmsg = s->reqmsg) == NULL) { // We had no outstanding request. - nni_mtx_unlock(&req->mtx); + nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } - if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { + if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { // Wrong request id - nni_mtx_unlock(&req->mtx); + nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } - req->reqmsg = NULL; - req->pendpipe = NULL; - nni_mtx_unlock(&req->mtx); + s->reqmsg = NULL; + s->pendpipe = NULL; + nni_mtx_unlock(&s->mtx); - nni_sock_recverr(req->sock, NNG_ESTATE); + nni_sock_recverr(s->sock, NNG_ESTATE); nni_msg_free(rmsg); return (msg); } -// This is the global protocol structure -- our linkage to the core. -// This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops nni_req_pipe_ops = { - .pipe_init = nni_req_pipe_init, - .pipe_fini = nni_req_pipe_fini, - .pipe_start = nni_req_pipe_start, - .pipe_stop = nni_req_pipe_stop, +static nni_proto_pipe_ops req_pipe_ops = { + .pipe_init = req_pipe_init, + .pipe_fini = req_pipe_fini, + .pipe_start = req_pipe_start, + .pipe_stop = req_pipe_stop, }; -static nni_proto_sock_ops nni_req_sock_ops = { - .sock_init = nni_req_sock_init, - .sock_fini = nni_req_sock_fini, - .sock_open = nni_req_sock_open, - .sock_close = nni_req_sock_close, - .sock_setopt = nni_req_sock_setopt, - .sock_getopt = nni_req_sock_getopt, - .sock_rfilter = nni_req_sock_rfilter, - .sock_sfilter = nni_req_sock_sfilter, +static nni_proto_sock_ops req_sock_ops = { + .sock_init = req_sock_init, + .sock_fini = req_sock_fini, + .sock_open = req_sock_open, + .sock_close = req_sock_close, + .sock_setopt = req_sock_setopt, + .sock_getopt = req_sock_getopt, + .sock_rfilter = req_sock_rfilter, + .sock_sfilter = req_sock_sfilter, }; -nni_proto nni_req_proto = { +static nni_proto req_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNG_PROTO_REQ_V0, "req" }, .proto_peer = { NNG_PROTO_REP_V0, "rep" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &nni_req_sock_ops, - .proto_pipe_ops = &nni_req_pipe_ops, + .proto_sock_ops = &req_sock_ops, + .proto_pipe_ops = &req_pipe_ops, }; int nng_req0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_req_proto)); + return (nni_proto_open(sidp, &req_proto)); } |
