aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/lmq.c158
-rw-r--r--src/core/lmq.h39
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/protocol/pubsub0/CMakeLists.txt7
-rw-r--r--src/protocol/pubsub0/pub.c282
-rw-r--r--src/protocol/pubsub0/sub.c606
-rw-r--r--src/protocol/pubsub0/xsub.c230
8 files changed, 1043 insertions, 282 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 3ee94e18..b828537e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -48,6 +48,8 @@ set (NNG_SRCS
core/list.h
core/listener.c
core/listener.h
+ core/lmq.c
+ core/lmq.h
core/message.c
core/message.h
core/msgqueue.c
diff --git a/src/core/lmq.c b/src/core/lmq.c
new file mode 100644
index 00000000..9772e78e
--- /dev/null
+++ b/src/core/lmq.c
@@ -0,0 +1,158 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "nng_impl.h"
+
+// Light-weight message queue. These are derived from our heavy-weight
+// message queues, but are less "featureful", but more useful for
+// performance sensitive contexts. Locking must be done by the caller.
+
+int
+nni_lmq_init(nni_lmq *lmq, size_t cap)
+{
+ size_t alloc;
+
+ // We prefer alloc to a power of 2, this allows us to do modulo
+ // operations as a power of two, for efficiency. It does possibly
+ // waste some space, but never more than 2x. Consumers should try
+ // for powers of two if they are concerned about efficiency.
+ alloc = 1;
+ while (alloc < cap) {
+ alloc *= 2;
+ }
+ if ((lmq->lmq_msgs = nni_zalloc(sizeof(nng_msg *) * alloc)) == NULL) {
+ NNI_FREE_STRUCT(lmq);
+ return (NNG_ENOMEM);
+ }
+ lmq->lmq_cap = cap;
+ lmq->lmq_alloc = alloc;
+ lmq->lmq_mask = (cap - 1);
+ lmq->lmq_len = 0;
+ lmq->lmq_get = 0;
+ lmq->lmq_put = 0;
+
+ return (0);
+}
+
+void
+nni_lmq_fini(nni_lmq *lmq)
+{
+ if (lmq == NULL) {
+ return;
+ }
+
+ /* Free any orphaned messages. */
+ while (lmq->lmq_len > 0) {
+ nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ nni_msg_free(msg);
+ }
+
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+}
+
+void
+nni_lmq_flush(nni_lmq *lmq)
+{
+ while (lmq->lmq_len > 0) {
+ nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ nni_msg_free(msg);
+ }
+}
+
+size_t
+nni_lmq_len(nni_lmq *lmq)
+{
+ return (lmq->lmq_len);
+}
+
+size_t
+nni_lmq_cap(nni_lmq *lmq)
+{
+ return (lmq->lmq_cap);
+}
+
+bool
+nni_lmq_full(nni_lmq *lmq)
+{
+ return (lmq->lmq_len >= lmq->lmq_cap);
+}
+
+bool
+nni_lmq_empty(nni_lmq *lmq)
+{
+ return (lmq->lmq_len == 0);
+}
+
+int
+nni_lmq_putq(nni_lmq *lmq, nng_msg *msg)
+{
+ if (lmq->lmq_len >= lmq->lmq_cap) {
+ return (NNG_EAGAIN);
+ }
+ lmq->lmq_msgs[lmq->lmq_put++] = msg;
+ lmq->lmq_len++;
+ lmq->lmq_put &= lmq->lmq_mask;
+ return (0);
+}
+
+int
+nni_lmq_getq(nni_lmq *lmq, nng_msg **msgp)
+{
+ nng_msg *msg;
+ if (lmq->lmq_len == 0) {
+ return (NNG_EAGAIN);
+ }
+ msg = lmq->lmq_msgs[lmq->lmq_get++];
+ lmq->lmq_get &= lmq->lmq_mask;
+ lmq->lmq_len--;
+ *msgp = msg;
+ return (0);
+}
+
+int
+nni_lmq_resize(nni_lmq *lmq, size_t cap)
+{
+ nng_msg * msg;
+ nng_msg **newq;
+ size_t alloc;
+ size_t len;
+
+ alloc = 1;
+ while (alloc < cap) {
+ alloc *= 2;
+ }
+
+ newq = nni_alloc(sizeof(nng_msg *) * alloc);
+ if (newq == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ len = 0;
+ while ((len < cap) && (nni_lmq_getq(lmq, &msg) == 0)) {
+ newq[len++] = msg;
+ }
+
+ // Flush anything left over.
+ nni_lmq_flush(lmq);
+
+ nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
+ lmq->lmq_msgs = newq;
+ lmq->lmq_cap = cap;
+ lmq->lmq_alloc = alloc;
+ lmq->lmq_mask = alloc - 1;
+ lmq->lmq_len = len;
+ lmq->lmq_put = len;
+ lmq->lmq_get = 0;
+
+ return (0);
+}
diff --git a/src/core/lmq.h b/src/core/lmq.h
new file mode 100644
index 00000000..0a64c984
--- /dev/null
+++ b/src/core/lmq.h
@@ -0,0 +1,39 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_LMQ_H
+#define CORE_LMQ_H
+
+#include "nng_impl.h"
+
+// nni_lmq is a very lightweight message queue. Defining it this way allows
+// us to share some common code. Locking must be supplied by the caller.
+// For performance reasons, this is allocated inline.
+typedef struct nni_lmq {
+ size_t lmq_cap;
+ size_t lmq_alloc; // alloc is cap, rounded up to power of 2
+ size_t lmq_mask;
+ size_t lmq_len;
+ size_t lmq_get;
+ size_t lmq_put;
+ nng_msg **lmq_msgs;
+} nni_lmq;
+
+extern int nni_lmq_init(nni_lmq *, size_t);
+extern void nni_lmq_fini(nni_lmq *);
+extern void nni_lmq_flush(nni_lmq *);
+extern size_t nni_lmq_len(nni_lmq *);
+extern size_t nni_lmq_cap(nni_lmq *);
+extern int nni_lmq_putq(nni_lmq *, nng_msg *);
+extern int nni_lmq_getq(nni_lmq *, nng_msg **);
+extern int nni_lmq_resize(nni_lmq *, size_t);
+extern bool nni_lmq_full(nni_lmq *);
+extern bool nni_lmq_empty(nni_lmq *);
+
+#endif // CORE_LMQ_H
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 07fe44f5..0dcc976f 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -33,6 +33,7 @@
#include "core/idhash.h"
#include "core/init.h"
#include "core/list.h"
+#include "core/lmq.h"
#include "core/message.h"
#include "core/msgqueue.h"
#include "core/options.h"
diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt
index 1ea0cacf..12872063 100644
--- a/src/protocol/pubsub0/CMakeLists.txt
+++ b/src/protocol/pubsub0/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -26,7 +26,10 @@ endif()
if (NNG_PROTO_SUB0)
list(APPEND _DEFS -DNNG_HAVE_SUB0)
- list(APPEND _SRCS protocol/pubsub0/sub.c ${PROJECT_SOURCE_DIR}/include/nng/protocol/pubsub0/sub.h)
+ list(APPEND _SRCS
+ protocol/pubsub0/sub.c
+ protocol/pubsub0/xsub.c
+ ${PROJECT_SOURCE_DIR}/include/nng/protocol/pubsub0/sub.h)
endif()
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 2567a5b6..bec0763f 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -32,25 +32,25 @@ typedef struct pub0_sock pub0_sock;
static void pub0_pipe_recv_cb(void *);
static void pub0_pipe_send_cb(void *);
-static void pub0_pipe_getq_cb(void *);
-static void pub0_sock_getq_cb(void *);
static void pub0_sock_fini(void *);
static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
- nni_msgq *uwq;
- nni_aio * aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_list pipes;
+ nni_mtx mtx;
+ bool closed;
+ size_t sendbuf;
+ nni_pollable *sendable;
};
// pub0_pipe is our per-pipe protocol private structure.
struct pub0_pipe {
nni_pipe * pipe;
pub0_sock * pub;
- nni_msgq * sendq;
- nni_aio * aio_getq;
+ nni_lmq sendq;
+ bool closed;
+ bool busy;
nni_aio * aio_send;
nni_aio * aio_recv;
nni_list_node node;
@@ -61,48 +61,46 @@ pub0_sock_fini(void *arg)
{
pub0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_pollable_free(s->sendable);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
}
static int
-pub0_sock_init(void **sp, nni_sock *sock)
+pub0_sock_init(void **sp, nni_sock *nsock)
{
- pub0_sock *s;
+ pub0_sock *sock;
int rv;
+ NNI_ARG_UNUSED(nsock);
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) {
- pub0_sock_fini(s);
+ if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
+ NNI_FREE_STRUCT(sock);
return (rv);
}
-
- NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
-
- s->uwq = nni_sock_sendq(sock);
-
- *sp = s;
+ nni_mtx_init(&sock->mtx);
+ NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
+ sock->sendbuf = 16; // fairly arbitrary
+ *sp = sock;
return (0);
}
static void
pub0_sock_open(void *arg)
{
- pub0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
pub0_sock_close(void *arg)
{
- pub0_sock *s = arg;
+ pub0_sock *sock = arg;
- nni_aio_close(s->aio_getq);
+ nni_mtx_lock(&sock->mtx);
+ sock->closed = true;
+ nni_mtx_unlock(&sock->mtx);
}
static void
@@ -110,7 +108,6 @@ pub0_pipe_stop(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
}
@@ -120,10 +117,9 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_msgq_fini(p->sendq);
+ nni_lmq_fini(&p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -131,15 +127,20 @@ static int
pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
pub0_pipe *p;
+ pub0_sock *sock = s;
int rv;
+ size_t len;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_lock(&sock->mtx);
+ len = sock->sendbuf;
+ nni_mtx_unlock(&sock->mtx);
+
// XXX: consider making this depth tunable
- if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) ||
+ if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
@@ -147,6 +148,7 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
return (rv);
}
+ p->busy = false;
p->pipe = pipe;
p->pub = s;
*pp = p;
@@ -156,19 +158,18 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
static int
pub0_pipe_start(void *arg)
{
- pub0_pipe *p = arg;
- pub0_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&s->mtx);
- nni_list_append(&s->pipes, p);
- nni_mtx_unlock(&s->mtx);
+ nni_mtx_lock(&sock->mtx);
+ nni_list_append(&sock->pipes, p);
+ nni_mtx_unlock(&sock->mtx);
- // Start the receiver and the queue reader.
+ // Start the receiver.
nni_pipe_recv(p->pipe, p->aio_recv);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
return (0);
}
@@ -176,62 +177,20 @@ pub0_pipe_start(void *arg)
static void
pub0_pipe_close(void *arg)
{
- pub0_pipe *p = arg;
- pub0_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
- nni_aio_close(p->aio_getq);
nni_aio_close(p->aio_send);
nni_aio_close(p->aio_recv);
- nni_msgq_close(p->sendq);
+ nni_mtx_lock(&sock->mtx);
+ p->closed = true;
+ nni_lmq_flush(&p->sendq);
- nni_mtx_lock(&s->mtx);
- if (nni_list_active(&s->pipes, p)) {
- nni_list_remove(&s->pipes, p);
+ if (nni_list_active(&sock->pipes, p)) {
+ nni_list_remove(&sock->pipes, p);
}
- nni_mtx_unlock(&s->mtx);
-}
-
-static void
-pub0_sock_getq_cb(void *arg)
-{
- pub0_sock *s = arg;
- nni_msgq * uwq = s->uwq;
- nni_msg * msg, *dup;
-
- pub0_pipe *p;
- pub0_pipe *last;
- int rv;
-
- if (nni_aio_result(s->aio_getq) != 0) {
- return;
- }
-
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
-
- nni_mtx_lock(&s->mtx);
- last = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != last) {
- rv = nni_msg_dup(&dup, msg);
- if (rv != 0) {
- continue;
- }
- } else {
- dup = msg;
- }
- if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) {
- nni_msg_free(dup);
- }
- }
- nni_mtx_unlock(&s->mtx);
-
- if (last == NULL) {
- nni_msg_free(msg);
- }
-
- nni_msgq_aio_get(uwq, s->aio_getq);
+ nni_mtx_unlock(&sock->mtx);
}
static void
@@ -244,31 +203,18 @@ pub0_pipe_recv_cb(void *arg)
return;
}
+ // We should never get any messages. If we do we just dicard them.
nni_msg_free(nni_aio_get_msg(p->aio_recv));
nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-pub0_pipe_getq_cb(void *arg)
-{
- pub0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_close(p->pipe);
- return;
- }
-
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
-
- nni_pipe_send(p->pipe, p->aio_send);
-}
-
-static void
pub0_pipe_send_cb(void *arg)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
+ nni_msg * msg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -277,23 +223,126 @@ pub0_pipe_send_cb(void *arg)
return;
}
- nni_aio_set_msg(p->aio_send, NULL);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->closed || p->closed) {
+ nni_mtx_unlock(&sock->mtx);
+ return;
+ }
+ if (nni_lmq_getq(&p->sendq, &msg) == 0) {
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
+ } else {
+ p->busy = false;
+ }
+ nni_mtx_unlock(&sock->mtx);
}
static void
pub0_sock_recv(void *arg, nni_aio *aio)
{
NNI_ARG_UNUSED(arg);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
}
static void
pub0_sock_send(void *arg, nni_aio *aio)
{
- pub0_sock *s = arg;
+ pub0_sock *sock = arg;
+ pub0_pipe *p;
+ nng_msg * msg;
+ nng_msg * dup;
+ size_t len;
+
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->closed) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ NNI_LIST_FOREACH (&sock->pipes, p) {
+ if (p->closed) {
+ continue;
+ }
+ if (nni_lmq_full(&p->sendq)) {
+ continue;
+ }
+ if (p == nni_list_last(&sock->pipes)) {
+ dup = msg;
+ msg = NULL;
+ } else if (nni_msg_dup(&dup, msg) != 0) {
+ continue;
+ }
+ if (p->busy) {
+ nni_lmq_putq(&p->sendq, dup);
+ } else {
+ p->busy = true;
+ nni_aio_set_msg(p->aio_send, dup);
+ nni_pipe_send(p->pipe, p->aio_send);
+ }
+ }
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish(aio, 0, len);
+}
+
+static int
+pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ pub0_sock *sock = arg;
+ int fd;
+ int rv;
+ nni_mtx_lock(&sock->mtx);
+ // PUB sockets are *always* sendable.
+ nni_pollable_raise(sock->sendable);
+ rv = nni_pollable_getfd(sock->sendable, &fd);
+ nni_mtx_unlock(&sock->mtx);
+
+ if (rv == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
+}
+
+static int
+pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pub0_sock *sock = arg;
+ pub0_pipe *p;
+ size_t val;
+ int rv;
+
+ if ((rv = nni_copyin_size(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
- nni_msgq_aio_put(s->uwq, aio);
+ nni_mtx_lock(&sock->mtx);
+ sock->sendbuf = val;
+ NNI_LIST_FOREACH (&sock->pipes, p) {
+ // If we fail part way thru (should only be ENOMEM), we
+ // stop short. The others would likely fail for ENOMEM as
+ // well anyway. There is a weird effect here where the
+ // buffers may have been set for *some* of the pipes, but
+ // we have no way to correct, or even report, partial failure.
+ if ((rv = nni_lmq_resize(&p->sendq, val)) != 0) {
+ break;
+ }
+ }
+ nni_mtx_unlock(&sock->mtx);
+ return (rv);
+}
+
+static int
+pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ pub0_sock *sock = arg;
+ size_t val;
+ nni_mtx_lock(&sock->mtx);
+ val = sock->sendbuf;
+ nni_mtx_unlock(&sock->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
}
static nni_proto_pipe_ops pub0_pipe_ops = {
@@ -307,6 +356,15 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
static nni_option pub0_sock_options[] = {
// terminate list
{
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = pub0_sock_get_sendfd,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = pub0_sock_get_sendbuf,
+ .o_set = pub0_sock_set_sendbuf,
+ },
+ {
.o_name = NULL,
},
};
@@ -325,7 +383,7 @@ static nni_proto pub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUB_V0, "pub" },
.proto_peer = { NNI_PROTO_SUB_V0, "sub" },
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &pub0_sock_ops,
.proto_pipe_ops = &pub0_pipe_ops,
};
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 5240fc83..fefd79a9 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -26,8 +26,12 @@
#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
#endif
+// By default we accept 128 messages.
+#define SUB0_DEFAULT_QLEN 128
+
typedef struct sub0_pipe sub0_pipe;
typedef struct sub0_sock sub0_sock;
+typedef struct sub0_ctx sub0_ctx;
typedef struct sub0_topic sub0_topic;
static void sub0_recv_cb(void *);
@@ -39,11 +43,32 @@ struct sub0_topic {
void * buf;
};
+// sub0_ctx is a context for a SUB socket. The advantage of contexts is
+// that different contexts can maintain different subscriptions.
+struct sub0_ctx {
+ nni_list_node node;
+ sub0_sock * sock;
+ nni_list topics; // XXX: Consider replacing with patricia trie
+ nni_list raios; // sub context could have multiple pending recvs
+ bool closed;
+ nni_lmq lmq;
+
+#if 0
+ nni_msg **recvq;
+ size_t recvqcap;
+ size_t recvqlen;
+ size_t recvqget;
+ size_t recvqput;
+#endif
+};
+
// sub0_sock is our per-socket protocol private structure.
struct sub0_sock {
- nni_list topics;
- nni_msgq *urq;
- nni_mtx lk;
+ nni_pollable *recvable;
+ sub0_ctx * ctx; // default context
+ nni_list ctxs; // all contexts
+ size_t recvbuflen;
+ nni_mtx lk;
};
// sub0_pipe is our per-pipe protocol private structure.
@@ -53,35 +78,177 @@ struct sub0_pipe {
nni_aio * aio_recv;
};
+static void
+sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ nni_mtx_lock(&sock->lk);
+ if (nni_list_active(&ctx->raios, aio)) {
+ nni_list_remove(&ctx->raios, aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&sock->lk);
+}
+
+static void
+sub0_ctx_recv(void *arg, nni_aio *aio)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ nni_msg * msg;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&sock->lk);
+
+ if (ctx->closed) {
+ nni_mtx_unlock(&sock->lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+
+ if (nni_lmq_empty(&ctx->lmq)) {
+ int rv;
+ if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) {
+ nni_mtx_unlock(&sock->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&ctx->raios, aio);
+ nni_mtx_unlock(&sock->lk);
+ return;
+ }
+
+ (void) nni_lmq_getq(&ctx->lmq, &msg);
+
+ if (nni_lmq_empty(&ctx->lmq) && (ctx == sock->ctx)) {
+ nni_pollable_clear(sock->recvable);
+ }
+ nni_aio_set_msg(aio, msg);
+ nni_mtx_unlock(&sock->lk);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+}
+
+static void
+sub0_ctx_send(void *arg, nni_aio *aio)
+{
+ NNI_ARG_UNUSED(arg);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
+}
+
+static void
+sub0_ctx_close(void *arg)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ nni_aio * aio;
+
+ nni_mtx_lock(&sock->lk);
+ ctx->closed = true;
+ while ((aio = nni_list_first(&ctx->raios)) != NULL) {
+ nni_list_remove(&ctx->raios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&sock->lk);
+}
+
+static void
+sub0_ctx_fini(void *arg)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
+ sub0_topic *topic;
+
+ sub0_ctx_close(ctx);
+
+ nni_mtx_lock(&sock->lk);
+ nni_list_remove(&sock->ctxs, ctx);
+ nni_mtx_unlock(&sock->lk);
+
+ while ((topic = nni_list_first(&ctx->topics)) != 0) {
+ nni_list_remove(&ctx->topics, topic);
+ nni_free(topic->buf, topic->len);
+ NNI_FREE_STRUCT(topic);
+ }
+
+ nni_lmq_fini(&ctx->lmq);
+ NNI_FREE_STRUCT(ctx);
+}
+
static int
-sub0_sock_init(void **sp, nni_sock *sock)
+sub0_ctx_init(void **ctxp, void *sarg)
{
- sub0_sock *s;
+ sub0_sock *sock = sarg;
+ sub0_ctx * ctx;
+ size_t len;
+ int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&s->lk);
- NNI_LIST_INIT(&s->topics, sub0_topic, node);
- s->urq = nni_sock_recvq(sock);
- *sp = s;
+ nni_mtx_lock(&sock->lk);
+ len = sock->recvbuflen;
+
+ if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
+ return (rv);
+ }
+
+ nni_aio_list_init(&ctx->raios);
+ NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
+
+ ctx->sock = sock;
+ *ctxp = ctx;
+
+ nni_list_append(&sock->ctxs, ctx);
+ nni_mtx_unlock(&sock->lk);
+
return (0);
}
static void
sub0_sock_fini(void *arg)
{
- sub0_sock * s = arg;
- sub0_topic *topic;
+ sub0_sock *sock = arg;
- while ((topic = nni_list_first(&s->topics)) != NULL) {
- nni_list_remove(&s->topics, topic);
- nni_free(topic->buf, topic->len);
- NNI_FREE_STRUCT(topic);
+ if (sock->ctx != NULL) {
+ sub0_ctx_fini(sock->ctx);
}
- nni_mtx_fini(&s->lk);
- NNI_FREE_STRUCT(s);
+ if (sock->recvable != NULL) {
+ nni_pollable_free(sock->recvable);
+ }
+ nni_mtx_fini(&sock->lk);
+ NNI_FREE_STRUCT(sock);
+}
+
+static int
+sub0_sock_init(void **sp, nni_sock *nsock)
+{
+ sub0_sock *sock;
+ int rv;
+
+ NNI_ARG_UNUSED(nsock);
+
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
+ nni_mtx_init(&sock->lk);
+ sock->recvbuflen = SUB0_DEFAULT_QLEN;
+
+ if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) ||
+ ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
+ sub0_sock_fini(sock);
+ return (rv);
+ }
+
+ *sp = sock;
+ return (0);
}
static void
@@ -93,7 +260,8 @@ sub0_sock_open(void *arg)
static void
sub0_sock_close(void *arg)
{
- NNI_ARG_UNUSED(arg);
+ sub0_sock *sock = arg;
+ sub0_ctx_close(sock->ctx);
}
static void
@@ -155,191 +323,302 @@ sub0_pipe_close(void *arg)
nni_aio_close(p->aio_recv);
}
+static bool
+sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
+{
+ sub0_topic *topic;
+
+ // This is a naive and trivial matcher. Replace with a real
+ // patricia trie later.
+ NNI_LIST_FOREACH (&ctx->topics, topic) {
+ if (len < topic->len) {
+ continue;
+ }
+ if ((topic->len == 0) ||
+ (memcmp(topic->buf, body, topic->len) == 0)) {
+ return (true);
+ }
+ }
+ return (false);
+}
+
static void
sub0_recv_cb(void *arg)
{
- sub0_pipe *p = arg;
- sub0_sock *s = p->sub;
- nni_msgq * urq = s->urq;
+ sub0_pipe *p = arg;
+ sub0_sock *sock = p->sub;
+ sub0_ctx * ctx;
nni_msg * msg;
+ size_t len;
+ uint8_t * body;
+ nni_list finish;
+ nng_aio * aio;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
+ nni_aio_list_init(&finish);
+
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- switch (nni_msgq_tryput(urq, msg)) {
- case 0:
- break;
- case NNG_EAGAIN:
+ body = nni_msg_body(msg);
+ len = nni_msg_len(msg);
+
+ nni_mtx_lock(&sock->lk);
+ // Go through all contexts. We will try to send up.
+ NNI_LIST_FOREACH (&sock->ctxs, ctx) {
+ nni_msg *dup;
+
+ if (nni_lmq_full(&ctx->lmq)) {
+ // Cannot deliver here, as receive buffer is full.
+ continue;
+ }
+
+ if (!sub0_matches(ctx, body, len)) {
+ continue;
+ }
+
+ // Special optimization (for the case where only one context),
+ // including when no contexts are in use, we avoid duplication.
+ if (ctx == nni_list_last(&sock->ctxs)) {
+ dup = msg;
+ msg = NULL;
+ } else if (nni_msg_dup(&dup, msg) != 0) {
+ continue; // TODO: Bump a stat!
+ }
+
+ if (!nni_list_empty(&ctx->raios)) {
+ nni_aio *aio = nni_list_first(&ctx->raios);
+ nni_list_remove(&ctx->raios, aio);
+ nni_aio_set_msg(aio, dup);
+
+ // Save for synchronous completion
+ nni_list_append(&finish, aio);
+ } else {
+ (void) nni_lmq_putq(&ctx->lmq, dup);
+ }
+ }
+ nni_mtx_unlock(&sock->lk);
+
+ while ((aio = nni_list_first(&finish)) != NULL) {
+ nni_list_remove(&finish, aio);
+ nni_aio_finish_synch(aio, 0, len);
+ }
+
+ // We will toss the message if we didn't use it when delivering to
+ // the very last context.
+ if (msg != NULL) {
nni_msg_free(msg);
- break;
- default:
- // Any other error we stop the pipe for. It's probably
- // NNG_ECLOSED anyway.
- nng_msg_free(msg);
- nni_pipe_close(p->pipe);
- return;
}
+
nni_pipe_recv(p->pipe, p->aio_recv);
}
+static int
+sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ size_t val;
+ nni_mtx_lock(&sock->lk);
+ val = nni_lmq_cap(&ctx->lmq);
+ nni_mtx_unlock(&sock->lk);
+
+ return (nni_copyout_size(val, buf, szp, t));
+}
+
+static int
+sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock *sock = ctx->sock;
+ size_t val;
+ int rv;
+
+ if ((rv = nni_copyin_size(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&sock->lk);
+ if ((rv = nni_lmq_resize(&ctx->lmq, val)) != 0) {
+ nni_mtx_unlock(&sock->lk);
+ return (rv);
+ }
+
+ // If we change the socket, then this will change the queue for
+ // any new contexts. (Previously constructed contexts are unaffected.)
+ if (sock->ctx == ctx) {
+ sock->recvbuflen = val;
+ }
+ nni_mtx_unlock(&sock->lk);
+ return (0);
+}
+
// For now we maintain subscriptions on a sorted linked list. As we do not
// expect to have huge numbers of subscriptions, and as the operation is
// really O(n), we think this is acceptable. In the future we might decide
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub0_subscribe(void *arg, const void *buf, size_t sz, nni_opt_type t)
+sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_sock * s = arg;
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
sub0_topic *topic;
sub0_topic *newtopic;
NNI_ARG_UNUSED(t);
- nni_mtx_lock(&s->lk);
- NNI_LIST_FOREACH (&s->topics, topic) {
- int rv;
-
- if (topic->len >= sz) {
- rv = memcmp(topic->buf, buf, sz);
- } else {
- rv = memcmp(topic->buf, buf, topic->len);
+ nni_mtx_lock(&sock->lk);
+ NNI_LIST_FOREACH (&ctx->topics, topic) {
+ if (topic->len != sz) {
+ continue;
}
- if (rv == 0) {
- if (topic->len == sz) {
- // Already inserted.
- nni_mtx_unlock(&s->lk);
- return (0);
- }
- if (topic->len > sz) {
- break;
- }
- } else if (rv > 0) {
- break;
+ if (memcmp(topic->buf, buf, sz) == 0) {
+ // Already have it.
+ nni_mtx_unlock(&sock->lk);
+ return (0);
}
}
-
if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) {
- nni_mtx_unlock(&s->lk);
+ nni_mtx_unlock(&sock->lk);
return (NNG_ENOMEM);
}
- if (sz > 0) {
- if ((newtopic->buf = nni_alloc(sz)) == NULL) {
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOMEM);
- }
- } else {
- newtopic->buf = NULL;
+ if ((sz > 0) && ((newtopic->buf = nni_alloc(sz)) == NULL)) {
+ nni_mtx_unlock(&sock->lk);
+ NNI_FREE_STRUCT(newtopic);
+ return (NNG_ENOMEM);
}
- NNI_LIST_NODE_INIT(&newtopic->node);
- newtopic->len = sz;
memcpy(newtopic->buf, buf, sz);
- if (topic != NULL) {
- nni_list_insert_before(&s->topics, newtopic, topic);
- } else {
- nni_list_append(&s->topics, newtopic);
- }
- nni_mtx_unlock(&s->lk);
+ newtopic->len = sz;
+ nni_list_append(&ctx->topics, newtopic);
+ nni_mtx_unlock(&sock->lk);
return (0);
}
static int
-sub0_unsubscribe(void *arg, const void *buf, size_t sz, nni_opt_type t)
+sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
- sub0_sock * s = arg;
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
sub0_topic *topic;
- int rv;
+ size_t len;
NNI_ARG_UNUSED(t);
- nni_mtx_lock(&s->lk);
- NNI_LIST_FOREACH (&s->topics, topic) {
- if (topic->len >= sz) {
- rv = memcmp(topic->buf, buf, sz);
- } else {
- rv = memcmp(topic->buf, buf, topic->len);
+ nni_mtx_lock(&sock->lk);
+ NNI_LIST_FOREACH (&ctx->topics, topic) {
+ if (topic->len != sz) {
+ continue;
}
- if (rv == 0) {
- if (topic->len == sz) {
- nni_list_remove(&s->topics, topic);
- nni_mtx_unlock(&s->lk);
- nni_free(topic->buf, topic->len);
- NNI_FREE_STRUCT(topic);
- return (0);
- }
- if (topic->len > sz) {
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOENT);
- }
+ if (memcmp(topic->buf, buf, sz) == 0) {
+ // Matched!
+ break;
}
- if (rv > 0) {
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOENT);
+ }
+ if (topic == NULL) {
+ nni_mtx_unlock(&sock->lk);
+ return (NNG_ENOENT);
+ }
+ nni_list_remove(&ctx->topics, topic);
+
+ // Now we need to make sure that any messages that are waiting still
+ // match the subscription. We basically just run through the queue
+ // and requeue those messages we need.
+ len = nni_lmq_len(&ctx->lmq);
+ for (size_t i = 0; i < len; i++) {
+ nni_msg *msg;
+
+ (void) nni_lmq_getq(&ctx->lmq, &msg);
+ if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) {
+ (void) nni_lmq_putq(&ctx->lmq, msg);
+ } else {
+ nni_msg_free(msg);
}
}
- nni_mtx_unlock(&s->lk);
- return (NNG_ENOENT);
+ nni_mtx_unlock(&sock->lk);
+
+ nni_free(topic->buf, topic->len);
+ NNI_FREE_STRUCT(topic);
+ return (0);
}
+static nni_option sub0_ctx_options[] = {
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = sub0_ctx_get_recvbuf,
+ .o_set = sub0_ctx_set_recvbuf,
+ },
+ {
+ .o_name = NNG_OPT_SUB_SUBSCRIBE,
+ .o_set = sub0_ctx_subscribe,
+ },
+ {
+ .o_name = NNG_OPT_SUB_UNSUBSCRIBE,
+ .o_set = sub0_ctx_unsubscribe,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
static void
sub0_sock_send(void *arg, nni_aio *aio)
{
NNI_ARG_UNUSED(arg);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
}
static void
sub0_sock_recv(void *arg, nni_aio *aio)
{
- sub0_sock *s = arg;
+ sub0_sock *sock = arg;
- nni_msgq_aio_get(s->urq, aio);
+ sub0_ctx_recv(sock->ctx, aio);
}
-static nni_msg *
-sub0_sock_filter(void *arg, nni_msg *msg)
+static int
+sub0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
- sub0_sock * s = arg;
- sub0_topic *topic;
- char * body;
- size_t len;
- int match;
-
- body = nni_msg_body(msg);
- len = nni_msg_len(msg);
+ sub0_sock *sock = arg;
+ int rv;
+ int fd;
- match = 0;
-
- nni_mtx_lock(&s->lk);
-
- // Check to see if the message matches one of our subscriptions.
- NNI_LIST_FOREACH (&s->topics, topic) {
- if (len >= topic->len) {
- int rv = memcmp(topic->buf, body, topic->len);
- if (rv == 0) {
- // Matched!
- match = 1;
- break;
- }
- if (rv > 0) {
- match = 0;
- break;
- }
- } else if (memcmp(topic->buf, body, len) >= 0) {
- match = 0;
- break;
- }
- }
- nni_mtx_unlock(&s->lk);
- if (!match) {
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_pollable_getfd(sock->recvable, &fd)) != 0) {
+ return (rv);
}
- return (msg);
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_get_recvbuf(sock->ctx, buf, szp, t));
+}
+
+static int
+sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_set_recvbuf(sock->ctx, buf, sz, t));
+}
+
+static int
+sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_subscribe(sock->ctx, buf, sz, t));
+}
+
+static int
+sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t));
}
// This is the global protocol structure -- our linkage to the core.
@@ -352,14 +631,31 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
.pipe_stop = sub0_pipe_stop,
};
+static nni_proto_ctx_ops sub0_ctx_ops = {
+ .ctx_init = sub0_ctx_init,
+ .ctx_fini = sub0_ctx_fini,
+ .ctx_send = sub0_ctx_send,
+ .ctx_recv = sub0_ctx_recv,
+ .ctx_options = sub0_ctx_options,
+};
+
static nni_option sub0_sock_options[] = {
{
.o_name = NNG_OPT_SUB_SUBSCRIBE,
- .o_set = sub0_subscribe,
+ .o_set = sub0_sock_subscribe,
},
{
.o_name = NNG_OPT_SUB_UNSUBSCRIBE,
- .o_set = sub0_unsubscribe,
+ .o_set = sub0_sock_unsubscribe,
+ },
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = sub0_sock_get_recvfd,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = sub0_sock_get_recvbuf,
+ .o_set = sub0_sock_set_recvbuf,
},
// terminate list
{
@@ -374,18 +670,6 @@ static nni_proto_sock_ops sub0_sock_ops = {
.sock_close = sub0_sock_close,
.sock_send = sub0_sock_send,
.sock_recv = sub0_sock_recv,
- .sock_filter = sub0_sock_filter,
- .sock_options = sub0_sock_options,
-};
-
-static nni_proto_sock_ops sub0_sock_ops_raw = {
- .sock_init = sub0_sock_init,
- .sock_fini = sub0_sock_fini,
- .sock_open = sub0_sock_open,
- .sock_close = sub0_sock_close,
- .sock_send = sub0_sock_send,
- .sock_recv = sub0_sock_recv,
- .sock_filter = NULL, // raw does not filter
.sock_options = sub0_sock_options,
};
@@ -393,18 +677,10 @@ static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
.proto_peer = { NNI_PROTO_PUB_V0, "pub" },
- .proto_flags = NNI_PROTO_FLAG_RCV,
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &sub0_sock_ops,
.proto_pipe_ops = &sub0_pipe_ops,
-};
-
-static nni_proto sub0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SUB_V0, "sub" },
- .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
- .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &sub0_sock_ops_raw,
- .proto_pipe_ops = &sub0_pipe_ops,
+ .proto_ctx_ops = &sub0_ctx_ops,
};
int
@@ -412,9 +688,3 @@ nng_sub0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &sub0_proto));
}
-
-int
-nng_sub0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &sub0_proto_raw));
-}
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
new file mode 100644
index 00000000..b334bf87
--- /dev/null
+++ b/src/protocol/pubsub0/xsub.c
@@ -0,0 +1,230 @@
+//
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "nng/protocol/pubsub0/sub.h"
+
+// Subscriber protocol. The SUB protocol receives messages sent to
+// it from publishers, and filters out those it is not interested in,
+// only passing up ones that match known subscriptions.
+
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
+
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
+
+typedef struct xsub0_pipe xsub0_pipe;
+typedef struct xsub0_sock xsub0_sock;
+
+static void xsub0_recv_cb(void *);
+static void xsub0_pipe_fini(void *);
+
+// xsub0_sock is our per-socket protocol private structure.
+struct xsub0_sock {
+ nni_msgq *urq;
+ nni_mtx lk;
+};
+
+// sub0_pipe is our per-pipe protocol private structure.
+struct xsub0_pipe {
+ nni_pipe * pipe;
+ xsub0_sock *sub;
+ nni_aio * aio_recv;
+};
+
+static int
+xsub0_sock_init(void **sp, nni_sock *sock)
+{
+ xsub0_sock *s;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&s->lk);
+
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
+ return (0);
+}
+
+static void
+xsub0_sock_fini(void *arg)
+{
+ xsub0_sock *s = arg;
+ nni_mtx_fini(&s->lk);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+xsub0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xsub0_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xsub0_pipe_stop(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_recv);
+}
+
+static void
+xsub0_pipe_fini(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_recv);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+xsub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+{
+ xsub0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
+ xsub0_pipe_fini(p);
+ return (rv);
+ }
+
+ p->pipe = pipe;
+ p->sub = s;
+ *pp = p;
+ return (0);
+}
+
+static int
+xsub0_pipe_start(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ return (0);
+}
+
+static void
+xsub0_pipe_close(void *arg)
+{
+ xsub0_pipe *p = arg;
+
+ nni_aio_close(p->aio_recv);
+}
+
+static void
+xsub0_recv_cb(void *arg)
+{
+ xsub0_pipe *p = arg;
+ xsub0_sock *s = p->sub;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_close(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+
+ switch (nni_msgq_tryput(urq, msg)) {
+ case 0:
+ break;
+ case NNG_EAGAIN:
+ nni_msg_free(msg);
+ break;
+ default:
+ // Any other error we stop the pipe for. It's probably
+ // NNG_ECLOSED anyway.
+ nng_msg_free(msg);
+ nni_pipe_close(p->pipe);
+ return;
+ }
+ nni_pipe_recv(p->pipe, p->aio_recv);
+}
+
+static void
+xsub0_sock_send(void *arg, nni_aio *aio)
+{
+ NNI_ARG_UNUSED(arg);
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+xsub0_sock_recv(void *arg, nni_aio *aio)
+{
+ xsub0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe_ops xsub0_pipe_ops = {
+ .pipe_init = xsub0_pipe_init,
+ .pipe_fini = xsub0_pipe_fini,
+ .pipe_start = xsub0_pipe_start,
+ .pipe_close = xsub0_pipe_close,
+ .pipe_stop = xsub0_pipe_stop,
+};
+
+static nni_option xsub0_sock_options[] = {
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xsub0_sock_ops = {
+ .sock_init = xsub0_sock_init,
+ .sock_fini = xsub0_sock_fini,
+ .sock_open = xsub0_sock_open,
+ .sock_close = xsub0_sock_close,
+ .sock_send = xsub0_sock_send,
+ .sock_recv = xsub0_sock_recv,
+ .sock_options = xsub0_sock_options,
+};
+
+static nni_proto xsub0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_SUB_V0, "sub" },
+ .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xsub0_sock_ops,
+ .proto_pipe_ops = &xsub0_pipe_ops,
+};
+
+int
+nng_sub0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xsub0_proto));
+}