aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c15
-rw-r--r--src/core/aio.h7
-rw-r--r--src/core/list.c9
-rw-r--r--src/core/list.h1
-rw-r--r--src/core/msgqueue.c168
-rw-r--r--src/core/msgqueue.h8
-rw-r--r--src/core/protocol.h5
-rw-r--r--src/core/socket.c4
-rw-r--r--src/core/taskq.c12
-rw-r--r--src/core/timer.c23
-rw-r--r--src/core/timer.h4
11 files changed, 229 insertions, 27 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a3f4fb28..ffc5ac06 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,20 +12,29 @@
#define NNI_AIO_WAKE (1<<0)
-void
+int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
+ int rv;
+
if (cb == NULL) {
cb = (nni_cb) nni_aio_wake;
arg = aio;
}
memset(aio, 0, sizeof (*aio));
- nni_mtx_init(&aio->a_lk);
- nni_cv_init(&aio->a_cv, &aio->a_lk);
+ if ((rv = nni_mtx_init(&aio->a_lk)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) {
+ nni_mtx_fini(&aio->a_lk);
+ return (rv);
+ }
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
+
+ return (0);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index f6096ca7..80a3ac02 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -37,12 +37,9 @@ struct nni_aio {
nni_iov a_iov[4];
int a_niov;
- // Sendmsg operation.
+ // Message operations.
nni_msg * a_msg;
- // Recvmsg operation.
- nni_msg ** a_msgp;
-
// TBD: Resolver operations.
// Provider-use fields.
@@ -54,7 +51,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern void nni_aio_init(nni_aio *, nni_cb, void *);
+extern int nni_aio_init(nni_aio *, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
diff --git a/src/core/list.c b/src/core/list.c
index 821e48bd..1a5a4eb1 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -149,3 +149,12 @@ nni_list_remove(nni_list *list, void *item)
node->ln_next = NULL;
node->ln_prev = NULL;
}
+
+
+int
+nni_list_active(nni_list *list, void *item)
+{
+ nni_list_node *node = NODE(list, item);
+
+ return (node->ln_next == NULL ? 0 : 1);
+}
diff --git a/src/core/list.h b/src/core/list.h
index dd26377e..9f21e4d6 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -41,6 +41,7 @@ extern void nni_list_insert_after(nni_list *, void *, void *);
extern void *nni_list_next(const nni_list *, void *);
extern void *nni_list_prev(const nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
+extern int nni_list_active(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8b9a7e1a..2673063d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -39,6 +39,9 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
+
+ nni_timer_node mq_timer;
+ nni_time mq_expire;
};
@@ -89,6 +92,8 @@ nni_msgq_kick(nni_msgq *mq, int sig)
}
+static void nni_msgq_run_timeout(void *);
+
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -127,6 +132,8 @@ nni_msgq_init(nni_msgq **mqp, int cap)
goto fail;
}
+ nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
+
mq->mq_cap = cap;
mq->mq_alloc = alloc;
mq->mq_len = 0;
@@ -139,6 +146,7 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_rwait = 0;
mq->mq_notify_fn = NULL;
mq->mq_notify_arg = NULL;
+ mq->mq_expire = NNI_TIME_NEVER;
*mqp = mq;
return (0);
@@ -287,7 +295,7 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_getq, raio);
nni_list_remove(&mq->mq_aio_putq, waio);
- *raio->a_msgp = msg;
+ raio->a_msg = msg;
waio->a_msg = NULL;
nni_aio_finish(raio, 0, len);
@@ -296,7 +304,8 @@ nni_msgq_run_putq(nni_msgq *mq)
}
// Otherwise if we have room in the buffer, just queue it.
- if (mq->mq_len < mq->mq_cap) {
+ if ((mq->mq_len < mq->mq_cap) ||
+ ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -311,6 +320,14 @@ nni_msgq_run_putq(nni_msgq *mq)
// Unable to make progress, leave the aio where it is.
break;
}
+
+ // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC.
+ if (mq->mq_len != 0) {
+ nni_cv_wake(&mq->mq_readable);
+ }
+ if (mq->mq_len < mq->mq_cap) {
+ nni_cv_wake(&mq->mq_writeable);
+ }
}
@@ -325,13 +342,14 @@ nni_msgq_run_getq(nni_msgq *mq)
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_cap) {
mq->mq_get = 0;
}
mq->mq_len--;
len = nni_msg_len(msg);
- *(raio->a_msgp) = msg;
+ raio->a_msg = msg;
nni_aio_finish(raio, 0, len);
continue;
}
@@ -344,7 +362,7 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = waio->a_msg;
len = nni_msg_len(msg);
waio->a_msg = NULL;
- *raio->a_msgp = msg;
+ raio->a_msg = msg;
nni_aio_finish(raio, 0, len);
nni_aio_finish(waio, 0, len);
continue;
@@ -363,39 +381,93 @@ nni_msgq_run_notify(nni_msgq *mq)
}
-int
+void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
+ if (aio->a_expire < mq->mq_expire) {
+ mq->mq_expire = aio->a_expire;
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
nni_mtx_unlock(&mq->mq_lock);
- return (0);
}
-int
+void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
+ if (aio->a_expire < mq->mq_expire) {
+ mq->mq_expire = aio->a_expire;
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ // NB: nni_list_active and nni_list_remove only use the list structure
+ // to determine list node offsets. Otherwise, they only look at the
+ // node's linkage structure. Therefore the following check will remove
+ // the node from either the getq or the putq list.
+ if (nni_list_active(&mq->mq_aio_getq, aio)) {
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+int
+nni_msgq_canput(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if ((mq->mq_len < mq->mq_cap) ||
+ (mq->mq_rwait != 0) || // XXX: REMOVE ME
+ (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (1);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
+int
+nni_msgq_canget(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if ((mq->mq_len != 0) ||
+ (mq->mq_wwait != 0) ||
+ (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (1);
+ }
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
void
-nni_msgq_run_timeout(nni_msgq *mq)
+nni_msgq_run_timeout(void *arg)
{
+ nni_msgq *mq = arg;
nni_time now;
+ nni_time exp;
nni_aio *aio;
nni_aio *naio;
int rv;
now = nni_clock();
+ exp = NNI_TIME_NEVER;
nni_mtx_lock(&mq->mq_lock);
naio = nni_list_first(&mq->mq_aio_getq);
@@ -405,6 +477,9 @@ nni_msgq_run_timeout(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
}
+ if (exp > aio->a_expire) {
+ exp = aio->a_expire;
+ }
}
naio = nni_list_first(&mq->mq_aio_putq);
@@ -414,8 +489,15 @@ nni_msgq_run_timeout(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
}
+ if (exp > aio->a_expire) {
+ exp = aio->a_expire;
+ }
}
+ mq->mq_expire = exp;
+ if (mq->mq_expire != NNI_TIME_NEVER) {
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
nni_mtx_unlock(&mq->mq_lock);
}
@@ -495,6 +577,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
}
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
+ nni_msgq_run_getq(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
@@ -565,6 +648,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
}
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
+ nni_msgq_run_putq(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
@@ -572,6 +656,35 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
int
+nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
+{
+ nni_aio aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ aio.a_expire = expire;
+ nni_msgq_aio_get(mq, &aio);
+ nni_aio_wait(&aio);
+ if ((rv = nni_aio_result(&aio)) == 0) {
+ *msgp = aio.a_msg;
+ aio.a_msg = NULL;
+ }
+ nni_aio_fini(&aio);
+ return (rv);
+}
+
+
+int
+nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
+{
+ return (nni_msgq_get_until(mq, msgp, NNI_TIME_NEVER));
+}
+
+
+#if 0
+int
nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
{
nni_signal nosig = 0;
@@ -580,6 +693,9 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
}
+#endif
+
+
int
nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
{
@@ -587,6 +703,7 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
}
+#if 0
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -596,6 +713,34 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
}
+#endif
+
+int
+nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
+{
+ nni_aio aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ aio.a_expire = expire;
+ aio.a_msg = msg;
+ nni_msgq_aio_put(mq, &aio);
+ nni_aio_wait(&aio);
+ nni_aio_fini(&aio);
+ return (rv);
+}
+
+
+int
+nni_msgq_put(nni_msgq *mq, nni_msg *msg)
+{
+ return (nni_msgq_put_until(mq, msg, NNI_TIME_NEVER));
+}
+
+
+#if 0
int
nni_msgq_put(nni_msgq *mq, nni_msg *msg)
{
@@ -605,6 +750,9 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
}
+#endif
+
+
int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
@@ -621,6 +769,7 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
}
+#if 0
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
@@ -630,6 +779,9 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
}
+#endif
+
+
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index a0af12be..de23ebac 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -35,11 +35,13 @@ extern int nni_msgq_init(nni_msgq **, int);
// messages that may be in the queue.
extern void nni_msgq_fini(nni_msgq *);
-extern int nni_msgq_aio_put(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_get(nni_msgq *, nni_aio *);
+extern int nni_msgq_canget(nni_msgq *);
+extern int nni_msgq_canput(nni_msgq *);
+extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
+extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
+extern void nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
// nni_msgq_put puts the message to the queue. It blocks until it
// was able to do so, or the queue is closed, returning either 0 on
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 9869afc9..877eec31 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -62,6 +62,11 @@ struct nni_proto_sock_ops {
// block as needed.
void (*sock_fini)(void *);
+ // Open the protocol instance. This is run with the lock held,
+ // and intended to allow the protocol to start any asynchronous
+ // processing.
+ void (*sock_open)(void *);
+
// Close the protocol instance. This is run with the lock held,
// and intended to initiate closure of the socket. For example,
// it can signal the socket worker threads to exit.
diff --git a/src/core/socket.c b/src/core/socket.c
index 52a4d6d9..d58e64ba 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -325,6 +325,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
if (sops->sock_close == NULL) {
sops->sock_close = nni_sock_nullop;
}
+ if (sops->sock_open == NULL) {
+ sops->sock_open = nni_sock_nullop;
+ }
sock->s_pipe_ops = *proto->proto_pipe_ops;
pops = &sock->s_pipe_ops;
if (pops->pipe_add == NULL) {
@@ -404,6 +407,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_thr_run(&sock->s_worker_thr[i]);
}
+ sops->sock_open(sock->s_data);
nni_thr_run(&sock->s_reaper);
nni_thr_run(&sock->s_notifier);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 2b2b0d1a..d473b65c 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -33,6 +33,7 @@ nni_taskq_thread(void *self)
ent->tqe_tq = NULL;
nni_mtx_unlock(&tq->tq_mtx);
ent->tqe_cb(ent->tqe_arg);
+ nni_mtx_lock(&tq->tq_mtx);
continue;
}
@@ -56,8 +57,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&tq->tq_mtx);
- nni_cv_init(&tq->tq_cv, &tq->tq_mtx);
+ if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) {
+ NNI_FREE_STRUCT(tq);
+ return (rv);
+ }
+ if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) {
+ nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCT(tq);
+ return (rv);
+ }
tq->tq_close = 0;
NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
diff --git a/src/core/timer.c b/src/core/timer.c
index 59966822..778a82b7 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -12,10 +12,6 @@
#include <stdlib.h>
#include <string.h>
-extern void nni_timer_schedule(nni_timer_node *);
-extern void nni_timer_cancel(nni_timer_node *);
-extern int nni_timer_init(void);
-extern void nni_timer_fini(void);
static void nni_timer_loop(void *);
struct nni_timer {
@@ -76,6 +72,21 @@ nni_timer_sys_fini(void)
void
+nni_timer_init(nni_timer_node *node, nni_cb cb, void *arg)
+{
+ node->t_cb = cb;
+ node->t_arg = arg;
+}
+
+
+void
+nni_timer_fini(nni_timer_node *node)
+{
+ NNI_ARG_UNUSED(node);
+}
+
+
+void
nni_timer_cancel(nni_timer_node *node)
{
nni_timer *timer = &nni_global_timer;
@@ -92,12 +103,14 @@ nni_timer_cancel(nni_timer_node *node)
void
-nni_timer_schedule(nni_timer_node *node)
+nni_timer_schedule(nni_timer_node *node, nni_time when)
{
nni_timer *timer = &nni_global_timer;
nni_timer_node *srch;
int wake = 1;
+ node->t_expire = when;
+
nni_mtx_lock(&timer->t_list_mx);
srch = nni_list_first(&timer->t_entries);
diff --git a/src/core/timer.h b/src/core/timer.h
index a5bd4633..89a5cda0 100644
--- a/src/core/timer.h
+++ b/src/core/timer.h
@@ -25,7 +25,9 @@ struct nni_timer_node {
typedef struct nni_timer_node nni_timer_node;
-extern void nni_timer_schedule(nni_timer_node *);
+extern void nni_timer_init(nni_timer_node *, nni_cb, void *);
+extern void nni_timer_fini(nni_timer_node *);
+extern void nni_timer_schedule(nni_timer_node *, nni_time);
extern void nni_timer_cancel(nni_timer_node *);
extern int nni_timer_sys_init(void);
extern void nni_timer_sys_fini(void);