diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-04 02:46:40 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-04 02:46:40 -0800 |
| commit | fb6550a242bb1742ec62202a99d0604ee9069795 (patch) | |
| tree | bd235a8ddf6766dc54c3e47b31dbc7a59802d5da | |
| parent | c17a1dd3f5333da59355ecc3f8788a0396a8f72d (diff) | |
| download | nng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.gz nng-fb6550a242bb1742ec62202a99d0604ee9069795.tar.bz2 nng-fb6550a242bb1742ec62202a99d0604ee9069795.zip | |
Pipeline protocol now entirely callback driven.
| -rw-r--r-- | src/core/aio.c | 15 | ||||
| -rw-r--r-- | src/core/aio.h | 7 | ||||
| -rw-r--r-- | src/core/list.c | 9 | ||||
| -rw-r--r-- | src/core/list.h | 1 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 168 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 8 | ||||
| -rw-r--r-- | src/core/protocol.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/taskq.c | 12 | ||||
| -rw-r--r-- | src/core/timer.c | 23 | ||||
| -rw-r--r-- | src/core/timer.h | 4 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 107 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 197 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 34 |
14 files changed, 472 insertions, 122 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a3f4fb28..ffc5ac06 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,20 +12,29 @@ #define NNI_AIO_WAKE (1<<0) -void +int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { + int rv; + if (cb == NULL) { cb = (nni_cb) nni_aio_wake; arg = aio; } memset(aio, 0, sizeof (*aio)); - nni_mtx_init(&aio->a_lk); - nni_cv_init(&aio->a_cv, &aio->a_lk); + if ((rv = nni_mtx_init(&aio->a_lk)) != 0) { + return (rv); + } + if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) { + nni_mtx_fini(&aio->a_lk); + return (rv); + } aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; nni_taskq_ent_init(&aio->a_tqe, cb, arg); + + return (0); } diff --git a/src/core/aio.h b/src/core/aio.h index f6096ca7..80a3ac02 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -37,12 +37,9 @@ struct nni_aio { nni_iov a_iov[4]; int a_niov; - // Sendmsg operation. + // Message operations. nni_msg * a_msg; - // Recvmsg operation. - nni_msg ** a_msgp; - // TBD: Resolver operations. // Provider-use fields. @@ -54,7 +51,7 @@ struct nni_aio { // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern void nni_aio_init(nni_aio *, nni_cb, void *); +extern int nni_aio_init(nni_aio *, nni_cb, void *); // nni_aio_fini finalizes the aio, releasing resources (locks) // associated with it. The caller is responsible for ensuring that any diff --git a/src/core/list.c b/src/core/list.c index 821e48bd..1a5a4eb1 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -149,3 +149,12 @@ nni_list_remove(nni_list *list, void *item) node->ln_next = NULL; node->ln_prev = NULL; } + + +int +nni_list_active(nni_list *list, void *item) +{ + nni_list_node *node = NODE(list, item); + + return (node->ln_next == NULL ? 0 : 1); +} diff --git a/src/core/list.h b/src/core/list.h index dd26377e..9f21e4d6 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -41,6 +41,7 @@ extern void nni_list_insert_after(nni_list *, void *, void *); extern void *nni_list_next(const nni_list *, void *); extern void *nni_list_prev(const nni_list *, void *); extern void nni_list_remove(nni_list *, void *); +extern int nni_list_active(nni_list *, void *); #define NNI_LIST_FOREACH(l, it) \ for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 8b9a7e1a..2673063d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -39,6 +39,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; + + nni_timer_node mq_timer; + nni_time mq_expire; }; @@ -89,6 +92,8 @@ nni_msgq_kick(nni_msgq *mq, int sig) } +static void nni_msgq_run_timeout(void *); + int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -127,6 +132,8 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } + nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); + mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -139,6 +146,7 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_rwait = 0; mq->mq_notify_fn = NULL; mq->mq_notify_arg = NULL; + mq->mq_expire = NNI_TIME_NEVER; *mqp = mq; return (0); @@ -287,7 +295,7 @@ nni_msgq_run_putq(nni_msgq *mq) nni_list_remove(&mq->mq_aio_getq, raio); nni_list_remove(&mq->mq_aio_putq, waio); - *raio->a_msgp = msg; + raio->a_msg = msg; waio->a_msg = NULL; nni_aio_finish(raio, 0, len); @@ -296,7 +304,8 @@ nni_msgq_run_putq(nni_msgq *mq) } // Otherwise if we have room in the buffer, just queue it. - if (mq->mq_len < mq->mq_cap) { + if ((mq->mq_len < mq->mq_cap) || + ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) { nni_list_remove(&mq->mq_aio_putq, waio); mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { @@ -311,6 +320,14 @@ nni_msgq_run_putq(nni_msgq *mq) // Unable to make progress, leave the aio where it is. break; } + + // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC. + if (mq->mq_len != 0) { + nni_cv_wake(&mq->mq_readable); + } + if (mq->mq_len < mq->mq_cap) { + nni_cv_wake(&mq->mq_writeable); + } } @@ -325,13 +342,14 @@ nni_msgq_run_getq(nni_msgq *mq) while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { + nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_cap) { mq->mq_get = 0; } mq->mq_len--; len = nni_msg_len(msg); - *(raio->a_msgp) = msg; + raio->a_msg = msg; nni_aio_finish(raio, 0, len); continue; } @@ -344,7 +362,7 @@ nni_msgq_run_getq(nni_msgq *mq) msg = waio->a_msg; len = nni_msg_len(msg); waio->a_msg = NULL; - *raio->a_msgp = msg; + raio->a_msg = msg; nni_aio_finish(raio, 0, len); nni_aio_finish(waio, 0, len); continue; @@ -363,39 +381,93 @@ nni_msgq_run_notify(nni_msgq *mq) } -int +void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); + if (aio->a_expire < mq->mq_expire) { + mq->mq_expire = aio->a_expire; + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } nni_mtx_unlock(&mq->mq_lock); - return (0); } -int +void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); + if (aio->a_expire < mq->mq_expire) { + mq->mq_expire = aio->a_expire; + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + // NB: nni_list_active and nni_list_remove only use the list structure + // to determine list node offsets. Otherwise, they only look at the + // node's linkage structure. Therefore the following check will remove + // the node from either the getq or the putq list. + if (nni_list_active(&mq->mq_aio_getq, aio)) { + nni_list_remove(&mq->mq_aio_getq, aio); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +int +nni_msgq_canput(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + if ((mq->mq_len < mq->mq_cap) || + (mq->mq_rwait != 0) || // XXX: REMOVE ME + (nni_list_first(&mq->mq_aio_getq) != NULL)) { + nni_mtx_unlock(&mq->mq_lock); + return (1); + } + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + +int +nni_msgq_canget(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + if ((mq->mq_len != 0) || + (mq->mq_wwait != 0) || + (nni_list_first(&mq->mq_aio_putq) != NULL)) { + nni_mtx_unlock(&mq->mq_lock); + return (1); + } nni_mtx_unlock(&mq->mq_lock); return (0); } void -nni_msgq_run_timeout(nni_msgq *mq) +nni_msgq_run_timeout(void *arg) { + nni_msgq *mq = arg; nni_time now; + nni_time exp; nni_aio *aio; nni_aio *naio; int rv; now = nni_clock(); + exp = NNI_TIME_NEVER; nni_mtx_lock(&mq->mq_lock); naio = nni_list_first(&mq->mq_aio_getq); @@ -405,6 +477,9 @@ nni_msgq_run_timeout(nni_msgq *mq) nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } + if (exp > aio->a_expire) { + exp = aio->a_expire; + } } naio = nni_list_first(&mq->mq_aio_putq); @@ -414,8 +489,15 @@ nni_msgq_run_timeout(nni_msgq *mq) nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } + if (exp > aio->a_expire) { + exp = aio->a_expire; + } } + mq->mq_expire = exp; + if (mq->mq_expire != NNI_TIME_NEVER) { + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } nni_mtx_unlock(&mq->mq_lock); } @@ -495,6 +577,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); } nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); + nni_msgq_run_getq(mq); nni_mtx_unlock(&mq->mq_lock); return (0); @@ -565,6 +648,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); } nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); + nni_msgq_run_putq(mq); nni_mtx_unlock(&mq->mq_lock); return (0); @@ -572,6 +656,35 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) int +nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) +{ + nni_aio aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + aio.a_expire = expire; + nni_msgq_aio_get(mq, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) == 0) { + *msgp = aio.a_msg; + aio.a_msg = NULL; + } + nni_aio_fini(&aio); + return (rv); +} + + +int +nni_msgq_get(nni_msgq *mq, nni_msg **msgp) +{ + return (nni_msgq_get_until(mq, msgp, NNI_TIME_NEVER)); +} + + +#if 0 +int nni_msgq_get(nni_msgq *mq, nni_msg **msgp) { nni_signal nosig = 0; @@ -580,6 +693,9 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } +#endif + + int nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { @@ -587,6 +703,7 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) } +#if 0 int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { @@ -596,6 +713,34 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) } +#endif + +int +nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) +{ + nni_aio aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + aio.a_expire = expire; + aio.a_msg = msg; + nni_msgq_aio_put(mq, &aio); + nni_aio_wait(&aio); + nni_aio_fini(&aio); + return (rv); +} + + +int +nni_msgq_put(nni_msgq *mq, nni_msg *msg) +{ + return (nni_msgq_put_until(mq, msg, NNI_TIME_NEVER)); +} + + +#if 0 int nni_msgq_put(nni_msgq *mq, nni_msg *msg) { @@ -605,6 +750,9 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } +#endif + + int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { @@ -621,6 +769,7 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) } +#if 0 int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -630,6 +779,9 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) } +#endif + + void nni_msgq_drain(nni_msgq *mq, nni_time expire) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index a0af12be..de23ebac 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -35,11 +35,13 @@ extern int nni_msgq_init(nni_msgq **, int); // messages that may be in the queue. extern void nni_msgq_fini(nni_msgq *); -extern int nni_msgq_aio_put(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_get(nni_msgq *, nni_aio *); +extern int nni_msgq_canget(nni_msgq *); +extern int nni_msgq_canput(nni_msgq *); +extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); +extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *); +extern void nni_msgq_aio_cancel(nni_msgq *, nni_aio *); // nni_msgq_put puts the message to the queue. It blocks until it // was able to do so, or the queue is closed, returning either 0 on diff --git a/src/core/protocol.h b/src/core/protocol.h index 9869afc9..877eec31 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -62,6 +62,11 @@ struct nni_proto_sock_ops { // block as needed. void (*sock_fini)(void *); + // Open the protocol instance. This is run with the lock held, + // and intended to allow the protocol to start any asynchronous + // processing. + void (*sock_open)(void *); + // Close the protocol instance. This is run with the lock held, // and intended to initiate closure of the socket. For example, // it can signal the socket worker threads to exit. diff --git a/src/core/socket.c b/src/core/socket.c index 52a4d6d9..d58e64ba 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -325,6 +325,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) if (sops->sock_close == NULL) { sops->sock_close = nni_sock_nullop; } + if (sops->sock_open == NULL) { + sops->sock_open = nni_sock_nullop; + } sock->s_pipe_ops = *proto->proto_pipe_ops; pops = &sock->s_pipe_ops; if (pops->pipe_add == NULL) { @@ -404,6 +407,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + sops->sock_open(sock->s_data); nni_thr_run(&sock->s_reaper); nni_thr_run(&sock->s_notifier); diff --git a/src/core/taskq.c b/src/core/taskq.c index 2b2b0d1a..d473b65c 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -33,6 +33,7 @@ nni_taskq_thread(void *self) ent->tqe_tq = NULL; nni_mtx_unlock(&tq->tq_mtx); ent->tqe_cb(ent->tqe_arg); + nni_mtx_lock(&tq->tq_mtx); continue; } @@ -56,8 +57,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr) if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&tq->tq_mtx); - nni_cv_init(&tq->tq_cv, &tq->tq_mtx); + if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) { + NNI_FREE_STRUCT(tq); + return (rv); + } + if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) { + nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCT(tq); + return (rv); + } tq->tq_close = 0; NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); diff --git a/src/core/timer.c b/src/core/timer.c index 59966822..778a82b7 100644 --- a/src/core/timer.c +++ b/src/core/timer.c @@ -12,10 +12,6 @@ #include <stdlib.h> #include <string.h> -extern void nni_timer_schedule(nni_timer_node *); -extern void nni_timer_cancel(nni_timer_node *); -extern int nni_timer_init(void); -extern void nni_timer_fini(void); static void nni_timer_loop(void *); struct nni_timer { @@ -76,6 +72,21 @@ nni_timer_sys_fini(void) void +nni_timer_init(nni_timer_node *node, nni_cb cb, void *arg) +{ + node->t_cb = cb; + node->t_arg = arg; +} + + +void +nni_timer_fini(nni_timer_node *node) +{ + NNI_ARG_UNUSED(node); +} + + +void nni_timer_cancel(nni_timer_node *node) { nni_timer *timer = &nni_global_timer; @@ -92,12 +103,14 @@ nni_timer_cancel(nni_timer_node *node) void -nni_timer_schedule(nni_timer_node *node) +nni_timer_schedule(nni_timer_node *node, nni_time when) { nni_timer *timer = &nni_global_timer; nni_timer_node *srch; int wake = 1; + node->t_expire = when; + nni_mtx_lock(&timer->t_list_mx); srch = nni_list_first(&timer->t_entries); diff --git a/src/core/timer.h b/src/core/timer.h index a5bd4633..89a5cda0 100644 --- a/src/core/timer.h +++ b/src/core/timer.h @@ -25,7 +25,9 @@ struct nni_timer_node { typedef struct nni_timer_node nni_timer_node; -extern void nni_timer_schedule(nni_timer_node *); +extern void nni_timer_init(nni_timer_node *, nni_cb, void *); +extern void nni_timer_fini(nni_timer_node *); +extern void nni_timer_schedule(nni_timer_node *, nni_time); extern void nni_timer_cancel(nni_timer_node *); extern int nni_timer_sys_init(void); extern void nni_timer_sys_fini(void); diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 6f2d716b..ec66fab6 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -17,6 +17,11 @@ typedef struct nni_pull_pipe nni_pull_pipe; typedef struct nni_pull_sock nni_pull_sock; +static void nni_pull_putq_cb(void *); +static void nni_pull_recv_cb(void *); +static void nni_pull_recv(nni_pull_pipe *); +static void nni_pull_putq(nni_pull_pipe *, nni_msg *); + // An nni_pull_sock is our per-socket protocol private structure. struct nni_pull_sock { nni_msgq * urq; @@ -27,6 +32,8 @@ struct nni_pull_sock { struct nni_pull_pipe { nni_pipe * pipe; nni_pull_sock * pull; + nni_aio putq_aio; + nni_aio recv_aio; }; static int @@ -60,10 +67,20 @@ static int nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_pull_pipe *pp; + int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } + if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) { + NNI_FREE_STRUCT(pp); + return (rv); + } + if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) { + nni_aio_fini(&pp->putq_aio); + NNI_FREE_STRUCT(pp); + return (rv); + } pp->pipe = pipe; pp->pull = psock; *ppp = pp; @@ -77,28 +94,93 @@ nni_pull_pipe_fini(void *arg) nni_pull_pipe *pp = arg; if (pp != NULL) { + nni_aio_fini(&pp->putq_aio); + nni_aio_fini(&pp->recv_aio); NNI_FREE_STRUCT(pp); } } +static int +nni_pull_pipe_start(void *arg) +{ + nni_pull_pipe *pp = arg; + + // Start the pending pull... + nni_pull_recv(pp); + + return (0); +} + + static void -nni_pull_pipe_recv(void *arg) +nni_pull_pipe_stop(void *arg) { nni_pull_pipe *pp = arg; - nni_pull_sock *pull = pp->pull; + + // Cancel any pending sendup. + nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio); +} + + +static void +nni_pull_recv_cb(void *arg) +{ + nni_pull_pipe *pp = arg; + nni_aio *aio = &pp->recv_aio; nni_msg *msg; - for (;;) { - if (nni_pipe_recv(pp->pipe, &msg) != 0) { - break; - } - if (nni_msgq_put(pull->urq, msg) != 0) { - nni_msg_free(msg); - break; - } + if (nni_aio_result(aio) != 0) { + // Failed to get a message, probably the pipe is closed. + nni_pipe_close(pp->pipe); + return; + } + + // Got a message... start the put to send it up to the application. + msg = aio->a_msg; + aio->a_msg = NULL; + nni_pull_putq(pp, msg); +} + + +static void +nni_pull_putq_cb(void *arg) +{ + nni_pull_pipe *pp = arg; + nni_aio *aio = &pp->putq_aio; + int rv; + + if (nni_aio_result(aio) != 0) { + // If we failed to put, probably NNG_ECLOSED, nothing else + // we can do. Just close the pipe. + nni_pipe_close(pp->pipe); + return; + } + + nni_pull_recv(pp); +} + + +// nni_pull_recv is called to schedule a pending recv on the incoming pipe. +static void +nni_pull_recv(nni_pull_pipe *pp) +{ + // Schedule the aio with callback. + if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) { + nni_pipe_close(pp->pipe); } - nni_pipe_close(pp->pipe); +} + + +// nni_pull_putq schedules a put operation to the user socket (sendup). +static void +nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg) +{ + nni_pull_sock *pull = pp->pull; + + pp->putq_aio.a_msg = msg; + + nni_msgq_aio_put(pull->urq, &pp->putq_aio); } @@ -141,7 +223,8 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_worker = { nni_pull_pipe_recv }, + .pipe_add = nni_pull_pipe_start, + .pipe_rem = nni_pull_pipe_stop, }; static nni_proto_sock_ops nni_pull_sock_ops = { diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 3c3164d5..fcbb6d4f 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -19,27 +19,34 @@ typedef struct nni_push_pipe nni_push_pipe; typedef struct nni_push_sock nni_push_sock; +static void nni_push_send_cb(void *); +static void nni_push_recv_cb(void *); +static void nni_push_getq_cb(void *); +static void nni_push_recv(nni_push_pipe *); +static void nni_push_send(nni_push_sock *); + // An nni_push_sock is our per-socket protocol private structure. struct nni_push_sock { - nni_cv cv; nni_msgq * uwq; + nni_msg * msg; // pending message int raw; - int closing; - int wantw; nni_list pipes; nni_push_pipe * nextpipe; int npipes; nni_sock * sock; + + nni_aio aio_getq; }; // An nni_push_pipe is our per-pipe protocol private structure. struct nni_push_pipe { nni_pipe * pipe; nni_push_sock * push; - nni_msgq * mq; - int sigclose; int wantr; nni_list_node node; + + nni_aio aio_recv; + nni_aio aio_send; }; static int @@ -51,14 +58,13 @@ nni_push_sock_init(void **pushp, nni_sock *sock) if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) { + if ((rv = nni_aio_init(&push->aio_getq, nni_push_getq_cb, push)) != 0) { NNI_FREE_STRUCT(push); return (rv); } NNI_LIST_INIT(&push->pipes, nni_push_pipe, node); push->raw = 0; push->npipes = 0; - push->wantw = 0; push->nextpipe = NULL; push->sock = sock; push->uwq = nni_sock_sendq(sock); @@ -69,14 +75,20 @@ nni_push_sock_init(void **pushp, nni_sock *sock) static void +nni_push_sock_open(void *arg) +{ + nni_push_sock *push = arg; + + nni_msgq_aio_get(push->uwq, &push->aio_getq); +} + + +static void nni_push_sock_close(void *arg) { nni_push_sock *push = arg; - // Shut down the resender. We request it to exit by clearing - // its old value, then kick it. - push->closing = 1; - nni_cv_wake(&push->cv); + nni_msgq_aio_cancel(push->uwq, &push->aio_getq); } @@ -86,7 +98,9 @@ nni_push_sock_fini(void *arg) nni_push_sock *push = arg; if (push != NULL) { - nni_cv_fini(&push->cv); + if (push->msg != NULL) { + nni_msg_free(push->msg); + } NNI_FREE_STRUCT(push); } } @@ -101,13 +115,17 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&pp->mq, 0)) != 0) { + if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) { + NNI_FREE_STRUCT(pp); + return (rv); + } + if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) { + nni_aio_fini(&pp->aio_recv); NNI_FREE_STRUCT(pp); return (rv); } NNI_LIST_NODE_INIT(&pp->node); pp->pipe = pipe; - pp->sigclose = 0; pp->push = psock; pp->wantr = 0; *ppp = pp; @@ -121,7 +139,8 @@ nni_push_pipe_fini(void *arg) nni_push_pipe *pp = arg; if (pp != NULL) { - nni_msgq_fini(pp->mq); + nni_aio_fini(&pp->aio_recv); + nni_aio_fini(&pp->aio_send); NNI_FREE_STRUCT(pp); } } @@ -140,9 +159,19 @@ nni_push_pipe_add(void *arg) // The end makes our test cases easier. nni_list_append(&push->pipes, pp); + // We start out wanting data to read. + pp->wantr = 1; + // Wake the top sender, as we can accept a job. push->npipes++; - nni_cv_wake(&push->cv); + + // Schedule a receiver. This is mostly so that we can detect + // a closed transport pipe. + nni_pipe_aio_recv(pp->pipe, &pp->aio_recv); + + // Possibly schedule the sender. + nni_push_send(pp->push); + return (0); } @@ -162,46 +191,66 @@ nni_push_pipe_rem(void *arg) static void -nni_push_pipe_send(void *arg) +nni_push_recv(nni_push_pipe *pp) +{ + nni_pipe_aio_recv(pp->pipe, &pp->aio_recv); +} + + +static void +nni_push_recv_cb(void *arg) +{ + nni_push_pipe *pp = arg; + + // We normally expect to receive an error. If a pipe actually + // sends us data, we just discard it. + if (nni_aio_result(&pp->aio_recv) != 0) { + nni_pipe_close(pp->pipe); + return; + } + nni_push_recv(pp); +} + + +static void +nni_push_send_cb(void *arg) { nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; nni_mtx *mx = nni_sock_mtx(push->sock); - nni_msg *msg; - for (;;) { - nni_mtx_lock(mx); - pp->wantr = 1; - if (push->wantw) { - nni_cv_wake(&push->cv); - } - nni_mtx_unlock(mx); - if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { - break; - } - if (nni_pipe_send(pp->pipe, msg) != 0) { - nni_msg_free(msg); - break; - } + if (nni_aio_result(&pp->aio_send) != 0) { + nni_pipe_close(pp->pipe); + return; } - nni_pipe_close(pp->pipe); + + nni_mtx_lock(mx); + pp->wantr = 1; + + // This effectively kicks off a pull down. + nni_push_send(pp->push); + nni_mtx_unlock(mx); } static void -nni_push_pipe_recv(void *arg) +nni_push_getq_cb(void *arg) { - nni_push_pipe *pp = arg; - nni_msg *msg; + nni_push_sock *push = arg; + nni_mtx *mx = nni_sock_mtx(push->sock); + nni_aio *aio = &push->aio_getq; - for (;;) { - if (nni_pipe_recv(pp->pipe, &msg) != 0) { - break; - } - nni_msg_free(msg); + if (nni_aio_result(aio) != 0) { + // If the socket is closing, nothing else we can do. + return; } - nni_msgq_signal(pp->mq, &pp->sigclose); - nni_pipe_close(pp->pipe); + + nni_mtx_lock(mx); + push->msg = aio->a_msg; + aio->a_msg = NULL; + + nni_push_send(push); + nni_mtx_unlock(mx); } @@ -240,51 +289,35 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_push_sock_send(void *arg) +nni_push_send(nni_push_sock *push) { - nni_push_sock *push = arg; nni_push_pipe *pp; - nni_msgq *uwq = push->uwq; nni_msg *msg = NULL; - nni_mtx *mx = nni_sock_mtx(push->sock); int i; - for (;;) { - if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) { - // Should only be NNG_ECLOSED - return; - } + if ((msg = push->msg) == NULL) { + // Nothing to send... bail... + return; + } - nni_mtx_lock(mx); - if (push->closing) { - if (msg != NULL) { - nni_mtx_unlock(mx); - nni_msg_free(msg); - return; - } - } - push->wantw = 0; - for (i = 0; i < push->npipes; i++) { - pp = push->nextpipe; - if (pp == NULL) { - pp = nni_list_first(&push->pipes); - } - push->nextpipe = nni_list_next(&push->pipes, pp); - if (pp->wantr) { - pp->wantr = 0; - if (nni_msgq_put(pp->mq, msg) == 0) { - msg = NULL; - break; - } - } + // Let's try to send it. + for (i = 0; i < push->npipes; i++) { + pp = push->nextpipe; + if (pp == NULL) { + pp = nni_list_first(&push->pipes); } - if (msg != NULL) { - // We weren't able to deliver it, so keep it and - // wait for a sender to let us know its ready. - push->wantw = 1; - nni_cv_wait(&push->cv); + push->nextpipe = nni_list_next(&push->pipes, pp); + if (pp->wantr) { + pp->aio_send.a_msg = msg; + push->msg = NULL; + + // Schedule outbound pipe delivery... + nni_pipe_aio_send(pp->pipe, &pp->aio_send); + + // And schedule getting another message for send. + nni_msgq_aio_get(push->uwq, &push->aio_getq); + break; } - nni_mtx_unlock(mx); } } @@ -296,17 +329,15 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, .pipe_rem = nni_push_pipe_rem, - .pipe_worker = { nni_push_pipe_send, - nni_push_pipe_recv }, }; static nni_proto_sock_ops nni_push_sock_ops = { .sock_init = nni_push_sock_init, .sock_fini = nni_push_sock_fini, + .sock_open = nni_push_sock_open, .sock_close = nni_push_sock_close, .sock_setopt = nni_push_sock_setopt, .sock_getopt = nni_push_sock_getopt, - .sock_worker = { nni_push_sock_send }, }; nni_proto nni_push_proto = { diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 66f076ea..0cc208d4 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -129,6 +129,38 @@ nni_inproc_pipe_destroy(void *arg) static int +nni_inproc_pipe_aio_send(void *arg, nni_aio *aio) +{ + nni_inproc_pipe *pipe = arg; + nni_msg *msg = aio->a_msg; + char *h; + size_t l; + int rv; + + // We need to move any header data to the body, because the other + // side won't know what to do otherwise. + h = nni_msg_header(msg); + l = nni_msg_header_len(msg); + if ((rv = nni_msg_prepend(msg, h, l)) != 0) { + return (rv); + } + nni_msg_trunc_header(msg, l); + nni_msgq_aio_put(pipe->wq, aio); + return (0); +} + + +static int +nni_inproc_pipe_aio_recv(void *arg, nni_aio *aio) +{ + nni_inproc_pipe *pipe = arg; + + nni_msgq_aio_get(pipe->rq, aio); + return (0); +} + + +static int nni_inproc_pipe_send(void *arg, nni_msg *msg) { nni_inproc_pipe *pipe = arg; @@ -403,6 +435,8 @@ static nni_tran_pipe nni_inproc_pipe_ops = { .pipe_destroy = nni_inproc_pipe_destroy, .pipe_send = nni_inproc_pipe_send, .pipe_recv = nni_inproc_pipe_recv, + .pipe_aio_send = nni_inproc_pipe_aio_send, + .pipe_aio_recv = nni_inproc_pipe_aio_recv, .pipe_close = nni_inproc_pipe_close, .pipe_peer = nni_inproc_pipe_peer, .pipe_getopt = nni_inproc_pipe_getopt, |
