aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/man/nng_sub.7.adoc7
-rw-r--r--include/nng/protocol/pubsub0/sub.h2
-rw-r--r--src/protocol/pubsub0/pub.c9
-rw-r--r--src/protocol/pubsub0/sub.c79
4 files changed, 93 insertions, 4 deletions
diff --git a/docs/man/nng_sub.7.adoc b/docs/man/nng_sub.7.adoc
index cd9af727..aef1a9e4 100644
--- a/docs/man/nng_sub.7.adoc
+++ b/docs/man/nng_sub.7.adoc
@@ -73,6 +73,13 @@ TIP: To receive all messages, an empty topic (zero length) can be used.
Note that if the topic was not previously subscribed to with
`NNG_OPT_SUB_SUBSCRIBE` then an `NNG_ENOENT` error will result.
+((`NNG_OPT_SUB_PREFNEW`))::
+
+ (`bool`)
+ This read/write option specifies the behavior of the subscriber when the queue is full.
+ When `true` (the default), the subscriber will make room in the queue by removing the oldest message.
+ When `false`, the subscriber will reject messages if the message queue does not have room.
+
=== Protocol Headers
The _sub_ protocol has no protocol-specific headers.
diff --git a/include/nng/protocol/pubsub0/sub.h b/include/nng/protocol/pubsub0/sub.h
index acb5cda3..81f50a80 100644
--- a/include/nng/protocol/pubsub0/sub.h
+++ b/include/nng/protocol/pubsub0/sub.h
@@ -30,6 +30,8 @@ NNG_DECL int nng_sub0_open_raw(nng_socket *);
#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
+#define NNG_OPT_SUB_PREFNEW "sub:prefnew"
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index be550439..c54274a6 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -267,9 +267,6 @@ pub0_sock_send(void *arg, nni_aio *aio)
if (p->closed) {
continue;
}
- if (nni_lmq_full(&p->sendq)) {
- continue;
- }
if (p == nni_list_last(&sock->pipes)) {
dup = msg;
msg = NULL;
@@ -277,6 +274,12 @@ pub0_sock_send(void *arg, nni_aio *aio)
continue;
}
if (p->busy) {
+ if (nni_lmq_full(&p->sendq)) {
+ // Make space for the new message.
+ nni_msg * old;
+ (void) nni_lmq_getq(&p->sendq, &old);
+ nni_msg_free(old);
+ }
nni_lmq_putq(&p->sendq, dup);
} else {
p->busy = true;
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 938ce332..aca28f0c 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -1,6 +1,7 @@
//
// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+// Copyright 2019 Nathan Kent <nate@nkent.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -30,6 +31,9 @@
// By default we accept 128 messages.
#define SUB0_DEFAULT_QLEN 128
+// By default, prefer new messages when the queue is full.
+#define SUB0_DEFAULT_PREFNEW true
+
typedef struct sub0_pipe sub0_pipe;
typedef struct sub0_sock sub0_sock;
typedef struct sub0_ctx sub0_ctx;
@@ -53,6 +57,7 @@ struct sub0_ctx {
nni_list raios; // sub context could have multiple pending recvs
bool closed;
nni_lmq lmq;
+ bool prefnew;
#if 0
nni_msg **recvq;
@@ -69,6 +74,7 @@ struct sub0_sock {
sub0_ctx * ctx; // default context
nni_list ctxs; // all contexts
size_t recvbuflen;
+ bool prefnew;
nni_mtx lk;
};
@@ -187,6 +193,7 @@ sub0_ctx_init(void **ctxp, void *sarg)
sub0_sock *sock = sarg;
sub0_ctx * ctx;
size_t len;
+ bool prefnew;
int rv;
if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
@@ -195,10 +202,12 @@ sub0_ctx_init(void **ctxp, void *sarg)
nni_mtx_lock(&sock->lk);
len = sock->recvbuflen;
+ prefnew = sock->prefnew;
if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) {
return (rv);
}
+ ctx->prefnew = prefnew;
nni_aio_list_init(&ctx->raios);
NNI_LIST_INIT(&ctx->topics, sub0_topic, node);
@@ -241,6 +250,7 @@ sub0_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node);
nni_mtx_init(&sock->lk);
sock->recvbuflen = SUB0_DEFAULT_QLEN;
+ sock->prefnew = SUB0_DEFAULT_PREFNEW;
if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) ||
((rv = nni_pollable_alloc(&sock->recvable)) != 0)) {
@@ -376,7 +386,7 @@ sub0_recv_cb(void *arg)
NNI_LIST_FOREACH (&sock->ctxs, ctx) {
nni_msg *dup;
- if (nni_lmq_full(&ctx->lmq)) {
+ if (nni_lmq_full(&ctx->lmq) && !ctx->prefnew) {
// Cannot deliver here, as receive buffer is full.
continue;
}
@@ -405,6 +415,13 @@ sub0_recv_cb(void *arg)
// Save for synchronous completion
nni_list_append(&finish, aio);
+ } else if (nni_lmq_full(&ctx->lmq)) {
+ // Make space for the new message.
+ nni_msg * old;
+ (void) nni_lmq_getq(&ctx->lmq, &old);
+ nni_msg_free(old);
+
+ (void) nni_lmq_putq(&ctx->lmq, dup);
} else {
(void) nni_lmq_putq(&ctx->lmq, dup);
}
@@ -555,6 +572,42 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
return (0);
}
+static int
+sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
+ bool val;
+
+ nni_mtx_lock(&sock->lk);
+ val = ctx->prefnew;
+ nni_mtx_unlock(&sock->lk);
+
+ return (nni_copyout_bool(val, buf, szp, t));
+}
+
+static int
+sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_ctx * ctx = arg;
+ sub0_sock * sock = ctx->sock;
+ bool val;
+ int rv;
+
+ if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(&sock->lk);
+ ctx->prefnew = val;
+ if (sock->ctx == ctx) {
+ sock->prefnew = val;
+ }
+ nni_mtx_unlock(&sock->lk);
+
+ return (0);
+}
+
static nni_option sub0_ctx_options[] = {
{
.o_name = NNG_OPT_RECVBUF,
@@ -570,6 +623,11 @@ static nni_option sub0_ctx_options[] = {
.o_set = sub0_ctx_unsubscribe,
},
{
+ .o_name = NNG_OPT_SUB_PREFNEW,
+ .o_get = sub0_ctx_get_prefnew,
+ .o_set = sub0_ctx_set_prefnew,
+ },
+ {
.o_name = NULL,
},
};
@@ -632,6 +690,20 @@ sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t));
}
+static int
+sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_get_prefnew(sock->ctx, buf, szp, t));
+}
+
+static int
+sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ sub0_sock *sock = arg;
+ return (sub0_ctx_set_prefnew(sock->ctx, buf, sz, t));
+}
+
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops sub0_pipe_ops = {
@@ -668,6 +740,11 @@ static nni_option sub0_sock_options[] = {
.o_get = sub0_sock_get_recvbuf,
.o_set = sub0_sock_set_recvbuf,
},
+ {
+ .o_name = NNG_OPT_SUB_PREFNEW,
+ .o_get = sub0_sock_get_prefnew,
+ .o_set = sub0_sock_set_prefnew,
+ },
// terminate list
{
.o_name = NULL,