aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/pub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0/pub.c')
-rw-r--r--src/protocol/pubsub0/pub.c282
1 files changed, 170 insertions, 112 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 2567a5b6..bec0763f 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -32,25 +32,25 @@ typedef struct pub0_sock pub0_sock;
static void pub0_pipe_recv_cb(void *);
static void pub0_pipe_send_cb(void *);
-static void pub0_pipe_getq_cb(void *);
-static void pub0_sock_getq_cb(void *);
static void pub0_sock_fini(void *);
static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
- nni_msgq *uwq;
- nni_aio * aio_getq;
- nni_list pipes;
- nni_mtx mtx;
+ nni_list pipes;
+ nni_mtx mtx;
+ bool closed;
+ size_t sendbuf;
+ nni_pollable *sendable;
};
// pub0_pipe is our per-pipe protocol private structure.
struct pub0_pipe {
nni_pipe * pipe;
pub0_sock * pub;
- nni_msgq * sendq;
- nni_aio * aio_getq;
+ nni_lmq sendq;
+ bool closed;
+ bool busy;
nni_aio * aio_send;
nni_aio * aio_recv;
nni_list_node node;
@@ -61,48 +61,46 @@ pub0_sock_fini(void *arg)
{
pub0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_pollable_free(s->sendable);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
}
static int
-pub0_sock_init(void **sp, nni_sock *sock)
+pub0_sock_init(void **sp, nni_sock *nsock)
{
- pub0_sock *s;
+ pub0_sock *sock;
int rv;
+ NNI_ARG_UNUSED(nsock);
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) {
- pub0_sock_fini(s);
+ if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
+ NNI_FREE_STRUCT(sock);
return (rv);
}
-
- NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
-
- s->uwq = nni_sock_sendq(sock);
-
- *sp = s;
+ nni_mtx_init(&sock->mtx);
+ NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
+ sock->sendbuf = 16; // fairly arbitrary
+ *sp = sock;
return (0);
}
static void
pub0_sock_open(void *arg)
{
- pub0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
pub0_sock_close(void *arg)
{
- pub0_sock *s = arg;
+ pub0_sock *sock = arg;
- nni_aio_close(s->aio_getq);
+ nni_mtx_lock(&sock->mtx);
+ sock->closed = true;
+ nni_mtx_unlock(&sock->mtx);
}
static void
@@ -110,7 +108,6 @@ pub0_pipe_stop(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
}
@@ -120,10 +117,9 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_msgq_fini(p->sendq);
+ nni_lmq_fini(&p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -131,15 +127,20 @@ static int
pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
pub0_pipe *p;
+ pub0_sock *sock = s;
int rv;
+ size_t len;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_lock(&sock->mtx);
+ len = sock->sendbuf;
+ nni_mtx_unlock(&sock->mtx);
+
// XXX: consider making this depth tunable
- if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) ||
+ if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
@@ -147,6 +148,7 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
return (rv);
}
+ p->busy = false;
p->pipe = pipe;
p->pub = s;
*pp = p;
@@ -156,19 +158,18 @@ pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
static int
pub0_pipe_start(void *arg)
{
- pub0_pipe *p = arg;
- pub0_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&s->mtx);
- nni_list_append(&s->pipes, p);
- nni_mtx_unlock(&s->mtx);
+ nni_mtx_lock(&sock->mtx);
+ nni_list_append(&sock->pipes, p);
+ nni_mtx_unlock(&sock->mtx);
- // Start the receiver and the queue reader.
+ // Start the receiver.
nni_pipe_recv(p->pipe, p->aio_recv);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
return (0);
}
@@ -176,62 +177,20 @@ pub0_pipe_start(void *arg)
static void
pub0_pipe_close(void *arg)
{
- pub0_pipe *p = arg;
- pub0_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
- nni_aio_close(p->aio_getq);
nni_aio_close(p->aio_send);
nni_aio_close(p->aio_recv);
- nni_msgq_close(p->sendq);
+ nni_mtx_lock(&sock->mtx);
+ p->closed = true;
+ nni_lmq_flush(&p->sendq);
- nni_mtx_lock(&s->mtx);
- if (nni_list_active(&s->pipes, p)) {
- nni_list_remove(&s->pipes, p);
+ if (nni_list_active(&sock->pipes, p)) {
+ nni_list_remove(&sock->pipes, p);
}
- nni_mtx_unlock(&s->mtx);
-}
-
-static void
-pub0_sock_getq_cb(void *arg)
-{
- pub0_sock *s = arg;
- nni_msgq * uwq = s->uwq;
- nni_msg * msg, *dup;
-
- pub0_pipe *p;
- pub0_pipe *last;
- int rv;
-
- if (nni_aio_result(s->aio_getq) != 0) {
- return;
- }
-
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
-
- nni_mtx_lock(&s->mtx);
- last = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != last) {
- rv = nni_msg_dup(&dup, msg);
- if (rv != 0) {
- continue;
- }
- } else {
- dup = msg;
- }
- if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) {
- nni_msg_free(dup);
- }
- }
- nni_mtx_unlock(&s->mtx);
-
- if (last == NULL) {
- nni_msg_free(msg);
- }
-
- nni_msgq_aio_get(uwq, s->aio_getq);
+ nni_mtx_unlock(&sock->mtx);
}
static void
@@ -244,31 +203,18 @@ pub0_pipe_recv_cb(void *arg)
return;
}
+ // We should never get any messages. If we do we just dicard them.
nni_msg_free(nni_aio_get_msg(p->aio_recv));
nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-pub0_pipe_getq_cb(void *arg)
-{
- pub0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_close(p->pipe);
- return;
- }
-
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
-
- nni_pipe_send(p->pipe, p->aio_send);
-}
-
-static void
pub0_pipe_send_cb(void *arg)
{
- pub0_pipe *p = arg;
+ pub0_pipe *p = arg;
+ pub0_sock *sock = p->pub;
+ nni_msg * msg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -277,23 +223,126 @@ pub0_pipe_send_cb(void *arg)
return;
}
- nni_aio_set_msg(p->aio_send, NULL);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->closed || p->closed) {
+ nni_mtx_unlock(&sock->mtx);
+ return;
+ }
+ if (nni_lmq_getq(&p->sendq, &msg) == 0) {
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
+ } else {
+ p->busy = false;
+ }
+ nni_mtx_unlock(&sock->mtx);
}
static void
pub0_sock_recv(void *arg, nni_aio *aio)
{
NNI_ARG_UNUSED(arg);
- nni_aio_finish_error(aio, NNG_ENOTSUP);
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+ }
}
static void
pub0_sock_send(void *arg, nni_aio *aio)
{
- pub0_sock *s = arg;
+ pub0_sock *sock = arg;
+ pub0_pipe *p;
+ nng_msg * msg;
+ nng_msg * dup;
+ size_t len;
+
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->closed) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ NNI_LIST_FOREACH (&sock->pipes, p) {
+ if (p->closed) {
+ continue;
+ }
+ if (nni_lmq_full(&p->sendq)) {
+ continue;
+ }
+ if (p == nni_list_last(&sock->pipes)) {
+ dup = msg;
+ msg = NULL;
+ } else if (nni_msg_dup(&dup, msg) != 0) {
+ continue;
+ }
+ if (p->busy) {
+ nni_lmq_putq(&p->sendq, dup);
+ } else {
+ p->busy = true;
+ nni_aio_set_msg(p->aio_send, dup);
+ nni_pipe_send(p->pipe, p->aio_send);
+ }
+ }
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish(aio, 0, len);
+}
+
+static int
+pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ pub0_sock *sock = arg;
+ int fd;
+ int rv;
+ nni_mtx_lock(&sock->mtx);
+ // PUB sockets are *always* sendable.
+ nni_pollable_raise(sock->sendable);
+ rv = nni_pollable_getfd(sock->sendable, &fd);
+ nni_mtx_unlock(&sock->mtx);
+
+ if (rv == 0) {
+ rv = nni_copyout_int(fd, buf, szp, t);
+ }
+ return (rv);
+}
+
+static int
+pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pub0_sock *sock = arg;
+ pub0_pipe *p;
+ size_t val;
+ int rv;
+
+ if ((rv = nni_copyin_size(&val, buf, sz, 1, 8192, t)) != 0) {
+ return (rv);
+ }
- nni_msgq_aio_put(s->uwq, aio);
+ nni_mtx_lock(&sock->mtx);
+ sock->sendbuf = val;
+ NNI_LIST_FOREACH (&sock->pipes, p) {
+ // If we fail part way thru (should only be ENOMEM), we
+ // stop short. The others would likely fail for ENOMEM as
+ // well anyway. There is a weird effect here where the
+ // buffers may have been set for *some* of the pipes, but
+ // we have no way to correct, or even report, partial failure.
+ if ((rv = nni_lmq_resize(&p->sendq, val)) != 0) {
+ break;
+ }
+ }
+ nni_mtx_unlock(&sock->mtx);
+ return (rv);
+}
+
+static int
+pub0_sock_get_sendbuf(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ pub0_sock *sock = arg;
+ size_t val;
+ nni_mtx_lock(&sock->mtx);
+ val = sock->sendbuf;
+ nni_mtx_unlock(&sock->mtx);
+ return (nni_copyout_size(val, buf, szp, t));
}
static nni_proto_pipe_ops pub0_pipe_ops = {
@@ -307,6 +356,15 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
static nni_option pub0_sock_options[] = {
// terminate list
{
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = pub0_sock_get_sendfd,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = pub0_sock_get_sendbuf,
+ .o_set = pub0_sock_set_sendbuf,
+ },
+ {
.o_name = NULL,
},
};
@@ -325,7 +383,7 @@ static nni_proto pub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUB_V0, "pub" },
.proto_peer = { NNI_PROTO_SUB_V0, "sub" },
- .proto_flags = NNI_PROTO_FLAG_SND,
+ .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &pub0_sock_ops,
.proto_pipe_ops = &pub0_pipe_ops,
};