summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-02-15 14:08:19 -0800
committerGarrett D'Amore <garrett@damore.org>2018-02-15 14:08:19 -0800
commite7f2bd6e34fa2af9f55f200cda4bb6ceecb2ee89 (patch)
treed4b08182ebe529d36059c251176d2c1066a45ead
parent45bc175ef9278c175d2fc3a0678b49b18e74c449 (diff)
downloadnng-e7f2bd6e34fa2af9f55f200cda4bb6ceecb2ee89.tar.gz
nng-e7f2bd6e34fa2af9f55f200cda4bb6ceecb2ee89.tar.bz2
nng-e7f2bd6e34fa2af9f55f200cda4bb6ceecb2ee89.zip
Simply posix pollq architecture somewhat.
This change is being made to facilitate the work done for the kqueue port. We have created two new functions, nni_posix_pollq_init and nni_posix_pollq_fini, which can be used when creating or destroying the pollq nodes. Then nodes are *added* and *removed* from the pollq structure with nni_posix_pollq_add and nni_posix_pollq_remove. The add function in particular MUST NEVER be called unless the node has a valid file descriptor.
-rw-r--r--src/platform/posix/posix_epdesc.c25
-rw-r--r--src/platform/posix/posix_pipedesc.c6
-rw-r--r--src/platform/posix/posix_pollq.h4
-rw-r--r--src/platform/posix/posix_pollq_poll.c57
-rw-r--r--src/platform/posix/posix_udp.c7
5 files changed, 78 insertions, 21 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 75383f4f..5b1d6a9c 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -102,6 +102,7 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
switch (rv) {
case 0:
// Success!
+ nni_posix_pollq_remove(&ed->node);
nni_posix_epdesc_finish(aio, 0, ed->node.fd);
ed->node.fd = -1;
continue;
@@ -114,6 +115,7 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
if (rv == ENOENT) {
rv = ECONNREFUSED;
}
+ nni_posix_pollq_remove(&ed->node);
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
(void) close(ed->node.fd);
ed->node.fd = -1;
@@ -207,6 +209,8 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
+ nni_posix_pollq_remove(&ed->node);
+
if ((fd = ed->node.fd) != -1) {
ed->node.fd = -1;
(void) shutdown(fd, SHUT_RDWR);
@@ -253,8 +257,6 @@ nni_posix_epdesc_cb(void *arg)
void
nni_posix_epdesc_close(nni_posix_epdesc *ed)
{
- nni_posix_pollq_disarm(&ed->node, POLLIN | POLLOUT);
-
nni_mtx_lock(&ed->mtx);
nni_posix_epdesc_doclose(ed);
nni_mtx_unlock(&ed->mtx);
@@ -304,6 +306,12 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
ed->node.fd = fd;
+ if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
+ (void) close(fd);
+ ed->node.fd = -1;
+ nni_mtx_unlock(&ed->mtx);
+ return (rv);
+ }
nni_mtx_unlock(&ed->mtx);
return (0);
}
@@ -393,6 +401,13 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
// We have to submit to the pollq, because the connection is pending.
ed->node.fd = fd;
+ if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
+ (void) close(fd);
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
nni_aio_list_append(&ed->connectq, aio);
nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
@@ -402,7 +417,6 @@ int
nni_posix_epdesc_init(nni_posix_epdesc **edp)
{
nni_posix_epdesc *ed;
- nni_posix_pollq * pq;
int rv;
if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
@@ -423,8 +437,7 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp)
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
- pq = nni_posix_pollq_get(nni_random() % 0xffff);
- if ((rv = nni_posix_pollq_add(pq, &ed->node)) != 0) {
+ if ((rv = nni_posix_pollq_init(&ed->node)) != 0) {
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
return (rv);
@@ -467,7 +480,7 @@ nni_posix_epdesc_fini(nni_posix_epdesc *ed)
nni_posix_epdesc_doclose(ed);
}
nni_mtx_unlock(&ed->mtx);
- nni_posix_pollq_remove(&ed->node);
+ nni_posix_pollq_fini(&ed->node);
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index f387c60c..7cbf534b 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -358,8 +358,8 @@ nni_posix_pipedesc_init(nni_posix_pipedesc **pdp, int fd)
nni_aio_list_init(&pd->readq);
nni_aio_list_init(&pd->writeq);
- rv = nni_posix_pollq_add(nni_posix_pollq_get(fd), &pd->node);
- if (rv != 0) {
+ if (((rv = nni_posix_pollq_init(&pd->node)) != 0) ||
+ ((rv = nni_posix_pollq_add(&pd->node)) != 0)) {
nni_mtx_fini(&pd->mtx);
NNI_FREE_STRUCT(pd);
return (rv);
@@ -373,7 +373,7 @@ nni_posix_pipedesc_fini(nni_posix_pipedesc *pd)
{
// Make sure no other polling activity is pending.
nni_posix_pipedesc_close(pd);
- nni_posix_pollq_remove(&pd->node);
+ nni_posix_pollq_fini(&pd->node);
if (pd->node.fd >= 0) {
(void) close(pd->node.fd);
}
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 258c6580..bb441c7b 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -41,7 +41,9 @@ extern nni_posix_pollq *nni_posix_pollq_get(int);
extern int nni_posix_pollq_sysinit(void);
extern void nni_posix_pollq_sysfini(void);
-extern int nni_posix_pollq_add(nni_posix_pollq *, nni_posix_pollq_node *);
+extern int nni_posix_pollq_init(nni_posix_pollq_node *);
+extern void nni_posix_pollq_fini(nni_posix_pollq_node *);
+extern int nni_posix_pollq_add(nni_posix_pollq_node *);
extern void nni_posix_pollq_remove(nni_posix_pollq_node *);
extern void nni_posix_pollq_arm(nni_posix_pollq_node *, int);
extern void nni_posix_pollq_disarm(nni_posix_pollq_node *, int);
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 9081c0d9..8b386ecd 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -189,11 +189,18 @@ nni_posix_poll_thr(void *arg)
}
int
-nni_posix_pollq_add(nni_posix_pollq *pq, nni_posix_pollq_node *node)
+nni_posix_pollq_add(nni_posix_pollq_node *node)
{
- int rv;
+ int rv;
+ nni_posix_pollq *pq;
+
NNI_ASSERT(!nni_list_node_active(&node->node));
+ pq = nni_posix_pollq_get(node->fd);
+ if (node->pq != NULL) {
+ return (NNG_ESTATE);
+ }
+
nni_mtx_lock(&pq->mtx);
if (pq->close) {
// This shouldn't happen!
@@ -211,6 +218,9 @@ nni_posix_pollq_add(nni_posix_pollq *pq, nni_posix_pollq_node *node)
return (0);
}
+// nni_posix_pollq_remove removes the node from the pollq, but
+// does not ensure that the pollq node is safe to destroy. In particular,
+// this function can be called from a callback (the callback may be active).
void
nni_posix_pollq_remove(nni_posix_pollq_node *node)
{
@@ -219,6 +229,39 @@ nni_posix_pollq_remove(nni_posix_pollq_node *node)
if (pq == NULL) {
return;
}
+ node->pq = NULL;
+ nni_mtx_lock(&pq->mtx);
+ if (nni_list_node_active(&node->node)) {
+ nni_list_node_remove(&node->node);
+ pq->nnodes--;
+ }
+ if (pq->close) {
+ nni_cv_wake(&pq->cv);
+ }
+ nni_mtx_unlock(&pq->mtx);
+}
+
+// nni_posix_pollq_init merely ensures that the node is ready for use.
+// It does not register the node with any pollq in particular.
+int
+nni_posix_pollq_init(nni_posix_pollq_node *node)
+{
+ NNI_LIST_NODE_INIT(&node->node);
+ return (0);
+}
+
+// nni_posix_pollq_fini does everything that nni_posix_pollq_remove does,
+// but it also ensures that the callback is not active, so that the node
+// may be deallocated. This function must not be called in a callback.
+void
+nni_posix_pollq_fini(nni_posix_pollq_node *node)
+{
+ nni_posix_pollq *pq = node->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+ node->pq = NULL;
nni_mtx_lock(&pq->mtx);
while (pq->active == node) {
pq->wait = node;
@@ -287,7 +330,7 @@ nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
}
static void
-nni_posix_pollq_fini(nni_posix_pollq *pq)
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
if (pq->started) {
nni_mtx_lock(&pq->mtx);
@@ -317,7 +360,7 @@ nni_posix_pollq_fini(nni_posix_pollq *pq)
}
static int
-nni_posix_pollq_init(nni_posix_pollq *pq)
+nni_posix_pollq_create(nni_posix_pollq *pq)
{
int rv;
@@ -334,7 +377,7 @@ nni_posix_pollq_init(nni_posix_pollq *pq)
if (((rv = nni_posix_pollq_poll_grow(pq)) != 0) ||
((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) ||
((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0)) {
- nni_posix_pollq_fini(pq);
+ nni_posix_pollq_destroy(pq);
return (rv);
}
pq->started = 1;
@@ -363,14 +406,14 @@ nni_posix_pollq_sysinit(void)
{
int rv;
- rv = nni_posix_pollq_init(&nni_posix_global_pollq);
+ rv = nni_posix_pollq_create(&nni_posix_global_pollq);
return (rv);
}
void
nni_posix_pollq_sysfini(void)
{
- nni_posix_pollq_fini(&nni_posix_global_pollq);
+ nni_posix_pollq_destroy(&nni_posix_global_pollq);
}
#endif // NNG_USE_POSIX_POLLQ_POLL
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index cde58e7c..61546667 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -244,9 +244,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
nni_aio_list_init(&udp->udp_recvq);
nni_aio_list_init(&udp->udp_sendq);
- rv = nni_posix_pollq_add(
- nni_posix_pollq_get(udp->udp_fd), &udp->udp_pitem);
- if (rv != 0) {
+ if (((rv = nni_posix_pollq_init(&udp->udp_pitem)) != 0) ||
+ ((rv = nni_posix_pollq_add(&udp->udp_pitem)) != 0)) {
(void) close(udp->udp_fd);
nni_mtx_fini(&udp->udp_mtx);
NNI_FREE_STRUCT(udp);
@@ -261,7 +260,7 @@ void
nni_plat_udp_close(nni_plat_udp *udp)
{
// We're no longer interested in events.
- nni_posix_pollq_remove(&udp->udp_pitem);
+ nni_posix_pollq_fini(&udp->udp_pitem);
nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);