diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-07 21:49:48 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-07 21:52:30 -0800 |
| commit | bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 (patch) | |
| tree | 55ca7c800e9dfa54bb58b3f2323b1cb5996fab09 /src/protocol | |
| parent | ffdceebc19214f384f1b1b6b358f1b2301384135 (diff) | |
| download | nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.gz nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.bz2 nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.zip | |
Simplify locking for protocols.
In an attempt to simplify the protocol implementation, and hopefully
track down a close related race, we've made it so that most protocols
need not worry about locks, and can access the socket lock if they do
need a lock. They also let the socket manage their workers, for the
most part. (The req protocol is special, since it needs a top level
work distributor, *and* a resender.)
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/pair/pair.c | 40 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 79 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 93 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 70 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 87 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 99 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 93 |
7 files changed, 211 insertions, 350 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 830a02e6..51608e07 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -41,7 +41,7 @@ static void nni_pair_receiver(void *); static void nni_pair_sender(void *); static int -nni_pair_init(void **pairp, nni_sock *sock) +nni_pair_sock_init(void **pairp, nni_sock *sock) { nni_pair_sock *pair; int rv; @@ -59,7 +59,7 @@ nni_pair_init(void **pairp, nni_sock *sock) static void -nni_pair_fini(void *arg) +nni_pair_sock_fini(void *arg) { nni_pair_sock *pair = arg; @@ -102,9 +102,6 @@ nni_pair_pipe_add(void *arg) nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; - if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PAIR) { - return (NNG_EPROTO); - } if (pair->pipe != NULL) { return (NNG_EBUSY); // Already have a peer, denied. } @@ -182,14 +179,14 @@ nni_pair_pipe_recv(void *arg) // TODO: probably we could replace these with NULL, since we have no // protocol specific options? static int -nni_pair_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { return (NNG_ENOTSUP); } static int -nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { return (NNG_ENOTSUP); } @@ -198,7 +195,7 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pair_proto_pipe = { +static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_init = nni_pair_pipe_init, .pipe_fini = nni_pair_pipe_fini, .pipe_add = nni_pair_pipe_add, @@ -207,15 +204,22 @@ static nni_proto_pipe nni_pair_proto_pipe = { .pipe_recv = nni_pair_pipe_recv, }; +static nni_proto_sock_ops nni_pair_sock_ops = { + .sock_init = nni_pair_sock_init, + .sock_fini = nni_pair_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pair_sock_setopt, + .sock_getopt = nni_pair_sock_getopt, + .sock_rfilter = NULL, + .sock_sfilter = NULL, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_pair_proto = { - .proto_self = NNG_PROTO_PAIR, - .proto_peer = NNG_PROTO_PAIR, - .proto_name = "pair", - .proto_pipe = &nni_pair_proto_pipe, - .proto_init = nni_pair_init, - .proto_fini = nni_pair_fini, - .proto_setopt = nni_pair_setopt, - .proto_getopt = nni_pair_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PAIR, + .proto_peer = NNG_PROTO_PAIR, + .proto_name = "pair", + .proto_sock_ops = &nni_pair_sock_ops, + .proto_pipe_ops = &nni_pair_pipe_ops, }; diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 695b730e..9612177c 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -19,7 +19,6 @@ typedef struct nni_pull_sock nni_pull_sock; // An nni_pull_sock is our per-socket protocol private structure. struct nni_pull_sock { - nni_mtx mx; nni_msgq * urq; int raw; }; @@ -31,7 +30,7 @@ struct nni_pull_pipe { }; static int -nni_pull_init(void **pullp, nni_sock *sock) +nni_pull_sock_init(void **pullp, nni_sock *sock) { nni_pull_sock *pull; int rv; @@ -39,10 +38,6 @@ nni_pull_init(void **pullp, nni_sock *sock) if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pull->mx)) != 0) { - NNI_FREE_STRUCT(pull); - return (rv); - } pull->raw = 0; pull->urq = nni_sock_recvq(sock); *pullp = pull; @@ -52,11 +47,10 @@ nni_pull_init(void **pullp, nni_sock *sock) static void -nni_pull_fini(void *arg) +nni_pull_sock_fini(void *arg) { nni_pull_sock *pull = arg; - nni_mtx_fini(&pull->mx); NNI_FREE_STRUCT(pull); } @@ -86,32 +80,6 @@ nni_pull_pipe_fini(void *arg) } -static int -nni_pull_pipe_add(void *arg) -{ - nni_pull_pipe *pp = arg; - - if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) { - return (NNG_EPROTO); - } - return (0); -} - - -static void -nni_pull_pipe_rem(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - -static void -nni_pull_pipe_send(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - static void nni_pull_pipe_recv(void *arg) { @@ -133,16 +101,14 @@ nni_pull_pipe_recv(void *arg) static int -nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pull_sock *pull = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pull->mx); rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1); - nni_mtx_unlock(&pull->mx); break; default: rv = NNG_ENOTSUP; @@ -152,16 +118,14 @@ nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pull_sock *pull = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pull->mx); rv = nni_getopt_int(&pull->raw, buf, szp); - nni_mtx_unlock(&pull->mx); break; default: rv = NNG_ENOTSUP; @@ -172,24 +136,31 @@ nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pull_proto_pipe = { +static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_add = nni_pull_pipe_add, - .pipe_rem = nni_pull_pipe_rem, - .pipe_send = nni_pull_pipe_send, + .pipe_add = NULL, + .pipe_rem = NULL, + .pipe_send = NULL, .pipe_recv = nni_pull_pipe_recv, }; +static nni_proto_sock_ops nni_pull_sock_ops = { + .sock_init = nni_pull_sock_init, + .sock_fini = nni_pull_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pull_sock_setopt, + .sock_getopt = nni_pull_sock_getopt, + .sock_send = NULL, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_pull_proto = { - .proto_self = NNG_PROTO_PULL, - .proto_peer = NNG_PROTO_PUSH, - .proto_name = "pull", - .proto_pipe = &nni_pull_proto_pipe, - .proto_init = nni_pull_init, - .proto_fini = nni_pull_fini, - .proto_setopt = nni_pull_setopt, - .proto_getopt = nni_pull_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PULL, + .proto_peer = NNG_PROTO_PUSH, + .proto_name = "pull", + .proto_pipe_ops = &nni_pull_pipe_ops, + .proto_sock_ops = &nni_pull_sock_ops, }; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index de774125..6cdc9cc5 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -21,16 +21,15 @@ typedef struct nni_push_sock nni_push_sock; // An nni_push_sock is our per-socket protocol private structure. struct nni_push_sock { - nni_mtx mx; nni_cv cv; nni_msgq * uwq; - nni_thr sender; int raw; int closing; int wantw; nni_list pipes; nni_push_pipe * nextpipe; int npipes; + nni_sock * sock; }; // An nni_push_pipe is our per-pipe protocol private structure. @@ -42,10 +41,8 @@ struct nni_push_pipe { nni_list_node node; }; -static void nni_push_rrdist(void *); - static int -nni_push_init(void **pushp, nni_sock *sock) +nni_push_sock_init(void **pushp, nni_sock *sock) { nni_push_sock *push; int rv; @@ -53,12 +50,7 @@ nni_push_init(void **pushp, nni_sock *sock) if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&push->mx)) != 0) { - NNI_FREE_STRUCT(push); - return (rv); - } - if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) { - nni_mtx_fini(&push->mx); + if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) { NNI_FREE_STRUCT(push); return (rv); } @@ -67,36 +59,32 @@ nni_push_init(void **pushp, nni_sock *sock) push->npipes = 0; push->wantw = 0; push->nextpipe = NULL; + push->sock = sock; push->uwq = nni_sock_sendq(sock); *pushp = push; nni_sock_recverr(sock, NNG_ENOTSUP); - rv = nni_thr_init(&push->sender, nni_push_rrdist, push); - if (rv != 0) { - nni_cv_fini(&push->cv); - nni_mtx_fini(&push->mx); - NNI_FREE_STRUCT(push); - return (rv); - } - nni_thr_run(&push->sender); return (0); } static void -nni_push_fini(void *arg) +nni_push_sock_close(void *arg) { nni_push_sock *push = arg; // Shut down the resender. We request it to exit by clearing // its old value, then kick it. - nni_mtx_lock(&push->mx); push->closing = 1; nni_cv_wake(&push->cv); - nni_mtx_unlock(&push->mx); +} + + +static void +nni_push_sock_fini(void *arg) +{ + nni_push_sock *push = arg; - nni_thr_fini(&push->sender); nni_cv_fini(&push->cv); - nni_mtx_fini(&push->mx); NNI_FREE_STRUCT(push); } @@ -142,9 +130,6 @@ nni_push_pipe_add(void *arg) if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) { return (NNG_EPROTO); } - // Wake the sender since we have a new pipe. - nni_mtx_lock(&push->mx); - // Turns out it should not really matter where we stick this. // The end makes our test cases easier. nni_list_append(&push->pipes, pp); @@ -152,7 +137,6 @@ nni_push_pipe_add(void *arg) // Wake the top sender, as we can accept a job. push->npipes++; nni_cv_wake(&push->cv); - nni_mtx_unlock(&push->mx); return (0); } @@ -163,13 +147,11 @@ nni_push_pipe_rem(void *arg) nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; - nni_mtx_lock(&push->mx); if (pp == push->nextpipe) { push->nextpipe = nni_list_next(&push->pipes, pp); } push->npipes--; nni_list_remove(&push->pipes, pp); - nni_mtx_unlock(&push->mx); } @@ -178,17 +160,18 @@ nni_push_pipe_send(void *arg) { nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; + nni_mtx *mx = nni_sock_mtx(push->sock); nni_msg *msg; for (;;) { if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { break; } - nni_mtx_lock(&push->mx); + nni_mtx_lock(mx); if (push->wantw) { nni_cv_wake(&push->cv); } - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); if (nni_pipe_send(pp->pipe, msg) != 0) { nni_msg_free(msg); break; @@ -216,16 +199,14 @@ nni_push_pipe_recv(void *arg) static int -nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_push_sock *push = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&push->mx); rv = nni_setopt_int(&push->raw, buf, sz, 0, 1); - nni_mtx_unlock(&push->mx); break; default: rv = NNG_ENOTSUP; @@ -235,16 +216,14 @@ nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_push_sock *push = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&push->mx); rv = nni_getopt_int(&push->raw, buf, szp); - nni_mtx_unlock(&push->mx); break; default: rv = NNG_ENOTSUP; @@ -254,12 +233,13 @@ nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_push_rrdist(void *arg) +nni_push_sock_send(void *arg) { nni_push_sock *push = arg; nni_push_pipe *pp; nni_msgq *uwq = push->uwq; nni_msg *msg = NULL; + nni_mtx *mx = nni_sock_mtx(push->sock); int rv; int i; @@ -269,10 +249,10 @@ nni_push_rrdist(void *arg) return; } - nni_mtx_lock(&push->mx); + nni_mtx_lock(mx); if (push->closing) { if (msg != NULL) { - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); nni_msg_free(msg); return; } @@ -296,14 +276,14 @@ nni_push_rrdist(void *arg) } else { push->wantw = 0; } - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); } } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_push_proto_pipe = { +static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_init = nni_push_pipe_init, .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, @@ -312,15 +292,22 @@ static nni_proto_pipe nni_push_proto_pipe = { .pipe_recv = nni_push_pipe_recv, }; +static nni_proto_sock_ops nni_push_sock_ops = { + .sock_init = nni_push_sock_init, + .sock_fini = nni_push_sock_fini, + .sock_close = nni_push_sock_close, + .sock_setopt = nni_push_sock_setopt, + .sock_getopt = nni_push_sock_getopt, + .sock_send = nni_push_sock_send, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_push_proto = { - .proto_self = NNG_PROTO_PUSH, - .proto_peer = NNG_PROTO_PULL, - .proto_name = "push", - .proto_pipe = &nni_push_proto_pipe, - .proto_init = nni_push_init, - .proto_fini = nni_push_fini, - .proto_setopt = nni_push_setopt, - .proto_getopt = nni_push_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PUSH, + .proto_peer = NNG_PROTO_PULL, + .proto_name = "push", + .proto_pipe_ops = &nni_push_pipe_ops, + .proto_sock_ops = &nni_push_sock_ops, }; diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 860c7c7d..684b916d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -23,10 +23,8 @@ typedef struct nni_pub_sock nni_pub_sock; // An nni_pub_sock is our per-socket protocol private structure. struct nni_pub_sock { nni_sock * sock; - nni_mtx mx; nni_msgq * uwq; int raw; - nni_thr sender; nni_list pipes; }; @@ -39,10 +37,10 @@ struct nni_pub_pipe { int sigclose; }; -static void nni_pub_broadcast(void *); +static void nni_pub_sock_send(void *); static int -nni_pub_init(void **pubp, nni_sock *sock) +nni_pub_sock_init(void **pubp, nni_sock *sock) { nni_pub_sock *pub; int rv; @@ -50,36 +48,23 @@ nni_pub_init(void **pubp, nni_sock *sock) if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pub->mx)) != 0) { - NNI_FREE_STRUCT(pub); - return (rv); - } pub->sock = sock; pub->raw = 0; NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node); pub->uwq = nni_sock_sendq(sock); - rv = nni_thr_init(&pub->sender, nni_pub_broadcast, pub); - if (rv != 0) { - nni_mtx_fini(&pub->mx); - NNI_FREE_STRUCT(pub); - return (rv); - } *pubp = pub; nni_sock_recverr(sock, NNG_ENOTSUP); - nni_thr_run(&pub->sender); return (0); } static void -nni_pub_fini(void *arg) +nni_pub_sock_fini(void *arg) { nni_pub_sock *pub = arg; - nni_thr_fini(&pub->sender); - nni_mtx_fini(&pub->mx); NNI_FREE_STRUCT(pub); } @@ -125,10 +110,7 @@ nni_pub_pipe_add(void *arg) if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) { return (NNG_EPROTO); } - nni_mtx_lock(&pub->mx); nni_list_append(&pub->pipes, pp); - nni_mtx_unlock(&pub->mx); - return (0); } @@ -139,18 +121,17 @@ nni_pub_pipe_rem(void *arg) nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; - nni_mtx_lock(&pub->mx); nni_list_remove(&pub->pipes, pp); - nni_mtx_unlock(&pub->mx); } static void -nni_pub_broadcast(void *arg) +nni_pub_sock_send(void *arg) { nni_pub_sock *pub = arg; nni_msgq *uwq = pub->uwq; nni_msg *msg, *dup; + nni_mtx *mx = nni_sock_mtx(pub->sock); for (;;) { nni_pub_pipe *pp; @@ -161,7 +142,7 @@ nni_pub_broadcast(void *arg) break; } - nni_mtx_lock(&pub->mx); + nni_mtx_lock(mx); last = nni_list_last(&pub->pipes); NNI_LIST_FOREACH (&pub->pipes, pp) { if (pp != last) { @@ -176,7 +157,7 @@ nni_pub_broadcast(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(&pub->mx); + nni_mtx_unlock(mx); if (last == NULL) { nni_msg_free(msg); @@ -236,16 +217,14 @@ nni_pub_pipe_recv(void *arg) static int -nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pub_sock *pub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pub->mx); rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1); - nni_mtx_unlock(&pub->mx); break; default: rv = NNG_ENOTSUP; @@ -255,16 +234,14 @@ nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pub_sock *pub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pub->mx); rv = nni_getopt_int(&pub->raw, buf, szp); - nni_mtx_unlock(&pub->mx); break; default: rv = NNG_ENOTSUP; @@ -275,7 +252,7 @@ nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pub_proto_pipe = { +static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_init = nni_pub_pipe_init, .pipe_fini = nni_pub_pipe_fini, .pipe_add = nni_pub_pipe_add, @@ -284,15 +261,22 @@ static nni_proto_pipe nni_pub_proto_pipe = { .pipe_recv = nni_pub_pipe_recv, }; +nni_proto_sock_ops nni_pub_sock_ops = { + .sock_init = nni_pub_sock_init, + .sock_fini = nni_pub_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pub_sock_setopt, + .sock_getopt = nni_pub_sock_getopt, + .sock_send = nni_pub_sock_send, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_pub_proto = { - .proto_self = NNG_PROTO_PUB, - .proto_peer = NNG_PROTO_SUB, - .proto_name = "pub", - .proto_pipe = &nni_pub_proto_pipe, - .proto_init = nni_pub_init, - .proto_fini = nni_pub_fini, - .proto_setopt = nni_pub_setopt, - .proto_getopt = nni_pub_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PUB, + .proto_peer = NNG_PROTO_SUB, + .proto_name = "pub", + .proto_sock_ops = &nni_pub_sock_ops, + .proto_pipe_ops = &nni_pub_pipe_ops, }; diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 19c06aa0..dd288d5e 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -29,7 +29,6 @@ struct nni_sub_topic { // An nni_rep_sock is our per-socket protocol private structure. struct nni_sub_sock { nni_sock * sock; - nni_mtx mx; nni_list topics; nni_msgq * urq; int raw; @@ -42,7 +41,7 @@ struct nni_sub_pipe { }; static int -nni_sub_init(void **subp, nni_sock *sock) +nni_sub_sock_init(void **subp, nni_sock *sock) { nni_sub_sock *sub; int rv; @@ -50,10 +49,6 @@ nni_sub_init(void **subp, nni_sock *sock) if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&sub->mx)) != 0) { - NNI_FREE_STRUCT(sub); - return (rv); - } NNI_LIST_INIT(&sub->topics, nni_sub_topic, node); sub->sock = sock; sub->raw = 0; @@ -66,7 +61,7 @@ nni_sub_init(void **subp, nni_sock *sock) static void -nni_sub_fini(void *arg) +nni_sub_sock_fini(void *arg) { nni_sub_sock *sub = arg; nni_sub_topic *topic; @@ -76,7 +71,6 @@ nni_sub_fini(void *arg) nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } - nni_mtx_fini(&sub->mx); NNI_FREE_STRUCT(sub); } @@ -106,32 +100,6 @@ nni_sub_pipe_fini(void *arg) } -static int -nni_sub_pipe_add(void *arg) -{ - nni_sub_pipe *sp = arg; - - if (nni_pipe_peer(sp->pipe) != NNG_PROTO_PUB) { - return (NNG_EPROTO); - } - return (0); -} - - -static void -nni_sub_pipe_rem(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - -static void -nni_sub_pipe_send(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - static void nni_sub_pipe_recv(void *arg) { @@ -242,26 +210,20 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) static int -nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_sub_sock *sub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&sub->mx); rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1); - nni_mtx_unlock(&sub->mx); break; case NNG_OPT_SUBSCRIBE: - nni_mtx_lock(&sub->mx); rv = nni_sub_subscribe(sub, buf, sz); - nni_mtx_unlock(&sub->mx); break; case NNG_OPT_UNSUBSCRIBE: - nni_mtx_lock(&sub->mx); rv = nni_sub_unsubscribe(sub, buf, sz); - nni_mtx_unlock(&sub->mx); break; default: rv = NNG_ENOTSUP; @@ -271,16 +233,14 @@ nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_sub_sock *sub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&sub->mx); rv = nni_getopt_int(&sub->raw, buf, szp); - nni_mtx_unlock(&sub->mx); break; default: rv = NNG_ENOTSUP; @@ -290,7 +250,7 @@ nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * -nni_sub_recvfilter(void *arg, nni_msg *msg) +nni_sub_sock_rfilter(void *arg, nni_msg *msg) { nni_sub_sock *sub = arg; nni_sub_topic *topic; @@ -298,9 +258,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) size_t len; int match; - nni_mtx_lock(&sub->mx); if (sub->raw) { - nni_mtx_unlock(&sub->mx); return (msg); } @@ -326,7 +284,6 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) break; } } - nni_mtx_unlock(&sub->mx); if (!match) { nni_msg_free(msg); return (NULL); @@ -337,23 +294,31 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_sub_proto_pipe = { +static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, - .pipe_add = nni_sub_pipe_add, - .pipe_rem = nni_sub_pipe_rem, - .pipe_send = nni_sub_pipe_send, + .pipe_add = NULL, + .pipe_rem = NULL, + .pipe_send = NULL, .pipe_recv = nni_sub_pipe_recv, }; +static nni_proto_sock_ops nni_sub_sock_ops = { + .sock_init = nni_sub_sock_init, + .sock_fini = nni_sub_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_sub_sock_setopt, + .sock_getopt = nni_sub_sock_getopt, + .sock_rfilter = nni_sub_sock_rfilter, + .sock_sfilter = NULL, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_sub_proto = { - .proto_self = NNG_PROTO_SUB, - .proto_peer = NNG_PROTO_PUB, - .proto_name = "sub", - .proto_pipe = &nni_sub_proto_pipe, - .proto_init = nni_sub_init, - .proto_fini = nni_sub_fini, - .proto_setopt = nni_sub_setopt, - .proto_getopt = nni_sub_getopt, - .proto_recv_filter = nni_sub_recvfilter, + .proto_self = NNG_PROTO_SUB, + .proto_peer = NNG_PROTO_PUB, + .proto_name = "sub", + .proto_sock_ops = &nni_sub_sock_ops, + .proto_pipe_ops = &nni_sub_pipe_ops, }; diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 56ee2367..8de196c4 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -22,7 +22,6 @@ typedef struct nni_rep_sock nni_rep_sock; // An nni_rep_sock is our per-socket protocol private structure. struct nni_rep_sock { nni_sock * sock; - nni_mtx mx; nni_msgq * uwq; nni_msgq * urq; int raw; @@ -44,7 +43,7 @@ struct nni_rep_pipe { static void nni_rep_topsender(void *); static int -nni_rep_init(void **repp, nni_sock *sock) +nni_rep_sock_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; int rv; @@ -52,17 +51,12 @@ nni_rep_init(void **repp, nni_sock *sock) if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&rep->mx)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); - } rep->ttl = 8; // Per RFC rep->sock = sock; rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; if ((rv = nni_idhash_create(&rep->pipes)) != 0) { - nni_mtx_fini(&rep->mx); NNI_FREE_STRUCT(rep); return (rv); } @@ -70,28 +64,18 @@ nni_rep_init(void **repp, nni_sock *sock) rep->uwq = nni_sock_sendq(sock); rep->urq = nni_sock_recvq(sock); - rv = nni_thr_init(&rep->sender, nni_rep_topsender, rep); - if (rv != 0) { - nni_idhash_destroy(rep->pipes); - nni_mtx_fini(&rep->mx); - NNI_FREE_STRUCT(rep); - return (rv); - } *repp = rep; nni_sock_senderr(sock, NNG_ESTATE); - nni_thr_run(&rep->sender); return (0); } static void -nni_rep_fini(void *arg) +nni_rep_sock_fini(void *arg) { nni_rep_sock *rep = arg; - nni_thr_fini(&rep->sender); nni_idhash_destroy(rep->pipes); - nni_mtx_fini(&rep->mx); if (rep->btrace != NULL) { nni_free(rep->btrace, rep->btrace_len); } @@ -135,16 +119,8 @@ nni_rep_pipe_add(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - int rv; - - if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) { - return (NNG_EPROTO); - } - nni_mtx_lock(&rep->mx); - rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); - nni_mtx_unlock(&rep->mx); - return (rv); + return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp)); } @@ -154,22 +130,21 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); - nni_mtx_unlock(&rep->mx); } -// nni_rep_topsender watches for messages from the upper write queue, +// nni_rep_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket. static void -nni_rep_topsender(void *arg) +nni_rep_sock_send(void *arg) { nni_rep_sock *rep = arg; nni_msgq *uwq = rep->uwq; nni_msgq *urq = rep->urq; + nni_mtx *mx = nni_sock_mtx(rep->sock); nni_msg *msg; for (;;) { @@ -190,9 +165,9 @@ nni_rep_topsender(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); - nni_mtx_lock(&rep->mx); + nni_mtx_lock(mx); if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) { - nni_mtx_unlock(&rep->mx); + nni_mtx_unlock(mx); nni_msg_free(msg); continue; } @@ -204,7 +179,7 @@ nni_rep_topsender(void *arg) // circumstances. nni_msg_free(msg); } - nni_mtx_unlock(&rep->mx); + nni_mtx_unlock(mx); } } @@ -218,8 +193,6 @@ nni_rep_pipe_send(void *arg) nni_msgq *wq = rp->sendq; nni_pipe *pipe = rp->pipe; nni_msg *msg; - uint8_t *body; - size_t size; int rv; for (;;) { @@ -311,21 +284,17 @@ again: static int -nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_rep_sock *rep = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255); - nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); - nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -335,21 +304,17 @@ nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_rep_sock *rep = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->ttl, buf, szp); - nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->raw, buf, szp); - nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -359,14 +324,12 @@ nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * -nni_rep_sendfilter(void *arg, nni_msg *msg) +nni_rep_sock_sfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; size_t len; - nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mtx_unlock(&rep->mx); return (msg); } @@ -376,7 +339,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. if (rep->btrace == NULL) { - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -388,7 +350,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -396,21 +357,18 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mtx_unlock(&rep->mx); return (msg); } static nni_msg * -nni_rep_recvfilter(void *arg, nni_msg *msg) +nni_rep_sock_rfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; char *header; size_t len; - nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mtx_unlock(&rep->mx); return (msg); } @@ -423,21 +381,19 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) rep->btrace_len = 0; } if ((rep->btrace = nni_alloc(len)) == NULL) { - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } rep->btrace_len = len; memcpy(rep->btrace, header, len); nni_msg_trunc_header(msg, len); - nni_mtx_unlock(&rep->mx); return (msg); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_rep_proto_pipe = { +static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_add = nni_rep_pipe_add, @@ -446,15 +402,22 @@ static nni_proto_pipe nni_rep_proto_pipe = { .pipe_recv = nni_rep_pipe_recv, }; +static nni_proto_sock_ops nni_rep_sock_ops = { + .sock_init = nni_rep_sock_init, + .sock_fini = nni_rep_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_rep_sock_setopt, + .sock_getopt = nni_rep_sock_getopt, + .sock_rfilter = nni_rep_sock_rfilter, + .sock_sfilter = nni_rep_sock_sfilter, + .sock_send = nni_rep_sock_send, + .sock_recv = NULL, +}; + nni_proto nni_rep_proto = { - .proto_self = NNG_PROTO_REP, - .proto_peer = NNG_PROTO_REQ, - .proto_name = "rep", - .proto_pipe = &nni_rep_proto_pipe, - .proto_init = nni_rep_init, - .proto_fini = nni_rep_fini, - .proto_setopt = nni_rep_setopt, - .proto_getopt = nni_rep_getopt, - .proto_recv_filter = nni_rep_recvfilter, - .proto_send_filter = nni_rep_sendfilter, + .proto_self = NNG_PROTO_REP, + .proto_peer = NNG_PROTO_REQ, + .proto_name = "rep", + .proto_sock_ops = &nni_rep_sock_ops, + .proto_pipe_ops = &nni_rep_pipe_ops, }; diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 1c58d7a1..d8104342 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -22,7 +22,6 @@ typedef struct nni_req_sock nni_req_sock; // An nni_req_sock is our per-socket protocol private structure. struct nni_req_sock { nni_sock * sock; - nni_mtx mx; nni_cv cv; nni_msgq * uwq; nni_msgq * urq; @@ -48,7 +47,7 @@ struct nni_req_pipe { static void nni_req_resender(void *); static int -nni_req_init(void **reqp, nni_sock *sock) +nni_req_sock_init(void **reqp, nni_sock *sock) { nni_req_sock *req; int rv; @@ -56,12 +55,7 @@ nni_req_init(void **reqp, nni_sock *sock) if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&req->mx)) != 0) { - NNI_FREE_STRUCT(req); - return (rv); - } - if ((rv = nni_cv_init(&req->cv, &req->mx)) != 0) { - nni_mtx_fini(&req->mx); + if ((rv = nni_cv_init(&req->cv, nni_sock_mtx(sock))) != 0) { NNI_FREE_STRUCT(req); return (rv); } @@ -81,7 +75,6 @@ nni_req_init(void **reqp, nni_sock *sock) rv = nni_thr_init(&req->resender, nni_req_resender, req); if (rv != 0) { nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mx); NNI_FREE_STRUCT(req); return (rv); } @@ -91,20 +84,24 @@ nni_req_init(void **reqp, nni_sock *sock) static void -nni_req_fini(void *arg) +nni_req_sock_close(void *arg) { nni_req_sock *req = arg; // Shut down the resender. We request it to exit by clearing // its old value, then kick it. - nni_mtx_lock(&req->mx); req->closing = 1; nni_cv_wake(&req->cv); - nni_mtx_unlock(&req->mx); +} + + +static void +nni_req_sock_fini(void *arg) +{ + nni_req_sock *req = arg; nni_thr_fini(&req->resender); nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mx); if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } @@ -168,15 +165,16 @@ nni_req_pipe_send(void *arg) nni_msgq *uwq = req->uwq; nni_msgq *urq = req->urq; nni_pipe *pipe = rp->pipe; + nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int rv; for (;;) { - nni_mtx_lock(&req->mx); + nni_mtx_lock(mx); if ((msg = req->retrymsg) != NULL) { req->retrymsg = NULL; } - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); if (msg == NULL) { rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); if (rv != 0) { @@ -237,21 +235,17 @@ nni_req_pipe_recv(void *arg) static int -nni_req_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_req_sock *req = arg; int rv; switch (opt) { case NNG_OPT_RESENDTIME: - nni_mtx_lock(&req->mx); rv = nni_setopt_duration(&req->retry, buf, sz); - nni_mtx_unlock(&req->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&req->mx); rv = nni_setopt_int(&req->raw, buf, sz, 0, 1); - nni_mtx_unlock(&req->mx); break; default: rv = NNG_ENOTSUP; @@ -261,21 +255,17 @@ nni_req_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_req_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_req_sock *req = arg; int rv; switch (opt) { case NNG_OPT_RESENDTIME: - nni_mtx_lock(&req->mx); rv = nni_getopt_duration(&req->retry, buf, szp); - nni_mtx_unlock(&req->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&req->mx); rv = nni_getopt_int(&req->raw, buf, szp); - nni_mtx_unlock(&req->mx); break; default: rv = NNG_ENOTSUP; @@ -288,17 +278,18 @@ static void nni_req_resender(void *arg) { nni_req_sock *req = arg; + nni_mtx *mx = nni_sock_mtx(req->sock); int rv; for (;;) { - nni_mtx_lock(&req->mx); + nni_mtx_lock(mx); if (req->closing) { - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); return; } if (req->reqmsg == NULL) { nni_cv_wait(&req->cv); - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); continue; } rv = nni_cv_until(&req->cv, req->resend); @@ -309,22 +300,20 @@ nni_req_resender(void *arg) } req->resend = nni_clock() + req->retry; } - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); } } static nni_msg * -nni_req_sendfilter(void *arg, nni_msg *msg) +nni_req_sock_sfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; uint32_t id; - nni_mtx_lock(&req->mx); if (req->raw) { // No automatic retry, and the request ID must // be in the header coming down. - nni_mtx_unlock(&req->mx); return (msg); } @@ -338,7 +327,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg) if (nni_msg_append_header(msg, req->reqid, 4) != 0) { // Should be ENOMEM. - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -351,7 +339,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg) // Make a duplicate message... for retries. if (nni_msg_dup(&req->reqmsg, msg) != 0) { - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -362,39 +349,33 @@ nni_req_sendfilter(void *arg, nni_msg *msg) // Clear the error condition. nni_sock_recverr(req->sock, 0); - nni_mtx_unlock(&req->mx); return (msg); } static nni_msg * -nni_req_recvfilter(void *arg, nni_msg *msg) +nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; - nni_mtx_lock(&req->mx); if (req->raw) { // Pass it unmolested - nni_mtx_unlock(&req->mx); return (msg); } if (nni_msg_header_len(msg) < 4) { - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } if (req->reqmsg == NULL) { // We had no outstanding request. - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { // Wrong request id - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -403,14 +384,13 @@ nni_req_recvfilter(void *arg, nni_msg *msg) nni_msg_free(req->reqmsg); req->reqmsg = NULL; nni_cv_wake(&req->cv); - nni_mtx_unlock(&req->mx); return (msg); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_req_proto_pipe = { +static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_add = nni_req_pipe_add, @@ -419,15 +399,22 @@ static nni_proto_pipe nni_req_proto_pipe = { .pipe_recv = nni_req_pipe_recv, }; +static nni_proto_sock_ops nni_req_sock_ops = { + .sock_init = nni_req_sock_init, + .sock_fini = nni_req_sock_fini, + .sock_close = nni_req_sock_close, + .sock_setopt = nni_req_sock_setopt, + .sock_getopt = nni_req_sock_getopt, + .sock_rfilter = nni_req_sock_rfilter, + .sock_sfilter = nni_req_sock_sfilter, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_req_proto = { - .proto_self = NNG_PROTO_REQ, - .proto_peer = NNG_PROTO_REP, - .proto_name = "req", - .proto_pipe = &nni_req_proto_pipe, - .proto_init = nni_req_init, - .proto_fini = nni_req_fini, - .proto_setopt = nni_req_setopt, - .proto_getopt = nni_req_getopt, - .proto_recv_filter = nni_req_recvfilter, - .proto_send_filter = nni_req_sendfilter, + .proto_self = NNG_PROTO_REQ, + .proto_peer = NNG_PROTO_REP, + .proto_name = "req", + .proto_sock_ops = &nni_req_sock_ops, + .proto_pipe_ops = &nni_req_pipe_ops, }; |
