diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 15 | ||||
| -rw-r--r-- | src/core/aio.h | 7 | ||||
| -rw-r--r-- | src/core/list.c | 9 | ||||
| -rw-r--r-- | src/core/list.h | 1 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 168 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 8 | ||||
| -rw-r--r-- | src/core/protocol.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/taskq.c | 12 | ||||
| -rw-r--r-- | src/core/timer.c | 23 | ||||
| -rw-r--r-- | src/core/timer.h | 4 |
11 files changed, 229 insertions, 27 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index a3f4fb28..ffc5ac06 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -12,20 +12,29 @@ #define NNI_AIO_WAKE (1<<0) -void +int nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) { + int rv; + if (cb == NULL) { cb = (nni_cb) nni_aio_wake; arg = aio; } memset(aio, 0, sizeof (*aio)); - nni_mtx_init(&aio->a_lk); - nni_cv_init(&aio->a_cv, &aio->a_lk); + if ((rv = nni_mtx_init(&aio->a_lk)) != 0) { + return (rv); + } + if ((rv = nni_cv_init(&aio->a_cv, &aio->a_lk)) != 0) { + nni_mtx_fini(&aio->a_lk); + return (rv); + } aio->a_cb = cb; aio->a_cbarg = arg; aio->a_expire = NNI_TIME_NEVER; nni_taskq_ent_init(&aio->a_tqe, cb, arg); + + return (0); } diff --git a/src/core/aio.h b/src/core/aio.h index f6096ca7..80a3ac02 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -37,12 +37,9 @@ struct nni_aio { nni_iov a_iov[4]; int a_niov; - // Sendmsg operation. + // Message operations. nni_msg * a_msg; - // Recvmsg operation. - nni_msg ** a_msgp; - // TBD: Resolver operations. // Provider-use fields. @@ -54,7 +51,7 @@ struct nni_aio { // the supplied argument when the operation is complete. If NULL is // supplied for the callback, then nni_aio_wake is used in its place, // and the aio is used for the argument. -extern void nni_aio_init(nni_aio *, nni_cb, void *); +extern int nni_aio_init(nni_aio *, nni_cb, void *); // nni_aio_fini finalizes the aio, releasing resources (locks) // associated with it. The caller is responsible for ensuring that any diff --git a/src/core/list.c b/src/core/list.c index 821e48bd..1a5a4eb1 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -149,3 +149,12 @@ nni_list_remove(nni_list *list, void *item) node->ln_next = NULL; node->ln_prev = NULL; } + + +int +nni_list_active(nni_list *list, void *item) +{ + nni_list_node *node = NODE(list, item); + + return (node->ln_next == NULL ? 0 : 1); +} diff --git a/src/core/list.h b/src/core/list.h index dd26377e..9f21e4d6 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -41,6 +41,7 @@ extern void nni_list_insert_after(nni_list *, void *, void *); extern void *nni_list_next(const nni_list *, void *); extern void *nni_list_prev(const nni_list *, void *); extern void nni_list_remove(nni_list *, void *); +extern int nni_list_active(nni_list *, void *); #define NNI_LIST_FOREACH(l, it) \ for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 8b9a7e1a..2673063d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -39,6 +39,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; + + nni_timer_node mq_timer; + nni_time mq_expire; }; @@ -89,6 +92,8 @@ nni_msgq_kick(nni_msgq *mq, int sig) } +static void nni_msgq_run_timeout(void *); + int nni_msgq_init(nni_msgq **mqp, int cap) { @@ -127,6 +132,8 @@ nni_msgq_init(nni_msgq **mqp, int cap) goto fail; } + nni_timer_init(&mq->mq_timer, nni_msgq_run_timeout, mq); + mq->mq_cap = cap; mq->mq_alloc = alloc; mq->mq_len = 0; @@ -139,6 +146,7 @@ nni_msgq_init(nni_msgq **mqp, int cap) mq->mq_rwait = 0; mq->mq_notify_fn = NULL; mq->mq_notify_arg = NULL; + mq->mq_expire = NNI_TIME_NEVER; *mqp = mq; return (0); @@ -287,7 +295,7 @@ nni_msgq_run_putq(nni_msgq *mq) nni_list_remove(&mq->mq_aio_getq, raio); nni_list_remove(&mq->mq_aio_putq, waio); - *raio->a_msgp = msg; + raio->a_msg = msg; waio->a_msg = NULL; nni_aio_finish(raio, 0, len); @@ -296,7 +304,8 @@ nni_msgq_run_putq(nni_msgq *mq) } // Otherwise if we have room in the buffer, just queue it. - if (mq->mq_len < mq->mq_cap) { + if ((mq->mq_len < mq->mq_cap) || + ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) { nni_list_remove(&mq->mq_aio_putq, waio); mq->mq_msgs[mq->mq_put++] = msg; if (mq->mq_put == mq->mq_alloc) { @@ -311,6 +320,14 @@ nni_msgq_run_putq(nni_msgq *mq) // Unable to make progress, leave the aio where it is. break; } + + // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC. + if (mq->mq_len != 0) { + nni_cv_wake(&mq->mq_readable); + } + if (mq->mq_len < mq->mq_cap) { + nni_cv_wake(&mq->mq_writeable); + } } @@ -325,13 +342,14 @@ nni_msgq_run_getq(nni_msgq *mq) while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { + nni_list_remove(&mq->mq_aio_getq, raio); msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get > mq->mq_cap) { mq->mq_get = 0; } mq->mq_len--; len = nni_msg_len(msg); - *(raio->a_msgp) = msg; + raio->a_msg = msg; nni_aio_finish(raio, 0, len); continue; } @@ -344,7 +362,7 @@ nni_msgq_run_getq(nni_msgq *mq) msg = waio->a_msg; len = nni_msg_len(msg); waio->a_msg = NULL; - *raio->a_msgp = msg; + raio->a_msg = msg; nni_aio_finish(raio, 0, len); nni_aio_finish(waio, 0, len); continue; @@ -363,39 +381,93 @@ nni_msgq_run_notify(nni_msgq *mq) } -int +void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); + if (aio->a_expire < mq->mq_expire) { + mq->mq_expire = aio->a_expire; + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } nni_mtx_unlock(&mq->mq_lock); - return (0); } -int +void nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); + if (aio->a_expire < mq->mq_expire) { + mq->mq_expire = aio->a_expire; + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +void +nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio) +{ + nni_mtx_lock(&mq->mq_lock); + // NB: nni_list_active and nni_list_remove only use the list structure + // to determine list node offsets. Otherwise, they only look at the + // node's linkage structure. Therefore the following check will remove + // the node from either the getq or the putq list. + if (nni_list_active(&mq->mq_aio_getq, aio)) { + nni_list_remove(&mq->mq_aio_getq, aio); + } + nni_mtx_unlock(&mq->mq_lock); +} + + +int +nni_msgq_canput(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + if ((mq->mq_len < mq->mq_cap) || + (mq->mq_rwait != 0) || // XXX: REMOVE ME + (nni_list_first(&mq->mq_aio_getq) != NULL)) { + nni_mtx_unlock(&mq->mq_lock); + return (1); + } + nni_mtx_unlock(&mq->mq_lock); + return (0); +} + + +int +nni_msgq_canget(nni_msgq *mq) +{ + nni_mtx_lock(&mq->mq_lock); + if ((mq->mq_len != 0) || + (mq->mq_wwait != 0) || + (nni_list_first(&mq->mq_aio_putq) != NULL)) { + nni_mtx_unlock(&mq->mq_lock); + return (1); + } nni_mtx_unlock(&mq->mq_lock); return (0); } void -nni_msgq_run_timeout(nni_msgq *mq) +nni_msgq_run_timeout(void *arg) { + nni_msgq *mq = arg; nni_time now; + nni_time exp; nni_aio *aio; nni_aio *naio; int rv; now = nni_clock(); + exp = NNI_TIME_NEVER; nni_mtx_lock(&mq->mq_lock); naio = nni_list_first(&mq->mq_aio_getq); @@ -405,6 +477,9 @@ nni_msgq_run_timeout(nni_msgq *mq) nni_list_remove(&mq->mq_aio_getq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } + if (exp > aio->a_expire) { + exp = aio->a_expire; + } } naio = nni_list_first(&mq->mq_aio_putq); @@ -414,8 +489,15 @@ nni_msgq_run_timeout(nni_msgq *mq) nni_list_remove(&mq->mq_aio_putq, aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } + if (exp > aio->a_expire) { + exp = aio->a_expire; + } } + mq->mq_expire = exp; + if (mq->mq_expire != NNI_TIME_NEVER) { + nni_timer_schedule(&mq->mq_timer, mq->mq_expire); + } nni_mtx_unlock(&mq->mq_lock); } @@ -495,6 +577,7 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); } nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); + nni_msgq_run_getq(mq); nni_mtx_unlock(&mq->mq_lock); return (0); @@ -565,6 +648,7 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET); } nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT); + nni_msgq_run_putq(mq); nni_mtx_unlock(&mq->mq_lock); return (0); @@ -572,6 +656,35 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) int +nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) +{ + nni_aio aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + aio.a_expire = expire; + nni_msgq_aio_get(mq, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) == 0) { + *msgp = aio.a_msg; + aio.a_msg = NULL; + } + nni_aio_fini(&aio); + return (rv); +} + + +int +nni_msgq_get(nni_msgq *mq, nni_msg **msgp) +{ + return (nni_msgq_get_until(mq, msgp, NNI_TIME_NEVER)); +} + + +#if 0 +int nni_msgq_get(nni_msgq *mq, nni_msg **msgp) { nni_signal nosig = 0; @@ -580,6 +693,9 @@ nni_msgq_get(nni_msgq *mq, nni_msg **msgp) } +#endif + + int nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { @@ -587,6 +703,7 @@ nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) } +#if 0 int nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { @@ -596,6 +713,34 @@ nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) } +#endif + +int +nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) +{ + nni_aio aio; + int rv; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } + aio.a_expire = expire; + aio.a_msg = msg; + nni_msgq_aio_put(mq, &aio); + nni_aio_wait(&aio); + nni_aio_fini(&aio); + return (rv); +} + + +int +nni_msgq_put(nni_msgq *mq, nni_msg *msg) +{ + return (nni_msgq_put_until(mq, msg, NNI_TIME_NEVER)); +} + + +#if 0 int nni_msgq_put(nni_msgq *mq, nni_msg *msg) { @@ -605,6 +750,9 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg) } +#endif + + int nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) { @@ -621,6 +769,7 @@ nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) } +#if 0 int nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { @@ -630,6 +779,9 @@ nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) } +#endif + + void nni_msgq_drain(nni_msgq *mq, nni_time expire) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index a0af12be..de23ebac 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -35,11 +35,13 @@ extern int nni_msgq_init(nni_msgq **, int); // messages that may be in the queue. extern void nni_msgq_fini(nni_msgq *); -extern int nni_msgq_aio_put(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_get(nni_msgq *, nni_aio *); +extern int nni_msgq_canget(nni_msgq *); +extern int nni_msgq_canput(nni_msgq *); +extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); +extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); -extern int nni_msgq_aio_cancel(nni_msgq *, nni_aio *); +extern void nni_msgq_aio_cancel(nni_msgq *, nni_aio *); // nni_msgq_put puts the message to the queue. It blocks until it // was able to do so, or the queue is closed, returning either 0 on diff --git a/src/core/protocol.h b/src/core/protocol.h index 9869afc9..877eec31 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -62,6 +62,11 @@ struct nni_proto_sock_ops { // block as needed. void (*sock_fini)(void *); + // Open the protocol instance. This is run with the lock held, + // and intended to allow the protocol to start any asynchronous + // processing. + void (*sock_open)(void *); + // Close the protocol instance. This is run with the lock held, // and intended to initiate closure of the socket. For example, // it can signal the socket worker threads to exit. diff --git a/src/core/socket.c b/src/core/socket.c index 52a4d6d9..d58e64ba 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -325,6 +325,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) if (sops->sock_close == NULL) { sops->sock_close = nni_sock_nullop; } + if (sops->sock_open == NULL) { + sops->sock_open = nni_sock_nullop; + } sock->s_pipe_ops = *proto->proto_pipe_ops; pops = &sock->s_pipe_ops; if (pops->pipe_add == NULL) { @@ -404,6 +407,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_worker_thr[i]); } + sops->sock_open(sock->s_data); nni_thr_run(&sock->s_reaper); nni_thr_run(&sock->s_notifier); diff --git a/src/core/taskq.c b/src/core/taskq.c index 2b2b0d1a..d473b65c 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -33,6 +33,7 @@ nni_taskq_thread(void *self) ent->tqe_tq = NULL; nni_mtx_unlock(&tq->tq_mtx); ent->tqe_cb(ent->tqe_arg); + nni_mtx_lock(&tq->tq_mtx); continue; } @@ -56,8 +57,15 @@ nni_taskq_init(nni_taskq **tqp, int nthr) if ((tq = NNI_ALLOC_STRUCT(tq)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&tq->tq_mtx); - nni_cv_init(&tq->tq_cv, &tq->tq_mtx); + if ((rv = nni_mtx_init(&tq->tq_mtx)) != 0) { + NNI_FREE_STRUCT(tq); + return (rv); + } + if ((rv = nni_cv_init(&tq->tq_cv, &tq->tq_mtx)) != 0) { + nni_mtx_fini(&tq->tq_mtx); + NNI_FREE_STRUCT(tq); + return (rv); + } tq->tq_close = 0; NNI_LIST_INIT(&tq->tq_ents, nni_taskq_ent, tqe_node); diff --git a/src/core/timer.c b/src/core/timer.c index 59966822..778a82b7 100644 --- a/src/core/timer.c +++ b/src/core/timer.c @@ -12,10 +12,6 @@ #include <stdlib.h> #include <string.h> -extern void nni_timer_schedule(nni_timer_node *); -extern void nni_timer_cancel(nni_timer_node *); -extern int nni_timer_init(void); -extern void nni_timer_fini(void); static void nni_timer_loop(void *); struct nni_timer { @@ -76,6 +72,21 @@ nni_timer_sys_fini(void) void +nni_timer_init(nni_timer_node *node, nni_cb cb, void *arg) +{ + node->t_cb = cb; + node->t_arg = arg; +} + + +void +nni_timer_fini(nni_timer_node *node) +{ + NNI_ARG_UNUSED(node); +} + + +void nni_timer_cancel(nni_timer_node *node) { nni_timer *timer = &nni_global_timer; @@ -92,12 +103,14 @@ nni_timer_cancel(nni_timer_node *node) void -nni_timer_schedule(nni_timer_node *node) +nni_timer_schedule(nni_timer_node *node, nni_time when) { nni_timer *timer = &nni_global_timer; nni_timer_node *srch; int wake = 1; + node->t_expire = when; + nni_mtx_lock(&timer->t_list_mx); srch = nni_list_first(&timer->t_entries); diff --git a/src/core/timer.h b/src/core/timer.h index a5bd4633..89a5cda0 100644 --- a/src/core/timer.h +++ b/src/core/timer.h @@ -25,7 +25,9 @@ struct nni_timer_node { typedef struct nni_timer_node nni_timer_node; -extern void nni_timer_schedule(nni_timer_node *); +extern void nni_timer_init(nni_timer_node *, nni_cb, void *); +extern void nni_timer_fini(nni_timer_node *); +extern void nni_timer_schedule(nni_timer_node *, nni_time); extern void nni_timer_cancel(nni_timer_node *); extern int nni_timer_sys_init(void); extern void nni_timer_sys_fini(void); |
