diff options
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 25 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 57 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 7 |
5 files changed, 78 insertions, 21 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 75383f4f..5b1d6a9c 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -102,6 +102,7 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) switch (rv) { case 0: // Success! + nni_posix_pollq_remove(&ed->node); nni_posix_epdesc_finish(aio, 0, ed->node.fd); ed->node.fd = -1; continue; @@ -114,6 +115,7 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) if (rv == ENOENT) { rv = ECONNREFUSED; } + nni_posix_pollq_remove(&ed->node); nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); (void) close(ed->node.fd); ed->node.fd = -1; @@ -207,6 +209,8 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed) nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } + nni_posix_pollq_remove(&ed->node); + if ((fd = ed->node.fd) != -1) { ed->node.fd = -1; (void) shutdown(fd, SHUT_RDWR); @@ -253,8 +257,6 @@ nni_posix_epdesc_cb(void *arg) void nni_posix_epdesc_close(nni_posix_epdesc *ed) { - nni_posix_pollq_disarm(&ed->node, POLLIN | POLLOUT); - nni_mtx_lock(&ed->mtx); nni_posix_epdesc_doclose(ed); nni_mtx_unlock(&ed->mtx); @@ -304,6 +306,12 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed) (void) fcntl(fd, F_SETFL, O_NONBLOCK); ed->node.fd = fd; + if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { + (void) close(fd); + ed->node.fd = -1; + nni_mtx_unlock(&ed->mtx); + return (rv); + } nni_mtx_unlock(&ed->mtx); return (0); } @@ -393,6 +401,13 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) // We have to submit to the pollq, because the connection is pending. ed->node.fd = fd; + if ((rv = nni_posix_pollq_add(&ed->node)) != 0) { + (void) close(fd); + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&ed->mtx); + return; + } + nni_aio_list_append(&ed->connectq, aio); nni_posix_pollq_arm(&ed->node, POLLOUT); nni_mtx_unlock(&ed->mtx); @@ -402,7 +417,6 @@ int nni_posix_epdesc_init(nni_posix_epdesc **edp) { nni_posix_epdesc *ed; - nni_posix_pollq * pq; int rv; if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) { @@ -423,8 +437,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp) nni_aio_list_init(&ed->connectq); nni_aio_list_init(&ed->acceptq); - pq = nni_posix_pollq_get(nni_random() % 0xffff); - if ((rv = nni_posix_pollq_add(pq, &ed->node)) != 0) { + if ((rv = nni_posix_pollq_init(&ed->node)) != 0) { nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); return (rv); @@ -467,7 +480,7 @@ nni_posix_epdesc_fini(nni_posix_epdesc *ed) nni_posix_epdesc_doclose(ed); } nni_mtx_unlock(&ed->mtx); - nni_posix_pollq_remove(&ed->node); + nni_posix_pollq_fini(&ed->node); nni_mtx_fini(&ed->mtx); NNI_FREE_STRUCT(ed); } diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index f387c60c..7cbf534b 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -358,8 +358,8 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd) nni_aio_list_init(&pd->readq); nni_aio_list_init(&pd->writeq); - rv = nni_posix_pollq_add(nni_posix_pollq_get(fd), &pd->node); - if (rv != 0) { + if (((rv = nni_posix_pollq_init(&pd->node)) != 0) || + ((rv = nni_posix_pollq_add(&pd->node)) != 0)) { nni_mtx_fini(&pd->mtx); NNI_FREE_STRUCT(pd); return (rv); @@ -373,7 +373,7 @@ nni_posix_pipedesc_fini(nni_posix_pipedesc *pd) { // Make sure no other polling activity is pending. nni_posix_pipedesc_close(pd); - nni_posix_pollq_remove(&pd->node); + nni_posix_pollq_fini(&pd->node); if (pd->node.fd >= 0) { (void) close(pd->node.fd); } diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 258c6580..bb441c7b 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -41,7 +41,9 @@ extern nni_posix_pollq *nni_posix_pollq_get(int); extern int nni_posix_pollq_sysinit(void); extern void nni_posix_pollq_sysfini(void); -extern int nni_posix_pollq_add(nni_posix_pollq *, nni_posix_pollq_node *); +extern int nni_posix_pollq_init(nni_posix_pollq_node *); +extern void nni_posix_pollq_fini(nni_posix_pollq_node *); +extern int nni_posix_pollq_add(nni_posix_pollq_node *); extern void nni_posix_pollq_remove(nni_posix_pollq_node *); extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int); extern void nni_posix_pollq_disarm(nni_posix_pollq_node *, int); diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 9081c0d9..8b386ecd 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -189,11 +189,18 @@ nni_posix_poll_thr(void *arg) } int -nni_posix_pollq_add(nni_posix_pollq *pq, nni_posix_pollq_node *node) +nni_posix_pollq_add(nni_posix_pollq_node *node) { - int rv; + int rv; + nni_posix_pollq *pq; + NNI_ASSERT(!nni_list_node_active(&node->node)); + pq = nni_posix_pollq_get(node->fd); + if (node->pq != NULL) { + return (NNG_ESTATE); + } + nni_mtx_lock(&pq->mtx); if (pq->close) { // This shouldn't happen! @@ -211,6 +218,9 @@ nni_posix_pollq_add(nni_posix_pollq *pq, nni_posix_pollq_node *node) return (0); } +// nni_posix_pollq_remove removes the node from the pollq, but +// does not ensure that the pollq node is safe to destroy. In particular, +// this function can be called from a callback (the callback may be active). void nni_posix_pollq_remove(nni_posix_pollq_node *node) { @@ -219,6 +229,39 @@ nni_posix_pollq_remove(nni_posix_pollq_node *node) if (pq == NULL) { return; } + node->pq = NULL; + nni_mtx_lock(&pq->mtx); + if (nni_list_node_active(&node->node)) { + nni_list_node_remove(&node->node); + pq->nnodes--; + } + if (pq->close) { + nni_cv_wake(&pq->cv); + } + nni_mtx_unlock(&pq->mtx); +} + +// nni_posix_pollq_init merely ensures that the node is ready for use. +// It does not register the node with any pollq in particular. +int +nni_posix_pollq_init(nni_posix_pollq_node *node) +{ + NNI_LIST_NODE_INIT(&node->node); + return (0); +} + +// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does, +// but it also ensures that the callback is not active, so that the node +// may be deallocated. This function must not be called in a callback. +void +nni_posix_pollq_fini(nni_posix_pollq_node *node) +{ + nni_posix_pollq *pq = node->pq; + + if (pq == NULL) { + return; + } + node->pq = NULL; nni_mtx_lock(&pq->mtx); while (pq->active == node) { pq->wait = node; @@ -287,7 +330,7 @@ nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) } static void -nni_posix_pollq_fini(nni_posix_pollq *pq) +nni_posix_pollq_destroy(nni_posix_pollq *pq) { if (pq->started) { nni_mtx_lock(&pq->mtx); @@ -317,7 +360,7 @@ nni_posix_pollq_fini(nni_posix_pollq *pq) } static int -nni_posix_pollq_init(nni_posix_pollq *pq) +nni_posix_pollq_create(nni_posix_pollq *pq) { int rv; @@ -334,7 +377,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq) if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) || ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) || ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) { - nni_posix_pollq_fini(pq); + nni_posix_pollq_destroy(pq); return (rv); } pq->started = 1; @@ -363,14 +406,14 @@ nni_posix_pollq_sysinit(void) { int rv; - rv = nni_posix_pollq_init(&nni_posix_global_pollq); + rv = nni_posix_pollq_create(&nni_posix_global_pollq); return (rv); } void nni_posix_pollq_sysfini(void) { - nni_posix_pollq_fini(&nni_posix_global_pollq); + nni_posix_pollq_destroy(&nni_posix_global_pollq); } #endif // NNG_USE_POSIX_POLLQ_POLL diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index cde58e7c..61546667 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -244,9 +244,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) nni_aio_list_init(&udp->udp_recvq); nni_aio_list_init(&udp->udp_sendq); - rv = nni_posix_pollq_add( - nni_posix_pollq_get(udp->udp_fd), &udp->udp_pitem); - if (rv != 0) { + if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) || + ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) { (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); @@ -261,7 +260,7 @@ void nni_plat_udp_close(nni_plat_udp *udp) { // We're no longer interested in events. - nni_posix_pollq_remove(&udp->udp_pitem); + nni_posix_pollq_fini(&udp->udp_pitem); nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); |
