summaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_epdesc.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-18 19:52:08 -0700
commit5fb832e06fd4ded6ccc45f943837fd374a9cea7a (patch)
tree41c306c297911d740e92f38b98685207f77758c6 /src/platform/posix/posix_epdesc.c
parent3eb60946ae8b5ad7d8a95233ffe946432acdb837 (diff)
downloadnng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.gz
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.tar.bz2
nng-5fb832e06fd4ded6ccc45f943837fd374a9cea7a.zip
Fixes most of the raaces in posix; but at least one remains outstanding.
Apparently there are circumstances when a pipedesc may get orphaned form the pollq. This triggers an assertion failure when it occurs. I am still trying to understand how this can occur. Stay tuned.
Diffstat (limited to 'src/platform/posix/posix_epdesc.c')
-rw-r--r--src/platform/posix/posix_epdesc.c127
1 files changed, 57 insertions, 70 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 8cae2565..b89af982 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -33,11 +33,10 @@
#endif
struct nni_posix_epdesc {
- int fd;
+ nni_posix_pollq_node node;
nni_list connectq;
nni_list acceptq;
- nni_posix_pollq_node node;
- nni_posix_pollq * pq;
+ int closed;
struct sockaddr_storage locaddr;
struct sockaddr_storage remaddr;
socklen_t loclen;
@@ -53,6 +52,7 @@ nni_posix_epdesc_cancel(nni_aio *aio)
nni_mtx_lock(&ed->mtx);
nni_aio_list_remove(aio);
+ NNI_ASSERT(aio->a_pipe == NULL);
nni_mtx_unlock(&ed->mtx);
}
@@ -60,20 +60,17 @@ static void
nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
{
nni_posix_epdesc * ed = aio->a_prov_data;
- nni_posix_pipedesc *pd;
+ nni_posix_pipedesc *pd = NULL;
// acceptq or connectq.
nni_aio_list_remove(aio);
if (rv == 0) {
- rv = nni_posix_pipedesc_init(&pd, newfd);
- if (rv != 0) {
+ if ((rv = nni_posix_pipedesc_init(&pd, newfd)) != 0) {
(void) close(newfd);
- } else {
- aio->a_pipe = pd;
}
}
- if ((nni_aio_finish(aio, rv, 0) != 0) && (rv == 0)) {
+ if ((nni_aio_finish_pipe(aio, rv, pd) != 0) && (pd != NULL)) {
nni_posix_pipedesc_fini(pd);
}
}
@@ -94,14 +91,15 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
rv = -1;
sz = sizeof(rv);
- if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) <
+ 0) {
rv = errno;
}
switch (rv) {
case 0:
// Success!
- nni_posix_epdesc_finish(aio, 0, ed->fd);
- ed->fd = -1;
+ nni_posix_epdesc_finish(aio, 0, ed->node.fd);
+ ed->node.fd = -1;
continue;
case EINPROGRESS:
@@ -113,8 +111,8 @@ nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
rv = ECONNREFUSED;
}
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
- close(ed->fd);
- ed->fd = -1;
+ (void) close(ed->node.fd);
+ ed->node.fd = -1;
continue;
}
}
@@ -134,12 +132,12 @@ nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
// do getpeername().
#ifdef NNG_USE_ACCEPT4
- newfd = accept4(ed->fd, NULL, NULL, SOCK_CLOEXEC);
+ newfd = accept4(ed->node.fd, NULL, NULL, SOCK_CLOEXEC);
if ((newfd < 0) && ((errno == ENOSYS) || (errno == ENOTSUP))) {
- newfd = accept(ed->fd, NULL, NULL);
+ newfd = accept(ed->node.fd, NULL, NULL);
}
#else
- newfd = accept(ed->fd, NULL, NULL);
+ newfd = accept(ed->node.fd, NULL, NULL);
#endif
if (newfd >= 0) {
@@ -176,7 +174,7 @@ nni_posix_epdesc_doerror(nni_posix_epdesc *ed)
int rv = 1;
socklen_t sz = sizeof(rv);
- if (getsockopt(ed->fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
+ if (getsockopt(ed->node.fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
rv = errno;
}
if (rv == 0) {
@@ -198,27 +196,30 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
nni_aio * aio;
struct sockaddr_un *sun;
- if (ed->fd != -1) {
- (void) shutdown(ed->fd, SHUT_RDWR);
- sun = (void *) &ed->locaddr;
- if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
- (void) unlink(sun->sun_path);
- }
- (void) close(ed->fd);
- ed->fd = -1;
- }
+ ed->closed = 1;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
while ((aio = nni_list_first(&ed->connectq)) != NULL) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
+
+ if (ed->node.fd != -1) {
+ (void) shutdown(ed->node.fd, SHUT_RDWR);
+ sun = (void *) &ed->locaddr;
+ if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
+ (void) unlink(sun->sun_path);
+ }
+ (void) close(ed->node.fd);
+ ed->node.fd = -1;
+ }
}
static void
nni_posix_epdesc_cb(void *arg)
{
nni_posix_epdesc *ed = arg;
+ int events;
nni_mtx_lock(&ed->mtx);
@@ -234,22 +235,22 @@ nni_posix_epdesc_cb(void *arg)
if (ed->node.revents & POLLNVAL) {
nni_posix_epdesc_doclose(ed);
}
- ed->node.revents = 0;
- ed->node.events = 0;
+ events = 0;
if (!nni_list_empty(&ed->connectq)) {
- ed->node.events |= POLLOUT;
+ events |= POLLOUT;
}
if (!nni_list_empty(&ed->acceptq)) {
- ed->node.events |= POLLIN;
+ events |= POLLIN;
}
+ nni_posix_pollq_arm(&ed->node, events);
nni_mtx_unlock(&ed->mtx);
}
void
nni_posix_epdesc_close(nni_posix_epdesc *ed)
{
- nni_posix_pollq_cancel(ed->pq, &ed->node);
+ nni_posix_pollq_disarm(&ed->node, POLLIN | POLLOUT);
nni_mtx_lock(&ed->mtx);
nni_posix_epdesc_doclose(ed);
@@ -343,7 +344,6 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- ed->fd = fd;
ed->node.fd = fd;
nni_mtx_unlock(&ed->mtx);
return (0);
@@ -365,22 +365,14 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
return;
}
- if (ed->fd < 0) {
+ if (ed->closed) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ed->mtx);
return;
}
nni_aio_list_append(&ed->acceptq, aio);
- if ((ed->node.events & POLLIN) == 0) {
- ed->node.events |= POLLIN;
- rv = nni_posix_pollq_submit(ed->pq, &ed->node);
- if (rv != 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&ed->node, POLLIN);
nni_mtx_unlock(&ed->mtx);
}
@@ -389,6 +381,7 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
{
// NB: We assume that the FD is already set to nonblocking mode.
int rv;
+ int fd;
nni_mtx_lock(&ed->mtx);
// If we can't start, it means that the AIO was stopped.
@@ -397,35 +390,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
return;
}
- ed->fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
- if (ed->fd < 0) {
+ fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
+ if (fd < 0) {
nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&ed->mtx);
return;
}
- ed->node.fd = ed->fd;
// Possibly bind.
if (ed->loclen != 0) {
- rv = bind(ed->fd, (void *) &ed->locaddr, ed->loclen);
+ rv = bind(fd, (void *) &ed->locaddr, ed->loclen);
if (rv != 0) {
- (void) close(ed->fd);
- ed->fd = -1;
+ (void) close(fd);
nni_posix_epdesc_finish(aio, rv, 0);
nni_mtx_unlock(&ed->mtx);
return;
}
}
- (void) fcntl(ed->fd, F_SETFL, O_NONBLOCK);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
- rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+ rv = connect(fd, (void *) &ed->remaddr, ed->remlen);
if (rv == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
- nni_posix_epdesc_finish(aio, 0, ed->fd);
- ed->fd = -1;
+ nni_posix_epdesc_finish(aio, 0, fd);
nni_mtx_unlock(&ed->mtx);
return;
}
@@ -435,26 +426,16 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno == ENOENT) {
errno = ECONNREFUSED;
}
- (void) close(ed->fd);
- ed->fd = -1;
+ (void) close(fd);
nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
nni_mtx_unlock(&ed->mtx);
return;
}
// We have to submit to the pollq, because the connection is pending.
+ ed->node.fd = fd;
nni_aio_list_append(&ed->connectq, aio);
- if ((ed->node.events & POLLOUT) == 0) {
- ed->node.events |= POLLOUT;
- rv = nni_posix_pollq_submit(ed->pq, &ed->node);
- if (rv != 0) {
- (void) close(ed->fd);
- ed->fd = -1;
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
- }
+ nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
}
@@ -462,6 +443,7 @@ int
nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
{
nni_posix_epdesc *ed;
+ nni_posix_pollq * pq;
int rv;
if ((ed = NNI_ALLOC_STRUCT(ed)) == NULL) {
@@ -478,8 +460,6 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
// one. For now we just have a global pollq. Note that by tying
// the ed to a single pollq we may get some kind of cache warmth.
- ed->pq = nni_posix_pollq_get((int) nni_random());
- ed->fd = -1;
ed->node.index = 0;
ed->node.cb = nni_posix_epdesc_cb;
ed->node.data = ed;
@@ -488,6 +468,12 @@ nni_posix_epdesc_init(nni_posix_epdesc **edp, const char *url)
nni_aio_list_init(&ed->connectq);
nni_aio_list_init(&ed->acceptq);
+ pq = nni_posix_pollq_get(nni_random() % 0xffff);
+ if ((rv = nni_posix_pollq_add(pq, &ed->node)) != 0) {
+ nni_mtx_fini(&ed->mtx);
+ NNI_FREE_STRUCT(ed);
+ return (rv);
+ }
*edp = ed;
return (0);
}
@@ -525,9 +511,10 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- if (ed->fd >= 0) {
- (void) close(ed->fd);
+ if (ed->node.fd >= 0) {
+ (void) close(ed->node.fd);
}
+ nni_posix_pollq_remove(&ed->node);
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
}