aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/sub.c
diff options
context:
space:
mode:
authorNathan Kent <nate@nkent.net>2019-09-25 18:46:38 -0400
committerGarrett D'Amore <garrett@damore.org>2019-11-03 10:24:47 -0800
commit96b7677ddc8e49da045ca770703df065f10db280 (patch)
tree1302c0cf0298033bc408b1806a4c2260bb4b74b4 /src/protocol/pubsub0/sub.c
parent32d17517b87713e4d584555c35ac48d010243910 (diff)
downloadnng-96b7677ddc8e49da045ca770703df065f10db280.tar.gz
nng-96b7677ddc8e49da045ca770703df065f10db280.tar.bz2
nng-96b7677ddc8e49da045ca770703df065f10db280.zip
Add option for preferring new messages on SUB0
Diffstat (limited to 'src/protocol/pubsub0/sub.c')
-rw-r--r--src/protocol/pubsub0/sub.c79
1 files changed, 78 insertions, 1 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 938ce332..aca28f0c 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -1,6 +1,7 @@
//
// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2019 Nathan Kent <nate@nkent.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -30,6 +31,9 @@
// By default we accept 128 messages.
#define SUB0_DEFAULT_QLEN 128
+// By default, prefer new messages when the queue is full.
+#define SUB0_DEFAULT_PREFNEW true
+
typedef struct sub0_pipe sub0_pipe;
typedef struct sub0_sock sub0_sock;
typedef struct sub0_ctx sub0_ctx;
@@ -53,6 +57,7 @@ struct sub0_ctx {
nni_list raios; // sub context could have multiple pending recvs
bool closed;
nni_lmq lmq;
+ bool prefnew;
#if 0
nni_msg **recvq;
@@ -69,6 +74,7 @@ struct sub0_sock {
sub0_ctx * ctx; // default context
nni_list ctxs; // all contexts
size_t recvbuflen;
+ bool prefnew;
nni_mtx lk;
};
@@ -187,6 +193,7 @@ sub0_ctx_init(void **ctxp, void *sarg)
sub0_sock *sock = sarg;
sub0_ctx * ctx;
size_t len;
+ bool prefnew;
int rv;
if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
@@ -195,10 +202,12 @@ sub0_ctx_init(void **ctxp, void *sarg)
nni_mtx_lock(&sock->lk);
len = sock->recvbuflen;
+ prefnew = sock->prefnew;
if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
return (rv);
}
+ ctx->prefnew = prefnew;
nni_aio_list_init(&ctx->raios);
NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
@@ -241,6 +250,7 @@ sub0_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
nni_mtx_init(&sock->lk);
sock->recvbuflen = SUB0_DEFAULT_QLEN;
+ sock->prefnew = SUB0_DEFAULT_PREFNEW;
if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) ||
((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
@@ -376,7 +386,7 @@ sub0_recv_cb(void *arg)
NNI_LIST_FOREACH (&sock->ctxs, ctx) {
nni_msg *dup;
- if (nni_lmq_full(&ctx->lmq)) {
+ if (nni_lmq_full(&ctx->lmq) && !ctx->prefnew) {
// Cannot deliver here, as receive buffer is full.
continue;
}
@@ -405,6 +415,13 @@ sub0_recv_cb(void *arg)
// Save for synchronous completion
nni_list_append(&finish, aio);
+ } else if (nni_lmq_full(&ctx->lmq)) {
+ // Make space for the new message.
+ nni_msg * old;
+ (void) nni_lmq_getq(&ctx->lmq, &old);
+ nni_msg_free(old);
+
+ (void) nni_lmq_putq(&ctx->lmq, dup);
} else {
(void) nni_lmq_putq(&ctx->lmq, dup);
}
@@ -555,6 +572,42 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
return (0);
}
+static int
+sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
+ bool val;
+
+ nni_mtx_lock(&sock->lk);
+ val = ctx->prefnew;
+ nni_mtx_unlock(&sock->lk);
+
+ return (nni_copyout_bool(val, buf, szp, t));
+}
+
+static int
+sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
+ bool val;
+ int rv;
+
+ if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&sock->lk);
+ ctx->prefnew = val;
+ if (sock->ctx == ctx) {
+ sock->prefnew = val;
+ }
+ nni_mtx_unlock(&sock->lk);
+
+ return (0);
+}
+
static nni_option sub0_ctx_options[] = {
{
.o_name = NNG_OPT_RECVBUF,
@@ -570,6 +623,11 @@ static nni_option sub0_ctx_options[] = {
.o_set = sub0_ctx_unsubscribe,
},
{
+ .o_name = NNG_OPT_SUB_PREFNEW,
+ .o_get = sub0_ctx_get_prefnew,
+ .o_set = sub0_ctx_set_prefnew,
+ },
+ {
.o_name = NULL,
},
};
@@ -632,6 +690,20 @@ sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t));
}
+static int
+sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_get_prefnew(sock->ctx, buf, szp, t));
+}
+
+static int
+sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_set_prefnew(sock->ctx, buf, sz, t));
+}
+
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops sub0_pipe_ops = {
@@ -668,6 +740,11 @@ static nni_option sub0_sock_options[] = {
.o_get = sub0_sock_get_recvbuf,
.o_set = sub0_sock_set_recvbuf,
},
+ {
+ .o_name = NNG_OPT_SUB_PREFNEW,
+ .o_get = sub0_sock_get_prefnew,
+ .o_set = sub0_sock_set_prefnew,
+ },
// terminate list
{
.o_name = NULL,