diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-05 01:05:18 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-05 01:05:18 -0700 |
| commit | 33f7dcc44cc76260712f9be0f8533e7e81657b45 (patch) | |
| tree | c6b8b2349e6935b1f857b6bee377cd19b1c040af | |
| parent | 16a43040ef29f77375d226f669770e64a42d278c (diff) | |
| download | nng-33f7dcc44cc76260712f9be0f8533e7e81657b45.tar.gz nng-33f7dcc44cc76260712f9be0f8533e7e81657b45.tar.bz2 nng-33f7dcc44cc76260712f9be0f8533e7e81657b45.zip | |
epdesc functionality.
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 285 | ||||
| -rw-r--r-- | src/platform/posix/posix_pipedesc.c | 23 |
2 files changed, 205 insertions, 103 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index 9d7cf538..f419d25a 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -10,6 +10,7 @@ #include "core/nng_impl.h" #include "platform/posix/posix_aio.h" #include "platform/posix/posix_pollq.h" +#include "platform/posix/posix_socket.h" #ifdef PLATFORM_POSIX_EPDESC @@ -19,14 +20,20 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/uio.h> +#include <sys/un.h> #include <fcntl.h> #include <unistd.h> #include <poll.h> +#ifdef SOCK_CLOEXEC +#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) +#else +#define NNI_STREAM_SOCKTYPE SOCK_STREAM +#endif + struct nni_posix_epdesc { int fd; - int index; nni_list connectq; nni_list acceptq; nni_posix_pollq_node node; @@ -39,34 +46,25 @@ struct nni_posix_epdesc { }; -#if 0 static void nni_posix_epdesc_cancel(nni_aio *aio) { - nni_posix_epdesc *ed; - nni_posix_pollq *pq; + nni_posix_epdesc *ed = aio->a_prov_data; - ed = aio->a_prov_data; - pq = ed->pq; - - nni_mtx_lock(&pq->mtx); - nni_list_node_remove(&aio->a_prov_node); - nni_mtx_unlock(&pq->mtx); + nni_mtx_lock(&ed->mtx); + nni_aio_list_remove(aio); + nni_mtx_unlock(&ed->mtx); } static void nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) { - nni_posix_epdesc *ed; + nni_posix_epdesc *ed = aio->a_prov_data; nni_posix_pipedesc *pd; - ed = aio->a_prov_data; - // acceptq or connectq. - if (nni_list_active(&ed->connectq, aio)) { - nni_list_remove(&ed->connectq, aio); - } + nni_aio_list_remove(aio); if (rv == 0) { rv = nni_posix_pipedesc_init(&pd, newfd); @@ -82,7 +80,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd) static void -nni_posix_poll_connect(nni_posix_epdesc *ed) +nni_posix_epdesc_doconnect(nni_posix_epdesc *ed) { nni_aio *aio; socklen_t sz; @@ -104,6 +102,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) case 0: // Success! nni_posix_epdesc_finish(aio, 0, ed->fd); + ed->fd = -1; continue; case EINPROGRESS: @@ -112,6 +111,8 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) default: nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0); + close(ed->fd); + ed->fd = -1; continue; } } @@ -119,7 +120,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed) static void -nni_posix_poll_accept(nni_posix_epdesc *ed) +nni_posix_epdesc_doaccept(nni_posix_epdesc *ed) { nni_aio *aio; int newfd; @@ -155,7 +156,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed) return; } - if (errno == ECONNABORTED) { + if ((errno == ECONNABORTED) || (errno == ECONNRESET)) { // Let's just eat this one. Perhaps it may be // better to report it to the application, but we // think most applications don't want to see this. @@ -170,10 +171,18 @@ nni_posix_poll_accept(nni_posix_epdesc *ed) static void -nni_posix_poll_epclose(nni_posix_epdesc *ed) +nni_posix_epdesc_doclose(nni_posix_epdesc *ed) { nni_aio *aio; + struct sockaddr_un *sun; + if (ed->fd != -1) { + (void) shutdown(ed->fd, SHUT_RDWR); + sun = (void *) &ed->locaddr; + if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) { + (void) unlink(sun->sun_path); + } + } while ((aio = nni_list_first(&ed->acceptq)) != NULL) { nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); } @@ -183,115 +192,225 @@ nni_posix_poll_epclose(nni_posix_epdesc *ed) } +static void +nni_posix_epdesc_cb(void *arg) +{ + nni_posix_epdesc *ed = arg; + + nni_mtx_lock(&ed->mtx); + + if (ed->node.revents & POLLIN) { + nni_posix_epdesc_doaccept(ed); + } + if (ed->node.revents & POLLOUT) { + nni_posix_epdesc_doconnect(ed); + } + if (ed->node.revents & (POLLHUP|POLLERR|POLLNVAL)) { + nni_posix_epdesc_doclose(ed); + } + ed->node.revents = 0; + ed->node.events = 0; + + if (!nni_list_empty(&ed->connectq)) { + ed->node.events |= POLLOUT; + } + if (!nni_list_empty(&ed->acceptq)) { + ed->node.events |= POLLIN; + } + nni_mtx_unlock(&ed->mtx); +} + + +void +nni_posix_epdesc_close(nni_posix_epdesc *ed) +{ + nni_posix_pollq_cancel(ed->pq, &ed->node); + + nni_mtx_lock(&ed->mtx); + nni_posix_epdesc_doclose(ed); + nni_mtx_unlock(&ed->mtx); +} + + +// UNIX DOMAIN SOCKETS -- these have names in the file namespace. +// We are going to check to see if there was a name already there. +// If there was, and nothing is listening (ECONNREFUSED), then we +// will just try to cleanup the old socket. Note that this is not +// perfect in all scenarios, so use this with caution. static int -nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed) +nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len) { - int rv; + int fd; + struct sockaddr_un *sun = (void *) sa; - // Add epdesc to the pollq if it isn't already there. - if (!nni_list_active(&pq->eds, ed)) { - if ((rv = nni_posix_poll_grow(pq)) != 0) { - return (rv); + if ((len == 0) || (sun->sun_family != AF_UNIX)) { + return (0); + } + + if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); + } + + // There is an assumption here that connect() returns immediately + // (even when non-blocking) when a server is absent. This seems + // to be true for the platforms we've tried. If it doesn't work, + // then the cleanup will fail. As this is supposed to be an + // exceptional case, don't worry. + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + if (connect(fd, (void *) sun, len) < 0) { + if (errno == ECONNREFUSED) { + (void) unlink(sun->sun_path); } - nni_list_append(&pq->eds, ed); - pq->neds++; } + (void) close(fd); return (0); } -void -nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +int +nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr) { - // NB: We assume that the FD is already set to nonblocking mode. + int len; + struct sockaddr_storage ss; int rv; - nni_posix_pollq *pq = ed->pq; - int wake; + int fd; - nni_mtx_lock(&pq->mtx); - // If we can't start, it means that the AIO was stopped. - if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&pq->mtx); - return; + if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) { + return (NNG_EADDRINVAL); } - if (ed->fd < 0) { - nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); - nni_mtx_unlock(&pq->mtx); - return; - } - rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); - if (rv == 0) { - // Immediate connect, cool! This probably only happens on - // loopback, and probably not on every platform. - nni_posix_epdesc_finish(aio, 0, 0); - nni_mtx_unlock(&pq->mtx); - return; + + if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) { + return (nni_plat_errno(errno)); } - if (errno != EINPROGRESS) { - // Some immediate failure occurred. - nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); - nni_mtx_unlock(&pq->mtx); - return; + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + + rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len); + if (rv != 0) { + (void) close(fd); + return (rv); } - // We have to submit to the pollq, because the connection is pending. - if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_unlock(&pq->mtx); - return; + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); } - NNI_ASSERT(!nni_list_active(&ed->connectq, aio)); - wake = nni_list_empty(&ed->connectq); - nni_aio_list_append(&ed->connectq, aio); - if (wake) { - nni_plat_pipe_raise(pq->wakewfd); + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); } - nni_mtx_unlock(&pq->mtx); + + ed->fd = fd; + return (0); } void nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio) { - // NB: We assume that the FD is already set to nonblocking mode. int rv; - int wake; - nni_posix_pollq *pq = ed->pq; // Accept is simpler than the connect case. With accept we just // need to wait for the socket to be readable to indicate an incoming // connection is ready for us. There isn't anything else for us to // do really, as that will have been done in listen. - nni_mtx_lock(&pq->mtx); + nni_mtx_lock(&ed->mtx); // If we can't start, it means that the AIO was stopped. if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { - nni_mtx_unlock(&pq->mtx); + nni_mtx_unlock(&ed->mtx); return; } if (ed->fd < 0) { - nni_mtx_unlock(&pq->mtx); nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&ed->mtx); return; } - // We have to submit to the pollq, because the connection is pending. - if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) { - nni_posix_epdesc_finish(aio, rv, 0); - nni_mtx_lock(&pq->mtx); - } - NNI_ASSERT(!nni_list_active(&ed->acceptq, aio)); - wake = nni_list_empty(&ed->acceptq); nni_aio_list_append(&ed->acceptq, aio); - if (wake) { - nni_plat_pipe_raise(pq->wakewfd); + if ((ed->node.events & POLLIN) == 0) { + ed->node.events |= POLLIN; + rv = nni_posix_pollq_submit(ed->pq, &ed->node); + if (rv != 0) { + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&ed->mtx); + return; + } } - nni_mtx_unlock(&pq->mtx); + nni_mtx_unlock(&ed->mtx); } -#endif +void +nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio) +{ + // NB: We assume that the FD is already set to nonblocking mode. + int rv; + + nni_mtx_lock(&ed->mtx); + // If we can't start, it means that the AIO was stopped. + if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) { + nni_mtx_unlock(&ed->mtx); + return; + } + + ed->fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0); + if (ed->fd < 0) { + nni_posix_epdesc_finish(aio, rv, 0); + return; + } + + // Possibly bind. + if (ed->loclen != 0) { + rv = bind(ed->fd, (void *) &ed->locaddr, ed->loclen); + if (rv != 0) { + (void) close(ed->fd); + ed->fd = -1; + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&ed->mtx); + return; + } + } + + rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen); + if (rv == 0) { + // Immediate connect, cool! This probably only happens on + // loopback, and probably not on every platform. + nni_posix_epdesc_finish(aio, 0, ed->fd); + ed->fd = -1; + nni_mtx_unlock(&ed->mtx); + return; + } + + if (errno != EINPROGRESS) { + // Some immediate failure occurred. + (void) close(ed->fd); + ed->fd = -1; + nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0); + nni_mtx_unlock(&ed->mtx); + return; + } + + // We have to submit to the pollq, because the connection is pending. + nni_aio_list_append(&ed->connectq, aio); + if ((ed->node.events & POLLOUT) == 0) { + ed->node.events |= POLLOUT; + rv = nni_posix_pollq_submit(ed->pq, &ed->node); + if (rv != 0) { + (void) close(ed->fd); + ed->fd = -1; + nni_posix_epdesc_finish(aio, rv, 0); + nni_mtx_unlock(&ed->mtx); + return; + } + } + nni_mtx_unlock(&ed->mtx); +} + int nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd) diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c index 14a5c8ab..c3e29c33 100644 --- a/src/platform/posix/posix_pipedesc.c +++ b/src/platform/posix/posix_pipedesc.c @@ -99,7 +99,6 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd) } // We completed the entire operation on this aioq. - nni_list_remove(&pd->writeq, aio); nni_posix_pipedesc_finish(aio, 0); // Go back to start of loop to see if there is another @@ -231,24 +230,10 @@ nni_posix_pipedesc_cb(void *arg) void nni_posix_pipedesc_close(nni_posix_pipedesc *pd) { - nni_posix_pollq *pq; - nni_aio *aio; - - pq = pd->pq; - - nni_posix_pollq_cancel(pq, &pd->node); + nni_posix_pollq_cancel(pd->pq, &pd->node); nni_mtx_lock(&pd->mtx); - if (pd->fd != -1) { - // Let any peer know we are closing. - shutdown(pd->fd, SHUT_RDWR); - } - while ((aio = nni_list_first(&pd->readq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } - while ((aio = nni_list_first(&pd->writeq)) != NULL) { - nni_posix_pipedesc_finish(aio, NNG_ECLOSED); - } + nni_posix_pipedesc_doclose(pd); nni_mtx_unlock(&pd->mtx); } @@ -256,9 +241,7 @@ nni_posix_pipedesc_close(nni_posix_pipedesc *pd) static void nni_posix_pipedesc_cancel(nni_aio *aio) { - nni_posix_pipedesc *pd; - - pd = aio->a_prov_data; + nni_posix_pipedesc *pd = aio->a_prov_data; nni_mtx_lock(&pd->mtx); nni_aio_list_remove(aio); |
