aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/protocol/pair/pair.c129
1 files changed, 72 insertions, 57 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index ec6709ac..ff02c2d1 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -21,10 +21,11 @@ typedef struct nni_pair_sock nni_pair_sock;
// An nni_pair_sock is our per-socket protocol private structure.
struct nni_pair_sock {
- nni_sock * sock;
- nni_pair_pipe * pipe;
+ nni_sock * nsock;
+ nni_pair_pipe * ppipe;
nni_msgq * uwq;
nni_msgq * urq;
+ int raw;
};
// An nni_pair_pipe is our per-pipe protocol private structure. We keep
@@ -32,8 +33,8 @@ struct nni_pair_sock {
// pipe. The separate data structure is more like other protocols that do
// manage multiple pipes.
struct nni_pair_pipe {
- nni_pipe * pipe;
- nni_pair_sock * pair;
+ nni_pipe * npipe;
+ nni_pair_sock * psock;
int sigclose;
};
@@ -41,19 +42,20 @@ static void nni_pair_receiver(void *);
static void nni_pair_sender(void *);
static int
-nni_pair_sock_init(void **pairp, nni_sock *sock)
+nni_pair_sock_init(void **sp, nni_sock *nsock)
{
- nni_pair_sock *pair;
+ nni_pair_sock *psock;
int rv;
- if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) {
+ if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
return (NNG_ENOMEM);
}
- pair->sock = sock;
- pair->pipe = NULL;
- pair->uwq = nni_sock_sendq(sock);
- pair->urq = nni_sock_recvq(sock);
- *pairp = pair;
+ psock->nsock = nsock;
+ psock->ppipe = NULL;
+ psock->raw = 0;
+ psock->uwq = nni_sock_sendq(nsock);
+ psock->urq = nni_sock_recvq(nsock);
+ *sp = psock;
return (0);
}
@@ -61,28 +63,24 @@ nni_pair_sock_init(void **pairp, nni_sock *sock)
static void
nni_pair_sock_fini(void *arg)
{
- nni_pair_sock *pair = arg;
+ nni_pair_sock *psock = arg;
- // If we had any worker threads that we have not unregistered,
- // this wold be the time to shut them all down. We don't, because
- // the socket already shut us down, and we don't have any other
- // threads that run.
- NNI_FREE_STRUCT(pair);
+ NNI_FREE_STRUCT(psock);
}
static int
-nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
- nni_pair_pipe *pp;
+ nni_pair_pipe *ppipe;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
return (NNG_ENOMEM);
}
- pp->pipe = pipe;
- pp->sigclose = 0;
- pp->pair = psock;
- *ppp = pp;
+ ppipe->npipe = npipe;
+ ppipe->sigclose = 0;
+ ppipe->psock = psock;
+ *pp = ppipe;
return (0);
}
@@ -90,22 +88,22 @@ nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
static void
nni_pair_pipe_fini(void *arg)
{
- nni_pair_pipe *pp = arg;
+ nni_pair_pipe *ppipe = arg;
- NNI_FREE_STRUCT(pp);
+ NNI_FREE_STRUCT(ppipe);
}
static int
nni_pair_pipe_add(void *arg)
{
- nni_pair_pipe *pp = arg;
- nni_pair_sock *pair = pp->pair;
+ nni_pair_pipe *ppipe = arg;
+ nni_pair_sock *psock = ppipe->psock;
- if (pair->pipe != NULL) {
+ if (psock->ppipe != NULL) {
return (NNG_EBUSY); // Already have a peer, denied.
}
- pair->pipe = pp;
+ psock->ppipe = ppipe;
return (0);
}
@@ -113,11 +111,11 @@ nni_pair_pipe_add(void *arg)
static void
nni_pair_pipe_rem(void *arg)
{
- nni_pair_pipe *pp = arg;
- nni_pair_sock *pair = pp->pair;
+ nni_pair_pipe *ppipe = arg;
+ nni_pair_sock *psock = ppipe->psock;
- if (pair->pipe == pp) {
- pair->pipe = NULL;
+ if (psock->ppipe == ppipe) {
+ psock->ppipe = NULL;
}
}
@@ -125,70 +123,87 @@ nni_pair_pipe_rem(void *arg)
static void
nni_pair_pipe_send(void *arg)
{
- nni_pair_pipe *pp = arg;
- nni_pair_sock *pair = pp->pair;
- nni_msgq *uwq = pair->uwq;
- nni_msgq *urq = pair->urq;
- nni_pipe *pipe = pp->pipe;
+ nni_pair_pipe *ppipe = arg;
+ nni_pair_sock *psock = ppipe->psock;
+ nni_msgq *uwq = psock->uwq;
+ nni_msgq *urq = psock->urq;
+ nni_pipe *npipe = ppipe->npipe;
nni_msg *msg;
int rv;
for (;;) {
- rv = nni_msgq_get_sig(uwq, &msg, &pp->sigclose);
+ rv = nni_msgq_get_sig(uwq, &msg, &ppipe->sigclose);
if (rv != 0) {
break;
}
- rv = nni_pipe_send(pipe, msg);
+ rv = nni_pipe_send(npipe, msg);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(urq, &pp->sigclose);
- nni_pipe_close(pipe);
+ nni_msgq_signal(urq, &ppipe->sigclose);
+ nni_pipe_close(npipe);
}
static void
nni_pair_pipe_recv(void *arg)
{
- nni_pair_pipe *pp = arg;
- nni_pair_sock *pair = pp->pair;
- nni_msgq *urq = pair->urq;
- nni_msgq *uwq = pair->uwq;
- nni_pipe *pipe = pp->pipe;
+ nni_pair_pipe *ppipe = arg;
+ nni_msgq *urq = ppipe->psock->urq;
+ nni_msgq *uwq = ppipe->psock->uwq;
+ nni_pipe *npipe = ppipe->npipe;
nni_msg *msg;
int rv;
for (;;) {
- rv = nni_pipe_recv(pipe, &msg);
+ rv = nni_pipe_recv(npipe, &msg);
if (rv != 0) {
break;
}
- rv = nni_msgq_put_sig(urq, msg, &pp->sigclose);
+ rv = nni_msgq_put_sig(urq, msg, &ppipe->sigclose);
if (rv != 0) {
nni_msg_free(msg);
break;
}
}
- nni_msgq_signal(uwq, &pp->sigclose);
- nni_pipe_close(pipe);
+ nni_msgq_signal(uwq, &ppipe->sigclose);
+ nni_pipe_close(npipe);
}
-// TODO: probably we could replace these with NULL, since we have no
-// protocol specific options?
static int
nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- return (NNG_ENOTSUP);
+ nni_pair_sock *psock = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
}
static int
nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- return (NNG_ENOTSUP);
+ nni_pair_sock *psock = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ rv = nni_getopt_int(&psock->raw, buf, szp);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
}