diff options
Diffstat (limited to 'src/protocol/reqrep/rep.c')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 82 |
1 files changed, 52 insertions, 30 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 75f2af71..8d51ee0a 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -41,12 +41,10 @@ struct nni_rep_pipe { int sigclose; }; -static void nni_rep_receiver(void *); -static void nni_rep_sender(void *); static void nni_rep_topsender(void *); static int -nni_rep_create(void **repp, nni_sock *sock) +nni_rep_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; int rv; @@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock) static void -nni_rep_destroy(void *arg) +nni_rep_fini(void *arg) { nni_rep_sock *rep = arg; @@ -102,41 +100,60 @@ nni_rep_destroy(void *arg) static int -nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) +nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { - nni_rep_sock *rep = arg; nni_rep_pipe *rp; int rv; + if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + NNI_FREE_STRUCT(rp); + return (rv); + } rp->pipe = pipe; + rp->rep = rsock; rp->sigclose = 0; + *rpp = rp; + return (0); +} + + +static void +nni_rep_pipe_fini(void *arg) +{ + nni_rep_pipe *rp = arg; + + nni_msgq_fini(rp->sendq); + NNI_FREE_STRUCT(rp); +} - rv = nni_msgq_init(&rp->sendq, 2); - if (rv != 0) { - return (rv); - } + +static int +nni_rep_pipe_add(void *arg) +{ + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; + int rv; nni_mtx_lock(&rep->mx); - if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { - nni_msgq_fini(rp->sendq); - nni_mtx_unlock(&rep->mx); - return (rv); - } + rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); nni_mtx_unlock(&rep->mx); - return (0); + + return (rv); } static void -nni_rep_rem_pipe(void *arg, void *data) +nni_rep_pipe_rem(void *arg) { - nni_rep_sock *rep = arg; - nni_rep_pipe *rp = data; + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); nni_mtx_unlock(&rep->mx); - nni_msgq_fini(rp->sendq); } @@ -197,7 +214,7 @@ nni_rep_topsender(void *arg) static void -nni_rep_sender(void *arg) +nni_rep_pipe_send(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; @@ -227,7 +244,7 @@ nni_rep_sender(void *arg) static void -nni_rep_receiver(void *arg) +nni_rep_pipe_recv(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; @@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_rep_protocol = { +static nni_proto_pipe nni_rep_proto_pipe = { + .pipe_init = nni_rep_pipe_init, + .pipe_fini = nni_rep_pipe_fini, + .pipe_add = nni_rep_pipe_add, + .pipe_rem = nni_rep_pipe_rem, + .pipe_send = nni_rep_pipe_send, + .pipe_recv = nni_rep_pipe_recv, +}; + +nni_proto nni_rep_protocol = { .proto_self = NNG_PROTO_REP, .proto_peer = NNG_PROTO_REQ, .proto_name = "rep", - .proto_create = nni_rep_create, - .proto_destroy = nni_rep_destroy, - .proto_add_pipe = nni_rep_add_pipe, - .proto_rem_pipe = nni_rep_rem_pipe, - .proto_pipe_size = sizeof (nni_rep_pipe), - .proto_pipe_send = nni_rep_sender, - .proto_pipe_recv = nni_rep_receiver, + .proto_pipe = &nni_rep_proto_pipe, + .proto_init = nni_rep_init, + .proto_fini = nni_rep_fini, .proto_setopt = nni_rep_setopt, .proto_getopt = nni_rep_getopt, .proto_recv_filter = nni_rep_recvfilter, |
