diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-02 14:29:47 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-02 14:29:47 -0800 |
| commit | b6374f9d9b07c929522066f27ed9a7a05c6bb23b (patch) | |
| tree | 9195694f13261ba5cd4d8f2446743f815a06619f /src/protocol | |
| parent | f729db021a4fd7c782cc08a07185c955f3567ea2 (diff) | |
| download | nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.gz nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.bz2 nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.zip | |
Protocol initialization restructuring.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/pair/pair.c | 75 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 82 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 75 |
3 files changed, 140 insertions, 92 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 037d5f1e..1d4e58d0 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -23,7 +23,6 @@ typedef struct nni_pair_sock nni_pair_sock; struct nni_pair_sock { nni_sock * sock; nni_pair_pipe * pipe; - nni_mtx mx; nni_msgq * uwq; nni_msgq * urq; }; @@ -42,7 +41,7 @@ static void nni_pair_receiver(void *); static void nni_pair_sender(void *); static int -nni_pair_create(void **pairp, nni_sock *sock) +nni_pair_init(void **pairp, nni_sock *sock) { nni_pair_sock *pair; int rv; @@ -50,10 +49,6 @@ nni_pair_create(void **pairp, nni_sock *sock) if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pair->mx)) != 0) { - NNI_FREE_STRUCT(pair); - return (rv); - } pair->sock = sock; pair->pipe = NULL; pair->uwq = nni_sock_sendq(sock); @@ -64,7 +59,7 @@ nni_pair_create(void **pairp, nni_sock *sock) static void -nni_pair_destroy(void *arg) +nni_pair_fini(void *arg) { nni_pair_sock *pair = arg; @@ -72,49 +67,61 @@ nni_pair_destroy(void *arg) // this wold be the time to shut them all down. We don't, because // the socket already shut us down, and we don't have any other // threads that run. - nni_mtx_fini(&pair->mx); NNI_FREE_STRUCT(pair); } static int -nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data) +nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { - nni_pair_sock *pair = arg; - nni_pair_pipe *pp = data; - int rv; + nni_pair_pipe *pp; + if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { + return (NNG_ENOMEM); + } pp->pipe = pipe; pp->sigclose = 0; - pp->pair = pair; + pp->pair = psock; + *ppp = pp; + return (0); +} + + +static void +nni_pair_pipe_fini(void *arg) +{ + nni_pair_pipe *pp = arg; + NNI_FREE_STRUCT(pp); +} + +static int +nni_pair_pipe_add(void *arg) +{ + nni_pair_pipe *pp = arg; + nni_pair_sock *pair = pp->pair; - nni_mtx_lock(&pair->mx); if (pair->pipe != NULL) { - nni_mtx_unlock(&pair->mx); return (NNG_EBUSY); // Already have a peer, denied. } pair->pipe = pp; - nni_mtx_unlock(&pair->mx); return (0); } static void -nni_pair_rem_pipe(void *arg, void *data) +nni_pair_pipe_rem(void *arg) { - nni_pair_sock *pair = arg; - nni_pair_pipe *pp = data; + nni_pair_pipe *pp = arg; + nni_pair_sock *pair = pp->pair; - nni_mtx_lock(&pair->mx); if (pair->pipe == pp) { pair->pipe = NULL; } - nni_mtx_unlock(&pair->mx); } static void -nni_pair_sender(void *arg) +nni_pair_pipe_send(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; @@ -141,7 +148,7 @@ nni_pair_sender(void *arg) static void -nni_pair_receiver(void *arg) +nni_pair_pipe_recv(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; @@ -185,17 +192,23 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_pair_protocol = { + +static nni_proto_pipe nni_pair_proto_pipe = { + .pipe_init = nni_pair_pipe_init, + .pipe_fini = nni_pair_pipe_fini, + .pipe_add = nni_pair_pipe_add, + .pipe_rem = nni_pair_pipe_rem, + .pipe_send = nni_pair_pipe_send, + .pipe_recv = nni_pair_pipe_recv, +}; + +nni_proto nni_pair_proto = { .proto_self = NNG_PROTO_PAIR, .proto_peer = NNG_PROTO_PAIR, .proto_name = "pair", - .proto_create = nni_pair_create, - .proto_destroy = nni_pair_destroy, - .proto_add_pipe = nni_pair_add_pipe, - .proto_rem_pipe = nni_pair_rem_pipe, - .proto_pipe_size = sizeof (nni_pair_pipe), - .proto_pipe_send = nni_pair_sender, - .proto_pipe_recv = nni_pair_receiver, + .proto_pipe = &nni_pair_proto_pipe, + .proto_init = nni_pair_init, + .proto_fini = nni_pair_fini, .proto_setopt = nni_pair_setopt, .proto_getopt = nni_pair_getopt, .proto_recv_filter = NULL, diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 75f2af71..8d51ee0a 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -41,12 +41,10 @@ struct nni_rep_pipe { int sigclose; }; -static void nni_rep_receiver(void *); -static void nni_rep_sender(void *); static void nni_rep_topsender(void *); static int -nni_rep_create(void **repp, nni_sock *sock) +nni_rep_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; int rv; @@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock) static void -nni_rep_destroy(void *arg) +nni_rep_fini(void *arg) { nni_rep_sock *rep = arg; @@ -102,41 +100,60 @@ nni_rep_destroy(void *arg) static int -nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) +nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { - nni_rep_sock *rep = arg; nni_rep_pipe *rp; int rv; + if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + NNI_FREE_STRUCT(rp); + return (rv); + } rp->pipe = pipe; + rp->rep = rsock; rp->sigclose = 0; + *rpp = rp; + return (0); +} + + +static void +nni_rep_pipe_fini(void *arg) +{ + nni_rep_pipe *rp = arg; + + nni_msgq_fini(rp->sendq); + NNI_FREE_STRUCT(rp); +} - rv = nni_msgq_init(&rp->sendq, 2); - if (rv != 0) { - return (rv); - } + +static int +nni_rep_pipe_add(void *arg) +{ + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; + int rv; nni_mtx_lock(&rep->mx); - if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { - nni_msgq_fini(rp->sendq); - nni_mtx_unlock(&rep->mx); - return (rv); - } + rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); nni_mtx_unlock(&rep->mx); - return (0); + + return (rv); } static void -nni_rep_rem_pipe(void *arg, void *data) +nni_rep_pipe_rem(void *arg) { - nni_rep_sock *rep = arg; - nni_rep_pipe *rp = data; + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); nni_mtx_unlock(&rep->mx); - nni_msgq_fini(rp->sendq); } @@ -197,7 +214,7 @@ nni_rep_topsender(void *arg) static void -nni_rep_sender(void *arg) +nni_rep_pipe_send(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; @@ -227,7 +244,7 @@ nni_rep_sender(void *arg) static void -nni_rep_receiver(void *arg) +nni_rep_pipe_recv(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; @@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_rep_protocol = { +static nni_proto_pipe nni_rep_proto_pipe = { + .pipe_init = nni_rep_pipe_init, + .pipe_fini = nni_rep_pipe_fini, + .pipe_add = nni_rep_pipe_add, + .pipe_rem = nni_rep_pipe_rem, + .pipe_send = nni_rep_pipe_send, + .pipe_recv = nni_rep_pipe_recv, +}; + +nni_proto nni_rep_protocol = { .proto_self = NNG_PROTO_REP, .proto_peer = NNG_PROTO_REQ, .proto_name = "rep", - .proto_create = nni_rep_create, - .proto_destroy = nni_rep_destroy, - .proto_add_pipe = nni_rep_add_pipe, - .proto_rem_pipe = nni_rep_rem_pipe, - .proto_pipe_size = sizeof (nni_rep_pipe), - .proto_pipe_send = nni_rep_sender, - .proto_pipe_recv = nni_rep_receiver, + .proto_pipe = &nni_rep_proto_pipe, + .proto_init = nni_rep_init, + .proto_fini = nni_rep_fini, .proto_setopt = nni_rep_setopt, .proto_getopt = nni_rep_getopt, .proto_recv_filter = nni_rep_recvfilter, diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index ba7d4f6b..c26d66e8 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -31,7 +31,6 @@ struct nni_req_sock { nni_thr resender; int raw; int closing; - nni_list pipes; nni_msg * reqmsg; uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) @@ -45,12 +44,10 @@ struct nni_req_pipe { nni_list_node node; }; -static void nni_req_receiver(void *); -static void nni_req_sender(void *); static void nni_req_resender(void *); static int -nni_req_create(void **reqp, nni_sock *sock) +nni_req_init(void **reqp, nni_sock *sock) { nni_req_sock *req; int rv; @@ -74,7 +71,6 @@ nni_req_create(void **reqp, nni_sock *sock) req->reqmsg = NULL; req->raw = 0; req->resend = NNI_TIME_ZERO; - NNI_LIST_INIT(&req->pipes, nni_req_pipe, node); req->uwq = nni_sock_sendq(sock); req->urq = nni_sock_recvq(sock); @@ -93,7 +89,7 @@ nni_req_create(void **reqp, nni_sock *sock) static void -nni_req_destroy(void *arg) +nni_req_fini(void *arg) { nni_req_sock *req = arg; @@ -112,37 +108,49 @@ nni_req_destroy(void *arg) static int -nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data) +nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { - nni_req_sock *req = arg; - nni_req_pipe *rp = data; - int rv; + nni_req_pipe *rp; + if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + return (NNG_ENOMEM); + } rp->pipe = pipe; rp->sigclose = 0; - rp->req = req; - - nni_mtx_lock(&req->mx); - nni_list_append(&req->pipes, rp); - nni_mtx_unlock(&req->mx); + rp->req = rsock; return (0); } static void -nni_req_rem_pipe(void *arg, void *data) +nni_req_pipe_fini(void *arg) { - nni_req_sock *req = arg; - nni_req_pipe *rp = data; + nni_req_pipe *rp = arg; - nni_mtx_lock(&req->mx); - nni_list_remove(&req->pipes, rp); - nni_mtx_unlock(&req->mx); + NNI_FREE_STRUCT(rp); +} + + +static int +nni_req_pipe_add(void *arg) +{ + // We have nothing to do, since we don't need to maintain a global + // list of related pipes. + NNI_ARG_UNUSED(arg); + return (0); +} + + +static void +nni_req_pipe_rem(void *arg) +{ + // As with add, nothing to do here. + NNI_ARG_UNUSED(arg); } static void -nni_req_sender(void *arg) +nni_req_pipe_send(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; @@ -169,7 +177,7 @@ nni_req_sender(void *arg) static void -nni_req_receiver(void *arg) +nni_req_pipe_recv(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; @@ -397,19 +405,24 @@ nni_req_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_req_protocol = { +static nni_proto_pipe nni_req_proto_pipe = { + .pipe_init = nni_req_pipe_init, + .pipe_fini = nni_req_pipe_fini, + .pipe_add = nni_req_pipe_add, + .pipe_rem = nni_req_pipe_rem, + .pipe_send = nni_req_pipe_send, + .pipe_recv = nni_req_pipe_recv, +}; + +nni_proto nni_req_protocol = { .proto_self = NNG_PROTO_REQ, .proto_peer = NNG_PROTO_REP, .proto_name = "req", - .proto_create = nni_req_create, - .proto_destroy = nni_req_destroy, - .proto_add_pipe = nni_req_add_pipe, - .proto_rem_pipe = nni_req_rem_pipe, + .proto_pipe = &nni_req_proto_pipe, + .proto_init = nni_req_init, + .proto_fini = nni_req_fini, .proto_setopt = nni_req_setopt, .proto_getopt = nni_req_getopt, - .proto_pipe_size = sizeof (nni_req_pipe), - .proto_pipe_send = nni_req_sender, - .proto_pipe_recv = nni_req_receiver, .proto_recv_filter = nni_req_recvfilter, .proto_send_filter = nni_req_sendfilter, }; |
