aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/rep.c')
-rw-r--r--src/protocol/reqrep/rep.c82
1 files changed, 52 insertions, 30 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 75f2af71..8d51ee0a 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -41,12 +41,10 @@ struct nni_rep_pipe {
int sigclose;
};
-static void nni_rep_receiver(void *);
-static void nni_rep_sender(void *);
static void nni_rep_topsender(void *);
static int
-nni_rep_create(void **repp, nni_sock *sock)
+nni_rep_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
int rv;
@@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock)
static void
-nni_rep_destroy(void *arg)
+nni_rep_fini(void *arg)
{
nni_rep_sock *rep = arg;
@@ -102,41 +100,60 @@ nni_rep_destroy(void *arg)
static int
-nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap)
+nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_rep_sock *rep = arg;
nni_rep_pipe *rp;
int rv;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
rp->pipe = pipe;
+ rp->rep = rsock;
rp->sigclose = 0;
+ *rpp = rp;
+ return (0);
+}
+
+
+static void
+nni_rep_pipe_fini(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+
+ nni_msgq_fini(rp->sendq);
+ NNI_FREE_STRUCT(rp);
+}
- rv = nni_msgq_init(&rp->sendq, 2);
- if (rv != 0) {
- return (rv);
- }
+
+static int
+nni_rep_pipe_add(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
+ int rv;
nni_mtx_lock(&rep->mx);
- if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) {
- nni_msgq_fini(rp->sendq);
- nni_mtx_unlock(&rep->mx);
- return (rv);
- }
+ rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
nni_mtx_unlock(&rep->mx);
- return (0);
+
+ return (rv);
}
static void
-nni_rep_rem_pipe(void *arg, void *data)
+nni_rep_pipe_rem(void *arg)
{
- nni_rep_sock *rep = arg;
- nni_rep_pipe *rp = data;
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
nni_mtx_unlock(&rep->mx);
- nni_msgq_fini(rp->sendq);
}
@@ -197,7 +214,7 @@ nni_rep_topsender(void *arg)
static void
-nni_rep_sender(void *arg)
+nni_rep_pipe_send(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -227,7 +244,7 @@ nni_rep_sender(void *arg)
static void
-nni_rep_receiver(void *arg)
+nni_rep_pipe_recv(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_rep_protocol = {
+static nni_proto_pipe nni_rep_proto_pipe = {
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_add = nni_rep_pipe_add,
+ .pipe_rem = nni_rep_pipe_rem,
+ .pipe_send = nni_rep_pipe_send,
+ .pipe_recv = nni_rep_pipe_recv,
+};
+
+nni_proto nni_rep_protocol = {
.proto_self = NNG_PROTO_REP,
.proto_peer = NNG_PROTO_REQ,
.proto_name = "rep",
- .proto_create = nni_rep_create,
- .proto_destroy = nni_rep_destroy,
- .proto_add_pipe = nni_rep_add_pipe,
- .proto_rem_pipe = nni_rep_rem_pipe,
- .proto_pipe_size = sizeof (nni_rep_pipe),
- .proto_pipe_send = nni_rep_sender,
- .proto_pipe_recv = nni_rep_receiver,
+ .proto_pipe = &nni_rep_proto_pipe,
+ .proto_init = nni_rep_init,
+ .proto_fini = nni_rep_fini,
.proto_setopt = nni_rep_setopt,
.proto_getopt = nni_rep_getopt,
.proto_recv_filter = nni_rep_recvfilter,