aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep')
-rw-r--r--src/protocol/reqrep/rep.c82
-rw-r--r--src/protocol/reqrep/req.c75
2 files changed, 96 insertions, 61 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 75f2af71..8d51ee0a 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -41,12 +41,10 @@ struct nni_rep_pipe {
int sigclose;
};
-static void nni_rep_receiver(void *);
-static void nni_rep_sender(void *);
static void nni_rep_topsender(void *);
static int
-nni_rep_create(void **repp, nni_sock *sock)
+nni_rep_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
int rv;
@@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock)
static void
-nni_rep_destroy(void *arg)
+nni_rep_fini(void *arg)
{
nni_rep_sock *rep = arg;
@@ -102,41 +100,60 @@ nni_rep_destroy(void *arg)
static int
-nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap)
+nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_rep_sock *rep = arg;
nni_rep_pipe *rp;
int rv;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
rp->pipe = pipe;
+ rp->rep = rsock;
rp->sigclose = 0;
+ *rpp = rp;
+ return (0);
+}
+
+
+static void
+nni_rep_pipe_fini(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+
+ nni_msgq_fini(rp->sendq);
+ NNI_FREE_STRUCT(rp);
+}
- rv = nni_msgq_init(&rp->sendq, 2);
- if (rv != 0) {
- return (rv);
- }
+
+static int
+nni_rep_pipe_add(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
+ int rv;
nni_mtx_lock(&rep->mx);
- if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) {
- nni_msgq_fini(rp->sendq);
- nni_mtx_unlock(&rep->mx);
- return (rv);
- }
+ rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
nni_mtx_unlock(&rep->mx);
- return (0);
+
+ return (rv);
}
static void
-nni_rep_rem_pipe(void *arg, void *data)
+nni_rep_pipe_rem(void *arg)
{
- nni_rep_sock *rep = arg;
- nni_rep_pipe *rp = data;
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
nni_mtx_unlock(&rep->mx);
- nni_msgq_fini(rp->sendq);
}
@@ -197,7 +214,7 @@ nni_rep_topsender(void *arg)
static void
-nni_rep_sender(void *arg)
+nni_rep_pipe_send(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -227,7 +244,7 @@ nni_rep_sender(void *arg)
static void
-nni_rep_receiver(void *arg)
+nni_rep_pipe_recv(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_rep_protocol = {
+static nni_proto_pipe nni_rep_proto_pipe = {
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_add = nni_rep_pipe_add,
+ .pipe_rem = nni_rep_pipe_rem,
+ .pipe_send = nni_rep_pipe_send,
+ .pipe_recv = nni_rep_pipe_recv,
+};
+
+nni_proto nni_rep_protocol = {
.proto_self = NNG_PROTO_REP,
.proto_peer = NNG_PROTO_REQ,
.proto_name = "rep",
- .proto_create = nni_rep_create,
- .proto_destroy = nni_rep_destroy,
- .proto_add_pipe = nni_rep_add_pipe,
- .proto_rem_pipe = nni_rep_rem_pipe,
- .proto_pipe_size = sizeof (nni_rep_pipe),
- .proto_pipe_send = nni_rep_sender,
- .proto_pipe_recv = nni_rep_receiver,
+ .proto_pipe = &nni_rep_proto_pipe,
+ .proto_init = nni_rep_init,
+ .proto_fini = nni_rep_fini,
.proto_setopt = nni_rep_setopt,
.proto_getopt = nni_rep_getopt,
.proto_recv_filter = nni_rep_recvfilter,
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index ba7d4f6b..c26d66e8 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -31,7 +31,6 @@ struct nni_req_sock {
nni_thr resender;
int raw;
int closing;
- nni_list pipes;
nni_msg * reqmsg;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
@@ -45,12 +44,10 @@ struct nni_req_pipe {
nni_list_node node;
};
-static void nni_req_receiver(void *);
-static void nni_req_sender(void *);
static void nni_req_resender(void *);
static int
-nni_req_create(void **reqp, nni_sock *sock)
+nni_req_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
int rv;
@@ -74,7 +71,6 @@ nni_req_create(void **reqp, nni_sock *sock)
req->reqmsg = NULL;
req->raw = 0;
req->resend = NNI_TIME_ZERO;
- NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
@@ -93,7 +89,7 @@ nni_req_create(void **reqp, nni_sock *sock)
static void
-nni_req_destroy(void *arg)
+nni_req_fini(void *arg)
{
nni_req_sock *req = arg;
@@ -112,37 +108,49 @@ nni_req_destroy(void *arg)
static int
-nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data)
+nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp = data;
- int rv;
+ nni_req_pipe *rp;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
rp->pipe = pipe;
rp->sigclose = 0;
- rp->req = req;
-
- nni_mtx_lock(&req->mx);
- nni_list_append(&req->pipes, rp);
- nni_mtx_unlock(&req->mx);
+ rp->req = rsock;
return (0);
}
static void
-nni_req_rem_pipe(void *arg, void *data)
+nni_req_pipe_fini(void *arg)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp = data;
+ nni_req_pipe *rp = arg;
- nni_mtx_lock(&req->mx);
- nni_list_remove(&req->pipes, rp);
- nni_mtx_unlock(&req->mx);
+ NNI_FREE_STRUCT(rp);
+}
+
+
+static int
+nni_req_pipe_add(void *arg)
+{
+ // We have nothing to do, since we don't need to maintain a global
+ // list of related pipes.
+ NNI_ARG_UNUSED(arg);
+ return (0);
+}
+
+
+static void
+nni_req_pipe_rem(void *arg)
+{
+ // As with add, nothing to do here.
+ NNI_ARG_UNUSED(arg);
}
static void
-nni_req_sender(void *arg)
+nni_req_pipe_send(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
@@ -169,7 +177,7 @@ nni_req_sender(void *arg)
static void
-nni_req_receiver(void *arg)
+nni_req_pipe_recv(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
@@ -397,19 +405,24 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_req_protocol = {
+static nni_proto_pipe nni_req_proto_pipe = {
+ .pipe_init = nni_req_pipe_init,
+ .pipe_fini = nni_req_pipe_fini,
+ .pipe_add = nni_req_pipe_add,
+ .pipe_rem = nni_req_pipe_rem,
+ .pipe_send = nni_req_pipe_send,
+ .pipe_recv = nni_req_pipe_recv,
+};
+
+nni_proto nni_req_protocol = {
.proto_self = NNG_PROTO_REQ,
.proto_peer = NNG_PROTO_REP,
.proto_name = "req",
- .proto_create = nni_req_create,
- .proto_destroy = nni_req_destroy,
- .proto_add_pipe = nni_req_add_pipe,
- .proto_rem_pipe = nni_req_rem_pipe,
+ .proto_pipe = &nni_req_proto_pipe,
+ .proto_init = nni_req_init,
+ .proto_fini = nni_req_fini,
.proto_setopt = nni_req_setopt,
.proto_getopt = nni_req_getopt,
- .proto_pipe_size = sizeof (nni_req_pipe),
- .proto_pipe_send = nni_req_sender,
- .proto_pipe_recv = nni_req_receiver,
.proto_recv_filter = nni_req_recvfilter,
.proto_send_filter = nni_req_sendfilter,
};