aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub')
-rw-r--r--src/protocol/pubsub/pub.c18
-rw-r--r--src/protocol/pubsub/sub.c40
2 files changed, 53 insertions, 5 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 03f4603a..4604d0ff 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -82,7 +82,6 @@ pub_sock_init(void **sp, nni_sock *sock)
s->uwq = nni_sock_sendq(sock);
*sp = s;
- nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
@@ -281,6 +280,21 @@ pub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+pub_sock_recv(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+pub_sock_send(void *arg, nni_aio *aio)
+{
+ pub_sock *s = arg;
+
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
static nni_proto_pipe_ops pub_pipe_ops = {
.pipe_init = pub_pipe_init,
.pipe_fini = pub_pipe_fini,
@@ -303,6 +317,8 @@ static nni_proto_sock_ops pub_sock_ops = {
.sock_fini = pub_sock_fini,
.sock_open = pub_sock_open,
.sock_close = pub_sock_close,
+ .sock_send = pub_sock_send,
+ .sock_recv = pub_sock_recv,
.sock_options = pub_sock_options,
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 8b2ed209..323bbb2f 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -40,6 +40,7 @@ struct sub_sock {
nni_list topics;
nni_msgq *urq;
int raw;
+ nni_mtx lk;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
@@ -58,13 +59,13 @@ sub_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&s->lk);
NNI_LIST_INIT(&s->topics, sub_topic, node);
s->sock = sock;
s->raw = 0;
s->urq = nni_sock_recvq(sock);
- nni_sock_senderr(sock, NNG_ENOTSUP);
- *sp = s;
+ *sp = s;
return (0);
}
@@ -79,6 +80,7 @@ sub_sock_fini(void *arg)
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
}
+ nni_mtx_fini(&s->lk);
NNI_FREE_STRUCT(s);
}
@@ -190,6 +192,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
sub_topic *topic;
sub_topic *newtopic;
+ nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
int rv;
@@ -201,6 +204,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
if (rv == 0) {
if (topic->len == sz) {
// Already inserted.
+ nni_mtx_unlock(&s->lk);
return (0);
}
if (topic->len > sz) {
@@ -212,9 +216,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
}
if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOMEM);
}
if ((newtopic->buf = nni_alloc(sz)) == NULL) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOMEM);
}
NNI_LIST_NODE_INIT(&newtopic->node);
@@ -225,6 +231,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
} else {
nni_list_append(&s->topics, newtopic);
}
+ nni_mtx_unlock(&s->lk);
return (0);
}
@@ -235,6 +242,7 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
sub_topic *topic;
int rv;
+ nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
if (topic->len >= sz) {
rv = memcmp(topic->buf, buf, sz);
@@ -244,18 +252,22 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
if (rv == 0) {
if (topic->len == sz) {
nni_list_remove(&s->topics, topic);
+ nni_mtx_unlock(&s->lk);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
return (0);
}
if (topic->len > sz) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOENT);
}
}
if (rv > 0) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOENT);
}
}
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOENT);
}
@@ -273,8 +285,23 @@ sub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+sub_sock_send(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+sub_sock_recv(void *arg, nni_aio *aio)
+{
+ sub_sock *s = arg;
+
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_msg *
-sub_sock_rfilter(void *arg, nni_msg *msg)
+sub_sock_filter(void *arg, nni_msg *msg)
{
sub_sock * s = arg;
sub_topic *topic;
@@ -282,7 +309,9 @@ sub_sock_rfilter(void *arg, nni_msg *msg)
size_t len;
int match;
+ nni_mtx_lock(&s->lk);
if (s->raw) {
+ nni_mtx_unlock(&s->lk);
return (msg);
}
@@ -308,6 +337,7 @@ sub_sock_rfilter(void *arg, nni_msg *msg)
break;
}
}
+ nni_mtx_unlock(&s->lk);
if (!match) {
nni_msg_free(msg);
return (NULL);
@@ -349,8 +379,10 @@ static nni_proto_sock_ops sub_sock_ops = {
.sock_fini = sub_sock_fini,
.sock_open = sub_sock_open,
.sock_close = sub_sock_close,
+ .sock_send = sub_sock_send,
+ .sock_recv = sub_sock_recv,
+ .sock_filter = sub_sock_filter,
.sock_options = sub_sock_options,
- .sock_rfilter = sub_sock_rfilter,
};
static nni_proto sub_proto = {