aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_epdesc.c285
-rw-r--r--src/platform/posix/posix_pipedesc.c23
2 files changed, 205 insertions, 103 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 9d7cf538..f419d25a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -10,6 +10,7 @@
#include "core/nng_impl.h"
#include "platform/posix/posix_aio.h"
#include "platform/posix/posix_pollq.h"
+#include "platform/posix/posix_socket.h"
#ifdef PLATFORM_POSIX_EPDESC
@@ -19,14 +20,20 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
+#include <sys/un.h>
#include <fcntl.h>
#include <unistd.h>
#include <poll.h>
+#ifdef SOCK_CLOEXEC
+#define NNI_STREAM_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
+#else
+#define NNI_STREAM_SOCKTYPE SOCK_STREAM
+#endif
+
struct nni_posix_epdesc {
int fd;
- int index;
nni_list connectq;
nni_list acceptq;
nni_posix_pollq_node node;
@@ -39,34 +46,25 @@ struct nni_posix_epdesc {
};
-#if 0
static void
nni_posix_epdesc_cancel(nni_aio *aio)
{
- nni_posix_epdesc *ed;
- nni_posix_pollq *pq;
+ nni_posix_epdesc *ed = aio->a_prov_data;
- ed = aio->a_prov_data;
- pq = ed->pq;
-
- nni_mtx_lock(&pq->mtx);
- nni_list_node_remove(&aio->a_prov_node);
- nni_mtx_unlock(&pq->mtx);
+ nni_mtx_lock(&ed->mtx);
+ nni_aio_list_remove(aio);
+ nni_mtx_unlock(&ed->mtx);
}
static void
nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
{
- nni_posix_epdesc *ed;
+ nni_posix_epdesc *ed = aio->a_prov_data;
nni_posix_pipedesc *pd;
- ed = aio->a_prov_data;
-
// acceptq or connectq.
- if (nni_list_active(&ed->connectq, aio)) {
- nni_list_remove(&ed->connectq, aio);
- }
+ nni_aio_list_remove(aio);
if (rv == 0) {
rv = nni_posix_pipedesc_init(&pd, newfd);
@@ -82,7 +80,7 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
static void
-nni_posix_poll_connect(nni_posix_epdesc *ed)
+nni_posix_epdesc_doconnect(nni_posix_epdesc *ed)
{
nni_aio *aio;
socklen_t sz;
@@ -104,6 +102,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
case 0:
// Success!
nni_posix_epdesc_finish(aio, 0, ed->fd);
+ ed->fd = -1;
continue;
case EINPROGRESS:
@@ -112,6 +111,8 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
default:
nni_posix_epdesc_finish(aio, nni_plat_errno(rv), 0);
+ close(ed->fd);
+ ed->fd = -1;
continue;
}
}
@@ -119,7 +120,7 @@ nni_posix_poll_connect(nni_posix_epdesc *ed)
static void
-nni_posix_poll_accept(nni_posix_epdesc *ed)
+nni_posix_epdesc_doaccept(nni_posix_epdesc *ed)
{
nni_aio *aio;
int newfd;
@@ -155,7 +156,7 @@ nni_posix_poll_accept(nni_posix_epdesc *ed)
return;
}
- if (errno == ECONNABORTED) {
+ if ((errno == ECONNABORTED) || (errno == ECONNRESET)) {
// Let's just eat this one. Perhaps it may be
// better to report it to the application, but we
// think most applications don't want to see this.
@@ -170,10 +171,18 @@ nni_posix_poll_accept(nni_posix_epdesc *ed)
static void
-nni_posix_poll_epclose(nni_posix_epdesc *ed)
+nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio *aio;
+ struct sockaddr_un *sun;
+ if (ed->fd != -1) {
+ (void) shutdown(ed->fd, SHUT_RDWR);
+ sun = (void *) &ed->locaddr;
+ if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
+ (void) unlink(sun->sun_path);
+ }
+ }
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
@@ -183,115 +192,225 @@ nni_posix_poll_epclose(nni_posix_epdesc *ed)
}
+static void
+nni_posix_epdesc_cb(void *arg)
+{
+ nni_posix_epdesc *ed = arg;
+
+ nni_mtx_lock(&ed->mtx);
+
+ if (ed->node.revents & POLLIN) {
+ nni_posix_epdesc_doaccept(ed);
+ }
+ if (ed->node.revents & POLLOUT) {
+ nni_posix_epdesc_doconnect(ed);
+ }
+ if (ed->node.revents & (POLLHUP|POLLERR|POLLNVAL)) {
+ nni_posix_epdesc_doclose(ed);
+ }
+ ed->node.revents = 0;
+ ed->node.events = 0;
+
+ if (!nni_list_empty(&ed->connectq)) {
+ ed->node.events |= POLLOUT;
+ }
+ if (!nni_list_empty(&ed->acceptq)) {
+ ed->node.events |= POLLIN;
+ }
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
+void
+nni_posix_epdesc_close(nni_posix_epdesc *ed)
+{
+ nni_posix_pollq_cancel(ed->pq, &ed->node);
+
+ nni_mtx_lock(&ed->mtx);
+ nni_posix_epdesc_doclose(ed);
+ nni_mtx_unlock(&ed->mtx);
+}
+
+
+// UNIX DOMAIN SOCKETS -- these have names in the file namespace.
+// We are going to check to see if there was a name already there.
+// If there was, and nothing is listening (ECONNREFUSED), then we
+// will just try to cleanup the old socket. Note that this is not
+// perfect in all scenarios, so use this with caution.
static int
-nni_posix_epdesc_add(nni_posix_pollq *pq, nni_posix_epdesc *ed)
+nni_posix_epdesc_remove_stale_ipc_socket(struct sockaddr *sa, socklen_t len)
{
- int rv;
+ int fd;
+ struct sockaddr_un *sun = (void *) sa;
- // Add epdesc to the pollq if it isn't already there.
- if (!nni_list_active(&pq->eds, ed)) {
- if ((rv = nni_posix_poll_grow(pq)) != 0) {
- return (rv);
+ if ((len == 0) || (sun->sun_family != AF_UNIX)) {
+ return (0);
+ }
+
+ if ((fd = socket(AF_UNIX, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ // There is an assumption here that connect() returns immediately
+ // (even when non-blocking) when a server is absent. This seems
+ // to be true for the platforms we've tried. If it doesn't work,
+ // then the cleanup will fail. As this is supposed to be an
+ // exceptional case, don't worry.
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (connect(fd, (void *) sun, len) < 0) {
+ if (errno == ECONNREFUSED) {
+ (void) unlink(sun->sun_path);
}
- nni_list_append(&pq->eds, ed);
- pq->neds++;
}
+ (void) close(fd);
return (0);
}
-void
-nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+int
+nni_posix_epdesc_listen(nni_posix_epdesc *ed, const nni_sockaddr *saddr)
{
- // NB: We assume that the FD is already set to nonblocking mode.
+ int len;
+ struct sockaddr_storage ss;
int rv;
- nni_posix_pollq *pq = ed->pq;
- int wake;
+ int fd;
- nni_mtx_lock(&pq->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&pq->mtx);
- return;
+ if ((len = nni_posix_to_sockaddr(&ss, saddr)) < 0) {
+ return (NNG_EADDRINVAL);
}
- if (ed->fd < 0) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&pq->mtx);
- return;
- }
- rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
- if (rv == 0) {
- // Immediate connect, cool! This probably only happens on
- // loopback, and probably not on every platform.
- nni_posix_epdesc_finish(aio, 0, 0);
- nni_mtx_unlock(&pq->mtx);
- return;
+
+ if ((fd = socket(ss.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ return (nni_plat_errno(errno));
}
- if (errno != EINPROGRESS) {
- // Some immediate failure occurred.
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
- nni_mtx_unlock(&pq->mtx);
- return;
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+
+ rv = nni_posix_epdesc_remove_stale_ipc_socket((void *) &ss, len);
+ if (rv != 0) {
+ (void) close(fd);
+ return (rv);
}
- // We have to submit to the pollq, because the connection is pending.
- if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&pq->mtx);
- return;
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
}
- NNI_ASSERT(!nni_list_active(&ed->connectq, aio));
- wake = nni_list_empty(&ed->connectq);
- nni_aio_list_append(&ed->connectq, aio);
- if (wake) {
- nni_plat_pipe_raise(pq->wakewfd);
+ // Listen -- 128 depth is probably sufficient. If it isn't, other
+ // bad things are going to happen.
+ if (listen(fd, 128) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
}
- nni_mtx_unlock(&pq->mtx);
+
+ ed->fd = fd;
+ return (0);
}
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
- // NB: We assume that the FD is already set to nonblocking mode.
int rv;
- int wake;
- nni_posix_pollq *pq = ed->pq;
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
- nni_mtx_lock(&pq->mtx);
+ nni_mtx_lock(&ed->mtx);
// If we can't start, it means that the AIO was stopped.
if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&pq->mtx);
+ nni_mtx_unlock(&ed->mtx);
return;
}
if (ed->fd < 0) {
- nni_mtx_unlock(&pq->mtx);
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&ed->mtx);
return;
}
- // We have to submit to the pollq, because the connection is pending.
- if ((rv = nni_posix_epdesc_add(pq, ed)) != 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_lock(&pq->mtx);
- }
- NNI_ASSERT(!nni_list_active(&ed->acceptq, aio));
- wake = nni_list_empty(&ed->acceptq);
nni_aio_list_append(&ed->acceptq, aio);
- if (wake) {
- nni_plat_pipe_raise(pq->wakewfd);
+ if ((ed->node.events & POLLIN) == 0) {
+ ed->node.events |= POLLIN;
+ rv = nni_posix_pollq_submit(ed->pq, &ed->node);
+ if (rv != 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
}
- nni_mtx_unlock(&pq->mtx);
+ nni_mtx_unlock(&ed->mtx);
}
-#endif
+void
+nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
+{
+ // NB: We assume that the FD is already set to nonblocking mode.
+ int rv;
+
+ nni_mtx_lock(&ed->mtx);
+ // If we can't start, it means that the AIO was stopped.
+ if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
+ ed->fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
+ if (ed->fd < 0) {
+ nni_posix_epdesc_finish(aio, rv, 0);
+ return;
+ }
+
+ // Possibly bind.
+ if (ed->loclen != 0) {
+ rv = bind(ed->fd, (void *) &ed->locaddr, ed->loclen);
+ if (rv != 0) {
+ (void) close(ed->fd);
+ ed->fd = -1;
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+ }
+
+ rv = connect(ed->fd, (void *) &ed->remaddr, ed->remlen);
+ if (rv == 0) {
+ // Immediate connect, cool! This probably only happens on
+ // loopback, and probably not on every platform.
+ nni_posix_epdesc_finish(aio, 0, ed->fd);
+ ed->fd = -1;
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
+ if (errno != EINPROGRESS) {
+ // Some immediate failure occurred.
+ (void) close(ed->fd);
+ ed->fd = -1;
+ nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+
+ // We have to submit to the pollq, because the connection is pending.
+ nni_aio_list_append(&ed->connectq, aio);
+ if ((ed->node.events & POLLOUT) == 0) {
+ ed->node.events |= POLLOUT;
+ rv = nni_posix_pollq_submit(ed->pq, &ed->node);
+ if (rv != 0) {
+ (void) close(ed->fd);
+ ed->fd = -1;
+ nni_posix_epdesc_finish(aio, rv, 0);
+ nni_mtx_unlock(&ed->mtx);
+ return;
+ }
+ }
+ nni_mtx_unlock(&ed->mtx);
+}
+
int
nni_posix_epdesc_init(nni_posix_epdesc **edp, int fd)
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index 14a5c8ab..c3e29c33 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -99,7 +99,6 @@ nni_posix_pipedesc_dowrite(nni_posix_pipedesc *pd)
}
// We completed the entire operation on this aioq.
- nni_list_remove(&pd->writeq, aio);
nni_posix_pipedesc_finish(aio, 0);
// Go back to start of loop to see if there is another
@@ -231,24 +230,10 @@ nni_posix_pipedesc_cb(void *arg)
void
nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
{
- nni_posix_pollq *pq;
- nni_aio *aio;
-
- pq = pd->pq;
-
- nni_posix_pollq_cancel(pq, &pd->node);
+ nni_posix_pollq_cancel(pd->pq, &pd->node);
nni_mtx_lock(&pd->mtx);
- if (pd->fd != -1) {
- // Let any peer know we are closing.
- shutdown(pd->fd, SHUT_RDWR);
- }
- while ((aio = nni_list_first(&pd->readq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
- while ((aio = nni_list_first(&pd->writeq)) != NULL) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
- }
+ nni_posix_pipedesc_doclose(pd);
nni_mtx_unlock(&pd->mtx);
}
@@ -256,9 +241,7 @@ nni_posix_pipedesc_close(nni_posix_pipedesc *pd)
static void
nni_posix_pipedesc_cancel(nni_aio *aio)
{
- nni_posix_pipedesc *pd;
-
- pd = aio->a_prov_data;
+ nni_posix_pipedesc *pd = aio->a_prov_data;
nni_mtx_lock(&pd->mtx);
nni_aio_list_remove(aio);