aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/msgqueue.c89
-rw-r--r--src/core/pollable.c30
-rw-r--r--src/core/pollable.h8
-rw-r--r--src/sp/protocol/pubsub0/pub.c15
4 files changed, 46 insertions, 96 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 2d767d45..116f0907 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -17,11 +17,11 @@
struct nni_msgq {
nni_mtx mq_lock;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
+ unsigned mq_cap;
+ unsigned mq_alloc; // alloc is cap + 2...
+ unsigned mq_len;
+ unsigned mq_get;
+ unsigned mq_put;
bool mq_closed;
nni_msg **mq_msgs;
@@ -29,8 +29,8 @@ struct nni_msgq {
nni_list mq_aio_getq;
// Pollable status.
- nni_pollable *mq_sendable;
- nni_pollable *mq_recvable;
+ nni_pollable mq_sendable;
+ nni_pollable mq_recvable;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -39,7 +39,7 @@ int
nni_msgq_init(nni_msgq **mqp, unsigned cap)
{
struct nni_msgq *mq;
- int alloc;
+ unsigned alloc;
// We allocate 2 extra cells in the fifo. One to accommodate a
// waiting writer when cap == 0. (We can "briefly" move the message
@@ -58,15 +58,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_recvable = NULL;
- mq->mq_sendable = NULL;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- *mqp = mq;
+ nni_pollable_init(&mq->mq_recvable);
+ nni_pollable_init(&mq->mq_sendable);
+
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ *mqp = mq;
return (0);
}
@@ -89,12 +90,8 @@ nni_msgq_fini(nni_msgq *mq)
nni_msg_free(msg);
}
- if (mq->mq_sendable) {
- nni_pollable_free(mq->mq_sendable);
- }
- if (mq->mq_recvable) {
- nni_pollable_free(mq->mq_recvable);
- }
+ nni_pollable_fini(&mq->mq_sendable);
+ nni_pollable_fini(&mq->mq_recvable);
nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *));
NNI_FREE_STRUCT(mq);
@@ -188,14 +185,14 @@ static void
nni_msgq_run_notify(nni_msgq *mq)
{
if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) {
- nni_pollable_raise(mq->mq_sendable);
+ nni_pollable_raise(&mq->mq_sendable);
} else {
- nni_pollable_clear(mq->mq_sendable);
+ nni_pollable_clear(&mq->mq_sendable);
}
if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- nni_pollable_raise(mq->mq_recvable);
+ nni_pollable_raise(&mq->mq_recvable);
} else {
- nni_pollable_clear(mq->mq_recvable);
+ nni_pollable_clear(&mq->mq_recvable);
}
}
@@ -335,7 +332,7 @@ nni_msgq_cap(nni_msgq *mq)
int rv;
nni_mtx_lock(&mq->mq_lock);
- rv = mq->mq_cap;
+ rv = (int) mq->mq_cap;
nni_mtx_unlock(&mq->mq_lock);
return (rv);
}
@@ -343,12 +340,12 @@ nni_msgq_cap(nni_msgq *mq)
int
nni_msgq_resize(nni_msgq *mq, int cap)
{
- int alloc;
- nni_msg * msg;
+ unsigned alloc;
+ nni_msg *msg;
nni_msg **newq, **oldq;
- int oldget;
- int oldlen;
- int oldalloc;
+ unsigned oldget;
+ unsigned oldlen;
+ unsigned oldalloc;
alloc = cap + 2;
@@ -362,7 +359,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
}
nni_mtx_lock(&mq->mq_lock);
- while (mq->mq_len > (cap + 1)) {
+ while (mq->mq_len > ((unsigned)cap + 1)) {
// too many messages -- we allow that one for
// the case of pushback or cap == 0.
// we delete the oldest messages first
@@ -412,17 +409,10 @@ int
nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp)
{
nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_recvable == NULL) {
- int rv;
- if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- nni_msgq_run_notify(mq);
- }
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
- *sp = mq->mq_recvable;
+ *sp = &mq->mq_recvable;
return (0);
}
@@ -430,16 +420,9 @@ int
nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
{
nni_mtx_lock(&mq->mq_lock);
- if (mq->mq_sendable == NULL) {
- int rv;
- if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- nni_msgq_run_notify(mq);
- }
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
- *sp = mq->mq_sendable;
+ *sp = &mq->mq_sendable;
return (0);
}
diff --git a/src/core/pollable.c b/src/core/pollable.c
index fb6af0f5..8e5ad7c1 100644
--- a/src/core/pollable.c
+++ b/src/core/pollable.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -38,34 +38,9 @@ nni_pollable_fini(nni_pollable *p)
}
}
-int
-nni_pollable_alloc(nni_pollable **pp)
-{
- nni_pollable *p;
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_pollable_init(p);
- *pp = p;
- return (0);
-}
-
-void
-nni_pollable_free(nni_pollable *p)
-{
- if (p == NULL) {
- return;
- }
- nni_pollable_fini(p);
- NNI_FREE_STRUCT(p);
-}
-
void
nni_pollable_raise(nni_pollable *p)
{
- if (p == NULL) {
- return;
- }
if (!nni_atomic_swap_bool(&p->p_raised, true)) {
uint64_t fds;
if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) {
@@ -77,9 +52,6 @@ nni_pollable_raise(nni_pollable *p)
void
nni_pollable_clear(nni_pollable *p)
{
- if (p == NULL) {
- return;
- }
if (nni_atomic_swap_bool(&p->p_raised, false)) {
uint64_t fds;
if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) {
diff --git a/src/core/pollable.h b/src/core/pollable.h
index a71a9693..2dcce0a1 100644
--- a/src/core/pollable.h
+++ b/src/core/pollable.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -15,15 +15,13 @@
typedef struct nni_pollable nni_pollable;
-extern int nni_pollable_alloc(nni_pollable **);
-extern void nni_pollable_free(nni_pollable *);
extern void nni_pollable_raise(nni_pollable *);
extern void nni_pollable_clear(nni_pollable *);
extern int nni_pollable_getfd(nni_pollable *, int *);
// nni_pollable implementation details are private. Only here for inlining.
-// We have joined to the write and read file descriptors into a a single
-// atomic 64 so we can update them together (and we can use cas to be sure
+// We have joined the write and read file descriptors into a single
+// atomic 64, so we can update them together (and we can use cas to be sure
// that such updates are always safe.)
struct nni_pollable {
nni_atomic_u64 p_fds;
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c
index e3d4f16a..cfc3ed6d 100644
--- a/src/sp/protocol/pubsub0/pub.c
+++ b/src/sp/protocol/pubsub0/pub.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -40,7 +40,7 @@ struct pub0_sock {
nni_mtx mtx;
bool closed;
size_t sendbuf;
- nni_pollable *sendable;
+ nni_pollable sendable;
};
// pub0_pipe is our per-pipe protocol private structure.
@@ -60,7 +60,7 @@ pub0_sock_fini(void *arg)
{
pub0_sock *s = arg;
- nni_pollable_free(s->sendable);
+ nni_pollable_fini(&s->sendable);
nni_mtx_fini(&s->mtx);
}
@@ -68,12 +68,9 @@ static int
pub0_sock_init(void *arg, nni_sock *nsock)
{
pub0_sock *sock = arg;
- int rv;
NNI_ARG_UNUSED(nsock);
- if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
- return (rv);
- }
+ nni_pollable_init(&sock->sendable);
nni_mtx_init(&sock->mtx);
NNI_LIST_INIT(&sock->pipes, pub0_pipe, node);
sock->sendbuf = 16; // fairly arbitrary
@@ -267,8 +264,8 @@ pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t)
int rv;
nni_mtx_lock(&sock->mtx);
// PUB sockets are *always* writable.
- nni_pollable_raise(sock->sendable);
- rv = nni_pollable_getfd(sock->sendable, &fd);
+ nni_pollable_raise(&sock->sendable);
+ rv = nni_pollable_getfd(&sock->sendable, &fd);
nni_mtx_unlock(&sock->mtx);
if (rv == 0) {