aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-04 13:04:38 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-04 13:04:38 -0700
commit58c5fbb731f50a952864bc500a8efd3b7077ee65 (patch)
treebff20e9758e04c024ef26288dff0f4edb89cf033
parent5b45db0aeb1026fcf7bbdec0e6451d1cfaac58f1 (diff)
downloadnng-58c5fbb731f50a952864bc500a8efd3b7077ee65.tar.gz
nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.tar.bz2
nng-58c5fbb731f50a952864bc500a8efd3b7077ee65.zip
Improved routines for list management.
-rw-r--r--src/core/aio.c29
-rw-r--r--src/core/aio.h9
-rw-r--r--src/core/list.c26
-rw-r--r--src/core/list.h3
-rw-r--r--src/core/msgqueue.c63
-rw-r--r--src/platform/posix/posix_poll.c43
-rw-r--r--src/transport/inproc/inproc.c20
7 files changed, 114 insertions, 79 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index f4512a34..e1d7bdae 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -217,3 +217,32 @@ nni_aio_finish(nni_aio *aio, int result, size_t count)
}
nni_mtx_unlock(&aio->a_lk);
}
+
+
+void
+nni_aio_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_aio, a_prov_node);
+}
+
+
+void
+nni_aio_list_append(nni_list *list, nni_aio *aio)
+{
+ nni_aio_list_remove(aio);
+ nni_list_append(list, aio);
+}
+
+
+void
+nni_aio_list_remove(nni_aio *aio)
+{
+ nni_list_node_remove(&aio->a_prov_node);
+}
+
+
+int
+nni_aio_list_active(nni_aio *aio)
+{
+ return (nni_list_node_active(&aio->a_prov_node));
+}
diff --git a/src/core/aio.h b/src/core/aio.h
index 73fdf260..d3c238f6 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -88,6 +88,15 @@ extern void nni_aio_wake(nni_aio *);
// block the caller indefinitely.
extern void nni_aio_wait(nni_aio *);
+// nni_aio_list_init creates a list suitable for use by providers using
+// the a_prov_node member of the aio. These operations are not locked,
+// but they do have some extra checks -- remove is idempotent for example,
+// and append will perform any necessary remove first.
+extern void nni_aio_list_init(nni_list *);
+extern void nni_aio_list_append(nni_list *, nni_aio *);
+extern void nni_aio_list_remove(nni_aio *);
+extern int nni_aio_list_active(nni_aio *);
+
// nni_aio_finish is called by the provider when an operation is complete.
// The provider gives the result code (0 for success, an NNG errno otherwise),
// and the amount of data transferred (if any).
diff --git a/src/core/list.c b/src/core/list.c
index 1d00171b..10ec4ef8 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -158,3 +158,29 @@ nni_list_active(nni_list *list, void *item)
return (node->ln_next == NULL ? 0 : 1);
}
+
+
+int
+nni_list_empty(nni_list *list)
+{
+ return (list->ll_head.ln_next == &list->ll_head);
+}
+
+
+int
+nni_list_node_active(nni_list_node *node)
+{
+ return (node->ln_next == NULL ? 0 : 1);
+}
+
+
+void
+nni_list_node_remove(nni_list_node *node)
+{
+ if (node->ln_next != NULL) {
+ node->ln_prev->ln_next = node->ln_next;
+ node->ln_next->ln_prev = node->ln_prev;
+ node->ln_next = NULL;
+ node->ln_prev = NULL;
+ }
+}
diff --git a/src/core/list.h b/src/core/list.h
index 9f21e4d6..9701431e 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -42,6 +42,9 @@ extern void *nni_list_next(const nni_list *, void *);
extern void *nni_list_prev(const nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
extern int nni_list_active(nni_list *, void *);
+extern int nni_list_empty(nni_list *);
+extern int nni_list_node_active(nni_list_node *);
+extern void nni_list_node_remove(nni_list_node *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 93a487ad..985563ad 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -61,10 +61,10 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((mq = NNI_ALLOC_STRUCT(mq)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node);
- NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node);
- NNI_LIST_INIT(&mq->mq_aio_notify_get, nni_aio, a_prov_node);
- NNI_LIST_INIT(&mq->mq_aio_notify_put, nni_aio, a_prov_node);
+ nni_aio_list_init(&mq->mq_aio_putq);
+ nni_aio_list_init(&mq->mq_aio_getq);
+ nni_aio_list_init(&mq->mq_aio_notify_get);
+ nni_aio_list_init(&mq->mq_aio_notify_put);
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
@@ -135,11 +135,7 @@ nni_msgq_fini(nni_msgq *mq)
static void
nni_msgq_finish(nni_aio *aio, int rv)
{
- nni_msgq *mq = aio->a_prov_data;
-
- if ((mq != NULL) && nni_list_active(&mq->mq_aio_putq, aio)) {
- nni_list_remove(&mq->mq_aio_putq, aio);
- }
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, rv, 0);
}
@@ -299,15 +295,14 @@ nni_msgq_run_notify(nni_msgq *mq)
if (mq->mq_closed) {
return;
}
- if ((mq->mq_len < mq->mq_cap) ||
- (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
}
}
- if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
@@ -315,8 +310,7 @@ nni_msgq_run_notify(nni_msgq *mq)
}
if (mq->mq_draining) {
- if ((mq->mq_len == 0) &&
- (nni_list_first(&mq->mq_aio_putq) == NULL)) {
+ if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) {
nni_cv_wake(&mq->mq_drained);
}
}
@@ -333,10 +327,7 @@ nni_msgq_cancel(nni_aio *aio)
}
nni_mtx_lock(&mq->mq_lock);
- // this checks if the AIO is active, it doesn't matter what list
- if (nni_list_active(&mq->mq_aio_getq, aio)) {
- nni_list_remove(&mq->mq_aio_getq, aio);
- }
+ nni_aio_list_remove(aio);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -349,10 +340,7 @@ nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
- if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
- nni_list_remove(&mq->mq_aio_notify_put, aio);
- }
- nni_list_append(&mq->mq_aio_notify_put, aio);
+ nni_aio_list_append(&mq->mq_aio_notify_put, aio);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -365,10 +353,7 @@ nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
nni_mtx_unlock(&mq->mq_lock);
return;
}
- if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
- nni_list_remove(&mq->mq_aio_notify_get, aio);
- }
- nni_list_append(&mq->mq_aio_notify_get, aio);
+ nni_aio_list_append(&mq->mq_aio_notify_get, aio);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -394,7 +379,7 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
return;
}
- nni_list_append(&mq->mq_aio_putq, aio);
+ nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -428,7 +413,7 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
return;
}
- nni_list_append(&mq->mq_aio_getq, aio);
+ nni_aio_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -449,8 +434,7 @@ nni_msgq_canput(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
- if ((mq->mq_len < mq->mq_cap) ||
- (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+ if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
}
@@ -467,8 +451,7 @@ nni_msgq_canget(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
- if ((mq->mq_len != 0) ||
- (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
}
@@ -537,10 +520,10 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
exp = aio->a_expire;
@@ -551,10 +534,10 @@ nni_msgq_run_timeout(void *arg)
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
if (aio->a_expire == NNI_TIME_ZERO) {
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_EAGAIN, 0);
} else if (now >= aio->a_expire) {
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ETIMEDOUT, 0);
} else if (exp > aio->a_expire) {
exp = aio->a_expire;
@@ -617,7 +600,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
mq->mq_draining = 1;
- while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
}
@@ -626,7 +609,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
// Also complete the putq as NNG_ECLOSED.
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
while (mq->mq_len > 0) {
@@ -664,14 +647,14 @@ nni_msgq_close(nni_msgq *mq)
naio = nni_list_first(&mq->mq_aio_getq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_getq, aio);
- nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
naio = nni_list_first(&mq->mq_aio_putq);
while ((aio = naio) != NULL) {
naio = nni_list_next(&mq->mq_aio_putq, aio);
- nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, NNG_ECLOSED, 0);
}
diff --git a/src/platform/posix/posix_poll.c b/src/platform/posix/posix_poll.c
index 973c5d21..7f9ad163 100644
--- a/src/platform/posix/posix_poll.c
+++ b/src/platform/posix/posix_poll.c
@@ -126,11 +126,7 @@ nni_posix_epdesc_cancel(nni_aio *aio)
pq = ed->pq;
nni_mtx_lock(&pq->mtx);
- // This will remove the aio from either the read or the write
- // list; it doesn't matter which.
- if (nni_list_active(&ed->connectq, aio)) {
- nni_list_remove(&ed->connectq, aio);
- }
+ nni_list_node_remove(&aio->a_prov_node);
nni_mtx_unlock(&pq->mtx);
}
@@ -322,8 +318,8 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
}
NNI_ASSERT(!nni_list_active(&ed->connectq, aio));
- wake = nni_list_first(&ed->connectq) == NULL ? 1 : 0;
- nni_list_append(&ed->connectq, aio);
+ wake = nni_list_empty(&ed->connectq);
+ nni_aio_list_append(&ed->connectq, aio);
if (wake) {
nni_plat_pipe_raise(pq->wakewfd);
}
@@ -362,8 +358,8 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
nni_mtx_lock(&pq->mtx);
}
NNI_ASSERT(!nni_list_active(&ed->acceptq, aio));
- wake = nni_list_first(&ed->acceptq) == NULL ? 1 : 0;
- nni_list_append(&ed->acceptq, aio);
+ wake = nni_list_empty(&ed->acceptq);
+ nni_aio_list_append(&ed->acceptq, aio);
if (wake) {
nni_plat_pipe_raise(pq->wakewfd);
}
@@ -425,12 +421,7 @@ nni_posix_epdesc_fini(nni_posix_epdesc *ed)
static void
nni_posix_pipedesc_finish(nni_aio *aio, int rv)
{
- nni_posix_pipedesc *pd;
-
- pd = aio->a_prov_data;
- if (nni_list_active(&pd->readq, aio)) {
- nni_list_remove(&pd->readq, aio);
- }
+ nni_aio_list_remove(aio);
nni_aio_finish(aio, rv, aio->a_count);
}
@@ -639,10 +630,10 @@ nni_posix_poll_thr(void *arg)
fds[nfds].fd = pd->fd;
fds[nfds].events = 0;
fds[nfds].revents = 0;
- if (nni_list_first(&pd->readq) != NULL) {
+ if (!nni_list_empty(&pd->readq)) {
fds[nfds].events |= POLLIN;
}
- if (nni_list_first(&pd->writeq) != NULL) {
+ if (!nni_list_empty(&pd->writeq)) {
fds[nfds].events |= POLLOUT;
}
pd->index = nfds;
@@ -652,10 +643,10 @@ nni_posix_poll_thr(void *arg)
fds[nfds].fd = ed->fd;
fds[nfds].events = 0;
fds[nfds].revents = 0;
- if (nni_list_first(&ed->connectq) != NULL) {
+ if (!nni_list_empty(&ed->connectq)) {
fds[nfds].events |= POLLOUT;
}
- if (nni_list_first(&ed->acceptq) != NULL) {
+ if (!nni_list_empty(&ed->acceptq)) {
fds[nfds].events |= POLLIN;
}
ed->index = nfds;
@@ -717,8 +708,8 @@ nni_posix_poll_thr(void *arg)
// If we have completed all the AIOs outstanding,
// then remove this pipedesc from the pollq.
- if ((nni_list_first(&pd->readq) == NULL) &&
- (nni_list_first(&pd->writeq) == NULL)) {
+ if (nni_list_empty(&pd->readq) &&
+ nni_list_empty(&pd->writeq)) {
nni_list_remove(&pollq->pds, pd);
pollq->npds--;
}
@@ -742,8 +733,8 @@ nni_posix_poll_thr(void *arg)
if (fds[index].revents & (POLLHUP|POLLERR|POLLNVAL)) {
nni_posix_poll_epclose(ed);
}
- if ((nni_list_first(&ed->connectq) == NULL) &&
- (nni_list_first(&ed->acceptq) == NULL)) {
+ if (nni_list_empty(&ed->connectq) &&
+ nni_list_empty(&ed->acceptq)) {
nni_list_remove(&pollq->eds, ed);
pollq->neds--;
}
@@ -802,8 +793,8 @@ nni_posix_pipedesc_submit(nni_posix_pipedesc *pd, nni_list *l, nni_aio *aio)
NNI_ASSERT(!nni_list_active(l, aio));
// Only wake if we aren't already waiting for this type of I/O on
// this descriptor.
- wake = nni_list_first(l) == NULL ? 1 : 0;
- nni_list_append(l, aio);
+ wake = nni_list_empty(l);
+ nni_aio_list_append(l, aio);
if (wake) {
nni_plat_pipe_raise(pq->wakewfd);
@@ -871,7 +862,7 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
nni_plat_pipe_raise(pq->wakewfd);
// All pipes should have been closed before this is called.
- NNI_ASSERT(nni_list_first(&pq->pds) == NULL);
+ NNI_ASSERT(nni_list_empty(&pq->pds));
nni_mtx_unlock(&pq->mtx);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index d41e5505..b1208e25 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -234,7 +234,7 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock)
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
- NNI_LIST_INIT(&ep->aios, nni_aio, a_prov_node);
+ nni_aio_list_init(&ep->aios);
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
*epp = ep;
@@ -263,13 +263,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv)
aio->a_pipe = NULL;
}
}
+ nni_aio_list_remove(aio);
if (ep != NULL) {
- if (nni_list_active(&ep->aios, aio)) {
- nni_list_remove(&ep->aios, aio);
- }
-
if ((ep->mode != NNI_INPROC_EP_LISTEN) &&
- (nni_list_first(&ep->aios) == NULL)) {
+ nni_list_empty(&ep->aios)) {
if (nni_list_active(&ep->clients, ep)) {
nni_list_remove(&ep->clients, ep);
}
@@ -316,13 +313,10 @@ nni_inproc_connect_abort(nni_aio *aio)
nni_inproc_pipe_fini(aio->a_pipe);
aio->a_pipe = NULL;
}
+ nni_aio_list_remove(aio);
if (ep != NULL) {
- if (nni_list_active(&ep->aios, aio)) {
- nni_list_remove(&ep->aios, aio);
- }
-
if ((ep->mode != NNI_INPROC_EP_LISTEN) &&
- (nni_list_first(&ep->aios) == NULL)) {
+ nni_list_empty(&ep->aios)) {
if (nni_list_active(&ep->clients, ep)) {
nni_list_remove(&ep->clients, ep);
}
@@ -443,7 +437,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
ep->mode = NNI_INPROC_EP_DIAL;
nni_list_append(&server->clients, ep);
- nni_list_append(&ep->aios, aio);
+ nni_aio_list_append(&ep->aios, aio);
nni_inproc_accept_clients(server);
nni_mtx_unlock(&nni_inproc.mx);
@@ -511,7 +505,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
// Insert us into the pending server aios, and then run the
// accept list.
- nni_list_append(&ep->aios, aio);
+ nni_aio_list_append(&ep->aios, aio);
nni_inproc_accept_clients(ep);
nni_mtx_unlock(&nni_inproc.mx);
}