diff options
Diffstat (limited to 'src/protocol/pubsub/sub.c')
| -rw-r--r-- | src/protocol/pubsub/sub.c | 40 |
1 files changed, 36 insertions, 4 deletions
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 8b2ed209..323bbb2f 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -40,6 +40,7 @@ struct sub_sock { nni_list topics; nni_msgq *urq; int raw; + nni_mtx lk; }; // An nni_rep_pipe is our per-pipe protocol private structure. @@ -58,13 +59,13 @@ sub_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub_topic, node); s->sock = sock; s->raw = 0; s->urq = nni_sock_recvq(sock); - nni_sock_senderr(sock, NNG_ENOTSUP); - *sp = s; + *sp = s; return (0); } @@ -79,6 +80,7 @@ sub_sock_fini(void *arg) nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } + nni_mtx_fini(&s->lk); NNI_FREE_STRUCT(s); } @@ -190,6 +192,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz) sub_topic *topic; sub_topic *newtopic; + nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { int rv; @@ -201,6 +204,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz) if (rv == 0) { if (topic->len == sz) { // Already inserted. + nni_mtx_unlock(&s->lk); return (0); } if (topic->len > sz) { @@ -212,9 +216,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { + nni_mtx_unlock(&s->lk); return (NNG_ENOMEM); } if ((newtopic->buf = nni_alloc(sz)) == NULL) { + nni_mtx_unlock(&s->lk); return (NNG_ENOMEM); } NNI_LIST_NODE_INIT(&newtopic->node); @@ -225,6 +231,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } else { nni_list_append(&s->topics, newtopic); } + nni_mtx_unlock(&s->lk); return (0); } @@ -235,6 +242,7 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) sub_topic *topic; int rv; + nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { if (topic->len >= sz) { rv = memcmp(topic->buf, buf, sz); @@ -244,18 +252,22 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) if (rv == 0) { if (topic->len == sz) { nni_list_remove(&s->topics, topic); + nni_mtx_unlock(&s->lk); nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); return (0); } if (topic->len > sz) { + nni_mtx_unlock(&s->lk); return (NNG_ENOENT); } } if (rv > 0) { + nni_mtx_unlock(&s->lk); return (NNG_ENOENT); } } + nni_mtx_unlock(&s->lk); return (NNG_ENOENT); } @@ -273,8 +285,23 @@ sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +sub_sock_send(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +sub_sock_recv(void *arg, nni_aio *aio) +{ + sub_sock *s = arg; + + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_msg * -sub_sock_rfilter(void *arg, nni_msg *msg) +sub_sock_filter(void *arg, nni_msg *msg) { sub_sock * s = arg; sub_topic *topic; @@ -282,7 +309,9 @@ sub_sock_rfilter(void *arg, nni_msg *msg) size_t len; int match; + nni_mtx_lock(&s->lk); if (s->raw) { + nni_mtx_unlock(&s->lk); return (msg); } @@ -308,6 +337,7 @@ sub_sock_rfilter(void *arg, nni_msg *msg) break; } } + nni_mtx_unlock(&s->lk); if (!match) { nni_msg_free(msg); return (NULL); @@ -349,8 +379,10 @@ static nni_proto_sock_ops sub_sock_ops = { .sock_fini = sub_sock_fini, .sock_open = sub_sock_open, .sock_close = sub_sock_close, + .sock_send = sub_sock_send, + .sock_recv = sub_sock_recv, + .sock_filter = sub_sock_filter, .sock_options = sub_sock_options, - .sock_rfilter = sub_sock_rfilter, }; static nni_proto sub_proto = { |
