aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-02-24 22:04:16 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-26 21:09:54 -0800
commit5803db08e55ed9287dc59b3adc281b89c52c530f (patch)
tree9d2d65ed86be5c7b976fc3bdfc5ed5b375143641 /src/protocol
parent9cf967e9d7fdab6ccf38f80d83e4bf3d1a5e1a67 (diff)
downloadnng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.gz
nng-5803db08e55ed9287dc59b3adc281b89c52c530f.tar.bz2
nng-5803db08e55ed9287dc59b3adc281b89c52c530f.zip
fixes #461 Context support for SUB
fixes #762 Pub/Sub very slow compared with nanomsg This introduces contexts for SUB, and converts both the cooked SUB and PUB protocols to use a new lightweight message queue that has significant performance benefits over the heavy-weight message queue. We've also added a test program, pubdrop, in the perf directory, which can be used for measuring pub/sub message rates and drop rates. Note that its quite easy to overwhelm a subscriber still. The SUB socket performance is still not completely where it needs to be. There are two remainging things to improve. Firsst we need to replace the naive linked list of topics with a proper PATRICIA trie. Second, we need to work on the low level POSIX poller code. (The Windows code is already quite good, and we outperform nanomsg on Windows.)
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/CMakeLists.txt7
-rw-r--r--src/protocol/pubsub0/pub.c282
-rw-r--r--src/protocol/pubsub0/sub.c606
-rw-r--r--src/protocol/pubsub0/xsub.c230
4 files changed, 843 insertions, 282 deletions
diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt
index 1ea0cacf..12872063 100644
--- a/src/protocol/pubsub0/CMakeLists.txt
+++ b/src/protocol/pubsub0/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -26,7 +26,10 @@ endif()
if (NNG_PROTO_SUB0)
list(APPEND _DEFS -DNNG_HAVE_SUB0)
- list(APPEND _SRCS protocol/pubsub0/sub.c ${PROJECT_SOURCE_DIR}/include/nng/protocol/pubsub0/sub.h)
+ list(APPEND _SRCS
+ protocol/pubsub0/sub.c
+ protocol/pubsub0/xsub.c
+ ${PROJECT_SOURCE_DIR}/include/nng/protocol/pubsub0/sub.h)
endif()
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 2567a5b6..bec0763f 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -32,25 +32,25 @@ typedef struct pub0_sock pub0_sock;
static void pub0_pipe_recv_cb(void *);
static void pub0_pipe_send_cb(void *);
-static void pub0_pipe_getq_cb(void *);
-static void pub0_sock_getq_cb(void *);
static void pub0_sock_fini(void *);
static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
- nni_msgq *uwq;
- nni_aio * aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_list pipes;
+ nni_mtx mtx;
+ bool closed;
+ size_t sendbuf;
+ nni_pollable *sendable;
};
// pub0_pipe is our per-pipe protocol private structure.
struct pub0_pipe {
nni_pipe * pipe;
pub0_sock * pub;
- nni_msgq * sendq;
- nni_aio * aio_getq;
+ nni_lmq sendq;
+ bool closed;
+ bool busy;
nni_aio * aio_send;
nni_aio * aio_recv;
nni_list_node node;
@@ -61,48 +61,46 @@ pub0_sock_fini(void *arg)
{
pub0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_pollable_free(s->sendable);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
}
static int
-pub0_sock_init(void **sp, nni_sock *sock)
+pub0_sock_init(void **sp, nni_sock *nsock)
{
- pub0_sock *s;
+ pub0_sock *sock;
int rv;
+ NNI_ARG_UNUSED(nsock);
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) {
- pub0_sock_fini(s);
+ if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
+ NNI_FREE_STRUCT(sock);
return (rv);
}
-
- NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
-
- s->uwq = nni_sock_sendq(sock);
-
- *sp = s;
+ nni_mtx_init(&sock->mtx);
+ NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
+ sock->sendbuf = 16; // fairly arbitrary
+ *sp = sock;
return (0);
}
static void
pub0_sock_open(void *arg)
{
- pub0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
pub0_sock_close(void *arg)
{
- pub0_sock *s = arg;
+ pub0_sock *sock = arg;
- nni_aio_close(s->aio_getq);
+ nni_mtx_lock(&sock->mtx);
+ sock->closed = true;
+ nni_mtx_unlock(&sock->mtx);
}
static void
@@ -110,7 +108,6 @@ pub0_pipe_stop(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
}
@@ -120,10 +117,9 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_msgq_fini(p->sendq);
+ nni_lmq_fini(&p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -131,15 +127,20 @@ static int
pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
pub0_pipe *p;
+ pub0_sock *sock = s;
int rv;
+ size_t len;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_lock(&sock->mtx);
+ len = sock->sendbuf;
+ nni_mtx_unlock(&sock->mtx);
+
// XXX: consider making this depth tunable
- if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) ||
+ if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
@@ -147,6 +148,7 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
return (rv);
}
+ p->busy = false;
p->pipe = pipe;
p->pub = s;
*pp = p;
@@ -156,19 +158,18 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
static int
pub0_pipe_start(void *arg)
{
- pub0_pipe *p = arg;
- pub0_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&s->mtx);
- nni_list_append(&s->pipes, p);
- nni_mtx_unlock(&s->mtx);
+ nni_mtx_lock(&sock->mtx);
+ nni_list_append(&sock->pipes, p);
+ nni_mtx_unlock(&sock->mtx);
- // Start the receiver and the queue reader.
+ // Start the receiver.
nni_pipe_recv(p->pipe, p->aio_recv);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
return (0);
}
@@ -176,62 +177,20 @@ pub0_pipe_start(void *arg)
static void
pub0_pipe_close(void *arg)
{
- pub0_pipe *p = arg;
- pub0_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
- nni_aio_close(p->aio_getq);
nni_aio_close(p->aio_send);
nni_aio_close(p->aio_recv);
- nni_msgq_close(p->sendq);
+ nni_mtx_lock(&sock->mtx);
+ p->closed = true;
+ nni_lmq_flush(&p->sendq);
- nni_mtx_lock(&s->mtx);
- if (nni_list_active(&s->pipes, p)) {
- nni_list_remove(&s->pipes, p);
+ if (nni_list_active(&sock->pipes, p)) {
+ nni_list_remove(&sock->pipes, p);
}
- nni_mtx_unlock(&s->mtx);
-}
-
-static void
-pub0_sock_getq_cb(void *arg)
-{
- pub0_sock *s = arg;
- nni_msgq * uwq = s->uwq;
- nni_msg * msg, *dup;
-
- pub0_pipe *p;
- pub0_pipe *last;
- int rv;
-
- if (nni_aio_result(s->aio_getq) != 0) {
- return;
- }
-
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
-
- nni_mtx_lock(&s->mtx);
- last = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != last) {
- rv = nni_msg_dup(&dup, msg);
- if (rv != 0) {
- continue;
- }
- } else {
- dup = msg;
- }
- if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) {
- nni_msg_free(dup);
- }
- }
- nni_mtx_unlock(&s->mtx);
-
- if (last == NULL) {
- nni_msg_free(msg);
- }
-
- nni_msgq_aio_get(uwq, s->aio_getq);
+ nni_mtx_unlock(&sock->mtx);
}
static void
@@ -244,31 +203,18 @@ pub0_pipe_recv_cb(void *arg)
return;
}
+ // We should never get any messages. If we do we just dicard them.
nni_msg_free(nni_aio_get_msg(p->aio_recv));
nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-pub0_pipe_getq_cb(void *arg)
-{
- pub0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_close(p->pipe);
- return;
- }
-
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
-
- nni_pipe_send(p->pipe, p->aio_send);
-}
-
-static void
pub0_pipe_send_cb(void *arg)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
+ nni_msg * msg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -277,23 +223,126 @@ pub0_pipe_send_cb(void *arg)
return;
}
- nni_aio_set_msg(p->aio_send, NULL);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->closed || p->closed) {
+ nni_mtx_unlock(&sock->mtx);
+ return;
+ }
+ if (nni_lmq_getq(&p->sendq, &msg) == 0) {
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
+ } else {
+ p->busy = false;
+ }
+ nni_mtx_unlock(&sock->mtx);
}
static void
pub0_sock_recv(void *arg, nni_aio *aio)
{
NNI_ARG_UNUSED(arg);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
}
static void
pub0_sock_send(void *arg, nni_aio *aio)
{
- pub0_sock *s = arg;
+ pub0_sock *sock = arg;
+ pub0_pipe *p;
+ nng_msg * msg;
+ nng_msg * dup;
+ size_t len;
+
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->closed) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ NNI_LIST_FOREACH (&sock->pipes, p) {
+ if (p->closed) {
+ continue;
+ }
+ if (nni_lmq_full(&p->sendq)) {
+ continue;
+ }
+ if (p == nni_list_last(&sock->pipes)) {
+ dup = msg;
+ msg = NULL;
+ } else if (nni_msg_dup(&dup, msg) != 0) {
+ continue;
+ }
+ if (p->busy) {
+ nni_lmq_putq(&p->sendq, dup);
+ } else {
+ p->busy = true;
+ nni_aio_set_msg(p->aio_send, dup);
+ nni_pipe_send(p->pipe, p->aio_send);
+ }
+ }
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish(aio, 0, len);
+}
+
+static int
+pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ pub0_sock *sock = arg;
+ int fd;
+ int rv;
+ nni_mtx_lock(&sock->mtx);
+ // PUB sockets are *always* sendable.
+ nni_pollable_raise(sock->sendable);
+ rv = nni_pollable_getfd(sock->sendable, &fd);
+ nni_mtx_unlock(&sock->mtx);
+
+ if (rv == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
+}
+
+static int
+pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pub0_sock *sock = arg;
+ pub0_pipe *p;
+ size_t val;
+ int rv;
+
+ if ((rv = nni_copyin_size(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
- nni_msgq_aio_put(s->uwq, aio);
+ nni_mtx_lock(&sock->mtx);
+ sock->sendbuf = val;
+ NNI_LIST_FOREACH (&sock->pipes, p) {
+ // If we fail part way thru (should only be ENOMEM), we
+ // stop short. The others would likely fail for ENOMEM as
+ // well anyway. There is a weird effect here where the
+ // buffers may have been set for *some* of the pipes, but
+ // we have no way to correct, or even report, partial failure.
+ if ((rv = nni_lmq_resize(&p->sendq, val)) != 0) {
+ break;
+ }
+ }
+ nni_mtx_unlock(&sock->mtx);
+ return (rv);
+}
+
+static int
+pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ pub0_sock *sock = arg;
+ size_t val;
+ nni_mtx_lock(&sock->mtx);
+ val = sock->sendbuf;
+ nni_mtx_unlock(&sock->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
}
static nni_proto_pipe_ops pub0_pipe_ops = {
@@ -307,6 +356,15 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
static nni_option pub0_sock_options[] = {
// terminate list
{
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = pub0_sock_get_sendfd,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = pub0_sock_get_sendbuf,
+ .o_set = pub0_sock_set_sendbuf,
+ },
+ {
.o_name = NULL,
},
};
@@ -325,7 +383,7 @@ static nni_proto pub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUB_V0, "pub" },
.proto_peer = { NNI_PROTO_SUB_V0, "sub" },
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &pub0_sock_ops,
.proto_pipe_ops = &pub0_pipe_ops,
};
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 5240fc83..fefd79a9 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -26,8 +26,12 @@
#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
#endif
+// By default we accept 128 messages.
+#define SUB0_DEFAULT_QLEN 128
+
typedef struct sub0_pipe sub0_pipe;
typedef struct sub0_sock sub0_sock;
+typedef struct sub0_ctx sub0_ctx;
typedef struct sub0_topic sub0_topic;
static void sub0_recv_cb(void *);
@@ -39,11 +43,32 @@ struct sub0_topic {
void * buf;
};
+// sub0_ctx is a context for a SUB socket. The advantage of contexts is
+// that different contexts can maintain different subscriptions.
+struct sub0_ctx {
+ nni_list_node node;
+ sub0_sock * sock;
+ nni_list topics; // XXX: Consider replacing with patricia trie
+ nni_list raios; // sub context could have multiple pending recvs
+ bool closed;
+ nni_lmq lmq;
+
+#if 0
+ nni_msg **recvq;
+ size_t recvqcap;
+ size_t recvqlen;
+ size_t recvqget;
+ size_t recvqput;
+#endif
+};
+
// sub0_sock is our per-socket protocol private structure.
struct sub0_sock {
- nni_list topics;
- nni_msgq *urq;
- nni_mtx lk;
+ nni_pollable *recvable;
+ sub0_ctx * ctx; // default context
+ nni_list ctxs; // all contexts
+ size_t recvbuflen;
+ nni_mtx lk;
};
// sub0_pipe is our per-pipe protocol private structure.
@@ -53,35 +78,177 @@ struct sub0_pipe {
nni_aio * aio_recv;
};
+static void
+sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ nni_mtx_lock(&sock->lk);
+ if (nni_list_active(&ctx->raios, aio)) {
+ nni_list_remove(&ctx->raios, aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&sock->lk);
+}
+
+static void
+sub0_ctx_recv(void *arg, nni_aio *aio)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ nni_msg * msg;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&sock->lk);
+
+ if (ctx->closed) {
+ nni_mtx_unlock(&sock->lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ if (nni_lmq_empty(&ctx->lmq)) {
+ int rv;
+ if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
+ nni_mtx_unlock(&sock->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&ctx->raios, aio);
+ nni_mtx_unlock(&sock->lk);
+ return;
+ }
+
+ (void) nni_lmq_getq(&ctx->lmq, &msg);
+
+ if (nni_lmq_empty(&ctx->lmq) && (ctx == sock->ctx)) {
+ nni_pollable_clear(sock->recvable);
+ }
+ nni_aio_set_msg(aio, msg);
+ nni_mtx_unlock(&sock->lk);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
+
+static void
+sub0_ctx_send(void *arg, nni_aio *aio)
+{
+ NNI_ARG_UNUSED(arg);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
+}
+
+static void
+sub0_ctx_close(void *arg)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ nni_aio * aio;
+
+ nni_mtx_lock(&sock->lk);
+ ctx->closed = true;
+ while ((aio = nni_list_first(&ctx->raios)) != NULL) {
+ nni_list_remove(&ctx->raios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&sock->lk);
+}
+
+static void
+sub0_ctx_fini(void *arg)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
+ sub0_topic *topic;
+
+ sub0_ctx_close(ctx);
+
+ nni_mtx_lock(&sock->lk);
+ nni_list_remove(&sock->ctxs, ctx);
+ nni_mtx_unlock(&sock->lk);
+
+ while ((topic = nni_list_first(&ctx->topics)) != 0) {
+ nni_list_remove(&ctx->topics, topic);
+ nni_free(topic->buf, topic->len);
+ NNI_FREE_STRUCT(topic);
+ }
+
+ nni_lmq_fini(&ctx->lmq);
+ NNI_FREE_STRUCT(ctx);
+}
+
static int
-sub0_sock_init(void **sp, nni_sock *sock)
+sub0_ctx_init(void **ctxp, void *sarg)
{
- sub0_sock *s;
+ sub0_sock *sock = sarg;
+ sub0_ctx * ctx;
+ size_t len;
+ int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&s->lk);
- NNI_LIST_INIT(&s->topics, sub0_topic, node);
- s->urq = nni_sock_recvq(sock);
- *sp = s;
+ nni_mtx_lock(&sock->lk);
+ len = sock->recvbuflen;
+
+ if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
+ return (rv);
+ }
+
+ nni_aio_list_init(&ctx->raios);
+ NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
+
+ ctx->sock = sock;
+ *ctxp = ctx;
+
+ nni_list_append(&sock->ctxs, ctx);
+ nni_mtx_unlock(&sock->lk);
+
return (0);
}
static void
sub0_sock_fini(void *arg)
{
- sub0_sock * s = arg;
- sub0_topic *topic;
+ sub0_sock *sock = arg;
- while ((topic = nni_list_first(&s->topics)) != NULL) {
- nni_list_remove(&s->topics, topic);
- nni_free(topic->buf, topic->len);
- NNI_FREE_STRUCT(topic);
+ if (sock->ctx != NULL) {
+ sub0_ctx_fini(sock->ctx);
}
- nni_mtx_fini(&s->lk);
- NNI_FREE_STRUCT(s);
+ if (sock->recvable != NULL) {
+ nni_pollable_free(sock->recvable);
+ }
+ nni_mtx_fini(&sock->lk);
+ NNI_FREE_STRUCT(sock);
+}
+
+static int
+sub0_sock_init(void **sp, nni_sock *nsock)
+{
+ sub0_sock *sock;
+ int rv;
+
+ NNI_ARG_UNUSED(nsock);
+
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
+ nni_mtx_init(&sock->lk);
+ sock->recvbuflen = SUB0_DEFAULT_QLEN;
+
+ if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) ||
+ ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
+ sub0_sock_fini(sock);
+ return (rv);
+ }
+
+ *sp = sock;
+ return (0);
}
static void
@@ -93,7 +260,8 @@ sub0_sock_open(void *arg)
static void
sub0_sock_close(void *arg)
{
- NNI_ARG_UNUSED(arg);
+ sub0_sock *sock = arg;
+ sub0_ctx_close(sock->ctx);
}
static void
@@ -155,191 +323,302 @@ sub0_pipe_close(void *arg)
nni_aio_close(p->aio_recv);
}
+static bool
+sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
+{
+ sub0_topic *topic;
+
+ // This is a naive and trivial matcher. Replace with a real
+ // patricia trie later.
+ NNI_LIST_FOREACH (&ctx->topics, topic) {
+ if (len < topic->len) {
+ continue;
+ }
+ if ((topic->len == 0) ||
+ (memcmp(topic->buf, body, topic->len) == 0)) {
+ return (true);
+ }
+ }
+ return (false);
+}
+
static void
sub0_recv_cb(void *arg)
{
- sub0_pipe *p = arg;
- sub0_sock *s = p->sub;
- nni_msgq * urq = s->urq;
+ sub0_pipe *p = arg;
+ sub0_sock *sock = p->sub;
+ sub0_ctx * ctx;
nni_msg * msg;
+ size_t len;
+ uint8_t * body;
+ nni_list finish;
+ nng_aio * aio;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
+ nni_aio_list_init(&finish);
+
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- switch (nni_msgq_tryput(urq, msg)) {
- case 0:
- break;
- case NNG_EAGAIN:
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
+
+ nni_mtx_lock(&sock->lk);
+ // Go through all contexts. We will try to send up.
+ NNI_LIST_FOREACH (&sock->ctxs, ctx) {
+ nni_msg *dup;
+
+ if (nni_lmq_full(&ctx->lmq)) {
+ // Cannot deliver here, as receive buffer is full.
+ continue;
+ }
+
+ if (!sub0_matches(ctx, body, len)) {
+ continue;
+ }
+
+ // Special optimization (for the case where only one context),
+ // including when no contexts are in use, we avoid duplication.
+ if (ctx == nni_list_last(&sock->ctxs)) {
+ dup = msg;
+ msg = NULL;
+ } else if (nni_msg_dup(&dup, msg) != 0) {
+ continue; // TODO: Bump a stat!
+ }
+
+ if (!nni_list_empty(&ctx->raios)) {
+ nni_aio *aio = nni_list_first(&ctx->raios);
+ nni_list_remove(&ctx->raios, aio);
+ nni_aio_set_msg(aio, dup);
+
+ // Save for synchronous completion
+ nni_list_append(&finish, aio);
+ } else {
+ (void) nni_lmq_putq(&ctx->lmq, dup);
+ }
+ }
+ nni_mtx_unlock(&sock->lk);
+
+ while ((aio = nni_list_first(&finish)) != NULL) {
+ nni_list_remove(&finish, aio);
+ nni_aio_finish_synch(aio, 0, len);
+ }
+
+ // We will toss the message if we didn't use it when delivering to
+ // the very last context.
+ if (msg != NULL) {
nni_msg_free(msg);
- break;
- default:
- // Any other error we stop the pipe for. It's probably
- // NNG_ECLOSED anyway.
- nng_msg_free(msg);
- nni_pipe_close(p->pipe);
- return;
}
+
nni_pipe_recv(p->pipe, p->aio_recv);
}
+static int
+sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ size_t val;
+ nni_mtx_lock(&sock->lk);
+ val = nni_lmq_cap(&ctx->lmq);
+ nni_mtx_unlock(&sock->lk);
+
+ return (nni_copyout_size(val, buf, szp, t));
+}
+
+static int
+sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ size_t val;
+ int rv;
+
+ if ((rv = nni_copyin_size(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&sock->lk);
+ if ((rv = nni_lmq_resize(&ctx->lmq, val)) != 0) {
+ nni_mtx_unlock(&sock->lk);
+ return (rv);
+ }
+
+ // If we change the socket, then this will change the queue for
+ // any new contexts. (Previously constructed contexts are unaffected.)
+ if (sock->ctx == ctx) {
+ sock->recvbuflen = val;
+ }
+ nni_mtx_unlock(&sock->lk);
+ return (0);
+}
+
// For now we maintain subscriptions on a sorted linked list. As we do not
// expect to have huge numbers of subscriptions, and as the operation is
// really O(n), we think this is acceptable. In the future we might decide
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub0_subscribe(void *arg, const void *buf, size_t sz, nni_opt_type t)
+sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_sock * s = arg;
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
sub0_topic *topic;
sub0_topic *newtopic;
NNI_ARG_UNUSED(t);
- nni_mtx_lock(&s->lk);
- NNI_LIST_FOREACH (&s->topics, topic) {
- int rv;
-
- if (topic->len >= sz) {
- rv = memcmp(topic->buf, buf, sz);
- } else {
- rv = memcmp(topic->buf, buf, topic->len);
+ nni_mtx_lock(&sock->lk);
+ NNI_LIST_FOREACH (&ctx->topics, topic) {
+ if (topic->len != sz) {
+ continue;
}
- if (rv == 0) {
- if (topic->len == sz) {
- // Already inserted.
- nni_mtx_unlock(&s->lk);
- return (0);
- }
- if (topic->len > sz) {
- break;
- }
- } else if (rv > 0) {
- break;
+ if (memcmp(topic->buf, buf, sz) == 0) {
+ // Already have it.
+ nni_mtx_unlock(&sock->lk);
+ return (0);
}
}
-
if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) {
- nni_mtx_unlock(&s->lk);
+ nni_mtx_unlock(&sock->lk);
return (NNG_ENOMEM);
}
- if (sz > 0) {
- if ((newtopic->buf = nni_alloc(sz)) == NULL) {
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOMEM);
- }
- } else {
- newtopic->buf = NULL;
+ if ((sz > 0) && ((newtopic->buf = nni_alloc(sz)) == NULL)) {
+ nni_mtx_unlock(&sock->lk);
+ NNI_FREE_STRUCT(newtopic);
+ return (NNG_ENOMEM);
}
- NNI_LIST_NODE_INIT(&newtopic->node);
- newtopic->len = sz;
memcpy(newtopic->buf, buf, sz);
- if (topic != NULL) {
- nni_list_insert_before(&s->topics, newtopic, topic);
- } else {
- nni_list_append(&s->topics, newtopic);
- }
- nni_mtx_unlock(&s->lk);
+ newtopic->len = sz;
+ nni_list_append(&ctx->topics, newtopic);
+ nni_mtx_unlock(&sock->lk);
return (0);
}
static int
-sub0_unsubscribe(void *arg, const void *buf, size_t sz, nni_opt_type t)
+sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_sock * s = arg;
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
sub0_topic *topic;
- int rv;
+ size_t len;
NNI_ARG_UNUSED(t);
- nni_mtx_lock(&s->lk);
- NNI_LIST_FOREACH (&s->topics, topic) {
- if (topic->len >= sz) {
- rv = memcmp(topic->buf, buf, sz);
- } else {
- rv = memcmp(topic->buf, buf, topic->len);
+ nni_mtx_lock(&sock->lk);
+ NNI_LIST_FOREACH (&ctx->topics, topic) {
+ if (topic->len != sz) {
+ continue;
}
- if (rv == 0) {
- if (topic->len == sz) {
- nni_list_remove(&s->topics, topic);
- nni_mtx_unlock(&s->lk);
- nni_free(topic->buf, topic->len);
- NNI_FREE_STRUCT(topic);
- return (0);
- }
- if (topic->len > sz) {
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOENT);
- }
+ if (memcmp(topic->buf, buf, sz) == 0) {
+ // Matched!
+ break;
}
- if (rv > 0) {
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOENT);
+ }
+ if (topic == NULL) {
+ nni_mtx_unlock(&sock->lk);
+ return (NNG_ENOENT);
+ }
+ nni_list_remove(&ctx->topics, topic);
+
+ // Now we need to make sure that any messages that are waiting still
+ // match the subscription. We basically just run through the queue
+ // and requeue those messages we need.
+ len = nni_lmq_len(&ctx->lmq);
+ for (size_t i = 0; i < len; i++) {
+ nni_msg *msg;
+
+ (void) nni_lmq_getq(&ctx->lmq, &msg);
+ if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) {
+ (void) nni_lmq_putq(&ctx->lmq, msg);
+ } else {
+ nni_msg_free(msg);
}
}
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOENT);
+ nni_mtx_unlock(&sock->lk);
+
+ nni_free(topic->buf, topic->len);
+ NNI_FREE_STRUCT(topic);
+ return (0);
}
+static nni_option sub0_ctx_options[] = {
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = sub0_ctx_get_recvbuf,
+ .o_set = sub0_ctx_set_recvbuf,
+ },
+ {
+ .o_name = NNG_OPT_SUB_SUBSCRIBE,
+ .o_set = sub0_ctx_subscribe,
+ },
+ {
+ .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
+ .o_set = sub0_ctx_unsubscribe,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
static void
sub0_sock_send(void *arg, nni_aio *aio)
{
NNI_ARG_UNUSED(arg);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
}
static void
sub0_sock_recv(void *arg, nni_aio *aio)
{
- sub0_sock *s = arg;
+ sub0_sock *sock = arg;
- nni_msgq_aio_get(s->urq, aio);
+ sub0_ctx_recv(sock->ctx, aio);
}
-static nni_msg *
-sub0_sock_filter(void *arg, nni_msg *msg)
+static int
+sub0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- sub0_sock * s = arg;
- sub0_topic *topic;
- char * body;
- size_t len;
- int match;
-
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ sub0_sock *sock = arg;
+ int rv;
+ int fd;
- match = 0;
-
- nni_mtx_lock(&s->lk);
-
- // Check to see if the message matches one of our subscriptions.
- NNI_LIST_FOREACH (&s->topics, topic) {
- if (len >= topic->len) {
- int rv = memcmp(topic->buf, body, topic->len);
- if (rv == 0) {
- // Matched!
- match = 1;
- break;
- }
- if (rv > 0) {
- match = 0;
- break;
- }
- } else if (memcmp(topic->buf, body, len) >= 0) {
- match = 0;
- break;
- }
- }
- nni_mtx_unlock(&s->lk);
- if (!match) {
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_pollable_getfd(sock->recvable, &fd)) != 0) {
+ return (rv);
}
- return (msg);
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_get_recvbuf(sock->ctx, buf, szp, t));
+}
+
+static int
+sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_set_recvbuf(sock->ctx, buf, sz, t));
+}
+
+static int
+sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_subscribe(sock->ctx, buf, sz, t));
+}
+
+static int
+sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t));
}
// This is the global protocol structure -- our linkage to the core.
@@ -352,14 +631,31 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
.pipe_stop = sub0_pipe_stop,
};
+static nni_proto_ctx_ops sub0_ctx_ops = {
+ .ctx_init = sub0_ctx_init,
+ .ctx_fini = sub0_ctx_fini,
+ .ctx_send = sub0_ctx_send,
+ .ctx_recv = sub0_ctx_recv,
+ .ctx_options = sub0_ctx_options,
+};
+
static nni_option sub0_sock_options[] = {
{
.o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_subscribe,
+ .o_set = sub0_sock_subscribe,
},
{
.o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_unsubscribe,
+ .o_set = sub0_sock_unsubscribe,
+ },
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = sub0_sock_get_recvfd,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = sub0_sock_get_recvbuf,
+ .o_set = sub0_sock_set_recvbuf,
},
// terminate list
{
@@ -374,18 +670,6 @@ static nni_proto_sock_ops sub0_sock_ops = {
.sock_close = sub0_sock_close,
.sock_send = sub0_sock_send,
.sock_recv = sub0_sock_recv,
- .sock_filter = sub0_sock_filter,
- .sock_options = sub0_sock_options,
-};
-
-static nni_proto_sock_ops sub0_sock_ops_raw = {
- .sock_init = sub0_sock_init,
- .sock_fini = sub0_sock_fini,
- .sock_open = sub0_sock_open,
- .sock_close = sub0_sock_close,
- .sock_send = sub0_sock_send,
- .sock_recv = sub0_sock_recv,
- .sock_filter = NULL, // raw does not filter
.sock_options = sub0_sock_options,
};
@@ -393,18 +677,10 @@ static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
.proto_peer = { NNI_PROTO_PUB_V0, "pub" },
- .proto_flags = NNI_PROTO_FLAG_RCV,
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &sub0_sock_ops,
.proto_pipe_ops = &sub0_pipe_ops,
-};
-
-static nni_proto sub0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SUB_V0, "sub" },
- .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
- .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &sub0_sock_ops_raw,
- .proto_pipe_ops = &sub0_pipe_ops,
+ .proto_ctx_ops = &sub0_ctx_ops,
};
int
@@ -412,9 +688,3 @@ nng_sub0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &sub0_proto));
}
-
-int
-nng_sub0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &sub0_proto_raw));
-}
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
new file mode 100644
index 00000000..b334bf87
--- /dev/null
+++ b/src/protocol/pubsub0/xsub.c
@@ -0,0 +1,230 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "nng/protocol/pubsub0/sub.h"
+
+// Subscriber protocol. The SUB protocol receives messages sent to
+// it from publishers, and filters out those it is not interested in,
+// only passing up ones that match known subscriptions.
+
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
+
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
+
+typedef struct xsub0_pipe xsub0_pipe;
+typedef struct xsub0_sock xsub0_sock;
+
+static void xsub0_recv_cb(void *);
+static void xsub0_pipe_fini(void *);
+
+// xsub0_sock is our per-socket protocol private structure.
+struct xsub0_sock {
+ nni_msgq *urq;
+ nni_mtx lk;
+};
+
+// sub0_pipe is our per-pipe protocol private structure.
+struct xsub0_pipe {
+ nni_pipe * pipe;
+ xsub0_sock *sub;
+ nni_aio * aio_recv;
+};
+
+static int
+xsub0_sock_init(void **sp, nni_sock *sock)
+{
+ xsub0_sock *s;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&s->lk);
+
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
+ return (0);
+}
+
+static void
+xsub0_sock_fini(void *arg)
+{
+ xsub0_sock *s = arg;
+ nni_mtx_fini(&s->lk);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+xsub0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xsub0_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xsub0_pipe_stop(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_recv);
+}
+
+static void
+xsub0_pipe_fini(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_recv);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+{
+ xsub0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
+ xsub0_pipe_fini(p);
+ return (rv);
+ }
+
+ p->pipe = pipe;
+ p->sub = s;
+ *pp = p;
+ return (0);
+}
+
+static int
+xsub0_pipe_start(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ return (0);
+}
+
+static void
+xsub0_pipe_close(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ nni_aio_close(p->aio_recv);
+}
+
+static void
+xsub0_recv_cb(void *arg)
+{
+ xsub0_pipe *p = arg;
+ xsub0_sock *s = p->sub;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+
+ switch (nni_msgq_tryput(urq, msg)) {
+ case 0:
+ break;
+ case NNG_EAGAIN:
+ nni_msg_free(msg);
+ break;
+ default:
+ // Any other error we stop the pipe for. It's probably
+ // NNG_ECLOSED anyway.
+ nng_msg_free(msg);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_pipe_recv(p->pipe, p->aio_recv);
+}
+
+static void
+xsub0_sock_send(void *arg, nni_aio *aio)
+{
+ NNI_ARG_UNUSED(arg);
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+xsub0_sock_recv(void *arg, nni_aio *aio)
+{
+ xsub0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe_ops xsub0_pipe_ops = {
+ .pipe_init = xsub0_pipe_init,
+ .pipe_fini = xsub0_pipe_fini,
+ .pipe_start = xsub0_pipe_start,
+ .pipe_close = xsub0_pipe_close,
+ .pipe_stop = xsub0_pipe_stop,
+};
+
+static nni_option xsub0_sock_options[] = {
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xsub0_sock_ops = {
+ .sock_init = xsub0_sock_init,
+ .sock_fini = xsub0_sock_fini,
+ .sock_open = xsub0_sock_open,
+ .sock_close = xsub0_sock_close,
+ .sock_send = xsub0_sock_send,
+ .sock_recv = xsub0_sock_recv,
+ .sock_options = xsub0_sock_options,
+};
+
+static nni_proto xsub0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_SUB_V0, "sub" },
+ .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xsub0_sock_ops,
+ .proto_pipe_ops = &xsub0_pipe_ops,
+};
+
+int
+nng_sub0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xsub0_proto));
+}