summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-12-25 13:14:26 -0800
committerGarrett D'Amore <garrett@damore.org>2021-12-25 13:14:26 -0800
commit57e5c8bb2ee122c30fa14127c3b969dc858491c0 (patch)
tree7c95640e8360c3d67d6967791b24a16b00414a5d /src/core/msgqueue.c
parentbeb8eb05310ac945ede357e2e57152f0d71e38ed (diff)
downloadnng-57e5c8bb2ee122c30fa14127c3b969dc858491c0.tar.gz
nng-57e5c8bb2ee122c30fa14127c3b969dc858491c0.tar.bz2
nng-57e5c8bb2ee122c30fa14127c3b969dc858491c0.zip
Pollables can be completely inline.
This eliminates more failure paths, and brings us still closer to eliminating the possibility of failure during socket init.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c89
1 files changed, 36 insertions, 53 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 2d767d45..116f0907 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -17,11 +17,11 @@
struct nni_msgq {
nni_mtx mq_lock;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
+ unsigned mq_cap;
+ unsigned mq_alloc; // alloc is cap + 2...
+ unsigned mq_len;
+ unsigned mq_get;
+ unsigned mq_put;
bool mq_closed;
nni_msg **mq_msgs;
@@ -29,8 +29,8 @@ struct nni_msgq {
nni_list mq_aio_getq;
// Pollable status.
- nni_pollable *mq_sendable;
- nni_pollable *mq_recvable;
+ nni_pollable mq_sendable;
+ nni_pollable mq_recvable;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -39,7 +39,7 @@ int
nni_msgq_init(nni_msgq **mqp, unsigned cap)
{
struct nni_msgq *mq;
- int alloc;
+ unsigned alloc;
// We allocate 2 extra cells in the fifo. One to accommodate a
// waiting writer when cap == 0. (We can "briefly" move the message
@@ -58,15 +58,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_recvable = NULL;
- mq->mq_sendable = NULL;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- *mqp = mq;
+ nni_pollable_init(&mq->mq_recvable);
+ nni_pollable_init(&mq->mq_sendable);
+
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ *mqp = mq;
return (0);
}
@@ -89,12 +90,8 @@ nni_msgq_fini(nni_msgq *mq)
nni_msg_free(msg);
}
- if (mq->mq_sendable) {
- nni_pollable_free(mq->mq_sendable);
- }
- if (mq->mq_recvable) {
- nni_pollable_free(mq->mq_recvable);
- }
+ nni_pollable_fini(&mq->mq_sendable);
+ nni_pollable_fini(&mq->mq_recvable);
nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *));
NNI_FREE_STRUCT(mq);
@@ -188,14 +185,14 @@ static void
nni_msgq_run_notify(nni_msgq *mq)
{
if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) {
- nni_pollable_raise(mq->mq_sendable);
+ nni_pollable_raise(&mq->mq_sendable);
} else {
- nni_pollable_clear(mq->mq_sendable);
+ nni_pollable_clear(&mq->mq_sendable);
}
if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- nni_pollable_raise(mq->mq_recvable);
+ nni_pollable_raise(&mq->mq_recvable);
} else {
- nni_pollable_clear(mq->mq_recvable);
+ nni_pollable_clear(&mq->mq_recvable);
}
}
@@ -335,7 +332,7 @@ nni_msgq_cap(nni_msgq *mq)
int rv;
nni_mtx_lock(&mq->mq_lock);
- rv = mq->mq_cap;
+ rv = (int) mq->mq_cap;
nni_mtx_unlock(&mq->mq_lock);
return (rv);
}
@@ -343,12 +340,12 @@ nni_msgq_cap(nni_msgq *mq)
int
nni_msgq_resize(nni_msgq *mq, int cap)
{
- int alloc;
- nni_msg * msg;
+ unsigned alloc;
+ nni_msg *msg;
nni_msg **newq, **oldq;
- int oldget;
- int oldlen;
- int oldalloc;
+ unsigned oldget;
+ unsigned oldlen;
+ unsigned oldalloc;
alloc = cap + 2;
@@ -362,7 +359,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
}
nni_mtx_lock(&mq->mq_lock);
- while (mq->mq_len > (cap + 1)) {
+ while (mq->mq_len > ((unsigned)cap + 1)) {
// too many messages -- we allow that one for
// the case of pushback or cap == 0.
// we delete the oldest messages first
@@ -412,17 +409,10 @@ int
nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp)
{
nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_recvable == NULL) {
- int rv;
- if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- nni_msgq_run_notify(mq);
- }
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
- *sp = mq->mq_recvable;
+ *sp = &mq->mq_recvable;
return (0);
}
@@ -430,16 +420,9 @@ int
nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
{
nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_sendable == NULL) {
- int rv;
- if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- nni_msgq_run_notify(mq);
- }
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
- *sp = mq->mq_sendable;
+ *sp = &mq->mq_sendable;
return (0);
}