diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-07 21:49:48 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-07 21:52:30 -0800 |
| commit | bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 (patch) | |
| tree | 55ca7c800e9dfa54bb58b3f2323b1cb5996fab09 /src/protocol/pipeline/push.c | |
| parent | ffdceebc19214f384f1b1b6b358f1b2301384135 (diff) | |
| download | nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.gz nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.bz2 nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.zip | |
Simplify locking for protocols.
In an attempt to simplify the protocol implementation, and hopefully
track down a close related race, we've made it so that most protocols
need not worry about locks, and can access the socket lock if they do
need a lock. They also let the socket manage their workers, for the
most part. (The req protocol is special, since it needs a top level
work distributor, *and* a resender.)
Diffstat (limited to 'src/protocol/pipeline/push.c')
| -rw-r--r-- | src/protocol/pipeline/push.c | 93 |
1 files changed, 40 insertions, 53 deletions
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index de774125..6cdc9cc5 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -21,16 +21,15 @@ typedef struct nni_push_sock nni_push_sock; // An nni_push_sock is our per-socket protocol private structure. struct nni_push_sock { - nni_mtx mx; nni_cv cv; nni_msgq * uwq; - nni_thr sender; int raw; int closing; int wantw; nni_list pipes; nni_push_pipe * nextpipe; int npipes; + nni_sock * sock; }; // An nni_push_pipe is our per-pipe protocol private structure. @@ -42,10 +41,8 @@ struct nni_push_pipe { nni_list_node node; }; -static void nni_push_rrdist(void *); - static int -nni_push_init(void **pushp, nni_sock *sock) +nni_push_sock_init(void **pushp, nni_sock *sock) { nni_push_sock *push; int rv; @@ -53,12 +50,7 @@ nni_push_init(void **pushp, nni_sock *sock) if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&push->mx)) != 0) { - NNI_FREE_STRUCT(push); - return (rv); - } - if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) { - nni_mtx_fini(&push->mx); + if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) { NNI_FREE_STRUCT(push); return (rv); } @@ -67,36 +59,32 @@ nni_push_init(void **pushp, nni_sock *sock) push->npipes = 0; push->wantw = 0; push->nextpipe = NULL; + push->sock = sock; push->uwq = nni_sock_sendq(sock); *pushp = push; nni_sock_recverr(sock, NNG_ENOTSUP); - rv = nni_thr_init(&push->sender, nni_push_rrdist, push); - if (rv != 0) { - nni_cv_fini(&push->cv); - nni_mtx_fini(&push->mx); - NNI_FREE_STRUCT(push); - return (rv); - } - nni_thr_run(&push->sender); return (0); } static void -nni_push_fini(void *arg) +nni_push_sock_close(void *arg) { nni_push_sock *push = arg; // Shut down the resender. We request it to exit by clearing // its old value, then kick it. - nni_mtx_lock(&push->mx); push->closing = 1; nni_cv_wake(&push->cv); - nni_mtx_unlock(&push->mx); +} + + +static void +nni_push_sock_fini(void *arg) +{ + nni_push_sock *push = arg; - nni_thr_fini(&push->sender); nni_cv_fini(&push->cv); - nni_mtx_fini(&push->mx); NNI_FREE_STRUCT(push); } @@ -142,9 +130,6 @@ nni_push_pipe_add(void *arg) if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) { return (NNG_EPROTO); } - // Wake the sender since we have a new pipe. - nni_mtx_lock(&push->mx); - // Turns out it should not really matter where we stick this. // The end makes our test cases easier. nni_list_append(&push->pipes, pp); @@ -152,7 +137,6 @@ nni_push_pipe_add(void *arg) // Wake the top sender, as we can accept a job. push->npipes++; nni_cv_wake(&push->cv); - nni_mtx_unlock(&push->mx); return (0); } @@ -163,13 +147,11 @@ nni_push_pipe_rem(void *arg) nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; - nni_mtx_lock(&push->mx); if (pp == push->nextpipe) { push->nextpipe = nni_list_next(&push->pipes, pp); } push->npipes--; nni_list_remove(&push->pipes, pp); - nni_mtx_unlock(&push->mx); } @@ -178,17 +160,18 @@ nni_push_pipe_send(void *arg) { nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; + nni_mtx *mx = nni_sock_mtx(push->sock); nni_msg *msg; for (;;) { if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { break; } - nni_mtx_lock(&push->mx); + nni_mtx_lock(mx); if (push->wantw) { nni_cv_wake(&push->cv); } - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); if (nni_pipe_send(pp->pipe, msg) != 0) { nni_msg_free(msg); break; @@ -216,16 +199,14 @@ nni_push_pipe_recv(void *arg) static int -nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_push_sock *push = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&push->mx); rv = nni_setopt_int(&push->raw, buf, sz, 0, 1); - nni_mtx_unlock(&push->mx); break; default: rv = NNG_ENOTSUP; @@ -235,16 +216,14 @@ nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_push_sock *push = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&push->mx); rv = nni_getopt_int(&push->raw, buf, szp); - nni_mtx_unlock(&push->mx); break; default: rv = NNG_ENOTSUP; @@ -254,12 +233,13 @@ nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_push_rrdist(void *arg) +nni_push_sock_send(void *arg) { nni_push_sock *push = arg; nni_push_pipe *pp; nni_msgq *uwq = push->uwq; nni_msg *msg = NULL; + nni_mtx *mx = nni_sock_mtx(push->sock); int rv; int i; @@ -269,10 +249,10 @@ nni_push_rrdist(void *arg) return; } - nni_mtx_lock(&push->mx); + nni_mtx_lock(mx); if (push->closing) { if (msg != NULL) { - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); nni_msg_free(msg); return; } @@ -296,14 +276,14 @@ nni_push_rrdist(void *arg) } else { push->wantw = 0; } - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); } } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_push_proto_pipe = { +static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_init = nni_push_pipe_init, .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, @@ -312,15 +292,22 @@ static nni_proto_pipe nni_push_proto_pipe = { .pipe_recv = nni_push_pipe_recv, }; +static nni_proto_sock_ops nni_push_sock_ops = { + .sock_init = nni_push_sock_init, + .sock_fini = nni_push_sock_fini, + .sock_close = nni_push_sock_close, + .sock_setopt = nni_push_sock_setopt, + .sock_getopt = nni_push_sock_getopt, + .sock_send = nni_push_sock_send, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_push_proto = { - .proto_self = NNG_PROTO_PUSH, - .proto_peer = NNG_PROTO_PULL, - .proto_name = "push", - .proto_pipe = &nni_push_proto_pipe, - .proto_init = nni_push_init, - .proto_fini = nni_push_fini, - .proto_setopt = nni_push_setopt, - .proto_getopt = nni_push_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PUSH, + .proto_peer = NNG_PROTO_PULL, + .proto_name = "push", + .proto_pipe_ops = &nni_push_pipe_ops, + .proto_sock_ops = &nni_push_sock_ops, }; |
