diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-31 17:59:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 11:47:07 -0700 |
| commit | d72076207a2fad96ff014a81366868fb47a0ed1b (patch) | |
| tree | 5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/survey/survey.c | |
| parent | 366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff) | |
| download | nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2 nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip | |
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them
abstractly in more places without inlining them. This will be used
for the ZeroTier transport to allow us to create operations consisting
of just the AIO. Furthermore, we provide accessors for some of the
aio members, in the hopes that we will be able to wrap these for
"safe" version of the AIO capability to export to applications, and
to protocol and transport implementors.
While here we cleaned up the protocol details to use consistently
shorter names (no nni_ prefix for static symbols needed), and we
also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/survey/survey.c')
| -rw-r--r-- | src/protocol/survey/survey.c | 393 |
1 files changed, 198 insertions, 195 deletions
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index d7341025..09fe0768 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -16,18 +16,18 @@ // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -typedef struct nni_surv_pipe nni_surv_pipe; -typedef struct nni_surv_sock nni_surv_sock; - -static void nni_surv_sock_getq_cb(void *); -static void nni_surv_getq_cb(void *); -static void nni_surv_putq_cb(void *); -static void nni_surv_send_cb(void *); -static void nni_surv_recv_cb(void *); -static void nni_surv_timeout(void *); - -// An nni_surv_sock is our per-socket protocol private structure. -struct nni_surv_sock { +typedef struct surv_pipe surv_pipe; +typedef struct surv_sock surv_sock; + +static void surv_sock_getq_cb(void *); +static void surv_getq_cb(void *); +static void surv_putq_cb(void *); +static void surv_send_cb(void *); +static void surv_recv_cb(void *); +static void surv_timeout(void *); + +// A surv_sock is our per-socket protocol private structure. +struct surv_sock { nni_sock * nsock; nni_duration survtime; nni_time expire; @@ -36,211 +36,214 @@ struct nni_surv_sock { uint32_t nextid; // next id uint32_t survid; // outstanding request ID (big endian) nni_list pipes; - nni_aio aio_getq; + nni_aio * aio_getq; nni_timer_node timer; nni_msgq * uwq; nni_msgq * urq; nni_mtx mtx; }; -// An nni_surv_pipe is our per-pipe protocol private structure. -struct nni_surv_pipe { - nni_pipe * npipe; - nni_surv_sock *psock; - nni_msgq * sendq; - nni_list_node node; - nni_aio aio_getq; - nni_aio aio_putq; - nni_aio aio_send; - nni_aio aio_recv; +// A surv_pipe is our per-pipe protocol private structure. +struct surv_pipe { + nni_pipe * npipe; + surv_sock * psock; + nni_msgq * sendq; + nni_list_node node; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; }; static void -nni_surv_sock_fini(void *arg) +surv_sock_fini(void *arg) { - nni_surv_sock *psock = arg; + surv_sock *s = arg; - nni_aio_stop(&psock->aio_getq); - nni_aio_fini(&psock->aio_getq); - nni_mtx_fini(&psock->mtx); - NNI_FREE_STRUCT(psock); + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_mtx_fini(&s->mtx); + NNI_FREE_STRUCT(s); } static int -nni_surv_sock_init(void **sp, nni_sock *nsock) +surv_sock_init(void **sp, nni_sock *nsock) { - nni_surv_sock *psock; + surv_sock *s; + int rv; - if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node); - nni_mtx_init(&psock->mtx); - nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock); - nni_timer_init(&psock->timer, nni_surv_timeout, psock); - - psock->nextid = nni_random(); - psock->nsock = nsock; - psock->raw = 0; - psock->survtime = NNI_SECOND * 60; - psock->expire = NNI_TIME_ZERO; - psock->uwq = nni_sock_sendq(nsock); - psock->urq = nni_sock_recvq(nsock); - - *sp = psock; + if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) { + surv_sock_fini(s); + return (rv); + } + NNI_LIST_INIT(&s->pipes, surv_pipe, node); + nni_mtx_init(&s->mtx); + nni_timer_init(&s->timer, surv_timeout, s); + + s->nextid = nni_random(); + s->nsock = nsock; + s->raw = 0; + s->survtime = NNI_SECOND * 60; + s->expire = NNI_TIME_ZERO; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + + *sp = s; nni_sock_recverr(nsock, NNG_ESTATE); return (0); } static void -nni_surv_sock_open(void *arg) +surv_sock_open(void *arg) { - nni_surv_sock *psock = arg; + surv_sock *s = arg; - nni_msgq_aio_get(psock->uwq, &psock->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -nni_surv_sock_close(void *arg) +surv_sock_close(void *arg) { - nni_surv_sock *psock = arg; + surv_sock *s = arg; - nni_timer_cancel(&psock->timer); - nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED); + nni_timer_cancel(&s->timer); + nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -nni_surv_pipe_fini(void *arg) +surv_pipe_fini(void *arg) { - nni_surv_pipe *ppipe = arg; - - nni_aio_fini(&ppipe->aio_getq); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); + surv_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); } static int -nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock) +surv_pipe_init(void **pp, nni_pipe *npipe, void *s) { - nni_surv_pipe *ppipe; - int rv; + surv_pipe *p; + int rv; - if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } // This depth could be tunable. - if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) { - NNI_FREE_STRUCT(ppipe); + if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) { + surv_pipe_fini(p); return (rv); } - nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe); - nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe); - nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe); - nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe); - - ppipe->npipe = npipe; - ppipe->psock = psock; - *pp = ppipe; + p->npipe = npipe; + p->psock = s; + *pp = p; return (0); } static int -nni_surv_pipe_start(void *arg) +surv_pipe_start(void *arg) { - nni_surv_pipe *ppipe = arg; - nni_surv_sock *psock = ppipe->psock; + surv_pipe *p = arg; + surv_sock *s = p->psock; - nni_mtx_lock(&psock->mtx); - nni_list_append(&psock->pipes, ppipe); - nni_mtx_unlock(&psock->mtx); + nni_mtx_lock(&s->mtx); + nni_list_append(&s->pipes, p); + nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq); - nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->npipe, p->aio_recv); return (0); } static void -nni_surv_pipe_stop(void *arg) +surv_pipe_stop(void *arg) { - nni_surv_pipe *ppipe = arg; - nni_surv_sock *psock = ppipe->psock; + surv_pipe *p = arg; + surv_sock *s = p->psock; - nni_aio_stop(&ppipe->aio_getq); - nni_aio_stop(&ppipe->aio_send); - nni_aio_stop(&ppipe->aio_recv); - nni_aio_stop(&ppipe->aio_putq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); - nni_msgq_close(ppipe->sendq); + nni_msgq_close(p->sendq); - nni_mtx_lock(&psock->mtx); - if (nni_list_active(&psock->pipes, ppipe)) { - nni_list_remove(&psock->pipes, ppipe); + nni_mtx_lock(&s->mtx); + if (nni_list_active(&s->pipes, p)) { + nni_list_remove(&s->pipes, p); } - nni_mtx_unlock(&psock->mtx); + nni_mtx_unlock(&s->mtx); } static void -nni_surv_getq_cb(void *arg) +surv_getq_cb(void *arg) { - nni_surv_pipe *ppipe = arg; + surv_pipe *p = arg; - if (nni_aio_result(&ppipe->aio_getq) != 0) { - nni_pipe_stop(ppipe->npipe); + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->npipe); return; } - ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg; - ppipe->aio_getq.a_msg = NULL; + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); - nni_pipe_send(ppipe->npipe, &ppipe->aio_send); + nni_pipe_send(p->npipe, p->aio_send); } static void -nni_surv_send_cb(void *arg) +surv_send_cb(void *arg) { - nni_surv_pipe *ppipe = arg; + surv_pipe *p = arg; - if (nni_aio_result(&ppipe->aio_send) != 0) { - nni_msg_free(ppipe->aio_send.a_msg); - ppipe->aio_send.a_msg = NULL; - nni_pipe_stop(ppipe->npipe); + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); return; } - nni_msgq_aio_get(ppipe->psock->uwq, &ppipe->aio_getq); + nni_msgq_aio_get(p->psock->uwq, p->aio_getq); } static void -nni_surv_putq_cb(void *arg) +surv_putq_cb(void *arg) { - nni_surv_pipe *ppipe = arg; + surv_pipe *p = arg; - if (nni_aio_result(&ppipe->aio_putq) != 0) { - nni_msg_free(ppipe->aio_putq.a_msg); - ppipe->aio_putq.a_msg = NULL; - nni_pipe_stop(ppipe->npipe); + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->npipe); return; } - nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv); + nni_pipe_recv(p->npipe, p->aio_recv); } static void -nni_surv_recv_cb(void *arg) +surv_recv_cb(void *arg) { - nni_surv_pipe *ppipe = arg; - nni_msg * msg; + surv_pipe *p = arg; + nni_msg * msg; - if (nni_aio_result(&ppipe->aio_recv) != 0) { + if (nni_aio_result(p->aio_recv) != 0) { goto failed; } - msg = ppipe->aio_recv.a_msg; - ppipe->aio_recv.a_msg = NULL; + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); // We yank 4 bytes of body, and move them to the header. if (nni_msg_len(msg) < 4) { @@ -255,35 +258,35 @@ nni_surv_recv_cb(void *arg) } (void) nni_msg_trim(msg, 4); - ppipe->aio_putq.a_msg = msg; - nni_msgq_aio_put(ppipe->psock->urq, &ppipe->aio_putq); + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(p->psock->urq, p->aio_putq); return; failed: - nni_pipe_stop(ppipe->npipe); + nni_pipe_stop(p->npipe); } static int -nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - nni_surv_sock *psock = arg; - int rv = NNG_ENOTSUP; - int oldraw; + surv_sock *s = arg; + int rv = NNG_ENOTSUP; + int oldraw; if (opt == nng_optid_surveyor_surveytime) { - rv = nni_setopt_usec(&psock->survtime, buf, sz); + rv = nni_setopt_usec(&s->survtime, buf, sz); } else if (opt == nng_optid_raw) { - oldraw = psock->raw; - rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); - if (oldraw != psock->raw) { - if (psock->raw) { - nni_sock_recverr(psock->nsock, 0); + oldraw = s->raw; + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + if (oldraw != s->raw) { + if (s->raw) { + nni_sock_recverr(s->nsock, 0); } else { - nni_sock_recverr(psock->nsock, NNG_ESTATE); + nni_sock_recverr(s->nsock, NNG_ESTATE); } - psock->survid = 0; - nni_timer_cancel(&psock->timer); + s->survid = 0; + nni_timer_cancel(&s->timer); } } @@ -291,49 +294,49 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz) } static int -nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - nni_surv_sock *psock = arg; - int rv = NNG_ENOTSUP; + surv_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_surveyor_surveytime) { - rv = nni_getopt_usec(&psock->survtime, buf, szp); + rv = nni_getopt_usec(&s->survtime, buf, szp); } else if (opt == nng_optid_raw) { - rv = nni_getopt_int(&psock->raw, buf, szp); + rv = nni_getopt_int(&s->raw, buf, szp); } return (rv); } static void -nni_surv_sock_getq_cb(void *arg) +surv_sock_getq_cb(void *arg) { - nni_surv_sock *psock = arg; - nni_surv_pipe *ppipe; - nni_surv_pipe *last; - nni_msg * msg, *dup; + surv_sock *s = arg; + surv_pipe *p; + surv_pipe *last; + nni_msg * msg, *dup; - if (nni_aio_result(&psock->aio_getq) != 0) { + if (nni_aio_result(s->aio_getq) != 0) { // Should be NNG_ECLOSED. return; } - msg = psock->aio_getq.a_msg; - psock->aio_getq.a_msg = NULL; + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); - nni_mtx_lock(&psock->mtx); - last = nni_list_last(&psock->pipes); - NNI_LIST_FOREACH (&psock->pipes, ppipe) { - if (ppipe != last) { + nni_mtx_lock(&s->mtx); + last = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (p != last) { if (nni_msg_dup(&dup, msg) != 0) { continue; } } else { dup = msg; } - if (nni_msgq_tryput(ppipe->sendq, dup) != 0) { + if (nni_msgq_tryput(p->sendq, dup) != 0) { nni_msg_free(dup); } } - nni_mtx_unlock(&psock->mtx); + nni_mtx_unlock(&s->mtx); if (last == NULL) { // If there were no pipes to send on, just toss the message. @@ -342,23 +345,23 @@ nni_surv_sock_getq_cb(void *arg) } static void -nni_surv_timeout(void *arg) +surv_timeout(void *arg) { - nni_surv_sock *psock = arg; + surv_sock *s = arg; - nni_sock_lock(psock->nsock); - psock->survid = 0; - nni_sock_recverr(psock->nsock, NNG_ESTATE); - nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT); - nni_sock_unlock(psock->nsock); + nni_sock_lock(s->nsock); + s->survid = 0; + nni_sock_recverr(s->nsock, NNG_ESTATE); + nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); + nni_sock_unlock(s->nsock); } static nni_msg * -nni_surv_sock_sfilter(void *arg, nni_msg *msg) +surv_sock_sfilter(void *arg, nni_msg *msg) { - nni_surv_sock *psock = arg; + surv_sock *s = arg; - if (psock->raw) { + if (s->raw) { // No automatic retry, and the request ID must // be in the header coming down. return (msg); @@ -367,9 +370,9 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg) // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the // backtrace. (Pipe IDs have the high order bit clear.) - psock->survid = (psock->nextid++) | 0x80000000u; + s->survid = (s->nextid++) | 0x80000000u; - if (nni_msg_header_append_u32(msg, psock->survid) != 0) { + if (nni_msg_header_append_u32(msg, s->survid) != 0) { // Should be ENOMEM. nni_msg_free(msg); return (NULL); @@ -378,28 +381,28 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg) // If another message is there, this cancels it. We move the // survey expiration out. The timeout thread will wake up in // the wake below, and reschedule itself appropriately. - psock->expire = nni_clock() + psock->survtime; - nni_timer_schedule(&psock->timer, psock->expire); + s->expire = nni_clock() + s->survtime; + nni_timer_schedule(&s->timer, s->expire); // Clear the error condition. - nni_sock_recverr(psock->nsock, 0); + nni_sock_recverr(s->nsock, 0); // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0); return (msg); } static nni_msg * -nni_surv_sock_rfilter(void *arg, nni_msg *msg) +surv_sock_rfilter(void *arg, nni_msg *msg) { - nni_surv_sock *ssock = arg; + surv_sock *s = arg; - if (ssock->raw) { + if (s->raw) { // Pass it unmolested return (msg); } if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || - (nni_msg_header_trim_u32(msg) != ssock->survid)) { + (nni_msg_header_trim_u32(msg) != s->survid)) { // Wrong request id nni_msg_free(msg); return (NULL); @@ -408,35 +411,35 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg) return (msg); } -static nni_proto_pipe_ops nni_surv_pipe_ops = { - .pipe_init = nni_surv_pipe_init, - .pipe_fini = nni_surv_pipe_fini, - .pipe_start = nni_surv_pipe_start, - .pipe_stop = nni_surv_pipe_stop, +static nni_proto_pipe_ops surv_pipe_ops = { + .pipe_init = surv_pipe_init, + .pipe_fini = surv_pipe_fini, + .pipe_start = surv_pipe_start, + .pipe_stop = surv_pipe_stop, }; -static nni_proto_sock_ops nni_surv_sock_ops = { - .sock_init = nni_surv_sock_init, - .sock_fini = nni_surv_sock_fini, - .sock_open = nni_surv_sock_open, - .sock_close = nni_surv_sock_close, - .sock_setopt = nni_surv_sock_setopt, - .sock_getopt = nni_surv_sock_getopt, - .sock_rfilter = nni_surv_sock_rfilter, - .sock_sfilter = nni_surv_sock_sfilter, +static nni_proto_sock_ops surv_sock_ops = { + .sock_init = surv_sock_init, + .sock_fini = surv_sock_fini, + .sock_open = surv_sock_open, + .sock_close = surv_sock_close, + .sock_setopt = surv_sock_setopt, + .sock_getopt = surv_sock_getopt, + .sock_rfilter = surv_sock_rfilter, + .sock_sfilter = surv_sock_sfilter, }; -nni_proto nni_surveyor_proto = { +static nni_proto surv_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNG_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNG_PROTO_RESPONDENT_V0, "respondent" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &nni_surv_sock_ops, - .proto_pipe_ops = &nni_surv_pipe_ops, + .proto_sock_ops = &surv_sock_ops, + .proto_pipe_ops = &surv_pipe_ops, }; int nng_surveyor0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_surveyor_proto)); + return (nni_proto_open(sidp, &surv_proto)); } |
