summaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c151
1 files changed, 63 insertions, 88 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index a69af47c..9f7ff7fd 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,37 +15,36 @@
// side can close, and they may be closed more than once.
struct nni_msgq {
- nni_mtx mq_lock;
- nni_cv mq_drained;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- int mq_puterr;
- int mq_geterr;
- int mq_draining;
- nni_msg ** mq_msgs;
-
- nni_list mq_aio_putq;
- nni_list mq_aio_getq;
- nni_list mq_aio_notify_get;
- nni_list mq_aio_notify_put;
-
- nni_timer_node mq_timer;
- nni_time mq_expire;
+ nni_mtx mq_lock;
+ nni_cv mq_drained;
+ int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
+ int mq_draining;
+ nni_msg **mq_msgs;
+
+ nni_list mq_aio_putq;
+ nni_list mq_aio_getq;
+ nni_list mq_aio_notify_get;
+ nni_list mq_aio_notify_put;
+
+ nni_timer_node mq_timer;
+ nni_time mq_expire;
};
-
static void nni_msgq_run_timeout(void *);
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
struct nni_msgq *mq;
- int rv;
- int alloc;
+ int rv;
+ int alloc;
if (cap < 0) {
return (NNG_EINVAL);
@@ -72,24 +71,24 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
goto fail;
}
- if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
+ if ((mq->mq_msgs = nni_alloc(sizeof(nng_msg *) * alloc)) == NULL) {
rv = NNG_ENOMEM;
goto fail;
}
nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- mq->mq_expire = NNI_TIME_NEVER;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ mq->mq_expire = NNI_TIME_NEVER;
mq->mq_draining = 0;
- *mqp = mq;
+ *mqp = mq;
return (0);
@@ -97,13 +96,12 @@ fail:
nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
if (mq->mq_msgs != NULL) {
- nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc);
+ nni_free(mq->mq_msgs, sizeof(nng_msg *) * alloc);
}
NNI_FREE_STRUCT(mq);
return (rv);
}
-
void
nni_msgq_fini(nni_msgq *mq)
{
@@ -127,11 +125,10 @@ nni_msgq_fini(nni_msgq *mq)
nni_msg_free(msg);
}
- nni_free(mq->mq_msgs, mq->mq_alloc * sizeof (nng_msg *));
+ nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *));
NNI_FREE_STRUCT(mq);
}
-
static void
nni_msgq_finish(nni_aio *aio, int rv)
{
@@ -139,7 +136,6 @@ nni_msgq_finish(nni_aio *aio, int rv)
nni_aio_finish(aio, rv, 0);
}
-
void
nni_msgq_set_get_error(nni_msgq *mq, int error)
{
@@ -159,7 +155,6 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_set_put_error(nni_msgq *mq, int error)
{
@@ -179,7 +174,6 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_set_error(nni_msgq *mq, int error)
{
@@ -205,14 +199,13 @@ nni_msgq_set_error(nni_msgq *mq, int error)
nni_mtx_unlock(&mq->mq_lock);
}
-
static void
nni_msgq_run_putq(nni_msgq *mq)
{
nni_aio *waio;
nni_aio *raio;
nni_msg *msg;
- size_t len;
+ size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
msg = waio->a_msg;
@@ -248,7 +241,6 @@ nni_msgq_run_putq(nni_msgq *mq)
}
}
-
static void
nni_msgq_run_getq(nni_msgq *mq)
{
@@ -272,7 +264,7 @@ nni_msgq_run_getq(nni_msgq *mq)
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
+ msg = waio->a_msg;
waio->a_msg = NULL;
raio->a_msg = msg;
nni_msgq_finish(raio, 0);
@@ -286,7 +278,6 @@ nni_msgq_run_getq(nni_msgq *mq)
}
}
-
static void
nni_msgq_run_notify(nni_msgq *mq)
{
@@ -316,7 +307,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
}
-
static void
nni_msgq_cancel(nni_aio *aio)
{
@@ -331,7 +321,6 @@ nni_msgq_cancel(nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
{
@@ -344,7 +333,6 @@ nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
{
@@ -357,7 +345,6 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
@@ -391,7 +378,6 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
@@ -425,7 +411,6 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
}
-
int
nni_msgq_canput(nni_msgq *mq)
{
@@ -442,7 +427,6 @@ nni_msgq_canput(nni_msgq *mq)
return (0);
}
-
int
nni_msgq_canget(nni_msgq *mq)
{
@@ -459,12 +443,11 @@ nni_msgq_canget(nni_msgq *mq)
return (0);
}
-
int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
nni_aio *raio;
- size_t len = nni_msg_len(msg);
+ size_t len = nni_msg_len(msg);
nni_mtx_lock(&mq->mq_lock);
if (mq->mq_closed) {
@@ -500,16 +483,15 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}
-
// XXX: Move this to generic AIO timeout...
void
nni_msgq_run_timeout(void *arg)
{
nni_msgq *mq = arg;
- nni_time now;
- nni_time exp;
- nni_aio *aio;
- nni_aio *naio;
+ nni_time now;
+ nni_time exp;
+ nni_aio * aio;
+ nni_aio * naio;
now = nni_clock();
exp = NNI_TIME_NEVER;
@@ -550,12 +532,11 @@ nni_msgq_run_timeout(void *arg)
nni_mtx_unlock(&mq->mq_lock);
}
-
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
nni_aio aio;
- int rv;
+ int rv;
if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
return (rv);
@@ -564,25 +545,24 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
nni_msgq_aio_get(mq, &aio);
nni_aio_wait(&aio);
if ((rv = nni_aio_result(&aio)) == 0) {
- *msgp = aio.a_msg;
+ *msgp = aio.a_msg;
aio.a_msg = NULL;
}
nni_aio_fini(&aio);
return (rv);
}
-
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
nni_aio aio;
- int rv;
+ int rv;
if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
return (rv);
}
aio.a_expire = expire;
- aio.a_msg = msg;
+ aio.a_msg = msg;
nni_msgq_aio_put(mq, &aio);
nni_aio_wait(&aio);
rv = nni_aio_result(&aio);
@@ -590,14 +570,13 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
return (rv);
}
-
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
nni_aio *aio;
nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
+ mq->mq_closed = 1;
mq->mq_draining = 1;
while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
@@ -622,7 +601,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
nni_mtx_unlock(&mq->mq_lock);
}
-
void
nni_msgq_close(nni_msgq *mq)
{
@@ -660,7 +638,6 @@ nni_msgq_close(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
}
-
int
nni_msgq_len(nni_msgq *mq)
{
@@ -672,7 +649,6 @@ nni_msgq_len(nni_msgq *mq)
return (rv);
}
-
int
nni_msgq_cap(nni_msgq *mq)
{
@@ -684,23 +660,22 @@ nni_msgq_cap(nni_msgq *mq)
return (rv);
}
-
int
nni_msgq_resize(nni_msgq *mq, int cap)
{
- int alloc;
- nni_msg *msg;
+ int alloc;
+ nni_msg * msg;
nni_msg **newq, **oldq;
- int oldget;
- int oldput;
- int oldcap;
- int oldlen;
- int oldalloc;
+ int oldget;
+ int oldput;
+ int oldcap;
+ int oldlen;
+ int oldalloc;
alloc = cap + 2;
if (alloc > mq->mq_alloc) {
- newq = nni_alloc(sizeof (nni_msg *) * alloc);
+ newq = nni_alloc(sizeof(nni_msg *) * alloc);
if (newq == NULL) {
return (NNG_ENOMEM);
}
@@ -726,17 +701,17 @@ nni_msgq_resize(nni_msgq *mq, int cap)
goto out;
}
- oldq = mq->mq_msgs;
- oldget = mq->mq_get;
- oldput = mq->mq_put;
- oldcap = mq->mq_cap;
+ oldq = mq->mq_msgs;
+ oldget = mq->mq_get;
+ oldput = mq->mq_put;
+ oldcap = mq->mq_cap;
oldalloc = mq->mq_alloc;
- oldlen = mq->mq_len;
+ oldlen = mq->mq_len;
mq->mq_msgs = newq;
mq->mq_len = mq->mq_get = mq->mq_put = 0;
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
while (oldlen) {
mq->mq_msgs[mq->mq_put++] = oldq[oldget++];
@@ -749,7 +724,7 @@ nni_msgq_resize(nni_msgq *mq, int cap)
mq->mq_len++;
oldlen--;
}
- nni_free(oldq, sizeof (nni_msg *) * oldalloc);
+ nni_free(oldq, sizeof(nni_msg *) * oldalloc);
out:
// Wake everyone up -- we changed everything.