diff options
Diffstat (limited to 'src/protocol/pubsub')
| -rw-r--r-- | src/protocol/pubsub/pub.c | 70 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 87 |
2 files changed, 53 insertions, 104 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 860c7c7d..684b916d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -23,10 +23,8 @@ typedef struct nni_pub_sock nni_pub_sock; // An nni_pub_sock is our per-socket protocol private structure. struct nni_pub_sock { nni_sock * sock; - nni_mtx mx; nni_msgq * uwq; int raw; - nni_thr sender; nni_list pipes; }; @@ -39,10 +37,10 @@ struct nni_pub_pipe { int sigclose; }; -static void nni_pub_broadcast(void *); +static void nni_pub_sock_send(void *); static int -nni_pub_init(void **pubp, nni_sock *sock) +nni_pub_sock_init(void **pubp, nni_sock *sock) { nni_pub_sock *pub; int rv; @@ -50,36 +48,23 @@ nni_pub_init(void **pubp, nni_sock *sock) if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pub->mx)) != 0) { - NNI_FREE_STRUCT(pub); - return (rv); - } pub->sock = sock; pub->raw = 0; NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node); pub->uwq = nni_sock_sendq(sock); - rv = nni_thr_init(&pub->sender, nni_pub_broadcast, pub); - if (rv != 0) { - nni_mtx_fini(&pub->mx); - NNI_FREE_STRUCT(pub); - return (rv); - } *pubp = pub; nni_sock_recverr(sock, NNG_ENOTSUP); - nni_thr_run(&pub->sender); return (0); } static void -nni_pub_fini(void *arg) +nni_pub_sock_fini(void *arg) { nni_pub_sock *pub = arg; - nni_thr_fini(&pub->sender); - nni_mtx_fini(&pub->mx); NNI_FREE_STRUCT(pub); } @@ -125,10 +110,7 @@ nni_pub_pipe_add(void *arg) if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) { return (NNG_EPROTO); } - nni_mtx_lock(&pub->mx); nni_list_append(&pub->pipes, pp); - nni_mtx_unlock(&pub->mx); - return (0); } @@ -139,18 +121,17 @@ nni_pub_pipe_rem(void *arg) nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; - nni_mtx_lock(&pub->mx); nni_list_remove(&pub->pipes, pp); - nni_mtx_unlock(&pub->mx); } static void -nni_pub_broadcast(void *arg) +nni_pub_sock_send(void *arg) { nni_pub_sock *pub = arg; nni_msgq *uwq = pub->uwq; nni_msg *msg, *dup; + nni_mtx *mx = nni_sock_mtx(pub->sock); for (;;) { nni_pub_pipe *pp; @@ -161,7 +142,7 @@ nni_pub_broadcast(void *arg) break; } - nni_mtx_lock(&pub->mx); + nni_mtx_lock(mx); last = nni_list_last(&pub->pipes); NNI_LIST_FOREACH (&pub->pipes, pp) { if (pp != last) { @@ -176,7 +157,7 @@ nni_pub_broadcast(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(&pub->mx); + nni_mtx_unlock(mx); if (last == NULL) { nni_msg_free(msg); @@ -236,16 +217,14 @@ nni_pub_pipe_recv(void *arg) static int -nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pub_sock *pub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pub->mx); rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1); - nni_mtx_unlock(&pub->mx); break; default: rv = NNG_ENOTSUP; @@ -255,16 +234,14 @@ nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pub_sock *pub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pub->mx); rv = nni_getopt_int(&pub->raw, buf, szp); - nni_mtx_unlock(&pub->mx); break; default: rv = NNG_ENOTSUP; @@ -275,7 +252,7 @@ nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pub_proto_pipe = { +static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_init = nni_pub_pipe_init, .pipe_fini = nni_pub_pipe_fini, .pipe_add = nni_pub_pipe_add, @@ -284,15 +261,22 @@ static nni_proto_pipe nni_pub_proto_pipe = { .pipe_recv = nni_pub_pipe_recv, }; +nni_proto_sock_ops nni_pub_sock_ops = { + .sock_init = nni_pub_sock_init, + .sock_fini = nni_pub_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pub_sock_setopt, + .sock_getopt = nni_pub_sock_getopt, + .sock_send = nni_pub_sock_send, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_pub_proto = { - .proto_self = NNG_PROTO_PUB, - .proto_peer = NNG_PROTO_SUB, - .proto_name = "pub", - .proto_pipe = &nni_pub_proto_pipe, - .proto_init = nni_pub_init, - .proto_fini = nni_pub_fini, - .proto_setopt = nni_pub_setopt, - .proto_getopt = nni_pub_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PUB, + .proto_peer = NNG_PROTO_SUB, + .proto_name = "pub", + .proto_sock_ops = &nni_pub_sock_ops, + .proto_pipe_ops = &nni_pub_pipe_ops, }; diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 19c06aa0..dd288d5e 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -29,7 +29,6 @@ struct nni_sub_topic { // An nni_rep_sock is our per-socket protocol private structure. struct nni_sub_sock { nni_sock * sock; - nni_mtx mx; nni_list topics; nni_msgq * urq; int raw; @@ -42,7 +41,7 @@ struct nni_sub_pipe { }; static int -nni_sub_init(void **subp, nni_sock *sock) +nni_sub_sock_init(void **subp, nni_sock *sock) { nni_sub_sock *sub; int rv; @@ -50,10 +49,6 @@ nni_sub_init(void **subp, nni_sock *sock) if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&sub->mx)) != 0) { - NNI_FREE_STRUCT(sub); - return (rv); - } NNI_LIST_INIT(&sub->topics, nni_sub_topic, node); sub->sock = sock; sub->raw = 0; @@ -66,7 +61,7 @@ nni_sub_init(void **subp, nni_sock *sock) static void -nni_sub_fini(void *arg) +nni_sub_sock_fini(void *arg) { nni_sub_sock *sub = arg; nni_sub_topic *topic; @@ -76,7 +71,6 @@ nni_sub_fini(void *arg) nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } - nni_mtx_fini(&sub->mx); NNI_FREE_STRUCT(sub); } @@ -106,32 +100,6 @@ nni_sub_pipe_fini(void *arg) } -static int -nni_sub_pipe_add(void *arg) -{ - nni_sub_pipe *sp = arg; - - if (nni_pipe_peer(sp->pipe) != NNG_PROTO_PUB) { - return (NNG_EPROTO); - } - return (0); -} - - -static void -nni_sub_pipe_rem(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - -static void -nni_sub_pipe_send(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - static void nni_sub_pipe_recv(void *arg) { @@ -242,26 +210,20 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) static int -nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_sub_sock *sub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&sub->mx); rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1); - nni_mtx_unlock(&sub->mx); break; case NNG_OPT_SUBSCRIBE: - nni_mtx_lock(&sub->mx); rv = nni_sub_subscribe(sub, buf, sz); - nni_mtx_unlock(&sub->mx); break; case NNG_OPT_UNSUBSCRIBE: - nni_mtx_lock(&sub->mx); rv = nni_sub_unsubscribe(sub, buf, sz); - nni_mtx_unlock(&sub->mx); break; default: rv = NNG_ENOTSUP; @@ -271,16 +233,14 @@ nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_sub_sock *sub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&sub->mx); rv = nni_getopt_int(&sub->raw, buf, szp); - nni_mtx_unlock(&sub->mx); break; default: rv = NNG_ENOTSUP; @@ -290,7 +250,7 @@ nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * -nni_sub_recvfilter(void *arg, nni_msg *msg) +nni_sub_sock_rfilter(void *arg, nni_msg *msg) { nni_sub_sock *sub = arg; nni_sub_topic *topic; @@ -298,9 +258,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) size_t len; int match; - nni_mtx_lock(&sub->mx); if (sub->raw) { - nni_mtx_unlock(&sub->mx); return (msg); } @@ -326,7 +284,6 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) break; } } - nni_mtx_unlock(&sub->mx); if (!match) { nni_msg_free(msg); return (NULL); @@ -337,23 +294,31 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_sub_proto_pipe = { +static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, - .pipe_add = nni_sub_pipe_add, - .pipe_rem = nni_sub_pipe_rem, - .pipe_send = nni_sub_pipe_send, + .pipe_add = NULL, + .pipe_rem = NULL, + .pipe_send = NULL, .pipe_recv = nni_sub_pipe_recv, }; +static nni_proto_sock_ops nni_sub_sock_ops = { + .sock_init = nni_sub_sock_init, + .sock_fini = nni_sub_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_sub_sock_setopt, + .sock_getopt = nni_sub_sock_getopt, + .sock_rfilter = nni_sub_sock_rfilter, + .sock_sfilter = NULL, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_sub_proto = { - .proto_self = NNG_PROTO_SUB, - .proto_peer = NNG_PROTO_PUB, - .proto_name = "sub", - .proto_pipe = &nni_sub_proto_pipe, - .proto_init = nni_sub_init, - .proto_fini = nni_sub_fini, - .proto_setopt = nni_sub_setopt, - .proto_getopt = nni_sub_getopt, - .proto_recv_filter = nni_sub_recvfilter, + .proto_self = NNG_PROTO_SUB, + .proto_peer = NNG_PROTO_PUB, + .proto_name = "sub", + .proto_sock_ops = &nni_sub_sock_ops, + .proto_pipe_ops = &nni_sub_pipe_ops, }; |
