diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-08 12:22:42 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-08 12:22:42 -0800 |
| commit | 5eb72cbdea39728a67a09fdd6f6d1084dadced67 (patch) | |
| tree | af284318658c05785d321dc726cc1bba51942665 /src/protocol/survey/respond.c | |
| parent | ec2574b09a746709f15d2a3f5de135e29f4bcb52 (diff) | |
| download | nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.gz nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.tar.bz2 nng-5eb72cbdea39728a67a09fdd6f6d1084dadced67.zip | |
Add surveyor protocol (no tests yet).
This adds the surveyor protocol, and updates the respondent somewhat.
I've switched to using generic names for per-pipe and per-socket protocol
data. Hopefully this will make 'cut-n-paste' from other protocol
implementations easier.
Diffstat (limited to 'src/protocol/survey/respond.c')
| -rw-r--r-- | src/protocol/survey/respond.c | 189 |
1 files changed, 101 insertions, 88 deletions
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 3b23dff8..2891edc1 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -21,7 +21,7 @@ typedef struct nni_resp_sock nni_resp_sock; // An nni_resp_sock is our per-socket protocol private structure. struct nni_resp_sock { - nni_sock * sock; + nni_sock * nsock; int raw; int ttl; nni_idhash * pipes; @@ -31,33 +31,33 @@ struct nni_resp_sock { // An nni_resp_pipe is our per-pipe protocol private structure. struct nni_resp_pipe { - nni_pipe * pipe; - nni_resp_sock * resp; + nni_pipe * npipe; + nni_resp_sock * psock; nni_msgq * sendq; int sigclose; }; static int -nni_resp_sock_init(void **respp, nni_sock *sock) +nni_resp_sock_init(void **pp, nni_sock *nsock) { - nni_resp_sock *resp; + nni_resp_sock *psock; int rv; - if ((resp = NNI_ALLOC_STRUCT(resp)) == NULL) { + if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } - resp->ttl = 8; // Per RFC - resp->sock = sock; - resp->raw = 0; - resp->btrace = NULL; - resp->btrace_len = 0; - if ((rv = nni_idhash_create(&resp->pipes)) != 0) { - NNI_FREE_STRUCT(resp); + psock->ttl = 8; // Per RFC + psock->nsock = nsock; + psock->raw = 0; + psock->btrace = NULL; + psock->btrace_len = 0; + if ((rv = nni_idhash_create(&psock->pipes)) != 0) { + NNI_FREE_STRUCT(psock); return (rv); } - *respp = resp; - nni_sock_senderr(sock, NNG_ESTATE); + *pp = psock; + nni_sock_senderr(nsock, NNG_ESTATE); return (0); } @@ -65,33 +65,33 @@ nni_resp_sock_init(void **respp, nni_sock *sock) static void nni_resp_sock_fini(void *arg) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; - nni_idhash_destroy(resp->pipes); - if (resp->btrace != NULL) { - nni_free(resp->btrace, resp->btrace_len); + nni_idhash_destroy(psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); } - NNI_FREE_STRUCT(resp); + NNI_FREE_STRUCT(psock); } static int -nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) +nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock) { - nni_resp_pipe *rp; + nni_resp_pipe *ppipe; int rv; - if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { - NNI_FREE_STRUCT(rp); + if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) { + NNI_FREE_STRUCT(ppipe); return (rv); } - rp->pipe = pipe; - rp->resp = rsock; - rp->sigclose = 0; - *rpp = rp; + ppipe->npipe = npipe; + ppipe->psock = psock; + ppipe->sigclose = 0; + *pp = ppipe; return (0); } @@ -99,30 +99,32 @@ nni_resp_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) static void nni_resp_pipe_fini(void *arg) { - nni_resp_pipe *rp = arg; + nni_resp_pipe *ppipe = arg; - nni_msgq_fini(rp->sendq); - NNI_FREE_STRUCT(rp); + nni_msgq_fini(ppipe->sendq); + NNI_FREE_STRUCT(ppipe); } static int nni_resp_pipe_add(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; + int rv; - return (nni_idhash_insert(resp->pipes, nni_pipe_id(rp->pipe), rp)); + rv = nni_idhash_insert(psock->pipes, nni_pipe_id(ppipe->npipe), ppipe); + return (rv); } static void nni_resp_pipe_rem(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; - nni_idhash_remove(resp->pipes, nni_pipe_id(rp->pipe)); + nni_idhash_remove(psock->pipes, nni_pipe_id(ppipe->npipe)); } @@ -133,15 +135,15 @@ nni_resp_pipe_rem(void *arg) static void nni_resp_sock_send(void *arg) { - nni_resp_sock *resp = arg; - nni_msgq *uwq = nni_sock_sendq(resp->sock); - nni_mtx *mx = nni_sock_mtx(resp->sock); + nni_resp_sock *psock = arg; + nni_msgq *uwq = nni_sock_sendq(psock->nsock); + nni_mtx *mx = nni_sock_mtx(psock->nsock); nni_msg *msg; for (;;) { uint8_t *header; uint32_t id; - nni_resp_pipe *rp; + nni_resp_pipe *ppipe; int rv; if ((rv = nni_msgq_get(uwq, &msg)) != 0) { @@ -157,13 +159,13 @@ nni_resp_sock_send(void *arg) nni_msg_trim_header(msg, 4); nni_mtx_lock(mx); - if (nni_idhash_find(resp->pipes, id, (void **) &rp) != 0) { + if (nni_idhash_find(psock->pipes, id, (void **) &ppipe) != 0) { nni_mtx_unlock(mx); nni_msg_free(msg); continue; } // Try a non-blocking put to the lower writer. - rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO); + rv = nni_msgq_put_until(ppipe->sendq, msg, NNI_TIME_ZERO); if (rv != 0) { // message queue is full, we have no choice but // to drop it. This should not happen under normal @@ -178,39 +180,41 @@ nni_resp_sock_send(void *arg) static void nni_resp_pipe_send(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; - nni_msgq *sendq = rp->sendq; + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; + nni_pipe *npipe = ppipe->npipe; + nni_msgq *sendq = ppipe->sendq; nni_msg *msg; int rv; for (;;) { - rv = nni_msgq_get_sig(sendq, &msg, &rp->sigclose); + rv = nni_msgq_get_sig(sendq, &msg, &ppipe->sigclose); if (rv != 0) { break; } - rv = nni_pipe_send(rp->pipe, msg); + rv = nni_pipe_send(npipe, msg); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(nni_sock_recvq(resp->sock), &rp->sigclose); - nni_pipe_close(rp->pipe); + nni_msgq_signal(nni_sock_recvq(psock->nsock), &ppipe->sigclose); + nni_pipe_close(npipe); } static void nni_resp_pipe_recv(void *arg) { - nni_resp_pipe *rp = arg; - nni_resp_sock *resp = rp->resp; - nni_msgq *urq = nni_sock_recvq(resp->sock); + nni_resp_pipe *ppipe = arg; + nni_resp_sock *psock = ppipe->psock; + nni_msgq *urq = nni_sock_recvq(psock->nsock); + nni_pipe *npipe = ppipe->npipe; nni_msg *msg; int rv; uint8_t idbuf[4]; - uint32_t id = nni_pipe_id(rp->pipe); + uint32_t id = nni_pipe_id(npipe); NNI_PUT32(idbuf, id); @@ -220,7 +224,7 @@ nni_resp_pipe_recv(void *arg) int hops; again: - rv = nni_pipe_recv(rp->pipe, &msg); + rv = nni_pipe_recv(npipe, &msg); if (rv != 0) { break; } @@ -236,7 +240,7 @@ again: hops = 0; for (;;) { int end = 0; - if (hops >= resp->ttl) { + if (hops >= psock->ttl) { nni_msg_free(msg); goto again; } @@ -258,30 +262,39 @@ again: } // Now send it up. - rv = nni_msgq_put_sig(urq, msg, &rp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(nni_sock_sendq(resp->sock), &rp->sigclose); - nni_msgq_signal(rp->sendq, &rp->sigclose); - nni_pipe_close(rp->pipe); + nni_msgq_signal(nni_sock_sendq(psock->nsock), &ppipe->sigclose); + nni_msgq_signal(ppipe->sendq, &ppipe->sigclose); + nni_pipe_close(npipe); } static int nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; int rv; + int oldraw; switch (opt) { case NNG_OPT_MAXTTL: - rv = nni_setopt_int(&resp->ttl, buf, sz, 1, 255); + rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255); break; case NNG_OPT_RAW: - rv = nni_setopt_int(&resp->raw, buf, sz, 0, 1); + oldraw = psock->raw; + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + if (oldraw != psock->raw) { + if (!psock->raw) { + nni_sock_senderr(psock->nsock, 0); + } else { + nni_sock_senderr(psock->nsock, NNG_ESTATE); + } + } break; default: rv = NNG_ENOTSUP; @@ -293,15 +306,15 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz) static int nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - rv = nni_getopt_int(&resp->ttl, buf, szp); + rv = nni_getopt_int(&psock->ttl, buf, szp); break; case NNG_OPT_RAW: - rv = nni_getopt_int(&resp->raw, buf, szp); + rv = nni_getopt_int(&psock->raw, buf, szp); break; default: rv = NNG_ENOTSUP; @@ -313,19 +326,19 @@ nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * nni_resp_sock_sfilter(void *arg, nni_msg *msg) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; size_t len; - if (resp->raw) { + if (psock->raw) { return (msg); } // Cannot send again until a receive is done... - nni_sock_senderr(resp->sock, NNG_ESTATE); + nni_sock_senderr(psock->nsock, NNG_ESTATE); // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. - if (resp->btrace == NULL) { + if (psock->btrace == NULL) { nni_msg_free(msg); return (NULL); } @@ -333,17 +346,17 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg) // drop anything else in the header... nni_msg_trunc_header(msg, nni_msg_header_len(msg)); - if (nni_msg_append_header(msg, resp->btrace, resp->btrace_len) != 0) { - nni_free(resp->btrace, resp->btrace_len); - resp->btrace = NULL; - resp->btrace_len = 0; + if (nni_msg_append_header(msg, psock->btrace, psock->btrace_len) != 0) { + nni_free(psock->btrace, psock->btrace_len); + psock->btrace = NULL; + psock->btrace_len = 0; nni_msg_free(msg); return (NULL); } - nni_free(resp->btrace, resp->btrace_len); - resp->btrace = NULL; - resp->btrace_len = 0; + nni_free(psock->btrace, psock->btrace_len); + psock->btrace = NULL; + psock->btrace_len = 0; return (msg); } @@ -351,28 +364,28 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg) static nni_msg * nni_resp_sock_rfilter(void *arg, nni_msg *msg) { - nni_resp_sock *resp = arg; + nni_resp_sock *psock = arg; char *header; size_t len; - if (resp->raw) { + if (psock->raw) { return (msg); } - nni_sock_senderr(resp->sock, 0); + nni_sock_senderr(psock->nsock, 0); len = nni_msg_header_len(msg); header = nni_msg_header(msg); - if (resp->btrace != NULL) { - nni_free(resp->btrace, resp->btrace_len); - resp->btrace = NULL; - resp->btrace_len = 0; + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); + psock->btrace = NULL; + psock->btrace_len = 0; } - if ((resp->btrace = nni_alloc(len)) == NULL) { + if ((psock->btrace = nni_alloc(len)) == NULL) { nni_msg_free(msg); return (NULL); } - resp->btrace_len = len; - memcpy(resp->btrace, header, len); + psock->btrace_len = len; + memcpy(psock->btrace, header, len); nni_msg_trunc_header(msg, len); return (msg); } |
