aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_epdesc.c57
-rw-r--r--src/platform/posix/posix_pipedesc.c24
-rw-r--r--src/platform/posix/posix_resolv_gai.c11
-rw-r--r--src/platform/posix/posix_udp.c20
4 files changed, 53 insertions, 59 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 16c0d09f..196b206a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -319,26 +319,22 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
- int rv;
-
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
if (ed->closed) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
-
nni_aio_list_append(&ed->acceptq, aio);
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_posix_pollq_arm(&ed->node, POLLIN);
nni_mtx_unlock(&ed->mtx);
}
@@ -362,39 +358,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int rv;
int fd;
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
- fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
- if (fd < 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
return;
}
// Possibly bind.
- if (ed->loclen != 0) {
- rv = bind(fd, (void *) &ed->locaddr, ed->loclen);
- if (rv != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
+ if ((ed->loclen != 0) &&
+ (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- rv = connect(fd, (void *) &ed->remaddr, ed->remlen);
-
- if (rv == 0) {
+ if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
-
nni_posix_epdesc_finish(aio, 0, fd);
nni_mtx_unlock(&ed->mtx);
return;
@@ -402,24 +392,27 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
- if (errno == ENOENT) {
+ if (errno == ENOENT) { // For UNIX domain sockets
errno = ECONNREFUSED;
}
- (void) close(fd);
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
// We have to submit to the pollq, because the connection is pending.
ed->node.fd = fd;
if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
+ ed->node.fd = -1;
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_aio_list_append(&ed->connectq, aio);
nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index aebea4b8..62531f9c 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -254,20 +254,20 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->readq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
// If we are only job on the list, go ahead and try to do an immediate
// transfer. This allows for faster completions in many cases. We
// also need not arm a list if it was already armed.
@@ -285,20 +285,20 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->writeq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
// If we are still the first thing on the list, that means we
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index f3ac7a19..d86a2008 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -206,9 +206,11 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
int family, int proto, nni_aio *aio)
{
nni_posix_resolv_item *item;
- int rv;
sa_family_t fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -241,12 +243,7 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->family = fam;
nni_mtx_lock(&nni_posix_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_posix_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_posix_resolv_mtx);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 8a402acc..b414fa95 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -287,22 +287,26 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_recvq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_sendq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
nni_mtx_unlock(&udp->udp_mtx);
}