aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-18 20:38:00 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-20 07:34:16 -0700
commit5902d02ad0a056a146231568f1293ffbcd59f61c (patch)
treebe38584c02d703ec2322ab941d4d723c752fe187 /src
parent40542e7af0f5003d7ad67876ea580a59174031ca (diff)
downloadnng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.gz
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.tar.bz2
nng-5902d02ad0a056a146231568f1293ffbcd59f61c.zip
fixes #346 nng_recv() sometimes acts on null `msg` pointer
This closes a fundamental flaw in the way aio structures were handled. In paticular, aio expiration could race ahead, and fire before the aio was properly registered by the provider. This ultimately led to the possibility of duplicate completions on the same aio. The solution involved breaking up nni_aio_start into two functions. nni_aio_begin (which can be run outside of external locks) simply validates that nni_aio_fini() has not been called, and clears certain fields in the aio to make it ready for use by the provider. nni_aio_schedule does the work to register the aio with the expiration thread, and should only be called when the aio is actually scheduled for asynchronous completion. nni_aio_schedule_verify does the same thing, but returns NNG_ETIMEDOUT if the aio has a zero length timeout. This change has a small negative performance impact. We have plans to rectify that by converting nni_aio_begin to use a locklesss flag for the aio->a_fini bit. While we were here, we fixed some error paths in the POSIX subsystem, which would have returned incorrect error codes, and we made some optmizations in the message queues to reduce conditionals while holding locks in the hot code path.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c52
-rw-r--r--src/core/aio.h20
-rw-r--r--src/core/device.c8
-rw-r--r--src/core/endpt.c6
-rw-r--r--src/core/msgqueue.c60
-rw-r--r--src/core/msgqueue.h2
-rw-r--r--src/platform/posix/posix_epdesc.c57
-rw-r--r--src/platform/posix/posix_pipedesc.c24
-rw-r--r--src/platform/posix/posix_resolv_gai.c11
-rw-r--r--src/platform/posix/posix_udp.c20
-rw-r--r--src/platform/windows/win_iocp.c11
-rw-r--r--src/platform/windows/win_ipc.c8
-rw-r--r--src/platform/windows/win_resolv.c10
-rw-r--r--src/protocol/reqrep0/rep.c38
-rw-r--r--src/protocol/reqrep0/req.c32
-rw-r--r--src/supplemental/http/http_client.c3
-rw-r--r--src/supplemental/http/http_conn.c6
-rw-r--r--src/supplemental/http/http_public.c28
-rw-r--r--src/supplemental/http/http_server.c4
-rw-r--r--src/supplemental/tls/mbedtls/tls.c13
-rw-r--r--src/supplemental/websocket/websocket.c60
-rw-r--r--src/transport/inproc/inproc.c29
-rw-r--r--src/transport/ipc/ipc.c47
-rw-r--r--src/transport/tcp/tcp.c41
-rw-r--r--src/transport/tls/tls.c49
-rw-r--r--src/transport/ws/websocket.c28
-rw-r--r--src/transport/zerotier/zerotier.c118
27 files changed, 423 insertions, 362 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index fac62f12..388e6677 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -49,9 +49,9 @@ static nni_list nni_aio_expire_aios;
//
// In order to guard against aio reuse during teardown, we set a fini
// flag. Any attempt to initialize for a new operation after that point
-// will fail and the caller will get NNG_ESTATE indicating this. The
-// provider that calls nni_aio_start() MUST check the return value, and
-// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
+// will fail and the caller will get NNG_ECANCELED indicating this. The
+// provider that calls nni_aio_begin() MUST check the return value, and
+// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the
// request and return.
// An nni_aio is an async I/O handle.
@@ -184,7 +184,7 @@ nni_aio_fini_cb(nni_aio *aio)
// nni_aio_stop cancels any oustanding operation, and waits for the
// callback to complete, if still running. It also marks the AIO as
-// stopped, preventing further calls to nni_aio_start from succeeding.
+// stopped, preventing further calls to nni_aio_begin from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
// calles are not also stopped, before calling nni_aio_fini to release
// actual memory.
@@ -298,14 +298,11 @@ nni_aio_wait(nni_aio *aio)
}
int
-nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+nni_aio_begin(nni_aio *aio)
{
- nni_time now = nni_clock();
-
nni_mtx_lock(&nni_aio_lk);
-
+ // We should not reschedule anything at this point.
if (aio->a_fini) {
- // We should not reschedule anything at this point.
aio->a_active = false;
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
@@ -315,34 +312,52 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
aio->a_active = true;
-
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_mtx_unlock(&nni_aio_lk);
+ return (0);
+}
+void
+nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = NNI_TIME_ZERO;
+ aio->a_expire = nni_clock();
break;
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
- aio->a_expire = now + aio->a_timeout;
+ aio->a_expire = nni_clock() + aio->a_timeout;
break;
}
}
+ nni_mtx_lock(&nni_aio_lk);
+ aio->a_prov_cancel = cancelfn;
+ aio->a_prov_data = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
+}
+
+int
+nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
+
+ if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
+ return (NNG_ETIMEDOUT);
+ }
+ nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -651,6 +666,9 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -661,13 +679,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
aio->a_sleep = false;
- (void) nni_aio_start(aio, NULL, NULL);
+ (void) nni_aio_schedule(aio, NULL, NULL);
return;
}
}
aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- (void) nni_aio_start(aio, NULL, NULL);
+
+ // There is no cancellation, apart from just unexpiring.
+ nni_aio_schedule(aio, NULL, NULL);
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index a0f4934f..d23ddf5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -123,7 +123,14 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
extern void nni_aio_abort(nni_aio *, int rv);
-extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *);
+// nni_aio_begin is called by a provider to indicate it is starting the
+// operation, and to check that the aio has not already been marked for
+// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being
+// torn down. (In that case, no operation should be aborted without any
+// call to any other functions on this AIO, most especially not the
+// nng_aio_finish family of functions.)
+extern int nni_aio_begin(nni_aio *);
+
extern void *nni_aio_get_prov_data(nni_aio *);
extern void nni_aio_set_prov_data(nni_aio *, void *);
extern void *nni_aio_get_prov_extra(nni_aio *, unsigned);
@@ -143,6 +150,17 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);
+// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
+// asychronous completion. This also starts the expiration timer. Note that
+// prior to this, the aio is uncancellable.
+extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
+
+// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
+// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
+// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
+// if the operation could not be immediately completed.
+extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
diff --git a/src/core/device.c b/src/core/device.c
index 6def9f64..e3b1d220 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -188,12 +188,12 @@ nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
- nni_mtx_lock(&dd->mtx);
- dd->user = user;
- if (nni_aio_start(user, nni_device_cancel, dd) != 0) {
- nni_mtx_unlock(&dd->mtx);
+ if (nni_aio_begin(user) != 0) {
return;
}
+ nni_mtx_lock(&dd->mtx);
+ nni_aio_schedule(user, nni_device_cancel, dd);
+ dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
p->user = user;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0cb59a14..2741a8e6 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -313,7 +313,7 @@ nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
- if (ep->ep_closing) {
+ if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
}
backoff = ep->ep_currtime;
@@ -334,9 +334,7 @@ nni_ep_tmo_start(nni_ep *ep)
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
ep->ep_tmo_run = 1;
- if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
- ep->ep_tmo_run = 0;
- }
+ nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 212b52c2..1bb5a762 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -22,10 +22,10 @@ struct nni_msgq {
int mq_len;
int mq_get;
int mq_put;
- int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_besteffort;
+ bool mq_besteffort;
+ bool mq_closed;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
@@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_aio_list_remove(aio);
@@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
@@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
@@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
void
-nni_msgq_set_best_effort(nni_msgq *mq, int on)
+nni_msgq_set_best_effort(nni_msgq *mq, bool on)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_besteffort = on;
@@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ // If this is an instantaneous poll operation, and the queue has
+ // no room, nobody is waiting to receive, and we're not best effort
+ // (best effort discards), then report the error (NNG_ETIMEDOUT).
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
+ (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- if (mq->mq_geterr) {
- nni_aio_finish_error(aio, mq->mq_geterr);
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len == 0) &&
+ (nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq)
nni_aio *aio;
nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
+ mq->mq_closed = true;
+ mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED;
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2d23f448..2f1a46eb 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -63,7 +63,7 @@ extern void nni_msgq_set_get_error(nni_msgq *, int);
// What this does is treat the message queue condition as if it were
// successful, returning 0, and discarding the message. If zero is
// passed then this mode is reset to normal.
-extern void nni_msgq_set_best_effort(nni_msgq *, int);
+extern void nni_msgq_set_best_effort(nni_msgq *, bool);
// nni_msgq_filter is a callback function used to filter messages.
// The function is called on entry (put) or exit (get). The void
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 16c0d09f..196b206a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -319,26 +319,22 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
- int rv;
-
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
if (ed->closed) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
-
nni_aio_list_append(&ed->acceptq, aio);
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_posix_pollq_arm(&ed->node, POLLIN);
nni_mtx_unlock(&ed->mtx);
}
@@ -362,39 +358,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int rv;
int fd;
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
- fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
- if (fd < 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
return;
}
// Possibly bind.
- if (ed->loclen != 0) {
- rv = bind(fd, (void *) &ed->locaddr, ed->loclen);
- if (rv != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
+ if ((ed->loclen != 0) &&
+ (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- rv = connect(fd, (void *) &ed->remaddr, ed->remlen);
-
- if (rv == 0) {
+ if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
-
nni_posix_epdesc_finish(aio, 0, fd);
nni_mtx_unlock(&ed->mtx);
return;
@@ -402,24 +392,27 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
- if (errno == ENOENT) {
+ if (errno == ENOENT) { // For UNIX domain sockets
errno = ECONNREFUSED;
}
- (void) close(fd);
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
// We have to submit to the pollq, because the connection is pending.
ed->node.fd = fd;
if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
+ ed->node.fd = -1;
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_aio_list_append(&ed->connectq, aio);
nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index aebea4b8..62531f9c 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -254,20 +254,20 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->readq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
// If we are only job on the list, go ahead and try to do an immediate
// transfer. This allows for faster completions in many cases. We
// also need not arm a list if it was already armed.
@@ -285,20 +285,20 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->writeq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
// If we are still the first thing on the list, that means we
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index f3ac7a19..d86a2008 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -206,9 +206,11 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
int family, int proto, nni_aio *aio)
{
nni_posix_resolv_item *item;
- int rv;
sa_family_t fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -241,12 +243,7 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->family = fam;
nni_mtx_lock(&nni_posix_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_posix_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_posix_resolv_mtx);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 8a402acc..b414fa95 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -287,22 +287,26 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_recvq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_sendq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 1189433a..a3ae3748 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -155,12 +155,11 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
- nni_mtx_lock(&evt->mtx);
- if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) {
- // the aio was aborted
- nni_mtx_unlock(&evt->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&evt->mtx);
+ nni_aio_schedule(aio, nni_win_event_cancel, evt);
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 5843917c..d372f639 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -491,16 +491,16 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&w->mtx);
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
- if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
- nni_mtx_unlock(&w->mtx);
- return;
- }
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
+ nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index d07b4fd5..72e6b439 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -180,6 +180,9 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
int rv;
int fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -211,12 +214,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;
nni_mtx_lock(&nni_win_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_win_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_win_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_win_resolv_mtx);
}
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index e18675ee..afc52629 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -170,6 +170,10 @@ rep0_ctx_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
nni_msg_header_clear(msg);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
nni_mtx_lock(&s->lk);
len = ctx->btrace_len;
p_id = ctx->pipe_id;
@@ -187,10 +191,6 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_pollable_clear(s->sendable);
}
- if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) {
- nni_mtx_unlock(&s->lk);
- return;
- }
if (ctx->closed) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -201,12 +201,6 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
- if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
@@ -217,7 +211,18 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_msg_free(msg);
return;
}
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if (p->busy) {
+ rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->saio = aio;
ctx->spipe = p;
nni_list_append(&p->sendq, ctx);
@@ -456,18 +461,23 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
size_t len;
nni_msg * msg;
- nni_mtx_lock(&s->lk);
- if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) {
- nni_mtx_unlock(&s->lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&s->lk);
if (ctx->closed) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
- nni_pollable_clear(s->recvable);
+ int rv;
+ rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->raio = aio;
nni_list_append(&s->recvq, ctx);
nni_mtx_unlock(&s->lk);
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 8149ce08..3ecc8604 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -617,11 +617,10 @@ req0_ctx_recv(void *arg, nni_aio *aio)
req0_sock *s = ctx->sock;
nni_msg * msg;
- nni_mtx_lock(&s->mtx);
- if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) {
- nni_mtx_unlock(&s->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&s->mtx);
if (s->closed) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -638,6 +637,13 @@ req0_ctx_recv(void *arg, nni_aio *aio)
}
if ((msg = ctx->repmsg) == NULL) {
+ int rv;
+ rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_recv, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->raio = aio;
nni_mtx_unlock(&s->mtx);
return;
@@ -697,14 +703,11 @@ req0_ctx_send(void *arg, nni_aio *aio)
uint64_t id;
int rv;
- nni_mtx_lock(&s->mtx);
- // Even though we always complete synchronously, this guards against
- // restarting a request that was stopped.
- if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) {
- nni_mtx_unlock(&s->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
- // Sending a new requst cancels the old one, including any
+ nni_mtx_lock(&s->mtx);
+ // Sending a new request cancels the old one, including any
// outstanding reply.
if (ctx->raio != NULL) {
nni_aio_finish_error(ctx->raio, NNG_ECANCELED);
@@ -735,6 +738,15 @@ req0_ctx_send(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
+ // If no pipes are ready, and the request was a poll (no background
+ // schedule), then fail it. Should be NNG_TIMEDOUT.
+ rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_send, ctx);
+ if ((rv != 0) && (nni_list_empty(&s->readypipes))) {
+ nni_idhash_remove(s->reqids, id);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->reqlen = nni_msg_len(msg);
ctx->reqmsg = msg;
ctx->saio = aio;
@@ -743,6 +755,8 @@ req0_ctx_send(void *arg, nni_aio *aio)
// Stick us on the sendq list.
nni_list_append(&s->sendq, ctx);
+ // Note that this will be synchronous if the readypipes list was
+ // not empty.
req0_run_sendq(s, NULL);
nni_mtx_unlock(&s->mtx);
}
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index c62a3c56..2a9be4bb 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -241,10 +241,11 @@ http_connect_cancel(nni_aio *aio, int rv)
void
nni_http_client_connect(nni_http_client *c, nni_aio *aio)
{
- if (nni_aio_start(aio, http_connect_cancel, aio) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
+ nni_aio_schedule(aio, http_connect_cancel, aio);
nni_list_append(&c->aios, aio);
if (nni_list_first(&c->aios) == aio) {
http_conn_start(c);
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index d37c9a2f..a3039d0d 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -360,13 +360,14 @@ http_rd_cancel(nni_aio *aio, int rv)
static void
http_rd_submit(nni_http_conn *conn, nni_aio *aio)
{
- if (nni_aio_start(aio, http_rd_cancel, conn) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
if (conn->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ nni_aio_schedule(aio, http_rd_cancel, conn);
nni_list_append(&conn->rdq, aio);
if (conn->rd_uaio == NULL) {
http_rd_start(conn);
@@ -473,7 +474,7 @@ http_wr_cancel(nni_aio *aio, int rv)
static void
http_wr_submit(nni_http_conn *conn, nni_aio *aio)
{
- if (nni_aio_start(aio, http_wr_cancel, conn) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
if (conn->closed) {
@@ -481,6 +482,7 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio)
return;
}
nni_list_append(&conn->wrq, aio);
+ nni_aio_schedule(aio, http_wr_cancel, conn);
if (conn->wr_uaio == NULL) {
http_wr_start(conn);
}
diff --git a/src/supplemental/http/http_public.c b/src/supplemental/http/http_public.c
index 7107c029..7aae45b2 100644
--- a/src/supplemental/http/http_public.c
+++ b/src/supplemental/http/http_public.c
@@ -9,9 +9,9 @@
//
#include "core/nng_impl.h"
-#include "supplemental/tls/tls.h"
#include "http.h"
#include "http_api.h"
+#include "supplemental/tls/tls.h"
// Symbols in this file are "public" versions of the HTTP API.
// These are suitable for exposure to applications.
@@ -382,7 +382,7 @@ nng_http_conn_read(nng_http_conn *conn, nng_aio *aio)
nni_http_read(conn, aio);
#else
NNI_ARG_UNUSED(conn);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -395,7 +395,7 @@ nng_http_conn_read_all(nng_http_conn *conn, nng_aio *aio)
nni_http_read_full(conn, aio);
#else
NNI_ARG_UNUSED(conn);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -408,7 +408,7 @@ nng_http_conn_write(nng_http_conn *conn, nng_aio *aio)
nni_http_write(conn, aio);
#else
NNI_ARG_UNUSED(conn);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -433,7 +433,7 @@ nng_http_conn_write_req(nng_http_conn *conn, nng_http_req *req, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(req);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -447,7 +447,7 @@ nng_http_conn_write_res(nng_http_conn *conn, nng_http_res *res, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(res);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -461,7 +461,7 @@ nng_http_conn_read_req(nng_http_conn *conn, nng_http_req *req, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(req);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -475,7 +475,7 @@ nng_http_conn_read_res(nng_http_conn *conn, nng_http_res *res, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(res);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -574,7 +574,8 @@ nng_http_handler_set_host(nng_http_handler *h, const char *host)
#endif
}
-int nng_http_handler_set_tree(nng_http_handler *h)
+int
+nng_http_handler_set_tree(nng_http_handler *h)
{
#ifdef NNG_SUPP_HTTP
return (nni_http_handler_set_tree(h));
@@ -698,12 +699,13 @@ nng_http_server_get_tls(nng_http_server *srv, nng_tls_config **cfgp)
#endif
}
-int nng_http_hijack(nng_http_conn * conn)
+int
+nng_http_hijack(nng_http_conn *conn)
{
#ifdef NNG_SUPP_HTTP
- return (nni_http_hijack(conn));
+ return (nni_http_hijack(conn));
#else
- NNI_ARG_UNUSED(conn);
+ NNI_ARG_UNUSED(conn);
return (NNG_ENOTSUP);
#endif
}
@@ -762,7 +764,7 @@ nng_http_client_connect(nng_http_client *cli, nng_aio *aio)
nni_http_client_connect(cli, aio);
#else
NNI_ARG_UNUSED(cli);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index 7feadc96..c92de586 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -590,8 +590,8 @@ http_sconn_rxdone(void *arg)
// Technically, probably callback should initialize this with
// start, but we do it instead.
-
- if (nni_aio_start(sc->cbaio, NULL, NULL) != 0) {
+ // This operation cannot at present canceled or timed out.
+ if (nni_aio_begin(sc->cbaio) != 0) {
nni_mtx_unlock(&s->mtx);
return;
}
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index 8faf4e46..c2a0fb52 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -13,7 +13,6 @@
#include <stdlib.h>
#include <string.h>
-
#include "mbedtls/version.h" // Must be first in order to pick up version
#include "mbedtls/error.h"
@@ -562,16 +561,16 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len)
void
nni_tls_send(nni_tls *tp, nni_aio *aio)
{
- nni_mtx_lock(&tp->lk);
- if (nni_aio_start(aio, nni_tls_cancel, tp) != 0) {
- nni_mtx_unlock(&tp->lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&tp->lk);
if (tp->tls_closed) {
nni_mtx_unlock(&tp->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ nni_aio_schedule(aio, nni_tls_cancel, tp);
nni_list_append(&tp->sends, aio);
nni_tls_do_send(tp);
nni_mtx_unlock(&tp->lk);
@@ -580,16 +579,16 @@ nni_tls_send(nni_tls *tp, nni_aio *aio)
void
nni_tls_recv(nni_tls *tp, nni_aio *aio)
{
- nni_mtx_lock(&tp->lk);
- if (nni_aio_start(aio, nni_tls_cancel, tp) != 0) {
- nni_mtx_unlock(&tp->lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&tp->lk);
if (tp->tls_closed) {
nni_mtx_unlock(&tp->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ nni_aio_schedule(aio, nni_tls_cancel, tp);
nni_list_append(&tp->recvs, aio);
nni_tls_do_recv(tp);
nni_mtx_unlock(&tp->lk);
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 7bd698bd..74b56437 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -655,9 +655,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws->closed = true;
aio = ws->closeaio;
- // We don't care about cancellation here. If this times out,
- // we will still shut all the physical I/O down in the callback.
- if (nni_aio_start(aio, ws_cancel_close, ws) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
ws->wclose = true;
@@ -669,6 +667,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
}
// Close frames get priority!
nni_list_prepend(&ws->txmsgs, wm);
+ nni_aio_schedule(aio, ws_cancel_close, ws);
ws_start_write(ws);
}
@@ -722,26 +721,22 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
msg = nni_aio_get_msg(aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
- if (nni_aio_start(aio, NULL, NULL) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&ws->mtx);
-
- if (nni_aio_start(aio, ws_write_cancel, ws) != 0) {
- nni_mtx_unlock(&ws->mtx);
- ws_msg_fini(wm);
- return;
- }
if (ws->closed) {
nni_mtx_unlock(&ws->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
ws_msg_fini(wm);
return;
}
+ nni_aio_schedule(aio, ws_write_cancel, ws);
nni_aio_set_prov_extra(aio, 0, wm);
nni_list_append(&ws->sendq, aio);
nni_list_append(&ws->txmsgs, wm);
@@ -1056,19 +1051,20 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
ws_msg *wm;
int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
- if (nni_aio_start(aio, NULL, NULL)) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&ws->mtx);
- if (nni_aio_start(aio, ws_read_cancel, ws) == 0) {
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->recvq, aio);
- nni_list_append(&ws->rxmsgs, wm);
- ws_start_read(ws);
- }
+ nni_aio_schedule(aio, ws_read_cancel, ws);
+ nni_aio_set_prov_extra(aio, 0, wm);
+ nni_list_append(&ws->recvq, aio);
+ nni_list_append(&ws->rxmsgs, wm);
+ ws_start_read(ws);
+
nni_mtx_unlock(&ws->mtx);
}
@@ -1651,11 +1647,10 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
{
nni_ws *ws;
- nni_mtx_lock(&l->mtx);
- if (nni_aio_start(aio, ws_accept_cancel, l) != 0) {
- nni_mtx_unlock(&l->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&l->mtx);
if (l->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&l->mtx);
@@ -1668,11 +1663,13 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
}
if ((ws = nni_list_first(&l->pend)) != NULL) {
nni_list_remove(&l->pend, ws);
+ nni_mtx_unlock(&l->mtx);
nni_aio_set_output(aio, 0, ws);
nni_aio_finish(aio, 0, 0);
- } else {
- nni_list_append(&l->aios, aio);
+ return;
}
+ nni_aio_schedule(aio, ws_accept_cancel, l);
+ nni_list_append(&l->aios, aio);
nni_mtx_unlock(&l->mtx);
}
@@ -1983,19 +1980,17 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
nni_ws *ws;
int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_init(&ws)) != 0) {
nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&d->mtx);
if (d->closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&d->mtx);
- ws_fini(ws);
- return;
- }
- if (nni_aio_start(aio, ws_dial_cancel, ws) != 0) {
nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
ws_fini(ws);
return;
}
@@ -2003,6 +1998,7 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
ws->useraio = aio;
ws->mode = NNI_EP_MODE_DIAL;
nni_list_append(&d->wspend, ws);
+ nni_aio_schedule(aio, ws_dial_cancel, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 82f46fc4..8bfb097e 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,19 +349,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
- int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
nni_mtx_lock(&nni_inproc.mx);
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
-
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
if (strcmp(server->addr, ep->addr) == 0) {
@@ -369,11 +366,17 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
}
}
if (server == NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
+ // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, NNG_ECONNREFUSED);
return;
}
+ // We don't have to worry about the case where a zero timeout
+ // on connect was specified, as there is no option to specify
+ // that in the upper API.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -404,15 +407,17 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
- int rv;
- nni_mtx_lock(&nni_inproc.mx);
-
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&nni_inproc.mx);
+
+ // We need not worry about the case where a non-blocking
+ // accept was tried -- there is no API to do such a thing.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
nni_aio_list_append(&ep->aios, aio);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index ecc9a962..61b89f20 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,11 +418,11 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+ nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,12 +474,13 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+
+ // Transports never have a zero length timeout.
+ (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -492,10 +493,12 @@ static void
nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- int rv;
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
@@ -513,11 +516,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -680,16 +679,14 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -700,18 +697,14 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- // If we can't start, then its dying and we can't report
- // either.
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 7ea77035..a551c5be 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -398,11 +398,11 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -454,11 +454,11 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -510,6 +510,9 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -527,10 +530,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -721,16 +721,14 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -741,17 +739,14 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index a852ddd8..2ea14e93 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -405,13 +405,11 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tls_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -463,12 +461,12 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+
+ nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -519,6 +517,9 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -536,10 +537,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -743,18 +741,14 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -763,19 +757,14 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 0542d0c7..761ba824 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -129,13 +129,12 @@ ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
p->user_rxaio = aio;
-
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -160,11 +159,11 @@ ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_send_cancel, p);
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -294,11 +293,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
// Otherwise we stick the AIO in the accept list.
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -313,6 +312,9 @@ ws_ep_connect(void *arg, nni_aio *aio)
int rv = 0;
ws_hdr *h;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (!ep->started) {
NNI_LIST_FOREACH (&ep->headers, h) {
rv = nni_ws_dialer_header(
@@ -327,11 +329,7 @@ ws_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(nni_list_empty(&ep->aios));
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index d0790b37..05866cfb 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1745,26 +1745,24 @@ zt_pipe_send(void *arg, nni_aio *aio)
size_t bytes;
nni_msg *m;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, NULL, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if ((m = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
+ nni_mtx_lock(&zt_lk);
+
if (p->zp_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data);
- if ((m = nni_aio_get_msg(aio)) == NULL) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- nni_mtx_unlock(&zt_lk);
- return;
- };
-
bytes = nni_msg_header_len(m) + nni_msg_len(m);
if (bytes >= (0xfffe * fragsz)) {
nni_aio_finish_error(aio, NNG_EMSGSIZE);
@@ -1821,11 +1819,16 @@ zt_pipe_send(void *arg, nni_aio *aio)
zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr,
p->zp_laddr, data, fraglen + zt_offset_data_data);
} while (nni_msg_len(m) != 0);
+ nni_mtx_unlock(&zt_lk);
+
+ // NB, We never bothered to call nn_aio_sched, because we run this
+ // synchronously, relying on UDP to simply discard messages if we
+ // cannot deliver them. This means that pipe send operations with
+ // this transport are not cancellable.
nni_aio_set_msg(aio, NULL);
nni_msg_free(m);
nni_aio_finish(aio, 0, offset);
- nni_mtx_unlock(&zt_lk);
}
static void
@@ -1903,17 +1906,18 @@ zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&zt_lk);
if (p->zp_closed) {
+ nni_mtx_unlock(&zt_lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
- } else {
- p->zp_user_rxaio = aio;
- zt_pipe_dorecv(p);
+ return;
}
+ nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ p->zp_user_rxaio = aio;
+ zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
}
@@ -2047,11 +2051,10 @@ zt_pipe_ping_cb(void *arg)
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
- if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
- // We have to send a ping to keep the session
- // up.
p->zp_ping_try++;
zt_pipe_send_ping(p);
}
@@ -2069,6 +2072,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&zt_lk);
p->zp_ping_active = 0;
// send a gratuitous ping, and start the ping interval timer.
@@ -2077,8 +2083,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
(p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
- if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
- 0) {
+ if (nni_aio_begin(p->zp_ping_aio) == 0) {
+ nni_aio_schedule(
+ p->zp_ping_aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
zt_pipe_send_ping(p);
}
@@ -2402,11 +2409,13 @@ zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_aio_list_append(&ep->ze_aios, aio);
- zt_ep_doaccept(ep);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&zt_lk);
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+ nni_aio_list_append(&ep->ze_aios, aio);
+ zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
}
@@ -2479,7 +2488,8 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
- if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
ep->ze_creq_active = 1;
ep->ze_creq_try++;
zt_ep_send_conn_req(ep);
@@ -2493,43 +2503,41 @@ static void
zt_ep_connect(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
// We bind locally. We'll use the address later when we give
// it to the pipe, but this allows us to receive the initial
// ack back from the server. (This gives us an ephemeral
// address to work with.)
nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- int rv;
-
- // Clear the port so we get an ephemeral port.
- ep->ze_laddr &= ~((uint64_t) zt_port_mask);
-
- if ((rv = zt_ep_bind_locked(ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&zt_lk);
- return;
- }
-
- if ((ep->ze_raddr >> 24) == 0) {
- ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
- }
- nni_aio_list_append(&ep->ze_aios, aio);
-
- ep->ze_running = 1;
-
- nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ // Clear the port so we get an ephemeral port.
+ ep->ze_laddr &= ~((uint64_t) zt_port_mask);
- if (nni_aio_start(
- ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if ((rv = zt_ep_bind_locked(ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
- }
+ if ((ep->ze_raddr >> 24) == 0) {
+ ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
+ }
+ nni_aio_list_append(&ep->ze_aios, aio);
+ ep->ze_running = 1;
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+
+ nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ if (nni_aio_begin(ep->ze_creq_aio) == 0) {
+ nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
}
nni_mtx_unlock(&zt_lk);
}