diff options
| -rw-r--r-- | docs/man/nng_sub.7.adoc | 7 | ||||
| -rw-r--r-- | include/nng/protocol/pubsub0/sub.h | 2 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 79 |
4 files changed, 93 insertions, 4 deletions
diff --git a/docs/man/nng_sub.7.adoc b/docs/man/nng_sub.7.adoc index cd9af727..aef1a9e4 100644 --- a/docs/man/nng_sub.7.adoc +++ b/docs/man/nng_sub.7.adoc @@ -73,6 +73,13 @@ TIP: To receive all messages, an empty topic (zero length) can be used. Note that if the topic was not previously subscribed to with `NNG_OPT_SUB_SUBSCRIBE` then an `NNG_ENOENT` error will result. +((`NNG_OPT_SUB_PREFNEW`)):: + + (`bool`) + This read/write option specifies the behavior of the subscriber when the queue is full. + When `true` (the default), the subscriber will make room in the queue by removing the oldest message. + When `false`, the subscriber will reject messages if the message queue does not have room. + === Protocol Headers The _sub_ protocol has no protocol-specific headers. diff --git a/include/nng/protocol/pubsub0/sub.h b/include/nng/protocol/pubsub0/sub.h index acb5cda3..81f50a80 100644 --- a/include/nng/protocol/pubsub0/sub.h +++ b/include/nng/protocol/pubsub0/sub.h @@ -30,6 +30,8 @@ NNG_DECL int nng_sub0_open_raw(nng_socket *); #define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" #define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" +#define NNG_OPT_SUB_PREFNEW "sub:prefnew" + #ifdef __cplusplus } #endif diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index be550439..c54274a6 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -267,9 +267,6 @@ pub0_sock_send(void *arg, nni_aio *aio) if (p->closed) { continue; } - if (nni_lmq_full(&p->sendq)) { - continue; - } if (p == nni_list_last(&sock->pipes)) { dup = msg; msg = NULL; @@ -277,6 +274,12 @@ pub0_sock_send(void *arg, nni_aio *aio) continue; } if (p->busy) { + if (nni_lmq_full(&p->sendq)) { + // Make space for the new message. + nni_msg * old; + (void) nni_lmq_getq(&p->sendq, &old); + nni_msg_free(old); + } nni_lmq_putq(&p->sendq, dup); } else { p->busy = true; diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 938ce332..aca28f0c 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -1,6 +1,7 @@ // // Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Nathan Kent <nate@nkent.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -30,6 +31,9 @@ // By default we accept 128 messages. #define SUB0_DEFAULT_QLEN 128 +// By default, prefer new messages when the queue is full. +#define SUB0_DEFAULT_PREFNEW true + typedef struct sub0_pipe sub0_pipe; typedef struct sub0_sock sub0_sock; typedef struct sub0_ctx sub0_ctx; @@ -53,6 +57,7 @@ struct sub0_ctx { nni_list raios; // sub context could have multiple pending recvs bool closed; nni_lmq lmq; + bool prefnew; #if 0 nni_msg **recvq; @@ -69,6 +74,7 @@ struct sub0_sock { sub0_ctx * ctx; // default context nni_list ctxs; // all contexts size_t recvbuflen; + bool prefnew; nni_mtx lk; }; @@ -187,6 +193,7 @@ sub0_ctx_init(void **ctxp, void *sarg) sub0_sock *sock = sarg; sub0_ctx * ctx; size_t len; + bool prefnew; int rv; if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { @@ -195,10 +202,12 @@ sub0_ctx_init(void **ctxp, void *sarg) nni_mtx_lock(&sock->lk); len = sock->recvbuflen; + prefnew = sock->prefnew; if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { return (rv); } + ctx->prefnew = prefnew; nni_aio_list_init(&ctx->raios); NNI_LIST_INIT(&ctx->topics, sub0_topic, node); @@ -241,6 +250,7 @@ sub0_sock_init(void **sp, nni_sock *nsock) NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node); nni_mtx_init(&sock->lk); sock->recvbuflen = SUB0_DEFAULT_QLEN; + sock->prefnew = SUB0_DEFAULT_PREFNEW; if (((rv = sub0_ctx_init((void **) &sock->ctx, sock)) != 0) || ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) { @@ -376,7 +386,7 @@ sub0_recv_cb(void *arg) NNI_LIST_FOREACH (&sock->ctxs, ctx) { nni_msg *dup; - if (nni_lmq_full(&ctx->lmq)) { + if (nni_lmq_full(&ctx->lmq) && !ctx->prefnew) { // Cannot deliver here, as receive buffer is full. continue; } @@ -405,6 +415,13 @@ sub0_recv_cb(void *arg) // Save for synchronous completion nni_list_append(&finish, aio); + } else if (nni_lmq_full(&ctx->lmq)) { + // Make space for the new message. + nni_msg * old; + (void) nni_lmq_getq(&ctx->lmq, &old); + nni_msg_free(old); + + (void) nni_lmq_putq(&ctx->lmq, dup); } else { (void) nni_lmq_putq(&ctx->lmq, dup); } @@ -555,6 +572,42 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) return (0); } +static int +sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + bool val; + + nni_mtx_lock(&sock->lk); + val = ctx->prefnew; + nni_mtx_unlock(&sock->lk); + + return (nni_copyout_bool(val, buf, szp, t)); +} + +static int +sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_ctx * ctx = arg; + sub0_sock * sock = ctx->sock; + bool val; + int rv; + + if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) { + return (rv); + } + + nni_mtx_lock(&sock->lk); + ctx->prefnew = val; + if (sock->ctx == ctx) { + sock->prefnew = val; + } + nni_mtx_unlock(&sock->lk); + + return (0); +} + static nni_option sub0_ctx_options[] = { { .o_name = NNG_OPT_RECVBUF, @@ -570,6 +623,11 @@ static nni_option sub0_ctx_options[] = { .o_set = sub0_ctx_unsubscribe, }, { + .o_name = NNG_OPT_SUB_PREFNEW, + .o_get = sub0_ctx_get_prefnew, + .o_set = sub0_ctx_set_prefnew, + }, + { .o_name = NULL, }, }; @@ -632,6 +690,20 @@ sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) return (sub0_ctx_unsubscribe(sock->ctx, buf, sz, t)); } +static int +sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_get_prefnew(sock->ctx, buf, szp, t)); +} + +static int +sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) +{ + sub0_sock *sock = arg; + return (sub0_ctx_set_prefnew(sock->ctx, buf, sz, t)); +} + // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops sub0_pipe_ops = { @@ -668,6 +740,11 @@ static nni_option sub0_sock_options[] = { .o_get = sub0_sock_get_recvbuf, .o_set = sub0_sock_set_recvbuf, }, + { + .o_name = NNG_OPT_SUB_PREFNEW, + .o_get = sub0_sock_get_prefnew, + .o_set = sub0_sock_set_prefnew, + }, // terminate list { .o_name = NULL, |
