aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c52
-rw-r--r--src/core/aio.h20
-rw-r--r--src/core/device.c8
-rw-r--r--src/core/endpt.c6
-rw-r--r--src/core/msgqueue.c60
-rw-r--r--src/core/msgqueue.h2
-rw-r--r--src/platform/posix/posix_epdesc.c57
-rw-r--r--src/platform/posix/posix_pipedesc.c24
-rw-r--r--src/platform/posix/posix_resolv_gai.c11
-rw-r--r--src/platform/posix/posix_udp.c20
-rw-r--r--src/platform/windows/win_iocp.c11
-rw-r--r--src/platform/windows/win_ipc.c8
-rw-r--r--src/platform/windows/win_resolv.c10
-rw-r--r--src/protocol/reqrep0/rep.c38
-rw-r--r--src/protocol/reqrep0/req.c32
-rw-r--r--src/supplemental/http/http_client.c3
-rw-r--r--src/supplemental/http/http_conn.c6
-rw-r--r--src/supplemental/http/http_public.c28
-rw-r--r--src/supplemental/http/http_server.c4
-rw-r--r--src/supplemental/tls/mbedtls/tls.c13
-rw-r--r--src/supplemental/websocket/websocket.c60
-rw-r--r--src/transport/inproc/inproc.c29
-rw-r--r--src/transport/ipc/ipc.c47
-rw-r--r--src/transport/tcp/tcp.c41
-rw-r--r--src/transport/tls/tls.c49
-rw-r--r--src/transport/ws/websocket.c28
-rw-r--r--src/transport/zerotier/zerotier.c118
27 files changed, 423 insertions, 362 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index fac62f12..388e6677 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -49,9 +49,9 @@ static nni_list nni_aio_expire_aios;
//
// In order to guard against aio reuse during teardown, we set a fini
// flag. Any attempt to initialize for a new operation after that point
-// will fail and the caller will get NNG_ESTATE indicating this. The
-// provider that calls nni_aio_start() MUST check the return value, and
-// if it comes back nonzero (NNG_ESTATE) then it must simply discard the
+// will fail and the caller will get NNG_ECANCELED indicating this. The
+// provider that calls nni_aio_begin() MUST check the return value, and
+// if it comes back nonzero (NNG_ECANCELED) then it must simply discard the
// request and return.
// An nni_aio is an async I/O handle.
@@ -184,7 +184,7 @@ nni_aio_fini_cb(nni_aio *aio)
// nni_aio_stop cancels any oustanding operation, and waits for the
// callback to complete, if still running. It also marks the AIO as
-// stopped, preventing further calls to nni_aio_start from succeeding.
+// stopped, preventing further calls to nni_aio_begin from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
// calles are not also stopped, before calling nni_aio_fini to release
// actual memory.
@@ -298,14 +298,11 @@ nni_aio_wait(nni_aio *aio)
}
int
-nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+nni_aio_begin(nni_aio *aio)
{
- nni_time now = nni_clock();
-
nni_mtx_lock(&nni_aio_lk);
-
+ // We should not reschedule anything at this point.
if (aio->a_fini) {
- // We should not reschedule anything at this point.
aio->a_active = false;
aio->a_result = NNG_ECANCELED;
nni_mtx_unlock(&nni_aio_lk);
@@ -315,34 +312,52 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_pend = false;
aio->a_result = 0;
aio->a_count = 0;
- aio->a_prov_cancel = cancelfn;
- aio->a_prov_data = data;
+ aio->a_prov_cancel = NULL;
+ aio->a_prov_data = NULL;
aio->a_active = true;
-
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
+ nni_mtx_unlock(&nni_aio_lk);
+ return (0);
+}
+void
+nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
- aio->a_expire = NNI_TIME_ZERO;
+ aio->a_expire = nni_clock();
break;
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
- aio->a_expire = now + aio->a_timeout;
+ aio->a_expire = nni_clock() + aio->a_timeout;
break;
}
}
+ nni_mtx_lock(&nni_aio_lk);
+ aio->a_prov_cancel = cancelfn;
+ aio->a_prov_data = data;
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
+}
+
+int
+nni_aio_schedule_verify(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
+{
+
+ if ((!aio->a_sleep) && (aio->a_timeout == NNG_DURATION_ZERO)) {
+ return (NNG_ETIMEDOUT);
+ }
+ nni_aio_schedule(aio, cancelfn, data);
return (0);
}
@@ -651,6 +666,9 @@ nni_aio_iov_advance(nni_aio *aio, size_t n)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (aio->a_timeout) {
case NNG_DURATION_DEFAULT:
case NNG_DURATION_INFINITE:
@@ -661,13 +679,15 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
// then let it still wake up early, but with NNG_ETIMEDOUT.
if (ms > aio->a_timeout) {
aio->a_sleep = false;
- (void) nni_aio_start(aio, NULL, NULL);
+ (void) nni_aio_schedule(aio, NULL, NULL);
return;
}
}
aio->a_sleep = true;
aio->a_expire = nni_clock() + ms;
- (void) nni_aio_start(aio, NULL, NULL);
+
+ // There is no cancellation, apart from just unexpiring.
+ nni_aio_schedule(aio, NULL, NULL);
}
void
diff --git a/src/core/aio.h b/src/core/aio.h
index a0f4934f..d23ddf5b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -123,7 +123,14 @@ extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// with the indicated result (NNG_ECLOSED or NNG_ECANCELED is recommended.)
extern void nni_aio_abort(nni_aio *, int rv);
-extern int nni_aio_start(nni_aio *, nni_aio_cancelfn, void *);
+// nni_aio_begin is called by a provider to indicate it is starting the
+// operation, and to check that the aio has not already been marked for
+// teardown. It returns 0 on success, or NNG_ECANCELED if the aio is being
+// torn down. (In that case, no operation should be aborted without any
+// call to any other functions on this AIO, most especially not the
+// nng_aio_finish family of functions.)
+extern int nni_aio_begin(nni_aio *);
+
extern void *nni_aio_get_prov_data(nni_aio *);
extern void nni_aio_set_prov_data(nni_aio *, void *);
extern void *nni_aio_get_prov_extra(nni_aio *, unsigned);
@@ -143,6 +150,17 @@ extern void nni_aio_get_iov(nni_aio *, unsigned *, nni_iov **);
extern void nni_aio_normalize_timeout(nni_aio *, nng_duration);
extern void nni_aio_bump_count(nni_aio *, size_t);
+// nni_aio_schedule indicates that the AIO has begun, and is scheduled for
+// asychronous completion. This also starts the expiration timer. Note that
+// prior to this, the aio is uncancellable.
+extern void nni_aio_schedule(nni_aio *, nni_aio_cancelfn, void *);
+
+// nni_aio_schedule_verify is like nni_aio_schedule, except that if the
+// operation has been run with a zero time (NNG_FLAG_NONBLOCK), then it
+// returns NNG_ETIMEDOUT. This is done to permit bypassing scheduling
+// if the operation could not be immediately completed.
+extern int nni_aio_schedule_verify(nni_aio *, nni_aio_cancelfn, void *);
+
extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
diff --git a/src/core/device.c b/src/core/device.c
index 6def9f64..e3b1d220 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -188,12 +188,12 @@ nni_device_start(nni_device_data *dd, nni_aio *user)
{
int i;
- nni_mtx_lock(&dd->mtx);
- dd->user = user;
- if (nni_aio_start(user, nni_device_cancel, dd) != 0) {
- nni_mtx_unlock(&dd->mtx);
+ if (nni_aio_begin(user) != 0) {
return;
}
+ nni_mtx_lock(&dd->mtx);
+ nni_aio_schedule(user, nni_device_cancel, dd);
+ dd->user = user;
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
p->user = user;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0cb59a14..2741a8e6 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -313,7 +313,7 @@ nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
- if (ep->ep_closing) {
+ if (ep->ep_closing || (nni_aio_begin(ep->ep_tmo_aio) != 0)) {
return;
}
backoff = ep->ep_currtime;
@@ -334,9 +334,7 @@ nni_ep_tmo_start(nni_ep *ep)
ep->ep_tmo_aio, (backoff ? nni_random() % backoff : 0));
ep->ep_tmo_run = 1;
- if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
- ep->ep_tmo_run = 0;
- }
+ nni_aio_schedule(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
}
static void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 212b52c2..1bb5a762 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -22,10 +22,10 @@ struct nni_msgq {
int mq_len;
int mq_get;
int mq_put;
- int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_besteffort;
+ bool mq_besteffort;
+ bool mq_closed;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
@@ -117,6 +117,10 @@ nni_msgq_set_get_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_aio_list_remove(aio);
@@ -134,6 +138,10 @@ nni_msgq_set_put_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_aio_list_remove(aio);
@@ -151,6 +159,10 @@ nni_msgq_set_error(nni_msgq *mq, int error)
// Let all pending blockers know we are closing the queue.
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ // If we were closed, then this error trumps all others.
+ error = NNG_ECLOSED;
+ }
if (error != 0) {
while (((aio = nni_list_first(&mq->mq_aio_getq)) != NULL) ||
((aio = nni_list_first(&mq->mq_aio_putq)) != NULL)) {
@@ -231,7 +243,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
void
-nni_msgq_set_best_effort(nni_msgq *mq, int on)
+nni_msgq_set_best_effort(nni_msgq *mq, bool on)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_besteffort = on;
@@ -325,22 +337,28 @@ nni_msgq_cancel(nni_aio *aio, int rv)
void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&mq->mq_lock);
if (mq->mq_puterr) {
nni_aio_finish_error(aio, mq->mq_puterr);
nni_mtx_unlock(&mq->mq_lock);
return;
}
+ // If this is an instantaneous poll operation, and the queue has
+ // no room, nobody is waiting to receive, and we're not best effort
+ // (best effort discards), then report the error (NNG_ETIMEDOUT).
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len >= mq->mq_cap) &&
+ (nni_list_empty(&mq->mq_aio_getq)) && (!mq->mq_besteffort)) {
+ nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
nni_aio_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -351,19 +369,22 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
void
nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
return;
}
- if (mq->mq_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_geterr) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, mq->mq_geterr);
return;
}
- if (mq->mq_geterr) {
- nni_aio_finish_error(aio, mq->mq_geterr);
+ rv = nni_aio_schedule_verify(aio, nni_msgq_cancel, mq);
+ if ((rv != 0) && (mq->mq_len == 0) &&
+ (nni_list_empty(&mq->mq_aio_putq))) {
nni_mtx_unlock(&mq->mq_lock);
+ nni_aio_finish_error(aio, rv);
return;
}
@@ -417,7 +438,8 @@ nni_msgq_close(nni_msgq *mq)
nni_aio *aio;
nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
+ mq->mq_closed = true;
+ mq->mq_puterr = mq->mq_geterr = NNG_ECLOSED;
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2d23f448..2f1a46eb 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -63,7 +63,7 @@ extern void nni_msgq_set_get_error(nni_msgq *, int);
// What this does is treat the message queue condition as if it were
// successful, returning 0, and discarding the message. If zero is
// passed then this mode is reset to normal.
-extern void nni_msgq_set_best_effort(nni_msgq *, int);
+extern void nni_msgq_set_best_effort(nni_msgq *, bool);
// nni_msgq_filter is a callback function used to filter messages.
// The function is called on entry (put) or exit (get). The void
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 16c0d09f..196b206a 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -319,26 +319,22 @@ nni_posix_epdesc_listen(nni_posix_epdesc *ed)
void
nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
{
- int rv;
-
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
if (ed->closed) {
- nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
-
nni_aio_list_append(&ed->acceptq, aio);
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_posix_pollq_arm(&ed->node, POLLIN);
nni_mtx_unlock(&ed->mtx);
}
@@ -362,39 +358,33 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int rv;
int fd;
- nni_mtx_lock(&ed->mtx);
- // If we can't start, it means that the AIO was stopped.
- if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
- nni_mtx_unlock(&ed->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ed->mtx);
- fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0);
- if (fd < 0) {
- nni_posix_epdesc_finish(aio, rv, 0);
+ if ((fd = socket(ed->remaddr.ss_family, NNI_STREAM_SOCKTYPE, 0)) < 0) {
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ nni_aio_finish_error(aio, rv);
return;
}
// Possibly bind.
- if (ed->loclen != 0) {
- rv = bind(fd, (void *) &ed->locaddr, ed->loclen);
- if (rv != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
- nni_mtx_unlock(&ed->mtx);
- return;
- }
+ if ((ed->loclen != 0) &&
+ (bind(fd, (void *) &ed->locaddr, ed->loclen) != 0)) {
+ rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
+ return;
}
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- rv = connect(fd, (void *) &ed->remaddr, ed->remlen);
-
- if (rv == 0) {
+ if ((rv = connect(fd, (void *) &ed->remaddr, ed->remlen)) == 0) {
// Immediate connect, cool! This probably only happens on
// loopback, and probably not on every platform.
-
nni_posix_epdesc_finish(aio, 0, fd);
nni_mtx_unlock(&ed->mtx);
return;
@@ -402,24 +392,27 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
if (errno != EINPROGRESS) {
// Some immediate failure occurred.
- if (errno == ENOENT) {
+ if (errno == ENOENT) { // For UNIX domain sockets
errno = ECONNREFUSED;
}
- (void) close(fd);
- nni_posix_epdesc_finish(aio, nni_plat_errno(errno), 0);
+ rv = nni_plat_errno(errno);
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
// We have to submit to the pollq, because the connection is pending.
ed->node.fd = fd;
if ((rv = nni_posix_pollq_add(&ed->node)) != 0) {
- (void) close(fd);
- nni_posix_epdesc_finish(aio, rv, 0);
+ ed->node.fd = -1;
nni_mtx_unlock(&ed->mtx);
+ (void) close(fd);
+ nni_aio_finish_error(aio, rv);
return;
}
+ nni_aio_schedule(aio, nni_posix_epdesc_cancel, ed);
nni_aio_list_append(&ed->connectq, aio);
nni_posix_pollq_arm(&ed->node, POLLOUT);
nni_mtx_unlock(&ed->mtx);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index aebea4b8..62531f9c 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -254,20 +254,20 @@ nni_posix_pipedesc_cancel(nni_aio *aio, int rv)
void
nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->readq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
// If we are only job on the list, go ahead and try to do an immediate
// transfer. This allows for faster completions in many cases. We
// also need not arm a list if it was already armed.
@@ -285,20 +285,20 @@ nni_posix_pipedesc_recv(nni_posix_pipedesc *pd, nni_aio *aio)
void
nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
{
- int rv;
-
- nni_mtx_lock(&pd->mtx);
- if ((rv = nni_aio_start(aio, nni_posix_pipedesc_cancel, pd)) != 0) {
- nni_mtx_unlock(&pd->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pd->mtx);
+
if (pd->closed) {
- nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_aio_list_append(&pd->writeq, aio);
+ nni_aio_schedule(aio, nni_posix_pipedesc_cancel, pd);
+
if (nni_list_first(&pd->writeq) == aio) {
nni_posix_pipedesc_dowrite(pd);
// If we are still the first thing on the list, that means we
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index f3ac7a19..d86a2008 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -206,9 +206,11 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
int family, int proto, nni_aio *aio)
{
nni_posix_resolv_item *item;
- int rv;
sa_family_t fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -241,12 +243,7 @@ nni_posix_resolv_ip(const char *host, const char *serv, int passive,
item->family = fam;
nni_mtx_lock(&nni_posix_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_posix_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_posix_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_posix_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_posix_resolv_mtx);
}
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 8a402acc..b414fa95 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -287,22 +287,26 @@ nni_plat_udp_cancel(nni_aio *aio, int rv)
void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_recvq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
nni_mtx_unlock(&udp->udp_mtx);
}
void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
- nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
- nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&udp->udp_mtx);
+ nni_aio_schedule(aio, nni_plat_udp_cancel, udp);
+ nni_list_append(&udp->udp_sendq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 1189433a..a3ae3748 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -155,12 +155,11 @@ nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
{
- nni_mtx_lock(&evt->mtx);
- if (nni_aio_start(aio, nni_win_event_cancel, evt) != 0) {
- // the aio was aborted
- nni_mtx_unlock(&evt->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&evt->mtx);
+ nni_aio_schedule(aio, nni_win_event_cancel, evt);
nni_aio_list_append(&evt->aios, aio);
nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 5843917c..d372f639 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -491,16 +491,16 @@ nni_plat_ipc_ep_connect(nni_plat_ipc_ep *ep, nni_aio *aio)
{
nni_win_ipc_conn_work *w = &nni_win_ipc_connecter;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&w->mtx);
NNI_ASSERT(!nni_list_active(&w->waiters, ep));
- if (nni_aio_start(aio, nni_win_ipc_conn_cancel, ep) != 0) {
- nni_mtx_unlock(&w->mtx);
- return;
- }
ep->con_aio = aio;
nni_list_append(&w->waiters, ep);
+ nni_aio_schedule(aio, nni_win_ipc_conn_cancel, ep);
nni_cv_wake(&w->cv);
nni_mtx_unlock(&w->mtx);
}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index d07b4fd5..72e6b439 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -180,6 +180,9 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
int rv;
int fam;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
switch (family) {
case NNG_AF_INET:
fam = AF_INET;
@@ -211,12 +214,7 @@ nni_win_resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;
nni_mtx_lock(&nni_win_resolv_mtx);
- // If we were stopped, we're done...
- if ((rv = nni_aio_start(aio, nni_win_resolv_cancel, item)) != 0) {
- nni_mtx_unlock(&nni_win_resolv_mtx);
- NNI_FREE_STRUCT(item);
- return;
- }
+ nni_aio_schedule(aio, nni_win_resolv_cancel, item);
nni_task_dispatch(&item->task);
nni_mtx_unlock(&nni_win_resolv_mtx);
}
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index e18675ee..afc52629 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -170,6 +170,10 @@ rep0_ctx_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
nni_msg_header_clear(msg);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
nni_mtx_lock(&s->lk);
len = ctx->btrace_len;
p_id = ctx->pipe_id;
@@ -187,10 +191,6 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_pollable_clear(s->sendable);
}
- if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) {
- nni_mtx_unlock(&s->lk);
- return;
- }
if (ctx->closed) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -201,12 +201,6 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
- if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
@@ -217,7 +211,18 @@ rep0_ctx_send(void *arg, nni_aio *aio)
nni_msg_free(msg);
return;
}
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
if (p->busy) {
+ rv = nni_aio_schedule_verify(aio, rep0_ctx_cancel_send, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->saio = aio;
ctx->spipe = p;
nni_list_append(&p->sendq, ctx);
@@ -456,18 +461,23 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
size_t len;
nni_msg * msg;
- nni_mtx_lock(&s->lk);
- if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) {
- nni_mtx_unlock(&s->lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&s->lk);
if (ctx->closed) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
if ((p = nni_list_first(&s->recvpipes)) == NULL) {
- nni_pollable_clear(s->recvable);
+ int rv;
+ rv = nni_aio_schedule_verify(aio, rep0_cancel_recv, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->raio = aio;
nni_list_append(&s->recvq, ctx);
nni_mtx_unlock(&s->lk);
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 8149ce08..3ecc8604 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -617,11 +617,10 @@ req0_ctx_recv(void *arg, nni_aio *aio)
req0_sock *s = ctx->sock;
nni_msg * msg;
- nni_mtx_lock(&s->mtx);
- if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) {
- nni_mtx_unlock(&s->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&s->mtx);
if (s->closed) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -638,6 +637,13 @@ req0_ctx_recv(void *arg, nni_aio *aio)
}
if ((msg = ctx->repmsg) == NULL) {
+ int rv;
+ rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_recv, ctx);
+ if (rv != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->raio = aio;
nni_mtx_unlock(&s->mtx);
return;
@@ -697,14 +703,11 @@ req0_ctx_send(void *arg, nni_aio *aio)
uint64_t id;
int rv;
- nni_mtx_lock(&s->mtx);
- // Even though we always complete synchronously, this guards against
- // restarting a request that was stopped.
- if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) {
- nni_mtx_unlock(&s->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
- // Sending a new requst cancels the old one, including any
+ nni_mtx_lock(&s->mtx);
+ // Sending a new request cancels the old one, including any
// outstanding reply.
if (ctx->raio != NULL) {
nni_aio_finish_error(ctx->raio, NNG_ECANCELED);
@@ -735,6 +738,15 @@ req0_ctx_send(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
+ // If no pipes are ready, and the request was a poll (no background
+ // schedule), then fail it. Should be NNG_TIMEDOUT.
+ rv = nni_aio_schedule_verify(aio, req0_ctx_cancel_send, ctx);
+ if ((rv != 0) && (nni_list_empty(&s->readypipes))) {
+ nni_idhash_remove(s->reqids, id);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
ctx->reqlen = nni_msg_len(msg);
ctx->reqmsg = msg;
ctx->saio = aio;
@@ -743,6 +755,8 @@ req0_ctx_send(void *arg, nni_aio *aio)
// Stick us on the sendq list.
nni_list_append(&s->sendq, ctx);
+ // Note that this will be synchronous if the readypipes list was
+ // not empty.
req0_run_sendq(s, NULL);
nni_mtx_unlock(&s->mtx);
}
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index c62a3c56..2a9be4bb 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -241,10 +241,11 @@ http_connect_cancel(nni_aio *aio, int rv)
void
nni_http_client_connect(nni_http_client *c, nni_aio *aio)
{
- if (nni_aio_start(aio, http_connect_cancel, aio) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
+ nni_aio_schedule(aio, http_connect_cancel, aio);
nni_list_append(&c->aios, aio);
if (nni_list_first(&c->aios) == aio) {
http_conn_start(c);
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index d37c9a2f..a3039d0d 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -360,13 +360,14 @@ http_rd_cancel(nni_aio *aio, int rv)
static void
http_rd_submit(nni_http_conn *conn, nni_aio *aio)
{
- if (nni_aio_start(aio, http_rd_cancel, conn) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
if (conn->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ nni_aio_schedule(aio, http_rd_cancel, conn);
nni_list_append(&conn->rdq, aio);
if (conn->rd_uaio == NULL) {
http_rd_start(conn);
@@ -473,7 +474,7 @@ http_wr_cancel(nni_aio *aio, int rv)
static void
http_wr_submit(nni_http_conn *conn, nni_aio *aio)
{
- if (nni_aio_start(aio, http_wr_cancel, conn) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
if (conn->closed) {
@@ -481,6 +482,7 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio)
return;
}
nni_list_append(&conn->wrq, aio);
+ nni_aio_schedule(aio, http_wr_cancel, conn);
if (conn->wr_uaio == NULL) {
http_wr_start(conn);
}
diff --git a/src/supplemental/http/http_public.c b/src/supplemental/http/http_public.c
index 7107c029..7aae45b2 100644
--- a/src/supplemental/http/http_public.c
+++ b/src/supplemental/http/http_public.c
@@ -9,9 +9,9 @@
//
#include "core/nng_impl.h"
-#include "supplemental/tls/tls.h"
#include "http.h"
#include "http_api.h"
+#include "supplemental/tls/tls.h"
// Symbols in this file are "public" versions of the HTTP API.
// These are suitable for exposure to applications.
@@ -382,7 +382,7 @@ nng_http_conn_read(nng_http_conn *conn, nng_aio *aio)
nni_http_read(conn, aio);
#else
NNI_ARG_UNUSED(conn);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -395,7 +395,7 @@ nng_http_conn_read_all(nng_http_conn *conn, nng_aio *aio)
nni_http_read_full(conn, aio);
#else
NNI_ARG_UNUSED(conn);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -408,7 +408,7 @@ nng_http_conn_write(nng_http_conn *conn, nng_aio *aio)
nni_http_write(conn, aio);
#else
NNI_ARG_UNUSED(conn);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -433,7 +433,7 @@ nng_http_conn_write_req(nng_http_conn *conn, nng_http_req *req, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(req);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -447,7 +447,7 @@ nng_http_conn_write_res(nng_http_conn *conn, nng_http_res *res, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(res);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -461,7 +461,7 @@ nng_http_conn_read_req(nng_http_conn *conn, nng_http_req *req, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(req);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -475,7 +475,7 @@ nng_http_conn_read_res(nng_http_conn *conn, nng_http_res *res, nng_aio *aio)
#else
NNI_ARG_UNUSED(conn);
NNI_ARG_UNUSED(res);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
@@ -574,7 +574,8 @@ nng_http_handler_set_host(nng_http_handler *h, const char *host)
#endif
}
-int nng_http_handler_set_tree(nng_http_handler *h)
+int
+nng_http_handler_set_tree(nng_http_handler *h)
{
#ifdef NNG_SUPP_HTTP
return (nni_http_handler_set_tree(h));
@@ -698,12 +699,13 @@ nng_http_server_get_tls(nng_http_server *srv, nng_tls_config **cfgp)
#endif
}
-int nng_http_hijack(nng_http_conn * conn)
+int
+nng_http_hijack(nng_http_conn *conn)
{
#ifdef NNG_SUPP_HTTP
- return (nni_http_hijack(conn));
+ return (nni_http_hijack(conn));
#else
- NNI_ARG_UNUSED(conn);
+ NNI_ARG_UNUSED(conn);
return (NNG_ENOTSUP);
#endif
}
@@ -762,7 +764,7 @@ nng_http_client_connect(nng_http_client *cli, nng_aio *aio)
nni_http_client_connect(cli, aio);
#else
NNI_ARG_UNUSED(cli);
- if (nni_aio_start(aio, NULL, NULL)) {
+ if (nni_aio_begin(aio) == 0) {
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
#endif
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index 7feadc96..c92de586 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -590,8 +590,8 @@ http_sconn_rxdone(void *arg)
// Technically, probably callback should initialize this with
// start, but we do it instead.
-
- if (nni_aio_start(sc->cbaio, NULL, NULL) != 0) {
+ // This operation cannot at present canceled or timed out.
+ if (nni_aio_begin(sc->cbaio) != 0) {
nni_mtx_unlock(&s->mtx);
return;
}
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index 8faf4e46..c2a0fb52 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -13,7 +13,6 @@
#include <stdlib.h>
#include <string.h>
-
#include "mbedtls/version.h" // Must be first in order to pick up version
#include "mbedtls/error.h"
@@ -562,16 +561,16 @@ nni_tls_net_recv(void *ctx, unsigned char *buf, size_t len)
void
nni_tls_send(nni_tls *tp, nni_aio *aio)
{
- nni_mtx_lock(&tp->lk);
- if (nni_aio_start(aio, nni_tls_cancel, tp) != 0) {
- nni_mtx_unlock(&tp->lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&tp->lk);
if (tp->tls_closed) {
nni_mtx_unlock(&tp->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ nni_aio_schedule(aio, nni_tls_cancel, tp);
nni_list_append(&tp->sends, aio);
nni_tls_do_send(tp);
nni_mtx_unlock(&tp->lk);
@@ -580,16 +579,16 @@ nni_tls_send(nni_tls *tp, nni_aio *aio)
void
nni_tls_recv(nni_tls *tp, nni_aio *aio)
{
- nni_mtx_lock(&tp->lk);
- if (nni_aio_start(aio, nni_tls_cancel, tp) != 0) {
- nni_mtx_unlock(&tp->lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&tp->lk);
if (tp->tls_closed) {
nni_mtx_unlock(&tp->lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ nni_aio_schedule(aio, nni_tls_cancel, tp);
nni_list_append(&tp->recvs, aio);
nni_tls_do_recv(tp);
nni_mtx_unlock(&tp->lk);
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 7bd698bd..74b56437 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -655,9 +655,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws->closed = true;
aio = ws->closeaio;
- // We don't care about cancellation here. If this times out,
- // we will still shut all the physical I/O down in the callback.
- if (nni_aio_start(aio, ws_cancel_close, ws) != 0) {
+ if (nni_aio_begin(aio) != 0) {
return;
}
ws->wclose = true;
@@ -669,6 +667,7 @@ ws_send_close(nni_ws *ws, uint16_t code)
}
// Close frames get priority!
nni_list_prepend(&ws->txmsgs, wm);
+ nni_aio_schedule(aio, ws_cancel_close, ws);
ws_start_write(ws);
}
@@ -722,26 +721,22 @@ nni_ws_send_msg(nni_ws *ws, nni_aio *aio)
msg = nni_aio_get_msg(aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_msg_init_tx(&wm, ws, msg, aio)) != 0) {
- if (nni_aio_start(aio, NULL, NULL) == 0) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&ws->mtx);
-
- if (nni_aio_start(aio, ws_write_cancel, ws) != 0) {
- nni_mtx_unlock(&ws->mtx);
- ws_msg_fini(wm);
- return;
- }
if (ws->closed) {
nni_mtx_unlock(&ws->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
ws_msg_fini(wm);
return;
}
+ nni_aio_schedule(aio, ws_write_cancel, ws);
nni_aio_set_prov_extra(aio, 0, wm);
nni_list_append(&ws->sendq, aio);
nni_list_append(&ws->txmsgs, wm);
@@ -1056,19 +1051,20 @@ nni_ws_recv_msg(nni_ws *ws, nni_aio *aio)
ws_msg *wm;
int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_msg_init_rx(&wm, ws, aio)) != 0) {
- if (nni_aio_start(aio, NULL, NULL)) {
- nni_aio_finish_error(aio, rv);
- }
+ nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&ws->mtx);
- if (nni_aio_start(aio, ws_read_cancel, ws) == 0) {
- nni_aio_set_prov_extra(aio, 0, wm);
- nni_list_append(&ws->recvq, aio);
- nni_list_append(&ws->rxmsgs, wm);
- ws_start_read(ws);
- }
+ nni_aio_schedule(aio, ws_read_cancel, ws);
+ nni_aio_set_prov_extra(aio, 0, wm);
+ nni_list_append(&ws->recvq, aio);
+ nni_list_append(&ws->rxmsgs, wm);
+ ws_start_read(ws);
+
nni_mtx_unlock(&ws->mtx);
}
@@ -1651,11 +1647,10 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
{
nni_ws *ws;
- nni_mtx_lock(&l->mtx);
- if (nni_aio_start(aio, ws_accept_cancel, l) != 0) {
- nni_mtx_unlock(&l->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&l->mtx);
if (l->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&l->mtx);
@@ -1668,11 +1663,13 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
}
if ((ws = nni_list_first(&l->pend)) != NULL) {
nni_list_remove(&l->pend, ws);
+ nni_mtx_unlock(&l->mtx);
nni_aio_set_output(aio, 0, ws);
nni_aio_finish(aio, 0, 0);
- } else {
- nni_list_append(&l->aios, aio);
+ return;
}
+ nni_aio_schedule(aio, ws_accept_cancel, l);
+ nni_list_append(&l->aios, aio);
nni_mtx_unlock(&l->mtx);
}
@@ -1983,19 +1980,17 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
nni_ws *ws;
int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if ((rv = ws_init(&ws)) != 0) {
nni_aio_finish_error(aio, rv);
return;
}
nni_mtx_lock(&d->mtx);
if (d->closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&d->mtx);
- ws_fini(ws);
- return;
- }
- if (nni_aio_start(aio, ws_dial_cancel, ws) != 0) {
nni_mtx_unlock(&d->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
ws_fini(ws);
return;
}
@@ -2003,6 +1998,7 @@ nni_ws_dialer_dial(nni_ws_dialer *d, nni_aio *aio)
ws->useraio = aio;
ws->mode = NNI_EP_MODE_DIAL;
nni_list_append(&d->wspend, ws);
+ nni_aio_schedule(aio, ws_dial_cancel, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_mtx_unlock(&d->mtx);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 82f46fc4..8bfb097e 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -349,19 +349,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *server;
- int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
nni_mtx_lock(&nni_inproc.mx);
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
-
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
if (strcmp(server->addr, ep->addr) == 0) {
@@ -369,11 +366,17 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
}
}
if (server == NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
+ // nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
nni_mtx_unlock(&nni_inproc.mx);
+ nni_aio_finish_error(aio, NNG_ECONNREFUSED);
return;
}
+ // We don't have to worry about the case where a zero timeout
+ // on connect was specified, as there is no option to specify
+ // that in the upper API.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
nni_list_append(&server->clients, ep);
nni_aio_list_append(&ep->aios, aio);
@@ -404,15 +407,17 @@ static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
nni_inproc_ep *ep = arg;
- int rv;
- nni_mtx_lock(&nni_inproc.mx);
-
- if ((rv = nni_aio_start(aio, nni_inproc_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&nni_inproc.mx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&nni_inproc.mx);
+
+ // We need not worry about the case where a non-blocking
+ // accept was tried -- there is no API to do such a thing.
+ nni_aio_schedule(aio, nni_inproc_ep_cancel, ep);
+
// We are already on the master list of servers, thanks to bind.
// Insert us into pending server aios, and then run accept list.
nni_aio_list_append(&ep->aios, aio);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index ecc9a962..61b89f20 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -418,11 +418,11 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
- if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+ nni_aio_schedule(aio, nni_ipc_cancel_tx, pipe);
nni_list_append(&pipe->sendq, aio);
if (nni_list_first(&pipe->sendq) == aio) {
nni_ipc_pipe_dosend(pipe, aio);
@@ -474,12 +474,13 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_mtx_lock(&pipe->mtx);
-
- if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&pipe->mtx);
+
+ // Transports never have a zero length timeout.
+ (void) nni_aio_schedule(aio, nni_ipc_cancel_rx, pipe);
nni_list_append(&pipe->recvq, aio);
if (nni_list_first(&pipe->recvq) == aio) {
@@ -492,10 +493,12 @@ static void
nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- int rv;
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
pipe->txhead[1] = 'S';
@@ -513,11 +516,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &pipe->txhead[0];
nni_aio_set_iov(negaio, 1, &iov);
- rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&pipe->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_ipc_cancel_start, pipe);
nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -680,16 +679,14 @@ static void
nni_ipc_ep_accept(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, ep->aio);
@@ -700,18 +697,14 @@ static void
nni_ipc_ep_connect(void *arg, nni_aio *aio)
{
nni_ipc_ep *ep = arg;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
- // If we can't start, then its dying and we can't report
- // either.
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_ipc_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_ipc_ep_connect(ep->iep, ep->aio);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 7ea77035..a551c5be 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -398,11 +398,11 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tcp_pipe_dosend(p, aio);
@@ -454,11 +454,11 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tcp_cancel_rx, p);
nni_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tcp_pipe_dorecv(p);
@@ -510,6 +510,9 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -527,10 +530,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tcp_cancel_nego, p);
nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -721,16 +721,14 @@ static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
@@ -741,17 +739,14 @@ static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tcp_cancel_ep, ep);
ep->user_aio = aio;
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index a852ddd8..2ea14e93 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -405,13 +405,11 @@ nni_tls_pipe_send(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_tx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, nni_tls_cancel_tx, p);
nni_list_append(&p->sendq, aio);
if (nni_list_first(&p->sendq) == aio) {
nni_tls_pipe_dosend(p, aio);
@@ -463,12 +461,12 @@ nni_tls_pipe_recv(void *arg, nni_aio *aio)
{
nni_tls_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
-
- if (nni_aio_start(aio, nni_tls_cancel_rx, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+
+ nni_aio_schedule(aio, nni_tls_cancel_rx, p);
nni_aio_list_append(&p->recvq, aio);
if (nni_list_first(&p->recvq) == aio) {
nni_tls_pipe_dorecv(p);
@@ -519,6 +517,9 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
nni_aio * negaio;
nni_iov iov;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
p->txlen[1] = 'S';
@@ -536,10 +537,7 @@ nni_tls_pipe_start(void *arg, nni_aio *aio)
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(negaio, 1, &iov);
- if (nni_aio_start(aio, nni_tls_cancel_nego, p) != 0) {
- nni_mtx_unlock(&p->mtx);
- return;
- }
+ nni_aio_schedule(aio, nni_tls_cancel_nego, p);
nni_tls_send(p->tls, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -743,18 +741,14 @@ static void
nni_tls_ep_accept(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -763,19 +757,14 @@ static void
nni_tls_ep_connect(void *arg, nni_aio *aio)
{
nni_tls_ep *ep = arg;
- int rv;
- nni_mtx_lock(&ep->mtx);
- NNI_ASSERT(ep->user_aio == NULL);
-
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, nni_tls_cancel_ep, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
-
+ nni_mtx_lock(&ep->mtx);
+ NNI_ASSERT(ep->user_aio == NULL);
+ nni_aio_schedule(aio, nni_tls_cancel_ep, ep);
ep->user_aio = aio;
-
nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 0542d0c7..761ba824 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -129,13 +129,12 @@ ws_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_recv_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_recv_cancel, p);
p->user_rxaio = aio;
-
nni_ws_recv_msg(p->ws, p->rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -160,11 +159,11 @@ ws_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
- nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, ws_pipe_send_cancel, p) != 0) {
- nni_mtx_unlock(&p->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&p->mtx);
+ nni_aio_schedule(aio, ws_pipe_send_cancel, p);
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
@@ -294,11 +293,11 @@ ws_ep_accept(void *arg, nni_aio *aio)
// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
// Otherwise we stick the AIO in the accept list.
- nni_mtx_lock(&ep->mtx);
- if (nni_aio_start(aio, ws_ep_cancel, ep) != 0) {
- nni_mtx_unlock(&ep->mtx);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&ep->mtx);
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
nni_list_append(&ep->aios, aio);
if (aio == nni_list_first(&ep->aios)) {
nni_ws_listener_accept(ep->listener, ep->accaio);
@@ -313,6 +312,9 @@ ws_ep_connect(void *arg, nni_aio *aio)
int rv = 0;
ws_hdr *h;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
if (!ep->started) {
NNI_LIST_FOREACH (&ep->headers, h) {
rv = nni_ws_dialer_header(
@@ -327,11 +329,7 @@ ws_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(nni_list_empty(&ep->aios));
- // If we can't start, then its dying and we can't report either.
- if ((rv = nni_aio_start(aio, ws_ep_cancel, ep)) != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
+ nni_aio_schedule(aio, ws_ep_cancel, ep);
ep->started = true;
nni_list_append(&ep->aios, aio);
nni_ws_dialer_dial(ep->dialer, ep->connaio);
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index d0790b37..05866cfb 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1745,26 +1745,24 @@ zt_pipe_send(void *arg, nni_aio *aio)
size_t bytes;
nni_msg *m;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, NULL, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ if ((m = nni_aio_get_msg(aio)) == NULL) {
+ nni_aio_finish_error(aio, NNG_EINVAL);
return;
}
+ nni_mtx_lock(&zt_lk);
+
if (p->zp_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&zt_lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
fragsz = (uint16_t)(p->zp_mtu - zt_offset_data_data);
- if ((m = nni_aio_get_msg(aio)) == NULL) {
- nni_aio_finish_error(aio, NNG_EINVAL);
- nni_mtx_unlock(&zt_lk);
- return;
- };
-
bytes = nni_msg_header_len(m) + nni_msg_len(m);
if (bytes >= (0xfffe * fragsz)) {
nni_aio_finish_error(aio, NNG_EMSGSIZE);
@@ -1821,11 +1819,16 @@ zt_pipe_send(void *arg, nni_aio *aio)
zt_send(p->zp_ztn, p->zp_nwid, zt_op_data, p->zp_raddr,
p->zp_laddr, data, fraglen + zt_offset_data_data);
} while (nni_msg_len(m) != 0);
+ nni_mtx_unlock(&zt_lk);
+
+ // NB, We never bothered to call nn_aio_sched, because we run this
+ // synchronously, relying on UDP to simply discard messages if we
+ // cannot deliver them. This means that pipe send operations with
+ // this transport are not cancellable.
nni_aio_set_msg(aio, NULL);
nni_msg_free(m);
nni_aio_finish(aio, 0, offset);
- nni_mtx_unlock(&zt_lk);
}
static void
@@ -1903,17 +1906,18 @@ zt_pipe_recv(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_pipe_cancel_recv, p) != 0) {
- nni_mtx_unlock(&zt_lk);
+ if (nni_aio_begin(aio) != 0) {
return;
}
+ nni_mtx_lock(&zt_lk);
if (p->zp_closed) {
+ nni_mtx_unlock(&zt_lk);
nni_aio_finish_error(aio, NNG_ECLOSED);
- } else {
- p->zp_user_rxaio = aio;
- zt_pipe_dorecv(p);
+ return;
}
+ nni_aio_schedule(aio, zt_pipe_cancel_recv, p);
+ p->zp_user_rxaio = aio;
+ zt_pipe_dorecv(p);
nni_mtx_unlock(&zt_lk);
}
@@ -2047,11 +2051,10 @@ zt_pipe_ping_cb(void *arg)
// use the the timer to wake us up even if we aren't
// going to send a ping. (We don't increment the try count
// unless we actually do send one though.)
- if (nni_aio_start(aio, zt_pipe_cancel_ping, p) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
if (now > (p->zp_last_recv + p->zp_ping_time)) {
- // We have to send a ping to keep the session
- // up.
p->zp_ping_try++;
zt_pipe_send_ping(p);
}
@@ -2069,6 +2072,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
{
zt_pipe *p = arg;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
nni_mtx_lock(&zt_lk);
p->zp_ping_active = 0;
// send a gratuitous ping, and start the ping interval timer.
@@ -2077,8 +2083,9 @@ zt_pipe_start(void *arg, nni_aio *aio)
(p->zp_ping_aio != NULL)) {
p->zp_ping_try = 0;
nni_aio_set_timeout(aio, p->zp_ping_time);
- if (nni_aio_start(p->zp_ping_aio, zt_pipe_cancel_ping, p) ==
- 0) {
+ if (nni_aio_begin(p->zp_ping_aio) == 0) {
+ nni_aio_schedule(
+ p->zp_ping_aio, zt_pipe_cancel_ping, p);
p->zp_ping_active = 1;
zt_pipe_send_ping(p);
}
@@ -2402,11 +2409,13 @@ zt_ep_accept(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
- nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- nni_aio_list_append(&ep->ze_aios, aio);
- zt_ep_doaccept(ep);
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
+ nni_mtx_lock(&zt_lk);
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+ nni_aio_list_append(&ep->ze_aios, aio);
+ zt_ep_doaccept(ep);
nni_mtx_unlock(&zt_lk);
}
@@ -2479,7 +2488,8 @@ zt_ep_conn_req_cb(void *arg)
if (nni_list_first(&ep->ze_aios) != NULL) {
nni_aio_set_timeout(aio, ep->ze_conn_time);
- if (nni_aio_start(aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if (nni_aio_begin(aio) == 0) {
+ nni_aio_schedule(aio, zt_ep_conn_req_cancel, ep);
ep->ze_creq_active = 1;
ep->ze_creq_try++;
zt_ep_send_conn_req(ep);
@@ -2493,43 +2503,41 @@ static void
zt_ep_connect(void *arg, nni_aio *aio)
{
zt_ep *ep = arg;
+ int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
// We bind locally. We'll use the address later when we give
// it to the pipe, but this allows us to receive the initial
// ack back from the server. (This gives us an ephemeral
// address to work with.)
nni_mtx_lock(&zt_lk);
- if (nni_aio_start(aio, zt_ep_cancel, ep) == 0) {
- int rv;
-
- // Clear the port so we get an ephemeral port.
- ep->ze_laddr &= ~((uint64_t) zt_port_mask);
-
- if ((rv = zt_ep_bind_locked(ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&zt_lk);
- return;
- }
-
- if ((ep->ze_raddr >> 24) == 0) {
- ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
- }
- nni_aio_list_append(&ep->ze_aios, aio);
-
- ep->ze_running = 1;
-
- nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ // Clear the port so we get an ephemeral port.
+ ep->ze_laddr &= ~((uint64_t) zt_port_mask);
- if (nni_aio_start(
- ep->ze_creq_aio, zt_ep_conn_req_cancel, ep) == 0) {
+ if ((rv = zt_ep_bind_locked(ep)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&zt_lk);
+ return;
+ }
- // Send out the first connect message; if not
- // yet attached to network message will be dropped.
- ep->ze_creq_try = 1;
- ep->ze_creq_active = 1;
- zt_ep_send_conn_req(ep);
- }
+ if ((ep->ze_raddr >> 24) == 0) {
+ ep->ze_raddr |= (ep->ze_ztn->zn_self << zt_port_shift);
+ }
+ nni_aio_list_append(&ep->ze_aios, aio);
+ ep->ze_running = 1;
+ nni_aio_schedule(aio, zt_ep_cancel, ep);
+
+ nni_aio_set_timeout(ep->ze_creq_aio, ep->ze_conn_time);
+ if (nni_aio_begin(ep->ze_creq_aio) == 0) {
+ nni_aio_schedule(ep->ze_creq_aio, zt_ep_conn_req_cancel, ep);
+ // Send out the first connect message; if not
+ // yet attached to network message will be dropped.
+ ep->ze_creq_try = 1;
+ ep->ze_creq_active = 1;
+ zt_ep_send_conn_req(ep);
}
nni_mtx_unlock(&zt_lk);
}