diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-04 13:04:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-04 13:04:38 -0700 |
| commit | 58c5fbb731f50a952864bc500a8efd3b7077ee65 (patch) | |
| tree | bff20e9758e04c024ef26288dff0f4edb89cf033 /src | |
| parent | 5b45db0aeb1026fcf7bbdec0e6451d1cfaac58f1 (diff) | |
| download | nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.tar.gz nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.tar.bz2 nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.zip | |
Improved routines for list management.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.c | 29 | ||||
| -rw-r--r-- | src/core/aio.h | 9 | ||||
| -rw-r--r-- | src/core/list.c | 26 | ||||
| -rw-r--r-- | src/core/list.h | 3 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 63 | ||||
| -rw-r--r-- | src/platform/posix/posix_poll.c | 43 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 20 |
7 files changed, 114 insertions, 79 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index f4512a34..e1d7bdae 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -217,3 +217,32 @@ nni_aio_finish(nni_aio *aio, int result, size_t count) } nni_mtx_unlock(&aio->a_lk); } + + +void +nni_aio_list_init(nni_list *list) +{ + NNI_LIST_INIT(list, nni_aio, a_prov_node); +} + + +void +nni_aio_list_append(nni_list *list, nni_aio *aio) +{ + nni_aio_list_remove(aio); + nni_list_append(list, aio); +} + + +void +nni_aio_list_remove(nni_aio *aio) +{ + nni_list_node_remove(&aio->a_prov_node); +} + + +int +nni_aio_list_active(nni_aio *aio) +{ + return (nni_list_node_active(&aio->a_prov_node)); +} diff --git a/src/core/aio.h b/src/core/aio.h index 73fdf260..d3c238f6 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -88,6 +88,15 @@ extern void nni_aio_wake(nni_aio *); // block the caller indefinitely. extern void nni_aio_wait(nni_aio *); +// nni_aio_list_init creates a list suitable for use by providers using +// the a_prov_node member of the aio. These operations are not locked, +// but they do have some extra checks -- remove is idempotent for example, +// and append will perform any necessary remove first. +extern void nni_aio_list_init(nni_list *); +extern void nni_aio_list_append(nni_list *, nni_aio *); +extern void nni_aio_list_remove(nni_aio *); +extern int nni_aio_list_active(nni_aio *); + // nni_aio_finish is called by the provider when an operation is complete. // The provider gives the result code (0 for success, an NNG errno otherwise), // and the amount of data transferred (if any). diff --git a/src/core/list.c b/src/core/list.c index 1d00171b..10ec4ef8 100644 --- a/src/core/list.c +++ b/src/core/list.c @@ -158,3 +158,29 @@ nni_list_active(nni_list *list, void *item) return (node->ln_next == NULL ? 0 : 1); } + + +int +nni_list_empty(nni_list *list) +{ + return (list->ll_head.ln_next == &list->ll_head); +} + + +int +nni_list_node_active(nni_list_node *node) +{ + return (node->ln_next == NULL ? 0 : 1); +} + + +void +nni_list_node_remove(nni_list_node *node) +{ + if (node->ln_next != NULL) { + node->ln_prev->ln_next = node->ln_next; + node->ln_next->ln_prev = node->ln_prev; + node->ln_next = NULL; + node->ln_prev = NULL; + } +} diff --git a/src/core/list.h b/src/core/list.h index 9f21e4d6..9701431e 100644 --- a/src/core/list.h +++ b/src/core/list.h @@ -42,6 +42,9 @@ extern void *nni_list_next(const nni_list *, void *); extern void *nni_list_prev(const nni_list *, void *); extern void nni_list_remove(nni_list *, void *); extern int nni_list_active(nni_list *, void *); +extern int nni_list_empty(nni_list *); +extern int nni_list_node_active(nni_list_node *); +extern void nni_list_node_remove(nni_list_node *); #define NNI_LIST_FOREACH(l, it) \ for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it)) diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 93a487ad..985563ad 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -61,10 +61,10 @@ nni_msgq_init(nni_msgq **mqp, int cap) if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node); - NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node); - NNI_LIST_INIT(&mq->mq_aio_notify_get, nni_aio, a_prov_node); - NNI_LIST_INIT(&mq->mq_aio_notify_put, nni_aio, a_prov_node); + nni_aio_list_init(&mq->mq_aio_putq); + nni_aio_list_init(&mq->mq_aio_getq); + nni_aio_list_init(&mq->mq_aio_notify_get); + nni_aio_list_init(&mq->mq_aio_notify_put); if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) { goto fail; @@ -135,11 +135,7 @@ nni_msgq_fini(nni_msgq *mq) static void nni_msgq_finish(nni_aio *aio, int rv) { - nni_msgq *mq = aio->a_prov_data; - - if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) { - nni_list_remove(&mq->mq_aio_putq, aio); - } + nni_aio_list_remove(aio); nni_aio_finish(aio, rv, 0); } @@ -299,15 +295,14 @@ nni_msgq_run_notify(nni_msgq *mq) if (mq->mq_closed) { return; } - if ((mq->mq_len < mq->mq_cap) || - (nni_list_first(&mq->mq_aio_getq) != NULL)) { + if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { // This stays on the list. nni_aio_finish(aio, 0, 0); } } - if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) { // This stays on the list. nni_aio_finish(aio, 0, 0); @@ -315,8 +310,7 @@ nni_msgq_run_notify(nni_msgq *mq) } if (mq->mq_draining) { - if ((mq->mq_len == 0) && - (nni_list_first(&mq->mq_aio_putq) == NULL)) { + if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) { nni_cv_wake(&mq->mq_drained); } } @@ -333,10 +327,7 @@ nni_msgq_cancel(nni_aio *aio) } nni_mtx_lock(&mq->mq_lock); - // this checks if the AIO is active, it doesn't matter what list - if (nni_list_active(&mq->mq_aio_getq, aio)) { - nni_list_remove(&mq->mq_aio_getq, aio); - } + nni_aio_list_remove(aio); nni_mtx_unlock(&mq->mq_lock); } @@ -349,10 +340,7 @@ nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } - if (nni_list_active(&mq->mq_aio_notify_put, aio)) { - nni_list_remove(&mq->mq_aio_notify_put, aio); - } - nni_list_append(&mq->mq_aio_notify_put, aio); + nni_aio_list_append(&mq->mq_aio_notify_put, aio); nni_mtx_unlock(&mq->mq_lock); } @@ -365,10 +353,7 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) nni_mtx_unlock(&mq->mq_lock); return; } - if (nni_list_active(&mq->mq_aio_notify_get, aio)) { - nni_list_remove(&mq->mq_aio_notify_get, aio); - } - nni_list_append(&mq->mq_aio_notify_get, aio); + nni_aio_list_append(&mq->mq_aio_notify_get, aio); nni_mtx_unlock(&mq->mq_lock); } @@ -394,7 +379,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) return; } - nni_list_append(&mq->mq_aio_putq, aio); + nni_aio_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -428,7 +413,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) return; } - nni_list_append(&mq->mq_aio_getq, aio); + nni_aio_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -449,8 +434,7 @@ nni_msgq_canput(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); return (0); } - if ((mq->mq_len < mq->mq_cap) || - (nni_list_first(&mq->mq_aio_getq) != NULL)) { + if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { nni_mtx_unlock(&mq->mq_lock); return (1); } @@ -467,8 +451,7 @@ nni_msgq_canget(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); return (0); } - if ((mq->mq_len != 0) || - (nni_list_first(&mq->mq_aio_putq) != NULL)) { + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { nni_mtx_unlock(&mq->mq_lock); return (1); } @@ -537,10 +520,10 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { exp = aio->a_expire; @@ -551,10 +534,10 @@ nni_msgq_run_timeout(void *arg) while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); if (aio->a_expire == NNI_TIME_ZERO) { - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_EAGAIN, 0); } else if (now >= aio->a_expire) { - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ETIMEDOUT, 0); } else if (exp > aio->a_expire) { exp = aio->a_expire; @@ -617,7 +600,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; mq->mq_draining = 1; - while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { + while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) { if (nni_cv_until(&mq->mq_drained, expire) != 0) { break; } @@ -626,7 +609,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) // Also complete the putq as NNG_ECLOSED. while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } while (mq->mq_len > 0) { @@ -664,14 +647,14 @@ nni_msgq_close(nni_msgq *mq) naio = nni_list_first(&mq->mq_aio_getq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_getq, aio); - nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } naio = nni_list_first(&mq->mq_aio_putq); while ((aio = naio) != NULL) { naio = nni_list_next(&mq->mq_aio_putq, aio); - nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_list_remove(aio); nni_aio_finish(aio, NNG_ECLOSED, 0); } diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c index 973c5d21..7f9ad163 100644 --- a/src/platform/posix/posix_poll.c +++ b/src/platform/posix/posix_poll.c @@ -126,11 +126,7 @@ nni_posix_epdesc_cancel(nni_aio *aio) pq = ed->pq; nni_mtx_lock(&pq->mtx); - // This will remove the aio from either the read or the write - // list; it doesn't matter which. - if (nni_list_active(&ed->connectq, aio)) { - nni_list_remove(&ed->connectq, aio); - } + nni_list_node_remove(&aio->a_prov_node); nni_mtx_unlock(&pq->mtx); } @@ -322,8 +318,8 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) } NNI_ASSERT(!nni_list_active(&ed->connectq, aio)); - wake = nni_list_first(&ed->connectq) == NULL ? 1 : 0; - nni_list_append(&ed->connectq, aio); + wake = nni_list_empty(&ed->connectq); + nni_aio_list_append(&ed->connectq, aio); if (wake) { nni_plat_pipe_raise(pq->wakewfd); } @@ -362,8 +358,8 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) nni_mtx_lock(&pq->mtx); } NNI_ASSERT(!nni_list_active(&ed->acceptq, aio)); - wake = nni_list_first(&ed->acceptq) == NULL ? 1 : 0; - nni_list_append(&ed->acceptq, aio); + wake = nni_list_empty(&ed->acceptq); + nni_aio_list_append(&ed->acceptq, aio); if (wake) { nni_plat_pipe_raise(pq->wakewfd); } @@ -425,12 +421,7 @@ nni_posix_epdesc_fini(nni_posix_epdesc *ed) static void nni_posix_pipedesc_finish(nni_aio *aio, int rv) { - nni_posix_pipedesc *pd; - - pd = aio->a_prov_data; - if (nni_list_active(&pd->readq, aio)) { - nni_list_remove(&pd->readq, aio); - } + nni_aio_list_remove(aio); nni_aio_finish(aio, rv, aio->a_count); } @@ -639,10 +630,10 @@ nni_posix_poll_thr(void *arg) fds[nfds].fd = pd->fd; fds[nfds].events = 0; fds[nfds].revents = 0; - if (nni_list_first(&pd->readq) != NULL) { + if (!nni_list_empty(&pd->readq)) { fds[nfds].events |= POLLIN; } - if (nni_list_first(&pd->writeq) != NULL) { + if (!nni_list_empty(&pd->writeq)) { fds[nfds].events |= POLLOUT; } pd->index = nfds; @@ -652,10 +643,10 @@ nni_posix_poll_thr(void *arg) fds[nfds].fd = ed->fd; fds[nfds].events = 0; fds[nfds].revents = 0; - if (nni_list_first(&ed->connectq) != NULL) { + if (!nni_list_empty(&ed->connectq)) { fds[nfds].events |= POLLOUT; } - if (nni_list_first(&ed->acceptq) != NULL) { + if (!nni_list_empty(&ed->acceptq)) { fds[nfds].events |= POLLIN; } ed->index = nfds; @@ -717,8 +708,8 @@ nni_posix_poll_thr(void *arg) // If we have completed all the AIOs outstanding, // then remove this pipedesc from the pollq. - if ((nni_list_first(&pd->readq) == NULL) && - (nni_list_first(&pd->writeq) == NULL)) { + if (nni_list_empty(&pd->readq) && + nni_list_empty(&pd->writeq)) { nni_list_remove(&pollq->pds, pd); pollq->npds--; } @@ -742,8 +733,8 @@ nni_posix_poll_thr(void *arg) if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) { nni_posix_poll_epclose(ed); } - if ((nni_list_first(&ed->connectq) == NULL) && - (nni_list_first(&ed->acceptq) == NULL)) { + if (nni_list_empty(&ed->connectq) && + nni_list_empty(&ed->acceptq)) { nni_list_remove(&pollq->eds, ed); pollq->neds--; } @@ -802,8 +793,8 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio) NNI_ASSERT(!nni_list_active(l, aio)); // Only wake if we aren't already waiting for this type of I/O on // this descriptor. - wake = nni_list_first(l) == NULL ? 1 : 0; - nni_list_append(l, aio); + wake = nni_list_empty(l); + nni_aio_list_append(l, aio); if (wake) { nni_plat_pipe_raise(pq->wakewfd); @@ -871,7 +862,7 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) nni_plat_pipe_raise(pq->wakewfd); // All pipes should have been closed before this is called. - NNI_ASSERT(nni_list_first(&pq->pds) == NULL); + NNI_ASSERT(nni_list_empty(&pq->pds)); nni_mtx_unlock(&pq->mtx); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index d41e5505..b1208e25 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -234,7 +234,7 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) ep->closed = 0; ep->proto = nni_sock_proto(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); - NNI_LIST_INIT(&ep->aios, nni_aio, a_prov_node); + nni_aio_list_init(&ep->aios); (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; @@ -263,13 +263,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) aio->a_pipe = NULL; } } + nni_aio_list_remove(aio); if (ep != NULL) { - if (nni_list_active(&ep->aios, aio)) { - nni_list_remove(&ep->aios, aio); - } - if ((ep->mode != NNI_INPROC_EP_LISTEN) && - (nni_list_first(&ep->aios) == NULL)) { + nni_list_empty(&ep->aios)) { if (nni_list_active(&ep->clients, ep)) { nni_list_remove(&ep->clients, ep); } @@ -316,13 +313,10 @@ nni_inproc_connect_abort(nni_aio *aio) nni_inproc_pipe_fini(aio->a_pipe); aio->a_pipe = NULL; } + nni_aio_list_remove(aio); if (ep != NULL) { - if (nni_list_active(&ep->aios, aio)) { - nni_list_remove(&ep->aios, aio); - } - if ((ep->mode != NNI_INPROC_EP_LISTEN) && - (nni_list_first(&ep->aios) == NULL)) { + nni_list_empty(&ep->aios)) { if (nni_list_active(&ep->clients, ep)) { nni_list_remove(&ep->clients, ep); } @@ -443,7 +437,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(&server->clients, ep); - nni_list_append(&ep->aios, aio); + nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(server); nni_mtx_unlock(&nni_inproc.mx); @@ -511,7 +505,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) // Insert us into the pending server aios, and then run the // accept list. - nni_list_append(&ep->aios, aio); + nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); } |
