summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-02 14:29:47 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-02 14:29:47 -0800
commitb6374f9d9b07c929522066f27ed9a7a05c6bb23b (patch)
tree9195694f13261ba5cd4d8f2446743f815a06619f /src/protocol
parentf729db021a4fd7c782cc08a07185c955f3567ea2 (diff)
downloadnng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.gz
nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.bz2
nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.zip
Protocol initialization restructuring.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pair/pair.c75
-rw-r--r--src/protocol/reqrep/rep.c82
-rw-r--r--src/protocol/reqrep/req.c75
3 files changed, 140 insertions, 92 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 037d5f1e..1d4e58d0 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -23,7 +23,6 @@ typedef struct nni_pair_sock nni_pair_sock;
struct nni_pair_sock {
nni_sock * sock;
nni_pair_pipe * pipe;
- nni_mtx mx;
nni_msgq * uwq;
nni_msgq * urq;
};
@@ -42,7 +41,7 @@ static void nni_pair_receiver(void *);
static void nni_pair_sender(void *);
static int
-nni_pair_create(void **pairp, nni_sock *sock)
+nni_pair_init(void **pairp, nni_sock *sock)
{
nni_pair_sock *pair;
int rv;
@@ -50,10 +49,6 @@ nni_pair_create(void **pairp, nni_sock *sock)
if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pair->mx)) != 0) {
- NNI_FREE_STRUCT(pair);
- return (rv);
- }
pair->sock = sock;
pair->pipe = NULL;
pair->uwq = nni_sock_sendq(sock);
@@ -64,7 +59,7 @@ nni_pair_create(void **pairp, nni_sock *sock)
static void
-nni_pair_destroy(void *arg)
+nni_pair_fini(void *arg)
{
nni_pair_sock *pair = arg;
@@ -72,49 +67,61 @@ nni_pair_destroy(void *arg)
// this wold be the time to shut them all down. We don't, because
// the socket already shut us down, and we don't have any other
// threads that run.
- nni_mtx_fini(&pair->mx);
NNI_FREE_STRUCT(pair);
}
static int
-nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data)
+nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
- nni_pair_sock *pair = arg;
- nni_pair_pipe *pp = data;
- int rv;
+ nni_pair_pipe *pp;
+ if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
pp->pipe = pipe;
pp->sigclose = 0;
- pp->pair = pair;
+ pp->pair = psock;
+ *ppp = pp;
+ return (0);
+}
+
+
+static void
+nni_pair_pipe_fini(void *arg)
+{
+ nni_pair_pipe *pp = arg;
+ NNI_FREE_STRUCT(pp);
+}
+
+static int
+nni_pair_pipe_add(void *arg)
+{
+ nni_pair_pipe *pp = arg;
+ nni_pair_sock *pair = pp->pair;
- nni_mtx_lock(&pair->mx);
if (pair->pipe != NULL) {
- nni_mtx_unlock(&pair->mx);
return (NNG_EBUSY); // Already have a peer, denied.
}
pair->pipe = pp;
- nni_mtx_unlock(&pair->mx);
return (0);
}
static void
-nni_pair_rem_pipe(void *arg, void *data)
+nni_pair_pipe_rem(void *arg)
{
- nni_pair_sock *pair = arg;
- nni_pair_pipe *pp = data;
+ nni_pair_pipe *pp = arg;
+ nni_pair_sock *pair = pp->pair;
- nni_mtx_lock(&pair->mx);
if (pair->pipe == pp) {
pair->pipe = NULL;
}
- nni_mtx_unlock(&pair->mx);
}
static void
-nni_pair_sender(void *arg)
+nni_pair_pipe_send(void *arg)
{
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
@@ -141,7 +148,7 @@ nni_pair_sender(void *arg)
static void
-nni_pair_receiver(void *arg)
+nni_pair_pipe_recv(void *arg)
{
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
@@ -185,17 +192,23 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_pair_protocol = {
+
+static nni_proto_pipe nni_pair_proto_pipe = {
+ .pipe_init = nni_pair_pipe_init,
+ .pipe_fini = nni_pair_pipe_fini,
+ .pipe_add = nni_pair_pipe_add,
+ .pipe_rem = nni_pair_pipe_rem,
+ .pipe_send = nni_pair_pipe_send,
+ .pipe_recv = nni_pair_pipe_recv,
+};
+
+nni_proto nni_pair_proto = {
.proto_self = NNG_PROTO_PAIR,
.proto_peer = NNG_PROTO_PAIR,
.proto_name = "pair",
- .proto_create = nni_pair_create,
- .proto_destroy = nni_pair_destroy,
- .proto_add_pipe = nni_pair_add_pipe,
- .proto_rem_pipe = nni_pair_rem_pipe,
- .proto_pipe_size = sizeof (nni_pair_pipe),
- .proto_pipe_send = nni_pair_sender,
- .proto_pipe_recv = nni_pair_receiver,
+ .proto_pipe = &nni_pair_proto_pipe,
+ .proto_init = nni_pair_init,
+ .proto_fini = nni_pair_fini,
.proto_setopt = nni_pair_setopt,
.proto_getopt = nni_pair_getopt,
.proto_recv_filter = NULL,
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 75f2af71..8d51ee0a 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -41,12 +41,10 @@ struct nni_rep_pipe {
int sigclose;
};
-static void nni_rep_receiver(void *);
-static void nni_rep_sender(void *);
static void nni_rep_topsender(void *);
static int
-nni_rep_create(void **repp, nni_sock *sock)
+nni_rep_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
int rv;
@@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock)
static void
-nni_rep_destroy(void *arg)
+nni_rep_fini(void *arg)
{
nni_rep_sock *rep = arg;
@@ -102,41 +100,60 @@ nni_rep_destroy(void *arg)
static int
-nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap)
+nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_rep_sock *rep = arg;
nni_rep_pipe *rp;
int rv;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
rp->pipe = pipe;
+ rp->rep = rsock;
rp->sigclose = 0;
+ *rpp = rp;
+ return (0);
+}
+
+
+static void
+nni_rep_pipe_fini(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+
+ nni_msgq_fini(rp->sendq);
+ NNI_FREE_STRUCT(rp);
+}
- rv = nni_msgq_init(&rp->sendq, 2);
- if (rv != 0) {
- return (rv);
- }
+
+static int
+nni_rep_pipe_add(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
+ int rv;
nni_mtx_lock(&rep->mx);
- if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) {
- nni_msgq_fini(rp->sendq);
- nni_mtx_unlock(&rep->mx);
- return (rv);
- }
+ rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
nni_mtx_unlock(&rep->mx);
- return (0);
+
+ return (rv);
}
static void
-nni_rep_rem_pipe(void *arg, void *data)
+nni_rep_pipe_rem(void *arg)
{
- nni_rep_sock *rep = arg;
- nni_rep_pipe *rp = data;
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
nni_mtx_unlock(&rep->mx);
- nni_msgq_fini(rp->sendq);
}
@@ -197,7 +214,7 @@ nni_rep_topsender(void *arg)
static void
-nni_rep_sender(void *arg)
+nni_rep_pipe_send(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -227,7 +244,7 @@ nni_rep_sender(void *arg)
static void
-nni_rep_receiver(void *arg)
+nni_rep_pipe_recv(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_rep_protocol = {
+static nni_proto_pipe nni_rep_proto_pipe = {
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_add = nni_rep_pipe_add,
+ .pipe_rem = nni_rep_pipe_rem,
+ .pipe_send = nni_rep_pipe_send,
+ .pipe_recv = nni_rep_pipe_recv,
+};
+
+nni_proto nni_rep_protocol = {
.proto_self = NNG_PROTO_REP,
.proto_peer = NNG_PROTO_REQ,
.proto_name = "rep",
- .proto_create = nni_rep_create,
- .proto_destroy = nni_rep_destroy,
- .proto_add_pipe = nni_rep_add_pipe,
- .proto_rem_pipe = nni_rep_rem_pipe,
- .proto_pipe_size = sizeof (nni_rep_pipe),
- .proto_pipe_send = nni_rep_sender,
- .proto_pipe_recv = nni_rep_receiver,
+ .proto_pipe = &nni_rep_proto_pipe,
+ .proto_init = nni_rep_init,
+ .proto_fini = nni_rep_fini,
.proto_setopt = nni_rep_setopt,
.proto_getopt = nni_rep_getopt,
.proto_recv_filter = nni_rep_recvfilter,
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index ba7d4f6b..c26d66e8 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -31,7 +31,6 @@ struct nni_req_sock {
nni_thr resender;
int raw;
int closing;
- nni_list pipes;
nni_msg * reqmsg;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
@@ -45,12 +44,10 @@ struct nni_req_pipe {
nni_list_node node;
};
-static void nni_req_receiver(void *);
-static void nni_req_sender(void *);
static void nni_req_resender(void *);
static int
-nni_req_create(void **reqp, nni_sock *sock)
+nni_req_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
int rv;
@@ -74,7 +71,6 @@ nni_req_create(void **reqp, nni_sock *sock)
req->reqmsg = NULL;
req->raw = 0;
req->resend = NNI_TIME_ZERO;
- NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
@@ -93,7 +89,7 @@ nni_req_create(void **reqp, nni_sock *sock)
static void
-nni_req_destroy(void *arg)
+nni_req_fini(void *arg)
{
nni_req_sock *req = arg;
@@ -112,37 +108,49 @@ nni_req_destroy(void *arg)
static int
-nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data)
+nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp = data;
- int rv;
+ nni_req_pipe *rp;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
rp->pipe = pipe;
rp->sigclose = 0;
- rp->req = req;
-
- nni_mtx_lock(&req->mx);
- nni_list_append(&req->pipes, rp);
- nni_mtx_unlock(&req->mx);
+ rp->req = rsock;
return (0);
}
static void
-nni_req_rem_pipe(void *arg, void *data)
+nni_req_pipe_fini(void *arg)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp = data;
+ nni_req_pipe *rp = arg;
- nni_mtx_lock(&req->mx);
- nni_list_remove(&req->pipes, rp);
- nni_mtx_unlock(&req->mx);
+ NNI_FREE_STRUCT(rp);
+}
+
+
+static int
+nni_req_pipe_add(void *arg)
+{
+ // We have nothing to do, since we don't need to maintain a global
+ // list of related pipes.
+ NNI_ARG_UNUSED(arg);
+ return (0);
+}
+
+
+static void
+nni_req_pipe_rem(void *arg)
+{
+ // As with add, nothing to do here.
+ NNI_ARG_UNUSED(arg);
}
static void
-nni_req_sender(void *arg)
+nni_req_pipe_send(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
@@ -169,7 +177,7 @@ nni_req_sender(void *arg)
static void
-nni_req_receiver(void *arg)
+nni_req_pipe_recv(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
@@ -397,19 +405,24 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_req_protocol = {
+static nni_proto_pipe nni_req_proto_pipe = {
+ .pipe_init = nni_req_pipe_init,
+ .pipe_fini = nni_req_pipe_fini,
+ .pipe_add = nni_req_pipe_add,
+ .pipe_rem = nni_req_pipe_rem,
+ .pipe_send = nni_req_pipe_send,
+ .pipe_recv = nni_req_pipe_recv,
+};
+
+nni_proto nni_req_protocol = {
.proto_self = NNG_PROTO_REQ,
.proto_peer = NNG_PROTO_REP,
.proto_name = "req",
- .proto_create = nni_req_create,
- .proto_destroy = nni_req_destroy,
- .proto_add_pipe = nni_req_add_pipe,
- .proto_rem_pipe = nni_req_rem_pipe,
+ .proto_pipe = &nni_req_proto_pipe,
+ .proto_init = nni_req_init,
+ .proto_fini = nni_req_fini,
.proto_setopt = nni_req_setopt,
.proto_getopt = nni_req_getopt,
- .proto_pipe_size = sizeof (nni_req_pipe),
- .proto_pipe_send = nni_req_sender,
- .proto_pipe_recv = nni_req_receiver,
.proto_recv_filter = nni_req_recvfilter,
.proto_send_filter = nni_req_sendfilter,
};