aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-04 02:46:40 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-04 02:46:40 -0800
commitfb6550a242bb1742ec62202a99d0604ee9069795 (patch)
treebd235a8ddf6766dc54c3e47b31dbc7a59802d5da /src
parentc17a1dd3f5333da59355ecc3f8788a0396a8f72d (diff)
downloadnng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.gz
nng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.bz2
nng-fb6550a242bb1742ec62202a99d0604ee9069795.zip
Pipeline protocol now entirely callback driven.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c15
-rw-r--r--src/core/aio.h7
-rw-r--r--src/core/list.c9
-rw-r--r--src/core/list.h1
-rw-r--r--src/core/msgqueue.c168
-rw-r--r--src/core/msgqueue.h8
-rw-r--r--src/core/protocol.h5
-rw-r--r--src/core/socket.c4
-rw-r--r--src/core/taskq.c12
-rw-r--r--src/core/timer.c23
-rw-r--r--src/core/timer.h4
-rw-r--r--src/protocol/pipeline/pull.c107
-rw-r--r--src/protocol/pipeline/push.c197
-rw-r--r--src/transport/inproc/inproc.c34
14 files changed, 472 insertions, 122 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a3f4fb28..ffc5ac06 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,20 +12,29 @@
#define NNI_AIO_WAKE (1<<0)
-void
+int
nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
+ int rv;
+
if (cb == NULL) {
cb = (nni_cb) nni_aio_wake;
arg = aio;
}
memset(aio, 0, sizeof (*aio));
- nni_mtx_init(&aio->a_lk);
- nni_cv_init(&aio->a_cv, &aio->a_lk);
+ if ((rv = nni_mtx_init(&aio->a_lk)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) {
+ nni_mtx_fini(&aio->a_lk);
+ return (rv);
+ }
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
+
+ return (0);
}
diff --git a/src/core/aio.h b/src/core/aio.h
index f6096ca7..80a3ac02 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -37,12 +37,9 @@ struct nni_aio {
nni_iov a_iov[4];
int a_niov;
- // Sendmsg operation.
+ // Message operations.
nni_msg * a_msg;
- // Recvmsg operation.
- nni_msg ** a_msgp;
-
// TBD: Resolver operations.
// Provider-use fields.
@@ -54,7 +51,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern void nni_aio_init(nni_aio *, nni_cb, void *);
+extern int nni_aio_init(nni_aio *, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
diff --git a/src/core/list.c b/src/core/list.c
index 821e48bd..1a5a4eb1 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -149,3 +149,12 @@ nni_list_remove(nni_list *list, void *item)
node->ln_next = NULL;
node->ln_prev = NULL;
}
+
+
+int
+nni_list_active(nni_list *list, void *item)
+{
+ nni_list_node *node = NODE(list, item);
+
+ return (node->ln_next == NULL ? 0 : 1);
+}
diff --git a/src/core/list.h b/src/core/list.h
index dd26377e..9f21e4d6 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -41,6 +41,7 @@ extern void nni_list_insert_after(nni_list *, void *, void *);
extern void *nni_list_next(const nni_list *, void *);
extern void *nni_list_prev(const nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
+extern int nni_list_active(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8b9a7e1a..2673063d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -39,6 +39,9 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
+
+ nni_timer_node mq_timer;
+ nni_time mq_expire;
};
@@ -89,6 +92,8 @@ nni_msgq_kick(nni_msgq *mq, int sig)
}
+static void nni_msgq_run_timeout(void *);
+
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -127,6 +132,8 @@ nni_msgq_init(nni_msgq **mqp, int cap)
goto fail;
}
+ nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq);
+
mq->mq_cap = cap;
mq->mq_alloc = alloc;
mq->mq_len = 0;
@@ -139,6 +146,7 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_rwait = 0;
mq->mq_notify_fn = NULL;
mq->mq_notify_arg = NULL;
+ mq->mq_expire = NNI_TIME_NEVER;
*mqp = mq;
return (0);
@@ -287,7 +295,7 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_getq, raio);
nni_list_remove(&mq->mq_aio_putq, waio);
- *raio->a_msgp = msg;
+ raio->a_msg = msg;
waio->a_msg = NULL;
nni_aio_finish(raio, 0, len);
@@ -296,7 +304,8 @@ nni_msgq_run_putq(nni_msgq *mq)
}
// Otherwise if we have room in the buffer, just queue it.
- if (mq->mq_len < mq->mq_cap) {
+ if ((mq->mq_len < mq->mq_cap) ||
+ ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -311,6 +320,14 @@ nni_msgq_run_putq(nni_msgq *mq)
// Unable to make progress, leave the aio where it is.
break;
}
+
+ // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC.
+ if (mq->mq_len != 0) {
+ nni_cv_wake(&mq->mq_readable);
+ }
+ if (mq->mq_len < mq->mq_cap) {
+ nni_cv_wake(&mq->mq_writeable);
+ }
}
@@ -325,13 +342,14 @@ nni_msgq_run_getq(nni_msgq *mq)
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
+ nni_list_remove(&mq->mq_aio_getq, raio);
msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_cap) {
mq->mq_get = 0;
}
mq->mq_len--;
len = nni_msg_len(msg);
- *(raio->a_msgp) = msg;
+ raio->a_msg = msg;
nni_aio_finish(raio, 0, len);
continue;
}
@@ -344,7 +362,7 @@ nni_msgq_run_getq(nni_msgq *mq)
msg = waio->a_msg;
len = nni_msg_len(msg);
waio->a_msg = NULL;
- *raio->a_msgp = msg;
+ raio->a_msg = msg;
nni_aio_finish(raio, 0, len);
nni_aio_finish(waio, 0, len);
continue;
@@ -363,39 +381,93 @@ nni_msgq_run_notify(nni_msgq *mq)
}
-int
+void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
+ if (aio->a_expire < mq->mq_expire) {
+ mq->mq_expire = aio->a_expire;
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
nni_mtx_unlock(&mq->mq_lock);
- return (0);
}
-int
+void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
+ if (aio->a_expire < mq->mq_expire) {
+ mq->mq_expire = aio->a_expire;
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ // NB: nni_list_active and nni_list_remove only use the list structure
+ // to determine list node offsets. Otherwise, they only look at the
+ // node's linkage structure. Therefore the following check will remove
+ // the node from either the getq or the putq list.
+ if (nni_list_active(&mq->mq_aio_getq, aio)) {
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+int
+nni_msgq_canput(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if ((mq->mq_len < mq->mq_cap) ||
+ (mq->mq_rwait != 0) || // XXX: REMOVE ME
+ (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (1);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+}
+
+
+int
+nni_msgq_canget(nni_msgq *mq)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if ((mq->mq_len != 0) ||
+ (mq->mq_wwait != 0) ||
+ (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (1);
+ }
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
void
-nni_msgq_run_timeout(nni_msgq *mq)
+nni_msgq_run_timeout(void *arg)
{
+ nni_msgq *mq = arg;
nni_time now;
+ nni_time exp;
nni_aio *aio;
nni_aio *naio;
int rv;
now = nni_clock();
+ exp = NNI_TIME_NEVER;
nni_mtx_lock(&mq->mq_lock);
naio = nni_list_first(&mq->mq_aio_getq);
@@ -405,6 +477,9 @@ nni_msgq_run_timeout(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_getq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
}
+ if (exp > aio->a_expire) {
+ exp = aio->a_expire;
+ }
}
naio = nni_list_first(&mq->mq_aio_putq);
@@ -414,8 +489,15 @@ nni_msgq_run_timeout(nni_msgq *mq)
nni_list_remove(&mq->mq_aio_putq, aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
}
+ if (exp > aio->a_expire) {
+ exp = aio->a_expire;
+ }
}
+ mq->mq_expire = exp;
+ if (mq->mq_expire != NNI_TIME_NEVER) {
+ nni_timer_schedule(&mq->mq_timer, mq->mq_expire);
+ }
nni_mtx_unlock(&mq->mq_lock);
}
@@ -495,6 +577,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
}
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
+ nni_msgq_run_getq(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
@@ -565,6 +648,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
}
nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
+ nni_msgq_run_putq(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
@@ -572,6 +656,35 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
int
+nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
+{
+ nni_aio aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ aio.a_expire = expire;
+ nni_msgq_aio_get(mq, &aio);
+ nni_aio_wait(&aio);
+ if ((rv = nni_aio_result(&aio)) == 0) {
+ *msgp = aio.a_msg;
+ aio.a_msg = NULL;
+ }
+ nni_aio_fini(&aio);
+ return (rv);
+}
+
+
+int
+nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
+{
+ return (nni_msgq_get_until(mq, msgp, NNI_TIME_NEVER));
+}
+
+
+#if 0
+int
nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
{
nni_signal nosig = 0;
@@ -580,6 +693,9 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
}
+#endif
+
+
int
nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
{
@@ -587,6 +703,7 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
}
+#if 0
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -596,6 +713,34 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
}
+#endif
+
+int
+nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
+{
+ nni_aio aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ aio.a_expire = expire;
+ aio.a_msg = msg;
+ nni_msgq_aio_put(mq, &aio);
+ nni_aio_wait(&aio);
+ nni_aio_fini(&aio);
+ return (rv);
+}
+
+
+int
+nni_msgq_put(nni_msgq *mq, nni_msg *msg)
+{
+ return (nni_msgq_put_until(mq, msg, NNI_TIME_NEVER));
+}
+
+
+#if 0
int
nni_msgq_put(nni_msgq *mq, nni_msg *msg)
{
@@ -605,6 +750,9 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
}
+#endif
+
+
int
nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
{
@@ -621,6 +769,7 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
}
+#if 0
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
@@ -630,6 +779,9 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
}
+#endif
+
+
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index a0af12be..de23ebac 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -35,11 +35,13 @@ extern int nni_msgq_init(nni_msgq **, int);
// messages that may be in the queue.
extern void nni_msgq_fini(nni_msgq *);
-extern int nni_msgq_aio_put(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_get(nni_msgq *, nni_aio *);
+extern int nni_msgq_canget(nni_msgq *);
+extern int nni_msgq_canput(nni_msgq *);
+extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
+extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
+extern void nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
// nni_msgq_put puts the message to the queue. It blocks until it
// was able to do so, or the queue is closed, returning either 0 on
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 9869afc9..877eec31 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -62,6 +62,11 @@ struct nni_proto_sock_ops {
// block as needed.
void (*sock_fini)(void *);
+ // Open the protocol instance. This is run with the lock held,
+ // and intended to allow the protocol to start any asynchronous
+ // processing.
+ void (*sock_open)(void *);
+
// Close the protocol instance. This is run with the lock held,
// and intended to initiate closure of the socket. For example,
// it can signal the socket worker threads to exit.
diff --git a/src/core/socket.c b/src/core/socket.c
index 52a4d6d9..d58e64ba 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -325,6 +325,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
if (sops->sock_close == NULL) {
sops->sock_close = nni_sock_nullop;
}
+ if (sops->sock_open == NULL) {
+ sops->sock_open = nni_sock_nullop;
+ }
sock->s_pipe_ops = *proto->proto_pipe_ops;
pops = &sock->s_pipe_ops;
if (pops->pipe_add == NULL) {
@@ -404,6 +407,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_thr_run(&sock->s_worker_thr[i]);
}
+ sops->sock_open(sock->s_data);
nni_thr_run(&sock->s_reaper);
nni_thr_run(&sock->s_notifier);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 2b2b0d1a..d473b65c 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -33,6 +33,7 @@ nni_taskq_thread(void *self)
ent->tqe_tq = NULL;
nni_mtx_unlock(&tq->tq_mtx);
ent->tqe_cb(ent->tqe_arg);
+ nni_mtx_lock(&tq->tq_mtx);
continue;
}
@@ -56,8 +57,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr)
if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&tq->tq_mtx);
- nni_cv_init(&tq->tq_cv, &tq->tq_mtx);
+ if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) {
+ NNI_FREE_STRUCT(tq);
+ return (rv);
+ }
+ if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) {
+ nni_mtx_fini(&tq->tq_mtx);
+ NNI_FREE_STRUCT(tq);
+ return (rv);
+ }
tq->tq_close = 0;
NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node);
diff --git a/src/core/timer.c b/src/core/timer.c
index 59966822..778a82b7 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -12,10 +12,6 @@
#include <stdlib.h>
#include <string.h>
-extern void nni_timer_schedule(nni_timer_node *);
-extern void nni_timer_cancel(nni_timer_node *);
-extern int nni_timer_init(void);
-extern void nni_timer_fini(void);
static void nni_timer_loop(void *);
struct nni_timer {
@@ -76,6 +72,21 @@ nni_timer_sys_fini(void)
void
+nni_timer_init(nni_timer_node *node, nni_cb cb, void *arg)
+{
+ node->t_cb = cb;
+ node->t_arg = arg;
+}
+
+
+void
+nni_timer_fini(nni_timer_node *node)
+{
+ NNI_ARG_UNUSED(node);
+}
+
+
+void
nni_timer_cancel(nni_timer_node *node)
{
nni_timer *timer = &nni_global_timer;
@@ -92,12 +103,14 @@ nni_timer_cancel(nni_timer_node *node)
void
-nni_timer_schedule(nni_timer_node *node)
+nni_timer_schedule(nni_timer_node *node, nni_time when)
{
nni_timer *timer = &nni_global_timer;
nni_timer_node *srch;
int wake = 1;
+ node->t_expire = when;
+
nni_mtx_lock(&timer->t_list_mx);
srch = nni_list_first(&timer->t_entries);
diff --git a/src/core/timer.h b/src/core/timer.h
index a5bd4633..89a5cda0 100644
--- a/src/core/timer.h
+++ b/src/core/timer.h
@@ -25,7 +25,9 @@ struct nni_timer_node {
typedef struct nni_timer_node nni_timer_node;
-extern void nni_timer_schedule(nni_timer_node *);
+extern void nni_timer_init(nni_timer_node *, nni_cb, void *);
+extern void nni_timer_fini(nni_timer_node *);
+extern void nni_timer_schedule(nni_timer_node *, nni_time);
extern void nni_timer_cancel(nni_timer_node *);
extern int nni_timer_sys_init(void);
extern void nni_timer_sys_fini(void);
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 6f2d716b..ec66fab6 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -17,6 +17,11 @@
typedef struct nni_pull_pipe nni_pull_pipe;
typedef struct nni_pull_sock nni_pull_sock;
+static void nni_pull_putq_cb(void *);
+static void nni_pull_recv_cb(void *);
+static void nni_pull_recv(nni_pull_pipe *);
+static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
+
// An nni_pull_sock is our per-socket protocol private structure.
struct nni_pull_sock {
nni_msgq * urq;
@@ -27,6 +32,8 @@ struct nni_pull_sock {
struct nni_pull_pipe {
nni_pipe * pipe;
nni_pull_sock * pull;
+ nni_aio putq_aio;
+ nni_aio recv_aio;
};
static int
@@ -60,10 +67,20 @@ static int
nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pull_pipe *pp;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) {
+ NNI_FREE_STRUCT(pp);
+ return (rv);
+ }
+ if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) {
+ nni_aio_fini(&pp->putq_aio);
+ NNI_FREE_STRUCT(pp);
+ return (rv);
+ }
pp->pipe = pipe;
pp->pull = psock;
*ppp = pp;
@@ -77,28 +94,93 @@ nni_pull_pipe_fini(void *arg)
nni_pull_pipe *pp = arg;
if (pp != NULL) {
+ nni_aio_fini(&pp->putq_aio);
+ nni_aio_fini(&pp->recv_aio);
NNI_FREE_STRUCT(pp);
}
}
+static int
+nni_pull_pipe_start(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+
+ // Start the pending pull...
+ nni_pull_recv(pp);
+
+ return (0);
+}
+
+
static void
-nni_pull_pipe_recv(void *arg)
+nni_pull_pipe_stop(void *arg)
{
nni_pull_pipe *pp = arg;
- nni_pull_sock *pull = pp->pull;
+
+ // Cancel any pending sendup.
+ nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio);
+}
+
+
+static void
+nni_pull_recv_cb(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+ nni_aio *aio = &pp->recv_aio;
nni_msg *msg;
- for (;;) {
- if (nni_pipe_recv(pp->pipe, &msg) != 0) {
- break;
- }
- if (nni_msgq_put(pull->urq, msg) != 0) {
- nni_msg_free(msg);
- break;
- }
+ if (nni_aio_result(aio) != 0) {
+ // Failed to get a message, probably the pipe is closed.
+ nni_pipe_close(pp->pipe);
+ return;
+ }
+
+ // Got a message... start the put to send it up to the application.
+ msg = aio->a_msg;
+ aio->a_msg = NULL;
+ nni_pull_putq(pp, msg);
+}
+
+
+static void
+nni_pull_putq_cb(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+ nni_aio *aio = &pp->putq_aio;
+ int rv;
+
+ if (nni_aio_result(aio) != 0) {
+ // If we failed to put, probably NNG_ECLOSED, nothing else
+ // we can do. Just close the pipe.
+ nni_pipe_close(pp->pipe);
+ return;
+ }
+
+ nni_pull_recv(pp);
+}
+
+
+// nni_pull_recv is called to schedule a pending recv on the incoming pipe.
+static void
+nni_pull_recv(nni_pull_pipe *pp)
+{
+ // Schedule the aio with callback.
+ if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) {
+ nni_pipe_close(pp->pipe);
}
- nni_pipe_close(pp->pipe);
+}
+
+
+// nni_pull_putq schedules a put operation to the user socket (sendup).
+static void
+nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
+{
+ nni_pull_sock *pull = pp->pull;
+
+ pp->putq_aio.a_msg = msg;
+
+ nni_msgq_aio_put(pull->urq, &pp->putq_aio);
}
@@ -141,7 +223,8 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
- .pipe_worker = { nni_pull_pipe_recv },
+ .pipe_add = nni_pull_pipe_start,
+ .pipe_rem = nni_pull_pipe_stop,
};
static nni_proto_sock_ops nni_pull_sock_ops = {
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 3c3164d5..fcbb6d4f 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -19,27 +19,34 @@
typedef struct nni_push_pipe nni_push_pipe;
typedef struct nni_push_sock nni_push_sock;
+static void nni_push_send_cb(void *);
+static void nni_push_recv_cb(void *);
+static void nni_push_getq_cb(void *);
+static void nni_push_recv(nni_push_pipe *);
+static void nni_push_send(nni_push_sock *);
+
// An nni_push_sock is our per-socket protocol private structure.
struct nni_push_sock {
- nni_cv cv;
nni_msgq * uwq;
+ nni_msg * msg; // pending message
int raw;
- int closing;
- int wantw;
nni_list pipes;
nni_push_pipe * nextpipe;
int npipes;
nni_sock * sock;
+
+ nni_aio aio_getq;
};
// An nni_push_pipe is our per-pipe protocol private structure.
struct nni_push_pipe {
nni_pipe * pipe;
nni_push_sock * push;
- nni_msgq * mq;
- int sigclose;
int wantr;
nni_list_node node;
+
+ nni_aio aio_recv;
+ nni_aio aio_send;
};
static int
@@ -51,14 +58,13 @@ nni_push_sock_init(void **pushp, nni_sock *sock)
if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) {
+ if ((rv = nni_aio_init(&push->aio_getq, nni_push_getq_cb, push)) != 0) {
NNI_FREE_STRUCT(push);
return (rv);
}
NNI_LIST_INIT(&push->pipes, nni_push_pipe, node);
push->raw = 0;
push->npipes = 0;
- push->wantw = 0;
push->nextpipe = NULL;
push->sock = sock;
push->uwq = nni_sock_sendq(sock);
@@ -69,14 +75,20 @@ nni_push_sock_init(void **pushp, nni_sock *sock)
static void
+nni_push_sock_open(void *arg)
+{
+ nni_push_sock *push = arg;
+
+ nni_msgq_aio_get(push->uwq, &push->aio_getq);
+}
+
+
+static void
nni_push_sock_close(void *arg)
{
nni_push_sock *push = arg;
- // Shut down the resender. We request it to exit by clearing
- // its old value, then kick it.
- push->closing = 1;
- nni_cv_wake(&push->cv);
+ nni_msgq_aio_cancel(push->uwq, &push->aio_getq);
}
@@ -86,7 +98,9 @@ nni_push_sock_fini(void *arg)
nni_push_sock *push = arg;
if (push != NULL) {
- nni_cv_fini(&push->cv);
+ if (push->msg != NULL) {
+ nni_msg_free(push->msg);
+ }
NNI_FREE_STRUCT(push);
}
}
@@ -101,13 +115,17 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&pp->mq, 0)) != 0) {
+ if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) {
+ NNI_FREE_STRUCT(pp);
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) {
+ nni_aio_fini(&pp->aio_recv);
NNI_FREE_STRUCT(pp);
return (rv);
}
NNI_LIST_NODE_INIT(&pp->node);
pp->pipe = pipe;
- pp->sigclose = 0;
pp->push = psock;
pp->wantr = 0;
*ppp = pp;
@@ -121,7 +139,8 @@ nni_push_pipe_fini(void *arg)
nni_push_pipe *pp = arg;
if (pp != NULL) {
- nni_msgq_fini(pp->mq);
+ nni_aio_fini(&pp->aio_recv);
+ nni_aio_fini(&pp->aio_send);
NNI_FREE_STRUCT(pp);
}
}
@@ -140,9 +159,19 @@ nni_push_pipe_add(void *arg)
// The end makes our test cases easier.
nni_list_append(&push->pipes, pp);
+ // We start out wanting data to read.
+ pp->wantr = 1;
+
// Wake the top sender, as we can accept a job.
push->npipes++;
- nni_cv_wake(&push->cv);
+
+ // Schedule a receiver. This is mostly so that we can detect
+ // a closed transport pipe.
+ nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
+
+ // Possibly schedule the sender.
+ nni_push_send(pp->push);
+
return (0);
}
@@ -162,46 +191,66 @@ nni_push_pipe_rem(void *arg)
static void
-nni_push_pipe_send(void *arg)
+nni_push_recv(nni_push_pipe *pp)
+{
+ nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
+}
+
+
+static void
+nni_push_recv_cb(void *arg)
+{
+ nni_push_pipe *pp = arg;
+
+ // We normally expect to receive an error. If a pipe actually
+ // sends us data, we just discard it.
+ if (nni_aio_result(&pp->aio_recv) != 0) {
+ nni_pipe_close(pp->pipe);
+ return;
+ }
+ nni_push_recv(pp);
+}
+
+
+static void
+nni_push_send_cb(void *arg)
{
nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
nni_mtx *mx = nni_sock_mtx(push->sock);
- nni_msg *msg;
- for (;;) {
- nni_mtx_lock(mx);
- pp->wantr = 1;
- if (push->wantw) {
- nni_cv_wake(&push->cv);
- }
- nni_mtx_unlock(mx);
- if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) {
- break;
- }
- if (nni_pipe_send(pp->pipe, msg) != 0) {
- nni_msg_free(msg);
- break;
- }
+ if (nni_aio_result(&pp->aio_send) != 0) {
+ nni_pipe_close(pp->pipe);
+ return;
}
- nni_pipe_close(pp->pipe);
+
+ nni_mtx_lock(mx);
+ pp->wantr = 1;
+
+ // This effectively kicks off a pull down.
+ nni_push_send(pp->push);
+ nni_mtx_unlock(mx);
}
static void
-nni_push_pipe_recv(void *arg)
+nni_push_getq_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_msg *msg;
+ nni_push_sock *push = arg;
+ nni_mtx *mx = nni_sock_mtx(push->sock);
+ nni_aio *aio = &push->aio_getq;
- for (;;) {
- if (nni_pipe_recv(pp->pipe, &msg) != 0) {
- break;
- }
- nni_msg_free(msg);
+ if (nni_aio_result(aio) != 0) {
+ // If the socket is closing, nothing else we can do.
+ return;
}
- nni_msgq_signal(pp->mq, &pp->sigclose);
- nni_pipe_close(pp->pipe);
+
+ nni_mtx_lock(mx);
+ push->msg = aio->a_msg;
+ aio->a_msg = NULL;
+
+ nni_push_send(push);
+ nni_mtx_unlock(mx);
}
@@ -240,51 +289,35 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
-nni_push_sock_send(void *arg)
+nni_push_send(nni_push_sock *push)
{
- nni_push_sock *push = arg;
nni_push_pipe *pp;
- nni_msgq *uwq = push->uwq;
nni_msg *msg = NULL;
- nni_mtx *mx = nni_sock_mtx(push->sock);
int i;
- for (;;) {
- if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) {
- // Should only be NNG_ECLOSED
- return;
- }
+ if ((msg = push->msg) == NULL) {
+ // Nothing to send... bail...
+ return;
+ }
- nni_mtx_lock(mx);
- if (push->closing) {
- if (msg != NULL) {
- nni_mtx_unlock(mx);
- nni_msg_free(msg);
- return;
- }
- }
- push->wantw = 0;
- for (i = 0; i < push->npipes; i++) {
- pp = push->nextpipe;
- if (pp == NULL) {
- pp = nni_list_first(&push->pipes);
- }
- push->nextpipe = nni_list_next(&push->pipes, pp);
- if (pp->wantr) {
- pp->wantr = 0;
- if (nni_msgq_put(pp->mq, msg) == 0) {
- msg = NULL;
- break;
- }
- }
+ // Let's try to send it.
+ for (i = 0; i < push->npipes; i++) {
+ pp = push->nextpipe;
+ if (pp == NULL) {
+ pp = nni_list_first(&push->pipes);
}
- if (msg != NULL) {
- // We weren't able to deliver it, so keep it and
- // wait for a sender to let us know its ready.
- push->wantw = 1;
- nni_cv_wait(&push->cv);
+ push->nextpipe = nni_list_next(&push->pipes, pp);
+ if (pp->wantr) {
+ pp->aio_send.a_msg = msg;
+ push->msg = NULL;
+
+ // Schedule outbound pipe delivery...
+ nni_pipe_aio_send(pp->pipe, &pp->aio_send);
+
+ // And schedule getting another message for send.
+ nni_msgq_aio_get(push->uwq, &push->aio_getq);
+ break;
}
- nni_mtx_unlock(mx);
}
}
@@ -296,17 +329,15 @@ static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_fini = nni_push_pipe_fini,
.pipe_add = nni_push_pipe_add,
.pipe_rem = nni_push_pipe_rem,
- .pipe_worker = { nni_push_pipe_send,
- nni_push_pipe_recv },
};
static nni_proto_sock_ops nni_push_sock_ops = {
.sock_init = nni_push_sock_init,
.sock_fini = nni_push_sock_fini,
+ .sock_open = nni_push_sock_open,
.sock_close = nni_push_sock_close,
.sock_setopt = nni_push_sock_setopt,
.sock_getopt = nni_push_sock_getopt,
- .sock_worker = { nni_push_sock_send },
};
nni_proto nni_push_proto = {
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 66f076ea..0cc208d4 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -129,6 +129,38 @@ nni_inproc_pipe_destroy(void *arg)
static int
+nni_inproc_pipe_aio_send(void *arg, nni_aio *aio)
+{
+ nni_inproc_pipe *pipe = arg;
+ nni_msg *msg = aio->a_msg;
+ char *h;
+ size_t l;
+ int rv;
+
+ // We need to move any header data to the body, because the other
+ // side won't know what to do otherwise.
+ h = nni_msg_header(msg);
+ l = nni_msg_header_len(msg);
+ if ((rv = nni_msg_prepend(msg, h, l)) != 0) {
+ return (rv);
+ }
+ nni_msg_trunc_header(msg, l);
+ nni_msgq_aio_put(pipe->wq, aio);
+ return (0);
+}
+
+
+static int
+nni_inproc_pipe_aio_recv(void *arg, nni_aio *aio)
+{
+ nni_inproc_pipe *pipe = arg;
+
+ nni_msgq_aio_get(pipe->rq, aio);
+ return (0);
+}
+
+
+static int
nni_inproc_pipe_send(void *arg, nni_msg *msg)
{
nni_inproc_pipe *pipe = arg;
@@ -403,6 +435,8 @@ static nni_tran_pipe nni_inproc_pipe_ops = {
.pipe_destroy = nni_inproc_pipe_destroy,
.pipe_send = nni_inproc_pipe_send,
.pipe_recv = nni_inproc_pipe_recv,
+ .pipe_aio_send = nni_inproc_pipe_aio_send,
+ .pipe_aio_recv = nni_inproc_pipe_aio_recv,
.pipe_close = nni_inproc_pipe_close,
.pipe_peer = nni_inproc_pipe_peer,
.pipe_getopt = nni_inproc_pipe_getopt,