diff options
Diffstat (limited to 'src/protocol/pair')
| -rw-r--r-- | src/protocol/pair/pair.c | 129 |
1 files changed, 72 insertions, 57 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index ec6709ac..ff02c2d1 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -21,10 +21,11 @@ typedef struct nni_pair_sock nni_pair_sock; // An nni_pair_sock is our per-socket protocol private structure. struct nni_pair_sock { - nni_sock * sock; - nni_pair_pipe * pipe; + nni_sock * nsock; + nni_pair_pipe * ppipe; nni_msgq * uwq; nni_msgq * urq; + int raw; }; // An nni_pair_pipe is our per-pipe protocol private structure. We keep @@ -32,8 +33,8 @@ struct nni_pair_sock { // pipe. The separate data structure is more like other protocols that do // manage multiple pipes. struct nni_pair_pipe { - nni_pipe * pipe; - nni_pair_sock * pair; + nni_pipe * npipe; + nni_pair_sock * psock; int sigclose; }; @@ -41,19 +42,20 @@ static void nni_pair_receiver(void *); static void nni_pair_sender(void *); static int -nni_pair_sock_init(void **pairp, nni_sock *sock) +nni_pair_sock_init(void **sp, nni_sock *nsock) { - nni_pair_sock *pair; + nni_pair_sock *psock; int rv; - if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { + if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) { return (NNG_ENOMEM); } - pair->sock = sock; - pair->pipe = NULL; - pair->uwq = nni_sock_sendq(sock); - pair->urq = nni_sock_recvq(sock); - *pairp = pair; + psock->nsock = nsock; + psock->ppipe = NULL; + psock->raw = 0; + psock->uwq = nni_sock_sendq(nsock); + psock->urq = nni_sock_recvq(nsock); + *sp = psock; return (0); } @@ -61,28 +63,24 @@ nni_pair_sock_init(void **pairp, nni_sock *sock) static void nni_pair_sock_fini(void *arg) { - nni_pair_sock *pair = arg; + nni_pair_sock *psock = arg; - // If we had any worker threads that we have not unregistered, - // this wold be the time to shut them all down. We don't, because - // the socket already shut us down, and we don't have any other - // threads that run. - NNI_FREE_STRUCT(pair); + NNI_FREE_STRUCT(psock); } static int -nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock) +nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) { - nni_pair_pipe *pp; + nni_pair_pipe *ppipe; - if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { + if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) { return (NNG_ENOMEM); } - pp->pipe = pipe; - pp->sigclose = 0; - pp->pair = psock; - *ppp = pp; + ppipe->npipe = npipe; + ppipe->sigclose = 0; + ppipe->psock = psock; + *pp = ppipe; return (0); } @@ -90,22 +88,22 @@ nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock) static void nni_pair_pipe_fini(void *arg) { - nni_pair_pipe *pp = arg; + nni_pair_pipe *ppipe = arg; - NNI_FREE_STRUCT(pp); + NNI_FREE_STRUCT(ppipe); } static int nni_pair_pipe_add(void *arg) { - nni_pair_pipe *pp = arg; - nni_pair_sock *pair = pp->pair; + nni_pair_pipe *ppipe = arg; + nni_pair_sock *psock = ppipe->psock; - if (pair->pipe != NULL) { + if (psock->ppipe != NULL) { return (NNG_EBUSY); // Already have a peer, denied. } - pair->pipe = pp; + psock->ppipe = ppipe; return (0); } @@ -113,11 +111,11 @@ nni_pair_pipe_add(void *arg) static void nni_pair_pipe_rem(void *arg) { - nni_pair_pipe *pp = arg; - nni_pair_sock *pair = pp->pair; + nni_pair_pipe *ppipe = arg; + nni_pair_sock *psock = ppipe->psock; - if (pair->pipe == pp) { - pair->pipe = NULL; + if (psock->ppipe == ppipe) { + psock->ppipe = NULL; } } @@ -125,70 +123,87 @@ nni_pair_pipe_rem(void *arg) static void nni_pair_pipe_send(void *arg) { - nni_pair_pipe *pp = arg; - nni_pair_sock *pair = pp->pair; - nni_msgq *uwq = pair->uwq; - nni_msgq *urq = pair->urq; - nni_pipe *pipe = pp->pipe; + nni_pair_pipe *ppipe = arg; + nni_pair_sock *psock = ppipe->psock; + nni_msgq *uwq = psock->uwq; + nni_msgq *urq = psock->urq; + nni_pipe *npipe = ppipe->npipe; nni_msg *msg; int rv; for (;;) { - rv = nni_msgq_get_sig(uwq, &msg, &pp->sigclose); + rv = nni_msgq_get_sig(uwq, &msg, &ppipe->sigclose); if (rv != 0) { break; } - rv = nni_pipe_send(pipe, msg); + rv = nni_pipe_send(npipe, msg); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(urq, &pp->sigclose); - nni_pipe_close(pipe); + nni_msgq_signal(urq, &ppipe->sigclose); + nni_pipe_close(npipe); } static void nni_pair_pipe_recv(void *arg) { - nni_pair_pipe *pp = arg; - nni_pair_sock *pair = pp->pair; - nni_msgq *urq = pair->urq; - nni_msgq *uwq = pair->uwq; - nni_pipe *pipe = pp->pipe; + nni_pair_pipe *ppipe = arg; + nni_msgq *urq = ppipe->psock->urq; + nni_msgq *uwq = ppipe->psock->uwq; + nni_pipe *npipe = ppipe->npipe; nni_msg *msg; int rv; for (;;) { - rv = nni_pipe_recv(pipe, &msg); + rv = nni_pipe_recv(npipe, &msg); if (rv != 0) { break; } - rv = nni_msgq_put_sig(urq, msg, &pp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgq_signal(uwq, &pp->sigclose); - nni_pipe_close(pipe); + nni_msgq_signal(uwq, &ppipe->sigclose); + nni_pipe_close(npipe); } -// TODO: probably we could replace these with NULL, since we have no -// protocol specific options? static int nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - return (NNG_ENOTSUP); + nni_pair_sock *psock = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); } static int nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - return (NNG_ENOTSUP); + nni_pair_sock *psock = arg; + int rv; + + switch (opt) { + case NNG_OPT_RAW: + rv = nni_getopt_int(&psock->raw, buf, szp); + break; + default: + rv = NNG_ENOTSUP; + } + return (rv); } |
