aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-04 10:24:05 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-04 10:56:40 -0800
commit382b4cff3abd5ccb282ba420ef1f7c7d171ec91a (patch)
tree6860e1cceb52a7dab2763001eb27edf95a0e7246 /src/protocol/pubsub0
parentbcc3814b58e9b198344bdaf6e7a916a354841275 (diff)
downloadnng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.gz
nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.bz2
nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.zip
fixes #1105 pollable can be inlined, and use atomics
This also introduces an nni_atomic_cas64 to help with lock-free designs. Some mechanical renaming was done in some of the protocols for spelling.
Diffstat (limited to 'src/protocol/pubsub0')
-rw-r--r--src/protocol/pubsub0/pub.c2
-rw-r--r--src/protocol/pubsub0/sub.c174
2 files changed, 87 insertions, 89 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 22195c52..a42e95ff 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -289,7 +289,7 @@ pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t)
int fd;
int rv;
nni_mtx_lock(&sock->mtx);
- // PUB sockets are *always* sendable.
+ // PUB sockets are *always* writable.
nni_pollable_raise(sock->sendable);
rv = nni_pollable_getfd(sock->sendable, &fd);
nni_mtx_unlock(&sock->mtx);
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 0a367216..56da98f8 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -29,10 +29,10 @@
#endif
// By default we accept 128 messages.
-#define SUB0_DEFAULT_QLEN 128
+#define SUB0_DEFAULT_RECV_BUF_LEN 128
// By default, prefer new messages when the queue is full.
-#define SUB0_DEFAULT_PREFNEW true
+#define SUB0_DEFAULT_PREFER_NEW true
typedef struct sub0_pipe sub0_pipe;
typedef struct sub0_sock sub0_sock;
@@ -53,21 +53,21 @@ struct sub0_topic {
struct sub0_ctx {
nni_list_node node;
sub0_sock * sock;
- nni_list topics; // XXX: Consider replacing with patricia trie
- nni_list raios; // sub context could have multiple pending recvs
+ nni_list topics; // TODO: Consider patricia trie
+ nni_list recv_queue; // can have multiple pending receives
bool closed;
nni_lmq lmq;
- bool prefnew;
+ bool prefer_new;
};
// sub0_sock is our per-socket protocol private structure.
struct sub0_sock {
- nni_pollable *recvable;
- sub0_ctx ctx; // default context
- nni_list ctxs; // all contexts
- size_t recvbuflen;
- bool prefnew;
- nni_mtx lk;
+ nni_pollable readable;
+ sub0_ctx master; // default context
+ nni_list contexts; // all contexts
+ size_t recv_buf_len;
+ bool prefer_new;
+ nni_mtx lk;
};
// sub0_pipe is our per-pipe protocol private structure.
@@ -83,8 +83,8 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
sub0_ctx * ctx = arg;
sub0_sock *sock = ctx->sock;
nni_mtx_lock(&sock->lk);
- if (nni_list_active(&ctx->raios, aio)) {
- nni_list_remove(&ctx->raios, aio);
+ if (nni_list_active(&ctx->recv_queue, aio)) {
+ nni_list_remove(&ctx->recv_queue, aio);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&sock->lk);
@@ -116,15 +116,15 @@ sub0_ctx_recv(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- nni_list_append(&ctx->raios, aio);
+ nni_list_append(&ctx->recv_queue, aio);
nni_mtx_unlock(&sock->lk);
return;
}
(void) nni_lmq_getq(&ctx->lmq, &msg);
- if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->ctx)) {
- nni_pollable_clear(sock->recvable);
+ if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) {
+ nni_pollable_clear(&sock->readable);
}
nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&sock->lk);
@@ -149,8 +149,8 @@ sub0_ctx_close(void *arg)
nni_mtx_lock(&sock->lk);
ctx->closed = true;
- while ((aio = nni_list_first(&ctx->raios)) != NULL) {
- nni_list_remove(&ctx->raios, aio);
+ while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
+ nni_list_remove(&ctx->recv_queue, aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_mtx_unlock(&sock->lk);
@@ -166,7 +166,7 @@ sub0_ctx_fini(void *arg)
sub0_ctx_close(ctx);
nni_mtx_lock(&sock->lk);
- nni_list_remove(&sock->ctxs, ctx);
+ nni_list_remove(&sock->contexts, ctx);
nni_mtx_unlock(&sock->lk);
while ((topic = nni_list_first(&ctx->topics)) != 0) {
@@ -179,29 +179,29 @@ sub0_ctx_fini(void *arg)
}
static int
-sub0_ctx_init(void *carg, void *sarg)
+sub0_ctx_init(void *ctx_arg, void *sock_arg)
{
- sub0_sock *sock = sarg;
- sub0_ctx * ctx = carg;
+ sub0_sock *sock = sock_arg;
+ sub0_ctx * ctx = ctx_arg;
size_t len;
- bool prefnew;
+ bool prefer_new;
int rv;
nni_mtx_lock(&sock->lk);
- len = sock->recvbuflen;
- prefnew = sock->prefnew;
+ len = sock->recv_buf_len;
+ prefer_new = sock->prefer_new;
if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
return (rv);
}
- ctx->prefnew = prefnew;
+ ctx->prefer_new = prefer_new;
- nni_aio_list_init(&ctx->raios);
+ nni_aio_list_init(&ctx->recv_queue);
NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
ctx->sock = sock;
- nni_list_append(&sock->ctxs, ctx);
+ nni_list_append(&sock->contexts, ctx);
nni_mtx_unlock(&sock->lk);
return (0);
@@ -212,28 +212,26 @@ sub0_sock_fini(void *arg)
{
sub0_sock *sock = arg;
- sub0_ctx_fini(&sock->ctx);
- if (sock->recvable != NULL) {
- nni_pollable_free(sock->recvable);
- }
+ sub0_ctx_fini(&sock->master);
+ nni_pollable_fini(&sock->readable);
nni_mtx_fini(&sock->lk);
}
static int
-sub0_sock_init(void *arg, nni_sock *nsock)
+sub0_sock_init(void *arg, nni_sock *unused)
{
sub0_sock *sock = arg;
int rv;
- NNI_ARG_UNUSED(nsock);
+ NNI_ARG_UNUSED(unused);
- NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
+ NNI_LIST_INIT(&sock->contexts, sub0_ctx, node);
nni_mtx_init(&sock->lk);
- sock->recvbuflen = SUB0_DEFAULT_QLEN;
- sock->prefnew = SUB0_DEFAULT_PREFNEW;
+ sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN;
+ sock->prefer_new = SUB0_DEFAULT_PREFER_NEW;
+ nni_pollable_init(&sock->readable);
- if (((rv = sub0_ctx_init(&sock->ctx, sock)) != 0) ||
- ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
+ if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) {
sub0_sock_fini(sock);
return (rv);
}
@@ -251,7 +249,7 @@ static void
sub0_sock_close(void *arg)
{
sub0_sock *sock = arg;
- sub0_ctx_close(&sock->ctx);
+ sub0_ctx_close(&sock->master);
}
static void
@@ -357,10 +355,10 @@ sub0_recv_cb(void *arg)
nni_mtx_lock(&sock->lk);
// Go through all contexts. We will try to send up.
- NNI_LIST_FOREACH (&sock->ctxs, ctx) {
+ NNI_LIST_FOREACH (&sock->contexts, ctx) {
nni_msg *dup;
- if (nni_lmq_full(&ctx->lmq) && !ctx->prefnew) {
+ if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) {
// Cannot deliver here, as receive buffer is full.
continue;
}
@@ -371,7 +369,7 @@ sub0_recv_cb(void *arg)
// Special optimization (for the case where only one context),
// including when no contexts are in use, we avoid duplication.
- if (ctx == nni_list_last(&sock->ctxs)) {
+ if (ctx == nni_list_last(&sock->contexts)) {
dup = msg;
msg = NULL;
} else if (nni_msg_dup(&dup, msg) != 0) {
@@ -382,9 +380,9 @@ sub0_recv_cb(void *arg)
// message and it is intended for us.
submatch = true;
- if (!nni_list_empty(&ctx->raios)) {
- aio = nni_list_first(&ctx->raios);
- nni_list_remove(&ctx->raios, aio);
+ if (!nni_list_empty(&ctx->recv_queue)) {
+ aio = nni_list_first(&ctx->recv_queue);
+ nni_list_remove(&ctx->recv_queue, aio);
nni_aio_set_msg(aio, dup);
// Save for synchronous completion
@@ -414,14 +412,14 @@ sub0_recv_cb(void *arg)
}
if (submatch) {
- nni_pollable_raise(sock->recvable);
+ nni_pollable_raise(&sock->readable);
}
nni_pipe_recv(p->pipe, p->aio_recv);
}
static int
-sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
+sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock *sock = ctx->sock;
@@ -434,7 +432,7 @@ sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock *sock = ctx->sock;
@@ -452,8 +450,8 @@ sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
// If we change the socket, then this will change the queue for
// any new contexts. (Previously constructed contexts are unaffected.)
- if (&sock->ctx == ctx) {
- sock->recvbuflen = (size_t) val;
+ if (&sock->master == ctx) {
+ sock->recv_buf_len = (size_t) val;
}
nni_mtx_unlock(&sock->lk);
return (0);
@@ -470,7 +468,7 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_topic *topic;
- sub0_topic *newtopic;
+ sub0_topic *new_topic;
NNI_ARG_UNUSED(t);
nni_mtx_lock(&sock->lk);
@@ -484,18 +482,18 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
return (0);
}
}
- if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) {
+ if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) {
nni_mtx_unlock(&sock->lk);
return (NNG_ENOMEM);
}
- if ((sz > 0) && ((newtopic->buf = nni_alloc(sz)) == NULL)) {
+ if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) {
nni_mtx_unlock(&sock->lk);
- NNI_FREE_STRUCT(newtopic);
+ NNI_FREE_STRUCT(new_topic);
return (NNG_ENOMEM);
}
- memcpy(newtopic->buf, buf, sz);
- newtopic->len = sz;
- nni_list_append(&ctx->topics, newtopic);
+ memcpy(new_topic->buf, buf, sz);
+ new_topic->len = sz;
+ nni_list_append(&ctx->topics, new_topic);
nni_mtx_unlock(&sock->lk);
return (0);
}
@@ -547,21 +545,21 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
+sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
nni_mtx_lock(&sock->lk);
- val = ctx->prefnew;
+ val = ctx->prefer_new;
nni_mtx_unlock(&sock->lk);
return (nni_copyout_bool(val, buf, szp, t));
}
static int
-sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock *sock = ctx->sock;
@@ -573,9 +571,9 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
}
nni_mtx_lock(&sock->lk);
- ctx->prefnew = val;
- if (&sock->ctx == ctx) {
- sock->prefnew = val;
+ ctx->prefer_new = val;
+ if (&sock->master == ctx) {
+ sock->prefer_new = val;
}
nni_mtx_unlock(&sock->lk);
@@ -585,8 +583,8 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
static nni_option sub0_ctx_options[] = {
{
.o_name = NNG_OPT_RECVBUF,
- .o_get = sub0_ctx_get_recvbuf,
- .o_set = sub0_ctx_set_recvbuf,
+ .o_get = sub0_ctx_get_recv_buf_len,
+ .o_set = sub0_ctx_set_recv_buf_len,
},
{
.o_name = NNG_OPT_SUB_SUBSCRIBE,
@@ -598,8 +596,8 @@ static nni_option sub0_ctx_options[] = {
},
{
.o_name = NNG_OPT_SUB_PREFNEW,
- .o_get = sub0_ctx_get_prefnew,
- .o_set = sub0_ctx_set_prefnew,
+ .o_get = sub0_ctx_get_prefer_new,
+ .o_set = sub0_ctx_set_prefer_new,
},
{
.o_name = NULL,
@@ -620,62 +618,62 @@ sub0_sock_recv(void *arg, nni_aio *aio)
{
sub0_sock *sock = arg;
- sub0_ctx_recv(&sock->ctx, aio);
+ sub0_ctx_recv(&sock->master, aio);
}
static int
-sub0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
sub0_sock *sock = arg;
int rv;
int fd;
- if ((rv = nni_pollable_getfd(sock->recvable, &fd)) != 0) {
+ if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) {
return (rv);
}
return (nni_copyout_int(fd, buf, szp, t));
}
static int
-sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t)
+sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_get_recvbuf(&sock->ctx, buf, szp, t));
+ return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t));
}
static int
-sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_set_recvbuf(&sock->ctx, buf, sz, t));
+ return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t));
}
static int
sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_subscribe(&sock->ctx, buf, sz, t));
+ return (sub0_ctx_subscribe(&sock->master, buf, sz, t));
}
static int
sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_unsubscribe(&sock->ctx, buf, sz, t));
+ return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t));
}
static int
-sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
+sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_get_prefnew(&sock->ctx, buf, szp, t));
+ return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t));
}
static int
-sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
+sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_sock *sock = arg;
- return (sub0_ctx_set_prefnew(&sock->ctx, buf, sz, t));
+ return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t));
}
// This is the global protocol structure -- our linkage to the core.
@@ -709,17 +707,17 @@ static nni_option sub0_sock_options[] = {
},
{
.o_name = NNG_OPT_RECVFD,
- .o_get = sub0_sock_get_recvfd,
+ .o_get = sub0_sock_get_recv_fd,
},
{
.o_name = NNG_OPT_RECVBUF,
- .o_get = sub0_sock_get_recvbuf,
- .o_set = sub0_sock_set_recvbuf,
+ .o_get = sub0_sock_get_recv_buf_len,
+ .o_set = sub0_sock_set_recv_buf_len,
},
{
.o_name = NNG_OPT_SUB_PREFNEW,
- .o_get = sub0_sock_get_prefnew,
- .o_set = sub0_sock_set_prefnew,
+ .o_get = sub0_sock_get_prefer_new,
+ .o_set = sub0_sock_set_prefer_new,
},
// terminate list
{
@@ -749,7 +747,7 @@ static nni_proto sub0_proto = {
};
int
-nng_sub0_open(nng_socket *sidp)
+nng_sub0_open(nng_socket *sock)
{
- return (nni_proto_open(sidp, &sub0_proto));
+ return (nni_proto_open(sock, &sub0_proto));
}