diff options
Diffstat (limited to 'src/protocol/pubsub/sub.c')
| -rw-r--r-- | src/protocol/pubsub/sub.c | 236 |
1 files changed, 120 insertions, 116 deletions
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 0563b764..0dbad081 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -17,22 +17,22 @@ // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. -typedef struct nni_sub_pipe nni_sub_pipe; -typedef struct nni_sub_sock nni_sub_sock; -typedef struct nni_sub_topic nni_sub_topic; +typedef struct sub_pipe sub_pipe; +typedef struct sub_sock sub_sock; +typedef struct sub_topic sub_topic; -static void nni_sub_recv_cb(void *); -static void nni_sub_putq_cb(void *); -static void nni_sub_pipe_fini(void *); +static void sub_recv_cb(void *); +static void sub_putq_cb(void *); +static void sub_pipe_fini(void *); -struct nni_sub_topic { +struct sub_topic { nni_list_node node; size_t len; void * buf; }; // An nni_rep_sock is our per-socket protocol private structure. -struct nni_sub_sock { +struct sub_sock { nni_sock *sock; nni_list topics; nni_msgq *urq; @@ -40,132 +40,136 @@ struct nni_sub_sock { }; // An nni_rep_pipe is our per-pipe protocol private structure. -struct nni_sub_pipe { - nni_pipe * pipe; - nni_sub_sock *sub; - nni_aio aio_recv; - nni_aio aio_putq; +struct sub_pipe { + nni_pipe *pipe; + sub_sock *sub; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static int -nni_sub_sock_init(void **subp, nni_sock *sock) +sub_sock_init(void **sp, nni_sock *sock) { - nni_sub_sock *sub; + sub_sock *s; - if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) { + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&sub->topics, nni_sub_topic, node); - sub->sock = sock; - sub->raw = 0; + NNI_LIST_INIT(&s->topics, sub_topic, node); + s->sock = sock; + s->raw = 0; - sub->urq = nni_sock_recvq(sock); + s->urq = nni_sock_recvq(sock); nni_sock_senderr(sock, NNG_ENOTSUP); - *subp = sub; + *sp = s; return (0); } static void -nni_sub_sock_fini(void *arg) +sub_sock_fini(void *arg) { - nni_sub_sock * sub = arg; - nni_sub_topic *topic; + sub_sock * s = arg; + sub_topic *topic; - while ((topic = nni_list_first(&sub->topics)) != NULL) { - nni_list_remove(&sub->topics, topic); + while ((topic = nni_list_first(&s->topics)) != NULL) { + nni_list_remove(&s->topics, topic); nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } - NNI_FREE_STRUCT(sub); + NNI_FREE_STRUCT(s); } static void -nni_sub_sock_open(void *arg) +sub_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -nni_sub_sock_close(void *arg) +sub_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } +static void +sub_pipe_fini(void *arg) +{ + sub_pipe *p = arg; + + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_recv); + NNI_FREE_STRUCT(p); +} + static int -nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock) +sub_pipe_init(void **pp, nni_pipe *pipe, void *s) { - nni_sub_pipe *sp; + sub_pipe *p; + int rv; - if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) { + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp); - nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp); + if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) { + sub_pipe_fini(p); + return (rv); + } - sp->pipe = pipe; - sp->sub = ssock; - *spp = sp; + p->pipe = pipe; + p->sub = s; + *pp = p; return (0); } -static void -nni_sub_pipe_fini(void *arg) -{ - nni_sub_pipe *sp = arg; - - nni_aio_fini(&sp->aio_putq); - nni_aio_fini(&sp->aio_recv); - NNI_FREE_STRUCT(sp); -} - static int -nni_sub_pipe_start(void *arg) +sub_pipe_start(void *arg) { - nni_sub_pipe *sp = arg; + sub_pipe *p = arg; - nni_pipe_recv(sp->pipe, &sp->aio_recv); + nni_pipe_recv(p->pipe, p->aio_recv); return (0); } static void -nni_sub_pipe_stop(void *arg) +sub_pipe_stop(void *arg) { - nni_sub_pipe *sp = arg; + sub_pipe *p = arg; - nni_aio_stop(&sp->aio_putq); - nni_aio_stop(&sp->aio_recv); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); } static void -nni_sub_recv_cb(void *arg) +sub_recv_cb(void *arg) { - nni_sub_pipe *sp = arg; - nni_sub_sock *sub = sp->sub; - nni_msgq * urq = sub->urq; + sub_pipe *p = arg; + sub_sock *s = p->sub; + nni_msgq *urq = s->urq; - if (nni_aio_result(&sp->aio_recv) != 0) { - nni_pipe_stop(sp->pipe); + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); return; } - sp->aio_putq.a_msg = sp->aio_recv.a_msg; - sp->aio_recv.a_msg = NULL; - nni_msgq_aio_put(sub->urq, &sp->aio_putq); + nni_aio_set_msg(p->aio_putq, nni_aio_get_msg(p->aio_recv)); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msgq_aio_put(urq, p->aio_putq); } static void -nni_sub_putq_cb(void *arg) +sub_putq_cb(void *arg) { - nni_sub_pipe *sp = arg; + sub_pipe *p = arg; - if (nni_aio_result(&sp->aio_putq) != 0) { - nni_msg_free(sp->aio_putq.a_msg); - sp->aio_putq.a_msg = NULL; - nni_pipe_stop(sp->pipe); + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); return; } - nni_pipe_recv(sp->pipe, &sp->aio_recv); + nni_pipe_recv(p->pipe, p->aio_recv); } // For now we maintain subscriptions on a sorted linked list. As we do not @@ -174,12 +178,12 @@ nni_sub_putq_cb(void *arg) // to replace this with a patricia trie, like old nanomsg had. static int -nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz) +sub_subscribe(sub_sock *s, const void *buf, size_t sz) { - nni_sub_topic *topic; - nni_sub_topic *newtopic; + sub_topic *topic; + sub_topic *newtopic; - NNI_LIST_FOREACH (&sub->topics, topic) { + NNI_LIST_FOREACH (&s->topics, topic) { int rv; if (topic->len >= sz) { @@ -210,20 +214,20 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz) newtopic->len = sz; memcpy(newtopic->buf, buf, sz); if (topic != NULL) { - nni_list_insert_before(&sub->topics, newtopic, topic); + nni_list_insert_before(&s->topics, newtopic, topic); } else { - nni_list_append(&sub->topics, newtopic); + nni_list_append(&s->topics, newtopic); } return (0); } static int -nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) +sub_unsubscribe(sub_sock *s, const void *buf, size_t sz) { - nni_sub_topic *topic; - int rv; + sub_topic *topic; + int rv; - NNI_LIST_FOREACH (&sub->topics, topic) { + NNI_LIST_FOREACH (&s->topics, topic) { if (topic->len >= sz) { rv = memcmp(topic->buf, buf, sz); } else { @@ -231,7 +235,7 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) } if (rv == 0) { if (topic->len == sz) { - nni_list_remove(&sub->topics, topic); + nni_list_remove(&s->topics, topic); nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); return (0); @@ -248,43 +252,43 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) } static int -nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) +sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { - nni_sub_sock *sub = arg; - int rv = NNG_ENOTSUP; + sub_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_raw) { - rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); } else if (opt == nng_optid_sub_subscribe) { - rv = nni_sub_subscribe(sub, buf, sz); + rv = sub_subscribe(s, buf, sz); } else if (opt == nng_optid_sub_unsubscribe) { - rv = nni_sub_unsubscribe(sub, buf, sz); + rv = sub_unsubscribe(s, buf, sz); } return (rv); } static int -nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) +sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { - nni_sub_sock *sub = arg; - int rv = NNG_ENOTSUP; + sub_sock *s = arg; + int rv = NNG_ENOTSUP; if (opt == nng_optid_raw) { - rv = nni_getopt_int(&sub->raw, buf, szp); + rv = nni_getopt_int(&s->raw, buf, szp); } return (rv); } static nni_msg * -nni_sub_sock_rfilter(void *arg, nni_msg *msg) +sub_sock_rfilter(void *arg, nni_msg *msg) { - nni_sub_sock * sub = arg; - nni_sub_topic *topic; - char * body; - size_t len; - int match; + sub_sock * s = arg; + sub_topic *topic; + char * body; + size_t len; + int match; - if (sub->raw) { + if (s->raw) { return (msg); } @@ -293,7 +297,7 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg) match = 0; // Check to see if the message matches one of our subscriptions. - NNI_LIST_FOREACH (&sub->topics, topic) { + NNI_LIST_FOREACH (&s->topics, topic) { if (len >= topic->len) { int rv = memcmp(topic->buf, body, topic->len); if (rv == 0) { @@ -319,34 +323,34 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops nni_sub_pipe_ops = { - .pipe_init = nni_sub_pipe_init, - .pipe_fini = nni_sub_pipe_fini, - .pipe_start = nni_sub_pipe_start, - .pipe_stop = nni_sub_pipe_stop, +static nni_proto_pipe_ops sub_pipe_ops = { + .pipe_init = sub_pipe_init, + .pipe_fini = sub_pipe_fini, + .pipe_start = sub_pipe_start, + .pipe_stop = sub_pipe_stop, }; -static nni_proto_sock_ops nni_sub_sock_ops = { - .sock_init = nni_sub_sock_init, - .sock_fini = nni_sub_sock_fini, - .sock_open = nni_sub_sock_open, - .sock_close = nni_sub_sock_close, - .sock_setopt = nni_sub_sock_setopt, - .sock_getopt = nni_sub_sock_getopt, - .sock_rfilter = nni_sub_sock_rfilter, +static nni_proto_sock_ops sub_sock_ops = { + .sock_init = sub_sock_init, + .sock_fini = sub_sock_fini, + .sock_open = sub_sock_open, + .sock_close = sub_sock_close, + .sock_setopt = sub_sock_setopt, + .sock_getopt = sub_sock_getopt, + .sock_rfilter = sub_sock_rfilter, }; -nni_proto nni_sub_proto = { +static nni_proto sub_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNG_PROTO_SUB_V0, "sub" }, .proto_peer = { NNG_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &nni_sub_sock_ops, - .proto_pipe_ops = &nni_sub_pipe_ops, + .proto_sock_ops = &sub_sock_ops, + .proto_pipe_ops = &sub_pipe_ops, }; int nng_sub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_sub_proto)); + return (nni_proto_open(sidp, &sub_proto)); } |
