diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-08 11:18:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-08 11:18:16 -0800 |
| commit | ec2574b09a746709f15d2a3f5de135e29f4bcb52 (patch) | |
| tree | 25f970232f8093b9ce94969eeed2a5f230e94a89 /src/protocol | |
| parent | 360d19001b90d92ac2f232efb67e356979b0bc4b (diff) | |
| download | nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.gz nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.bz2 nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.zip | |
Move to generic socket & pipe workers, and up to 4 each.
This should eliminate all need for protocols to do their own
thread management tasks.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/pair/pair.c | 9 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 10 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 10 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 21 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 13 |
8 files changed, 22 insertions, 68 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 51608e07..ec6709ac 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -200,20 +200,15 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_fini = nni_pair_pipe_fini, .pipe_add = nni_pair_pipe_add, .pipe_rem = nni_pair_pipe_rem, - .pipe_send = nni_pair_pipe_send, - .pipe_recv = nni_pair_pipe_recv, + .pipe_worker = { nni_pair_pipe_send, + nni_pair_pipe_recv }, }; static nni_proto_sock_ops nni_pair_sock_ops = { .sock_init = nni_pair_sock_init, .sock_fini = nni_pair_sock_fini, - .sock_close = NULL, .sock_setopt = nni_pair_sock_setopt, .sock_getopt = nni_pair_sock_getopt, - .sock_rfilter = NULL, - .sock_sfilter = NULL, - .sock_send = NULL, - .sock_recv = NULL, }; nni_proto nni_pair_proto = { diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 9612177c..dc37e72b 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -139,22 +139,14 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_add = NULL, - .pipe_rem = NULL, - .pipe_send = NULL, - .pipe_recv = nni_pull_pipe_recv, + .pipe_worker = { nni_pull_pipe_recv }, }; static nni_proto_sock_ops nni_pull_sock_ops = { .sock_init = nni_pull_sock_init, .sock_fini = nni_pull_sock_fini, - .sock_close = NULL, .sock_setopt = nni_pull_sock_setopt, .sock_getopt = nni_pull_sock_getopt, - .sock_send = NULL, - .sock_recv = NULL, - .sock_rfilter = NULL, - .sock_sfilter = NULL, }; nni_proto nni_pull_proto = { diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 6cdc9cc5..e3b9ace8 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -288,8 +288,8 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, .pipe_rem = nni_push_pipe_rem, - .pipe_send = nni_push_pipe_send, - .pipe_recv = nni_push_pipe_recv, + .pipe_worker = { nni_push_pipe_send, + nni_push_pipe_recv }, }; static nni_proto_sock_ops nni_push_sock_ops = { @@ -298,10 +298,7 @@ static nni_proto_sock_ops nni_push_sock_ops = { .sock_close = nni_push_sock_close, .sock_setopt = nni_push_sock_setopt, .sock_getopt = nni_push_sock_getopt, - .sock_send = nni_push_sock_send, - .sock_recv = NULL, - .sock_rfilter = NULL, - .sock_sfilter = NULL, + .sock_worker = { nni_push_sock_send }, }; nni_proto nni_push_proto = { diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 684b916d..b597f896 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -257,20 +257,16 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_fini = nni_pub_pipe_fini, .pipe_add = nni_pub_pipe_add, .pipe_rem = nni_pub_pipe_rem, - .pipe_send = nni_pub_pipe_send, - .pipe_recv = nni_pub_pipe_recv, + .pipe_worker = { nni_pub_pipe_send, + nni_pub_pipe_recv }, }; nni_proto_sock_ops nni_pub_sock_ops = { .sock_init = nni_pub_sock_init, .sock_fini = nni_pub_sock_fini, - .sock_close = NULL, .sock_setopt = nni_pub_sock_setopt, .sock_getopt = nni_pub_sock_getopt, - .sock_send = nni_pub_sock_send, - .sock_recv = NULL, - .sock_rfilter = NULL, - .sock_sfilter = NULL, + .sock_worker = { nni_pub_sock_send }, }; nni_proto nni_pub_proto = { diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index dd288d5e..0a6ce5fd 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -297,22 +297,15 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg) static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, - .pipe_add = NULL, - .pipe_rem = NULL, - .pipe_send = NULL, - .pipe_recv = nni_sub_pipe_recv, + .pipe_worker = { nni_sub_pipe_recv }, }; static nni_proto_sock_ops nni_sub_sock_ops = { .sock_init = nni_sub_sock_init, .sock_fini = nni_sub_sock_fini, - .sock_close = NULL, .sock_setopt = nni_sub_sock_setopt, .sock_getopt = nni_sub_sock_getopt, .sock_rfilter = nni_sub_sock_rfilter, - .sock_sfilter = NULL, - .sock_send = NULL, - .sock_recv = NULL, }; nni_proto nni_sub_proto = { diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 3cf215b5..485fd4bc 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -26,7 +26,6 @@ struct nni_rep_sock { nni_msgq * urq; int raw; int ttl; - nni_thr sender; nni_idhash * pipes; char * btrace; size_t btrace_len; @@ -395,20 +394,18 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_fini = nni_rep_pipe_fini, .pipe_add = nni_rep_pipe_add, .pipe_rem = nni_rep_pipe_rem, - .pipe_send = nni_rep_pipe_send, - .pipe_recv = nni_rep_pipe_recv, + .pipe_worker = { nni_rep_pipe_send, + nni_rep_pipe_recv }, }; static nni_proto_sock_ops nni_rep_sock_ops = { .sock_init = nni_rep_sock_init, .sock_fini = nni_rep_sock_fini, - .sock_close = NULL, .sock_setopt = nni_rep_sock_setopt, .sock_getopt = nni_rep_sock_getopt, .sock_rfilter = nni_rep_sock_rfilter, .sock_sfilter = nni_rep_sock_sfilter, - .sock_send = nni_rep_sock_send, - .sock_recv = NULL, + .sock_worker = { nni_rep_sock_send }, }; nni_proto nni_rep_proto = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index d8104342..3c3bba9c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -13,7 +13,7 @@ #include "core/nng_impl.h" // Request protocol. The REQ protocol is the "request" side of a -// request-reply pair. This is useful for bulding RPC clients, for +// request-reply pair. This is useful for building RPC clients, for // example. typedef struct nni_req_pipe nni_req_pipe; @@ -27,7 +27,6 @@ struct nni_req_sock { nni_msgq * urq; nni_duration retry; nni_time resend; - nni_thr resender; int raw; int closing; nni_msg * reqmsg; @@ -41,7 +40,6 @@ struct nni_req_pipe { nni_pipe * pipe; nni_req_sock * req; int sigclose; - nni_list_node node; }; static void nni_req_resender(void *); @@ -72,13 +70,6 @@ nni_req_sock_init(void **reqp, nni_sock *sock) req->urq = nni_sock_recvq(sock); *reqp = req; nni_sock_recverr(sock, NNG_ESTATE); - rv = nni_thr_init(&req->resender, nni_req_resender, req); - if (rv != 0) { - nni_cv_fini(&req->cv); - NNI_FREE_STRUCT(req); - return (rv); - } - nni_thr_run(&req->resender); return (0); } @@ -100,7 +91,6 @@ nni_req_sock_fini(void *arg) { nni_req_sock *req = arg; - nni_thr_fini(&req->resender); nni_cv_fini(&req->cv); if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); @@ -275,7 +265,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_req_resender(void *arg) +nni_req_sock_resend(void *arg) { nni_req_sock *req = arg; nni_mtx *mx = nni_sock_mtx(req->sock); @@ -395,8 +385,8 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_fini = nni_req_pipe_fini, .pipe_add = nni_req_pipe_add, .pipe_rem = nni_req_pipe_rem, - .pipe_send = nni_req_pipe_send, - .pipe_recv = nni_req_pipe_recv, + .pipe_worker = { nni_req_pipe_send, + nni_req_pipe_recv }, }; static nni_proto_sock_ops nni_req_sock_ops = { @@ -407,8 +397,7 @@ static nni_proto_sock_ops nni_req_sock_ops = { .sock_getopt = nni_req_sock_getopt, .sock_rfilter = nni_req_sock_rfilter, .sock_sfilter = nni_req_sock_sfilter, - .sock_send = NULL, - .sock_recv = NULL, + .sock_worker = { nni_req_sock_resend }, }; nni_proto nni_req_proto = { diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 24c607b9..3b23dff8 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -19,18 +19,17 @@ typedef struct nni_resp_pipe nni_resp_pipe; typedef struct nni_resp_sock nni_resp_sock; -// An nni_rep_sock is our per-socket protocol private structure. +// An nni_resp_sock is our per-socket protocol private structure. struct nni_resp_sock { nni_sock * sock; int raw; int ttl; - nni_thr sender; nni_idhash * pipes; char * btrace; size_t btrace_len; }; -// An nni_rep_pipe is our per-pipe protocol private structure. +// An nni_resp_pipe is our per-pipe protocol private structure. struct nni_resp_pipe { nni_pipe * pipe; nni_resp_sock * resp; @@ -38,8 +37,6 @@ struct nni_resp_pipe { int sigclose; }; -static void nni_rep_topsender(void *); - static int nni_resp_sock_init(void **respp, nni_sock *sock) { @@ -388,8 +385,7 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = { .pipe_fini = nni_resp_pipe_fini, .pipe_add = nni_resp_pipe_add, .pipe_rem = nni_resp_pipe_rem, - .pipe_send = nni_resp_pipe_send, - .pipe_recv = nni_resp_pipe_recv, + .pipe_worker = { nni_resp_pipe_send,nni_resp_pipe_recv }, }; static nni_proto_sock_ops nni_resp_sock_ops = { @@ -400,8 +396,7 @@ static nni_proto_sock_ops nni_resp_sock_ops = { .sock_getopt = nni_resp_sock_getopt, .sock_rfilter = nni_resp_sock_rfilter, .sock_sfilter = nni_resp_sock_sfilter, - .sock_send = nni_resp_sock_send, - .sock_recv = NULL, + .sock_worker = { nni_resp_sock_send }, }; nni_proto nni_respondent_proto = { |
