aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2020-01-08 20:34:26 -0800
committerGarrett D'Amore <garrett@damore.org>2020-01-08 21:16:30 -0800
commitb21d7805523a407a14567017edbdef57ca81781f (patch)
treee07f08bdc047ee4dfb057b670766e3de5bf2f981 /src
parent8479b4c8861c77cfd9eb64e724615605bdd1cbcb (diff)
downloadnng-b21d7805523a407a14567017edbdef57ca81781f.tar.gz
nng-b21d7805523a407a14567017edbdef57ca81781f.tar.bz2
nng-b21d7805523a407a14567017edbdef57ca81781f.zip
fixes #1094 Consider in-lining task and aio
This only does it for rep, but it also has changes that should increase the overall test coverage for the REP protocol
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c134
-rw-r--r--src/core/aio.h74
-rw-r--r--src/core/device.c10
-rw-r--r--src/core/dialer.c12
-rw-r--r--src/core/listener.c8
-rw-r--r--src/nng.c4
-rw-r--r--src/protocol/bus0/bus.c22
-rw-r--r--src/protocol/pair0/pair.c16
-rw-r--r--src/protocol/pair1/pair.c20
-rw-r--r--src/protocol/pipeline0/pull.c8
-rw-r--r--src/protocol/pipeline0/push.c12
-rw-r--r--src/protocol/pubsub0/pub.c8
-rw-r--r--src/protocol/pubsub0/sub.c4
-rw-r--r--src/protocol/pubsub0/xsub.c4
-rw-r--r--src/protocol/reqrep0/rep.c80
-rw-r--r--src/protocol/reqrep0/rep_test.c21
-rw-r--r--src/protocol/reqrep0/req.c8
-rw-r--r--src/protocol/reqrep0/xrep.c20
-rw-r--r--src/protocol/reqrep0/xreq.c16
-rw-r--r--src/protocol/survey0/respond.c8
-rw-r--r--src/protocol/survey0/survey.c12
-rw-r--r--src/protocol/survey0/xrespond.c20
-rw-r--r--src/protocol/survey0/xsurvey.c20
-rw-r--r--src/supplemental/http/http_client.c10
-rw-r--r--src/supplemental/http/http_conn.c8
-rw-r--r--src/supplemental/http/http_server.c20
-rw-r--r--src/supplemental/tcp/tcp.c12
-rw-r--r--src/supplemental/tls/mbedtls/tls.c10
-rw-r--r--src/supplemental/tls/tls_common.c6
-rw-r--r--src/supplemental/websocket/websocket.c20
-rw-r--r--src/transport/ipc/ipc.c22
-rw-r--r--src/transport/tcp/tcp.c26
-rw-r--r--src/transport/tls/tls.c32
-rw-r--r--src/transport/ws/websocket.c16
-rw-r--r--src/transport/zerotier/zerotier.c22
35 files changed, 395 insertions, 350 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 9f025f4c..a15bb47b 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -61,107 +61,75 @@ static nni_aio *nni_aio_expire_aio;
// operations from starting, without waiting for any existing one to
// complete, call nni_aio_close.
-// An nni_aio is an async I/O handle.
-struct nng_aio {
- int a_result; // Result code (nng_errno)
- size_t a_count; // Bytes transferred (I/O only)
- nni_time a_expire; // Absolute timeout
- nni_duration a_timeout; // Relative timeout
-
- bool a_stop; // shutting down (no new operations)
- bool a_sleep; // sleeping with no action
- bool a_expire_ok; // expire from sleep is ok
- nni_task a_task;
-
- // Read/write operations.
- nni_iov a_iov[8];
- unsigned a_niov;
-
- // Message operations.
- nni_msg *a_msg;
-
- // User scratch data. Consumers may store values here, which
- // must be preserved by providers and the framework.
- void *a_user_data[4];
-
- // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
- // specified. The semantics of these will vary, and depend on the
- // specific operation.
- void *a_inputs[4];
- void *a_outputs[4];
-
- // Provider-use fields.
- nni_aio_cancelfn a_cancel_fn;
- void * a_cancel_arg;
- nni_list_node a_prov_node;
- void * a_prov_extra[4]; // Extra data used by provider
-
- // Socket address. This turns out to be very useful, as we wind up
- // needing socket addresses for numerous connection related routines.
- // It would be cleaner to not have this and avoid burning the space,
- // but having this hear dramatically simplifies lots of code.
- nng_sockaddr a_sockaddr;
-
- // Expire node.
- nni_list_node a_expire_node;
-};
-
static void nni_aio_expire_add(nni_aio *);
-int
-nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
+void
+nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
{
- nni_aio *aio;
-
- if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
- return (NNG_ENOMEM);
- }
+ memset(aio, 0, sizeof(*aio));
nni_task_init(&aio->a_task, NULL, cb, arg);
aio->a_expire = NNI_TIME_NEVER;
aio->a_timeout = NNG_DURATION_INFINITE;
- *aiop = aio;
- return (0);
}
void
nni_aio_fini(nni_aio *aio)
{
- if (aio != NULL) {
+ nni_aio_cancelfn fn;
+ void * arg;
- nni_aio_cancelfn fn;
- void * arg;
+ // TODO: This probably could just use nni_aio_stop.
- // This is like aio_close, but we don't want to dispatch
- // the task. And unlike aio_stop, we don't want to wait
- // for the task. (Because we implicitly do task_fini.)
- nni_mtx_lock(&nni_aio_lk);
- fn = aio->a_cancel_fn;
- arg = aio->a_cancel_arg;
- aio->a_cancel_fn = NULL;
- aio->a_cancel_arg = NULL;
- aio->a_stop = true;
- nni_mtx_unlock(&nni_aio_lk);
+ // This is like aio_close, but we don't want to dispatch
+ // the task. And unlike aio_stop, we don't want to wait
+ // for the task. (Because we implicitly do task_fini.)
+ nni_mtx_lock(&nni_aio_lk);
+ fn = aio->a_cancel_fn;
+ arg = aio->a_cancel_arg;
+ aio->a_cancel_fn = NULL;
+ aio->a_cancel_arg = NULL;
+ aio->a_stop = true;
+ nni_mtx_unlock(&nni_aio_lk);
- if (fn != NULL) {
- fn(aio, arg, NNG_ECLOSED);
- }
+ if (fn != NULL) {
+ fn(aio, arg, NNG_ECLOSED);
+ }
- // Wait for the aio to be "done"; this ensures that we don't
- // destroy an aio from a "normal" completion callback while
- // the expiration thread is working.
+ // Wait for the aio to be "done"; this ensures that we don't
+ // destroy an aio from a "normal" completion callback while
+ // the expiration thread is working.
- nni_mtx_lock(&nni_aio_lk);
- while (nni_aio_expire_aio == aio) {
- if (nni_thr_is_self(&nni_aio_expire_thr)) {
- nni_aio_expire_aio = NULL;
- break;
- }
- nni_cv_wait(&nni_aio_expire_cv);
+ nni_mtx_lock(&nni_aio_lk);
+ while (nni_aio_expire_aio == aio) {
+ // TODO: It should be possible to remove this check!
+ if (nni_thr_is_self(&nni_aio_expire_thr)) {
+ nni_aio_expire_aio = NULL;
+ break;
}
- nni_mtx_unlock(&nni_aio_lk);
+ nni_cv_wait(&nni_aio_expire_cv);
+ }
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_fini(&aio->a_task);
+}
- nni_task_fini(&aio->a_task);
+int
+nni_aio_alloc(nni_aio **aiop, nni_cb cb, void *arg)
+{
+ nni_aio *aio;
+
+ if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_aio_init(aio, cb, arg);
+ *aiop = aio;
+ return (0);
+}
+void
+nni_aio_free(nni_aio *aio)
+{
+ if (aio != NULL) {
+ nni_aio_fini(aio);
NNI_FREE_STRUCT(aio);
}
}
@@ -189,7 +157,7 @@ nni_aio_set_iov(nni_aio *aio, unsigned niov, const nni_iov *iov)
// callback to complete, if still running. It also marks the AIO as
// stopped, preventing further calls to nni_aio_begin from succeeding.
// To correctly tear down an AIO, call stop, and make sure any other
-// callers are not also stopped, before calling nni_aio_fini to release
+// callers are not also stopped, before calling nni_aio_free to release
// actual memory.
void
nni_aio_stop(nni_aio *aio)
diff --git a/src/core/aio.h b/src/core/aio.h
index 83c068b5..c2776bc0 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -22,19 +22,29 @@ typedef void (*nni_aio_cancelfn)(nni_aio *, void *, int);
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern int nni_aio_init(nni_aio **, nni_cb, void *);
+extern void nni_aio_init(nni_aio *, nni_cb, void *arg);
-// nni_aio_fini finalizes the aio, releasing resources (locks)
-// associated with it. The caller is responsible for ensuring that any
-// associated I/O is unscheduled or complete. This is safe to call
-// on zero'd memory.
+// nni_aio_fini finalizes an aio object, releasing associated resources.
+// It waits for the callback to complete.
extern void nni_aio_fini(nni_aio *);
+// nni_aio_alloc allocates an aio object and initializes it. The callback
+// is called with the supplied argument when the operation is complete.
+// If NULL is supplied for the callback, then nni_aio_wake is used in its
+// place, and the aio is used for the argument.
+extern int nni_aio_alloc(nni_aio **, nni_cb, void *arg);
+
+// nni_aio_free frees the aio, releasing resources (locks)
+// associated with it. This is safe to call on zero'd memory.
+// This must only be called on an object that was allocated
+// with nni_aio_allocate.
+extern void nni_aio_free(nni_aio *aio);
+
// nni_aio_stop cancels any unfinished I/O, running completion callbacks,
// but also prevents any new operations from starting (nni_aio_start will
-// return NNG_ESTATE). This should be called before nni_aio_fini(). The
+// return NNG_ESTATE). This should be called before nni_aio_free(). The
// best pattern is to call nni_aio_stop on all linked aios, before calling
-// nni_aio_fini on any of them. This function will block until any
+// nni_aio_free on any of them. This function will block until any
// callbacks are executed, and therefore it should never be executed
// from a callback itself. (To abort operations without blocking
// use nni_aio_cancel instead.)
@@ -162,4 +172,54 @@ extern void nni_sleep_aio(nni_duration, nni_aio *);
extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);
+
+// An nni_aio is an async I/O handle. The details of this aio structure
+// are private to the AIO framework. The structure has the public name
+// (nng_aio) so that we minimize the pollution in the public API namespace.
+// It is a coding error for anything out side of the AIO framework to access
+// any of these members -- the definition is provided here to facilitate
+// inlining, but that should be the only use.
+struct nng_aio {
+ size_t a_count; // Bytes transferred (I/O only)
+ nni_time a_expire; // Absolute timeout
+ nni_duration a_timeout; // Relative timeout
+ int a_result; // Result code (nng_errno)
+ bool a_stop; // Shutting down (no new operations)
+ bool a_sleep; // Sleeping with no action
+ bool a_expire_ok; // Expire from sleep is ok
+ nni_task a_task;
+
+ // Read/write operations.
+ nni_iov a_iov[8];
+ unsigned a_niov;
+
+ // Message operations.
+ nni_msg *a_msg;
+
+ // User scratch data. Consumers may store values here, which
+ // must be preserved by providers and the framework.
+ void *a_user_data[4];
+
+ // Operation inputs & outputs. Up to 4 inputs and 4 outputs may be
+ // specified. The semantics of these will vary, and depend on the
+ // specific operation.
+ void *a_inputs[4];
+ void *a_outputs[4];
+
+ // Provider-use fields.
+ nni_aio_cancelfn a_cancel_fn;
+ void * a_cancel_arg;
+ nni_list_node a_prov_node; // Linkage on provider list.
+ void * a_prov_extra[4]; // Extra data used by provider
+
+ // Socket address. This turns out to be very useful, as we wind up
+ // needing socket addresses for numerous connection related routines.
+ // It would be cleaner to not have this and avoid burning the space,
+ // but having this hear dramatically simplifies lots of code.
+ nng_sockaddr a_sockaddr;
+
+ // Expire node.
+ nni_list_node a_expire_node;
+};
+
#endif // CORE_AIO_H
diff --git a/src/core/device.c b/src/core/device.c
index fee108a8..71480bbc 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -94,7 +94,7 @@ nni_device_fini(nni_device_data *dd)
}
for (i = 0; i < dd->npath; i++) {
nni_device_path *p = &dd->paths[i];
- nni_aio_fini(p->aio);
+ nni_aio_free(p->aio);
}
nni_mtx_fini(&dd->mtx);
NNI_FREE_STRUCT(dd);
@@ -172,7 +172,7 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
p->dst = i == 0 ? s2 : s1;
p->state = NNI_DEVICE_STATE_INIT;
- if ((rv = nni_aio_init(&p->aio, nni_device_cb, p)) != 0) {
+ if ((rv = nni_aio_alloc(&p->aio, nni_device_cb, p)) != 0) {
nni_device_fini(dd);
return (rv);
}
@@ -221,11 +221,11 @@ nni_device(nni_sock *s1, nni_sock *s2)
nni_aio * aio;
int rv;
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
return (rv);
}
if ((rv = nni_device_init(&dd, s1, s2)) != 0) {
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (rv);
}
nni_device_start(dd, aio);
@@ -233,6 +233,6 @@ nni_device(nni_sock *s1, nni_sock *s2)
rv = nni_aio_result(aio);
nni_device_fini(dd);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (rv);
}
diff --git a/src/core/dialer.c b/src/core/dialer.c
index e1178783..8a463452 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -60,8 +60,8 @@ nni_dialer_destroy(nni_dialer *d)
nni_aio_stop(d->d_con_aio);
nni_aio_stop(d->d_tmo_aio);
- nni_aio_fini(d->d_con_aio);
- nni_aio_fini(d->d_tmo_aio);
+ nni_aio_free(d->d_con_aio);
+ nni_aio_free(d->d_tmo_aio);
if (d->d_data != NULL) {
d->d_ops.d_fini(d->d_data);
@@ -204,8 +204,8 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_mtx_init(&d->d_mtx);
dialer_stats_init(d);
- if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
- ((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
+ if (((rv = nni_aio_alloc(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_alloc(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, d)) != 0) ||
((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
@@ -382,7 +382,7 @@ nni_dialer_start(nni_dialer *d, int flags)
if ((flags & NNG_FLAG_NONBLOCK) != 0) {
aio = NULL;
} else {
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
nni_atomic_flag_reset(&d->d_started);
return (rv);
}
@@ -397,7 +397,7 @@ nni_dialer_start(nni_dialer *d, int flags)
if (aio != NULL) {
nni_aio_wait(aio);
rv = nni_aio_result(aio);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
}
return (rv);
diff --git a/src/core/listener.c b/src/core/listener.c
index d44a5456..87d2c532 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -61,8 +61,8 @@ nni_listener_destroy(nni_listener *l)
nni_aio_stop(l->l_acc_aio);
nni_aio_stop(l->l_tmo_aio);
- nni_aio_fini(l->l_acc_aio);
- nni_aio_fini(l->l_tmo_aio);
+ nni_aio_free(l->l_acc_aio);
+ nni_aio_free(l->l_tmo_aio);
if (l->l_data != NULL) {
l->l_ops.l_fini(l->l_data);
@@ -196,8 +196,8 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *url_str)
NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
listener_stats_init(l);
- if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
- ((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
+ if (((rv = nni_aio_alloc(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
+ ((rv = nni_aio_alloc(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
((rv = l->l_ops.l_init(&l->l_data, url, l)) != 0) ||
((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) ||
((rv = nni_sock_add_listener(s, l)) != 0)) {
diff --git a/src/nng.c b/src/nng.c
index e601f785..b3179988 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1116,7 +1116,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) {
+ if ((rv = nni_aio_alloc(&aio, (nni_cb) cb, arg)) == 0) {
nng_aio_set_timeout(aio, NNG_DURATION_DEFAULT);
*app = aio;
}
@@ -1126,7 +1126,7 @@ nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
void
nng_aio_free(nng_aio *aio)
{
- nni_aio_fini(aio);
+ nni_aio_free(aio);
}
void
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index afb12ef6..dea228a1 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -68,7 +68,7 @@ bus0_sock_fini(void *arg)
{
bus0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_mtx_fini(&s->mtx);
}
@@ -80,7 +80,7 @@ bus0_sock_init(void *arg, nni_sock *nsock)
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
bus0_sock_fini(s);
return (rv);
}
@@ -99,7 +99,7 @@ bus0_sock_init_raw(void *arg, nni_sock *nsock)
NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
bus0_sock_fini(s);
return (rv);
}
@@ -142,10 +142,10 @@ bus0_pipe_fini(void *arg)
{
bus0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
nni_msgq_fini(p->sendq);
nni_mtx_fini(&p->mtx);
}
@@ -159,10 +159,10 @@ bus0_pipe_init(void *arg, nni_pipe *npipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
bus0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 860ac17f..730e5f5e 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -88,10 +88,10 @@ pair0_pipe_fini(void *arg)
{
pair0_pipe *p = arg;
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_getq);
}
static int
@@ -100,10 +100,10 @@ pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock)
pair0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
pair0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index 2838cb5d..b3b64a79 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -69,7 +69,7 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
}
@@ -88,7 +88,7 @@ pair1_sock_init_impl(void *arg, nni_sock *nsock, bool raw)
// Raw mode uses this.
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) {
pair1_sock_fini(s);
return (rv);
}
@@ -147,10 +147,10 @@ pair1_pipe_fini(void *arg)
{
pair1_pipe *p = arg;
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_getq);
nni_msgq_fini(p->sendq);
}
@@ -161,10 +161,10 @@ pair1_pipe_init(void *arg, nni_pipe *npipe, void *psock)
int rv;
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
pair1_pipe_fini(p);
return (NNG_ENOMEM);
}
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 64b47cef..8feb08b8 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -73,8 +73,8 @@ pull0_pipe_fini(void *arg)
{
pull0_pipe *p = arg;
- nni_aio_fini(p->putq_aio);
- nni_aio_fini(p->recv_aio);
+ nni_aio_free(p->putq_aio);
+ nni_aio_free(p->recv_aio);
}
static int
@@ -83,8 +83,8 @@ pull0_pipe_init(void *arg, nni_pipe *pipe, void *s)
pull0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
pull0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 5a932ece..90c94af9 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -89,9 +89,9 @@ push0_pipe_fini(void *arg)
{
push0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_getq);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_getq);
}
static int
@@ -100,9 +100,9 @@ push0_pipe_init(void *arg, nni_pipe *pipe, void *s)
push0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_recv, push0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, push0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, push0_getq_cb, p)) != 0)) {
push0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index a42e95ff..9b995c33 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -111,8 +111,8 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
nni_lmq_fini(&p->sendq);
}
@@ -130,8 +130,8 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
// XXX: consider making this depth tunable
if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
pub0_pipe_fini(p);
return (rv);
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 56da98f8..c5b84313 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -265,7 +265,7 @@ sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_recv);
}
static int
@@ -274,7 +274,7 @@ sub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
sub0_pipe *p = arg;
int rv;
- if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
+ if ((rv = nni_aio_alloc(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/pubsub0/xsub.c b/src/protocol/pubsub0/xsub.c
index be300df4..baa4f8eb 100644
--- a/src/protocol/pubsub0/xsub.c
+++ b/src/protocol/pubsub0/xsub.c
@@ -85,7 +85,7 @@ xsub0_pipe_fini(void *arg)
{
xsub0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_recv);
}
static int
@@ -94,7 +94,7 @@ xsub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
xsub0_pipe *p = arg;
int rv;
- if ((rv = nni_aio_init(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
+ if ((rv = nni_aio_alloc(&p->aio_recv, xsub0_recv_cb, p)) != 0) {
xsub0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index a715ab59..a29c3120 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -48,14 +48,14 @@ struct rep0_ctx {
// rep0_sock is our per-socket protocol private structure.
struct rep0_sock {
- nni_mtx lk;
- int ttl;
- nni_idhash * pipes;
- nni_list recvpipes; // list of pipes with data to receive
- nni_list recvq;
- rep0_ctx ctx;
- nni_pollable readable;
- nni_pollable writable;
+ nni_mtx lk;
+ int ttl;
+ nni_idhash * pipes;
+ nni_list recvpipes; // list of pipes with data to receive
+ nni_list recvq;
+ rep0_ctx ctx;
+ nni_pollable readable;
+ nni_pollable writable;
};
// rep0_pipe is our per-pipe protocol private structure.
@@ -63,8 +63,8 @@ struct rep0_pipe {
nni_pipe * pipe;
rep0_sock * rep;
uint32_t id;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node rnode; // receivable list linkage
nni_list sendq; // contexts waiting to send
bool busy;
@@ -193,8 +193,8 @@ rep0_ctx_send(void *arg, nni_aio *aio)
if (!p->busy) {
p->busy = true;
len = nni_msg_len(msg);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&s->lk);
nni_aio_set_msg(aio, NULL);
@@ -273,8 +273,8 @@ rep0_pipe_stop(void *arg)
{
rep0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -283,26 +283,22 @@ rep0_pipe_fini(void *arg)
rep0_pipe *p = arg;
nng_msg * msg;
- if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) {
- nni_aio_set_msg(p->aio_recv, NULL);
+ if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_msg_free(msg);
}
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
}
static int
rep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
rep0_pipe *p = arg;
- int rv;
- if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) {
- rep0_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p);
NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode);
@@ -329,7 +325,7 @@ rep0_pipe_start(void *arg)
}
// By definition, we have not received a request yet on this pipe,
// so it cannot cause us to become writable.
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -340,8 +336,8 @@ rep0_pipe_close(void *arg)
rep0_sock *s = p->rep;
rep0_ctx * ctx;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&s->lk);
if (nni_list_active(&s->recvpipes, p)) {
@@ -380,9 +376,9 @@ rep0_pipe_send_cb(void *arg)
nni_msg * msg;
size_t len;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -406,8 +402,8 @@ rep0_pipe_send_cb(void *arg)
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_aio_set_msg(aio, NULL);
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
nni_mtx_unlock(&s->lk);
@@ -462,13 +458,13 @@ rep0_ctx_recv(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->lk);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
+ msg = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_list_remove(&s->recvpipes, p);
if (nni_list_empty(&s->recvpipes)) {
nni_pollable_clear(&s->readable);
}
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(&s->writable);
}
@@ -496,12 +492,12 @@ rep0_pipe_recv_cb(void *arg)
size_t len;
int hops;
- if (nni_aio_result(p->aio_recv) != 0) {
+ if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}
- msg = nni_aio_get_msg(p->aio_recv);
+ msg = nni_aio_get_msg(&p->aio_recv);
nni_msg_set_pipe(msg, p->id);
@@ -521,7 +517,7 @@ rep0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -552,13 +548,13 @@ rep0_pipe_recv_cb(void *arg)
nni_list_remove(&s->recvq, ctx);
aio = ctx->raio;
ctx->raio = NULL;
- nni_aio_set_msg(p->aio_recv, NULL);
+ nni_aio_set_msg(&p->aio_recv, NULL);
if ((ctx == &s->ctx) && !p->busy) {
nni_pollable_raise(&s->writable);
}
// schedule another receive
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
ctx->btrace_len = len;
memcpy(ctx->btrace, nni_msg_header(msg), len);
@@ -573,8 +569,8 @@ rep0_pipe_recv_cb(void *arg)
drop:
nni_msg_free(msg);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
}
static int
diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c
index 879d6ae4..f339e68d 100644
--- a/src/protocol/reqrep0/rep_test.c
+++ b/src/protocol/reqrep0/rep_test.c
@@ -273,6 +273,26 @@ test_rep_close_pipe_during_send(void)
}
void
+test_rep_ctx_recv_aio_stopped(void)
+{
+ nng_socket rep;
+ nng_ctx ctx;
+ nng_aio * aio;
+
+ TEST_NNG_PASS(nng_rep0_open(&rep));
+ TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ TEST_NNG_PASS(nng_ctx_open(&ctx, rep));
+
+ nng_aio_stop(aio);
+ nng_ctx_recv(ctx, aio);
+ nng_aio_wait(aio);
+ TEST_NNG_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ TEST_NNG_PASS(nng_ctx_close(ctx));
+ TEST_NNG_PASS(nng_close(rep));
+ nng_aio_free(aio);
+}
+
+void
test_rep_close_pipe_context_send(void)
{
nng_socket rep;
@@ -424,6 +444,7 @@ TEST_LIST = {
{ "rep double recv", test_rep_double_recv },
{ "rep close pipe before send", test_rep_close_pipe_before_send },
{ "rep close pipe during send", test_rep_close_pipe_during_send },
+ { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped },
{ "rep close pipe context send", test_rep_close_pipe_context_send },
{ "rep close context send", test_rep_close_context_send },
{ "rep recv garbage", test_rep_recv_garbage },
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 33629abc..14da7143 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -184,8 +184,8 @@ req0_pipe_fini(void *arg)
{
req0_pipe *p = arg;
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_send);
}
static int
@@ -194,8 +194,8 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s)
req0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_recv, req0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, req0_send_cb, p)) != 0)) {
req0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 48f74075..308c0f0e 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -62,7 +62,7 @@ xrep0_sock_fini(void *arg)
{
xrep0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->lk);
}
@@ -75,7 +75,7 @@ xrep0_sock_init(void *arg, nni_sock *sock)
nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) {
+ ((rv = nni_aio_alloc(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) {
xrep0_sock_fini(s);
return (rv);
}
@@ -120,10 +120,10 @@ xrep0_pipe_fini(void *arg)
{
xrep0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
nni_msgq_fini(p->sendq);
}
@@ -146,10 +146,10 @@ xrep0_pipe_init(void *arg, nni_pipe *pipe, void *s)
// willing to receive replies. Something to think about for the
// future.)
if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) {
xrep0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 7455c986..15652f4f 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -96,10 +96,10 @@ xreq0_pipe_fini(void *arg)
{
xreq0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_send);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_send);
}
static int
@@ -108,10 +108,10 @@ xreq0_pipe_init(void *arg, nni_pipe *pipe, void *s)
xreq0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xreq0_send_cb, p)) != 0)) {
xreq0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index b4ffc917..06010d99 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -281,8 +281,8 @@ resp0_pipe_fini(void *arg)
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_free(msg);
}
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
}
static int
@@ -291,8 +291,8 @@ resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
resp0_pipe *p = arg;
int rv;
- if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) {
resp0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 8aa05dd4..35a14de7 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -286,9 +286,9 @@ surv0_pipe_fini(void *arg)
{
surv0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
nni_msgq_fini(p->sendq);
}
@@ -303,9 +303,9 @@ surv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
// is best effort, and a deep queue doesn't really do much for us.
// Note that surveys can be *outstanding*, but not yet put on the wire.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
surv0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 66b340ee..6318fe8b 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -62,7 +62,7 @@ xresp0_sock_fini(void *arg)
{
xresp0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
}
@@ -75,7 +75,7 @@ xresp0_sock_init(void *arg, nni_sock *nsock)
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
+ ((rv = nni_aio_alloc(&s->aio_getq, xresp0_sock_getq_cb, s)) != 0)) {
xresp0_sock_fini(s);
return (rv);
}
@@ -119,10 +119,10 @@ xresp0_pipe_fini(void *arg)
{
xresp0_pipe *p = arg;
- nni_aio_fini(p->aio_putq);
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
+ nni_aio_free(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
nni_msgq_fini(p->sendq);
}
@@ -133,10 +133,10 @@ xresp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
int rv;
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, xresp0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xresp0_send_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_putq, xresp0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xresp0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_getq, xresp0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xresp0_send_cb, p)) != 0)) {
xresp0_pipe_fini(p);
return (rv);
}
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 43c83793..86f912a2 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -60,7 +60,7 @@ xsurv0_sock_fini(void *arg)
{
xsurv0_sock *s = arg;
- nni_aio_fini(s->aio_getq);
+ nni_aio_free(s->aio_getq);
nni_mtx_fini(&s->mtx);
}
@@ -70,7 +70,7 @@ xsurv0_sock_init(void *arg, nni_sock *nsock)
xsurv0_sock *s = arg;
int rv;
- if ((rv = nni_aio_init(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->aio_getq, xsurv0_sock_getq_cb, s)) != 0) {
xsurv0_sock_fini(s);
return (rv);
}
@@ -116,10 +116,10 @@ xsurv0_pipe_fini(void *arg)
{
xsurv0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_send);
- nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
+ nni_aio_free(p->aio_getq);
+ nni_aio_free(p->aio_send);
+ nni_aio_free(p->aio_recv);
+ nni_aio_free(p->aio_putq);
nni_msgq_fini(p->sendq);
}
@@ -136,10 +136,10 @@ xsurv0_pipe_init(void *arg, nni_pipe *npipe, void *s)
// an expiration with them, so that we could discard any that are
// not delivered before their expiration date.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, xsurv0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, xsurv0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, xsurv0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->aio_getq, xsurv0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_putq, xsurv0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_send, xsurv0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->aio_recv, xsurv0_recv_cb, p)) != 0)) {
xsurv0_pipe_fini(p);
return (rv);
}
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index 3c60bd46..ecba84ae 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -91,7 +91,7 @@ http_dial_cb(void *arg)
void
nni_http_client_fini(nni_http_client *c)
{
- nni_aio_fini(c->aio);
+ nni_aio_free(c->aio);
nng_stream_dialer_free(c->dialer);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -132,7 +132,7 @@ nni_http_client_init(nni_http_client **cp, const nni_url *url)
return (rv);
}
- if ((rv = nni_aio_init(&c->aio, http_dial_cb, c)) != 0) {
+ if ((rv = nni_aio_alloc(&c->aio, http_dial_cb, c)) != 0) {
nni_http_client_fini(c);
return (rv);
}
@@ -251,7 +251,7 @@ http_txn_reap(void *arg)
}
}
nni_http_chunks_free(txn->chunks);
- nni_aio_fini(txn->aio);
+ nni_aio_free(txn->aio);
NNI_FREE_STRUCT(txn);
}
@@ -405,7 +405,7 @@ nni_http_transact_conn(
nni_aio_finish_error(aio, NNG_ENOMEM);
return;
}
- if ((rv = nni_aio_init(&txn->aio, http_txn_cb, txn)) != 0) {
+ if ((rv = nni_aio_alloc(&txn->aio, http_txn_cb, txn)) != 0) {
NNI_FREE_STRUCT(txn);
nni_aio_finish_error(aio, rv);
return;
@@ -450,7 +450,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req,
nni_aio_finish_error(aio, NNG_ENOMEM);
return;
}
- if ((rv = nni_aio_init(&txn->aio, http_txn_cb, txn)) != 0) {
+ if ((rv = nni_aio_alloc(&txn->aio, http_txn_cb, txn)) != 0) {
NNI_FREE_STRUCT(txn);
nni_aio_finish_error(aio, rv);
return;
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index 1fc2c34e..583f5eee 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -690,8 +690,8 @@ nni_http_conn_fini(nni_http_conn *conn)
}
nni_mtx_unlock(&conn->mtx);
- nni_aio_fini(conn->wr_aio);
- nni_aio_fini(conn->rd_aio);
+ nni_aio_free(conn->wr_aio);
+ nni_aio_free(conn->rd_aio);
nni_free(conn->rd_buf, conn->rd_bufsz);
nni_mtx_fini(&conn->mtx);
NNI_FREE_STRUCT(conn);
@@ -716,8 +716,8 @@ http_init(nni_http_conn **connp, nng_stream *data)
}
conn->rd_bufsz = HTTP_BUFSIZE;
- if (((rv = nni_aio_init(&conn->wr_aio, http_wr_cb, conn)) != 0) ||
- ((rv = nni_aio_init(&conn->rd_aio, http_rd_cb, conn)) != 0)) {
+ if (((rv = nni_aio_alloc(&conn->wr_aio, http_wr_cb, conn)) != 0) ||
+ ((rv = nni_aio_alloc(&conn->rd_aio, http_rd_cb, conn)) != 0)) {
nni_http_conn_fini(conn);
return (rv);
}
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index 06a93553..9a4ff2af 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -243,10 +243,10 @@ http_sconn_reap(void *arg)
}
nni_http_req_free(sc->req);
nni_http_res_free(sc->res);
- nni_aio_fini(sc->rxaio);
- nni_aio_fini(sc->txaio);
- nni_aio_fini(sc->txdataio);
- nni_aio_fini(sc->cbaio);
+ nni_aio_free(sc->rxaio);
+ nni_aio_free(sc->txaio);
+ nni_aio_free(sc->txdataio);
+ nni_aio_free(sc->cbaio);
// Now it is safe to release our reference on the server.
nni_mtx_lock(&s->mtx);
@@ -746,11 +746,11 @@ http_sconn_init(http_sconn **scp, nng_stream *stream)
}
if (((rv = nni_http_req_alloc(&sc->req, NULL)) != 0) ||
- ((rv = nni_aio_init(&sc->rxaio, http_sconn_rxdone, sc)) != 0) ||
- ((rv = nni_aio_init(&sc->txaio, http_sconn_txdone, sc)) != 0) ||
- ((rv = nni_aio_init(&sc->txdataio, http_sconn_txdatdone, sc)) !=
+ ((rv = nni_aio_alloc(&sc->rxaio, http_sconn_rxdone, sc)) != 0) ||
+ ((rv = nni_aio_alloc(&sc->txaio, http_sconn_txdone, sc)) != 0) ||
+ ((rv = nni_aio_alloc(&sc->txdataio, http_sconn_txdatdone, sc)) !=
0) ||
- ((rv = nni_aio_init(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) {
+ ((rv = nni_aio_alloc(&sc->cbaio, http_sconn_cbdone, sc)) != 0)) {
// Can't even accept the incoming request. Hard close.
http_sconn_close(sc);
return (rv);
@@ -838,7 +838,7 @@ http_server_fini(nni_http_server *s)
nni_mtx_unlock(&s->errors_mtx);
nni_mtx_fini(&s->errors_mtx);
- nni_aio_fini(s->accaio);
+ nni_aio_free(s->accaio);
nni_mtx_fini(&s->mtx);
nni_strfree(s->hostname);
NNI_FREE_STRUCT(s);
@@ -874,7 +874,7 @@ http_server_init(nni_http_server **serverp, const nni_url *url)
nni_mtx_init(&s->errors_mtx);
NNI_LIST_INIT(&s->errors, http_error, node);
- if ((rv = nni_aio_init(&s->accaio, http_server_acccb, s)) != 0) {
+ if ((rv = nni_aio_alloc(&s->accaio, http_server_acccb, s)) != 0) {
http_server_fini(s);
return (rv);
}
diff --git a/src/supplemental/tcp/tcp.c b/src/supplemental/tcp/tcp.c
index 02d5d6ce..dd6f28ff 100644
--- a/src/supplemental/tcp/tcp.c
+++ b/src/supplemental/tcp/tcp.c
@@ -157,8 +157,8 @@ tcp_dialer_free(void *arg)
nni_aio_stop(d->resaio);
nni_aio_stop(d->conaio);
- nni_aio_fini(d->resaio);
- nni_aio_fini(d->conaio);
+ nni_aio_free(d->resaio);
+ nni_aio_free(d->conaio);
if (d->d != NULL) {
nni_tcp_dialer_close(d->d);
@@ -234,8 +234,8 @@ tcp_dialer_alloc(tcp_dialer **dp)
nni_aio_list_init(&d->resaios);
nni_aio_list_init(&d->conaios);
- if (((rv = nni_aio_init(&d->resaio, tcp_dial_res_cb, d)) != 0) ||
- ((rv = nni_aio_init(&d->conaio, tcp_dial_con_cb, d)) != 0) ||
+ if (((rv = nni_aio_alloc(&d->resaio, tcp_dial_res_cb, d)) != 0) ||
+ ((rv = nni_aio_alloc(&d->conaio, tcp_dial_con_cb, d)) != 0) ||
((rv = nni_tcp_dialer_init(&d->d)) != 0)) {
tcp_dialer_free(d);
return (rv);
@@ -446,11 +446,11 @@ nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url)
nni_aio_wait(aio);
if ((rv = nni_aio_result(aio)) != 0) {
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (rv);
}
nni_aio_get_sockaddr(aio, &sa);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (tcp_listener_alloc_addr(lp, &sa));
}
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index d49c9de5..b0ac6ec5 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -290,12 +290,12 @@ tls_reap(void *arg)
}
nni_aio_stop(tls->tcp_send);
nni_aio_stop(tls->tcp_recv);
- nni_aio_fini(tls->com.aio);
+ nni_aio_free(tls->com.aio);
// And finalize / free everything.
nng_stream_free(tls->tcp);
- nni_aio_fini(tls->tcp_send);
- nni_aio_fini(tls->tcp_recv);
+ nni_aio_free(tls->tcp_send);
+ nni_aio_free(tls->tcp_recv);
mbedtls_ssl_free(&tls->ctx);
nng_tls_config_free(tls->com.cfg);
@@ -349,8 +349,8 @@ nni_tls_start(nng_stream *arg, nng_stream *tcp)
tp->tcp = tcp;
- if (((rv = nni_aio_init(&tp->tcp_send, tls_send_cb, tp)) != 0) ||
- ((rv = nni_aio_init(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) {
+ if (((rv = nni_aio_alloc(&tp->tcp_send, tls_send_cb, tp)) != 0) ||
+ ((rv = nni_aio_alloc(&tp->tcp_recv, tls_recv_cb, tp)) != 0)) {
return (rv);
}
diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c
index 39a4bff0..7cccc1e9 100644
--- a/src/supplemental/tls/tls_common.c
+++ b/src/supplemental/tls/tls_common.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -110,7 +110,7 @@ tls_dialer_dial(void *arg, nng_aio *aio)
}
com = (void *) tls;
- if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) {
+ if ((rv = nni_aio_alloc(&com->aio, tls_conn_cb, tls)) != 0) {
nni_aio_finish_error(aio, rv);
nng_stream_free(tls);
return;
@@ -394,7 +394,7 @@ tls_listener_accept(void *arg, nng_aio *aio)
return;
}
com = (void *) tls;
- if ((rv = nni_aio_init(&com->aio, tls_conn_cb, tls)) != 0) {
+ if ((rv = nni_aio_alloc(&com->aio, tls_conn_cb, tls)) != 0) {
nni_aio_finish_error(aio, rv);
nng_stream_free(tls);
return;
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index c7d3622c..df4a0834 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1230,11 +1230,11 @@ ws_fini(void *arg)
nni_strfree(ws->reqhdrs);
nni_strfree(ws->reshdrs);
- nni_aio_fini(ws->rxaio);
- nni_aio_fini(ws->txaio);
- nni_aio_fini(ws->closeaio);
- nni_aio_fini(ws->httpaio);
- nni_aio_fini(ws->connaio);
+ nni_aio_free(ws->rxaio);
+ nni_aio_free(ws->txaio);
+ nni_aio_free(ws->closeaio);
+ nni_aio_free(ws->httpaio);
+ nni_aio_free(ws->connaio);
nni_mtx_fini(&ws->mtx);
NNI_FREE_STRUCT(ws);
}
@@ -1411,11 +1411,11 @@ ws_init(nni_ws **wsp)
nni_aio_list_init(&ws->sendq);
nni_aio_list_init(&ws->recvq);
- if (((rv = nni_aio_init(&ws->closeaio, ws_close_cb, ws)) != 0) ||
- ((rv = nni_aio_init(&ws->txaio, ws_write_cb, ws)) != 0) ||
- ((rv = nni_aio_init(&ws->rxaio, ws_read_cb, ws)) != 0) ||
- ((rv = nni_aio_init(&ws->httpaio, ws_http_cb, ws)) != 0) ||
- ((rv = nni_aio_init(&ws->connaio, ws_conn_cb, ws)) != 0)) {
+ if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) ||
+ ((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) ||
+ ((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) ||
+ ((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) ||
+ ((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) {
ws_fini(ws);
return (rv);
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 0d8f12ae..4d5db82d 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -145,9 +145,9 @@ ipctran_pipe_fini(void *arg)
}
nni_mtx_unlock(&ep->mtx);
}
- nni_aio_fini(p->rxaio);
- nni_aio_fini(p->txaio);
- nni_aio_fini(p->negoaio);
+ nni_aio_free(p->rxaio);
+ nni_aio_free(p->txaio);
+ nni_aio_free(p->negoaio);
nng_stream_free(p->conn);
if (p->rxmsg) {
nni_msg_free(p->rxmsg);
@@ -177,9 +177,9 @@ ipctran_pipe_alloc(ipctran_pipe **pipep)
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->txaio, ipctran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) {
ipctran_pipe_fini(p);
return (rv);
}
@@ -699,8 +699,8 @@ ipctran_ep_fini(void *arg)
nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_aio_fini(ep->timeaio);
- nni_aio_fini(ep->connaio);
+ nni_aio_free(ep->timeaio);
+ nni_aio_free(ep->connaio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -840,7 +840,7 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
}
ep->ndialer = ndialer;
- if (((rv = nni_aio_init(&ep->connaio, ipctran_dial_cb, ep)) != 0) ||
+ if (((rv = nni_aio_alloc(&ep->connaio, ipctran_dial_cb, ep)) != 0) ||
((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) {
ipctran_ep_fini(ep);
return (rv);
@@ -863,8 +863,8 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener)
}
ep->nlistener = nlistener;
- if (((rv = nni_aio_init(&ep->connaio, ipctran_accept_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->timeaio, ipctran_timer_cb, ep)) != 0) ||
+ if (((rv = nni_aio_alloc(&ep->connaio, ipctran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_alloc(&ep->timeaio, ipctran_timer_cb, ep)) != 0) ||
((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
ipctran_ep_fini(ep);
return (rv);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 7748cd8f..32df5102 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -146,9 +146,9 @@ tcptran_pipe_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
}
- nni_aio_fini(p->rxaio);
- nni_aio_fini(p->txaio);
- nni_aio_fini(p->negoaio);
+ nni_aio_free(p->rxaio);
+ nni_aio_free(p->txaio);
+ nni_aio_free(p->negoaio);
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
nni_mtx_fini(&p->mtx);
@@ -176,9 +176,9 @@ tcptran_pipe_alloc(tcptran_pipe **pipep)
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) {
tcptran_pipe_fini(p);
return (rv);
}
@@ -652,8 +652,8 @@ tcptran_ep_fini(void *arg)
nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_aio_fini(ep->timeaio);
- nni_aio_fini(ep->connaio);
+ nni_aio_free(ep->timeaio);
+ nni_aio_free(ep->connaio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
@@ -736,7 +736,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl)
memcpy(src, surl->u_hostname, len);
src[len] = '\0';
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
nni_free(src, len + 1);
return (rv);
}
@@ -746,7 +746,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl)
if ((rv = nni_aio_result(aio)) == 0) {
nni_aio_get_sockaddr(aio, sa);
}
- nni_aio_fini(aio);
+ nni_aio_free(aio);
nni_free(src, len + 1);
return (rv);
}
@@ -904,7 +904,7 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
ep->ndialer = ndialer;
if ((rv != 0) ||
- ((rv = nni_aio_init(&ep->connaio, tcptran_dial_cb, ep)) != 0) ||
+ ((rv = nni_aio_alloc(&ep->connaio, tcptran_dial_cb, ep)) != 0) ||
((rv = nng_stream_dialer_alloc_url(&ep->dialer, &myurl)) != 0)) {
tcptran_ep_fini(ep);
return (rv);
@@ -942,8 +942,8 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
}
ep->nlistener = nlistener;
- if (((rv = nni_aio_init(&ep->connaio, tcptran_accept_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep)) != 0) ||
+ if (((rv = nni_aio_alloc(&ep->connaio, tcptran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_alloc(&ep->timeaio, tcptran_timer_cb, ep)) != 0) ||
((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
tcptran_ep_fini(ep);
return (rv);
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 58b43f2c..7869c380 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -151,9 +151,9 @@ tlstran_pipe_fini(void *arg)
}
nni_mtx_unlock(&ep->mtx);
}
- nni_aio_fini(p->rxaio);
- nni_aio_fini(p->txaio);
- nni_aio_fini(p->negoaio);
+ nni_aio_free(p->rxaio);
+ nni_aio_free(p->txaio);
+ nni_aio_free(p->negoaio);
nng_stream_free(p->tls);
nni_msg_free(p->rxmsg);
NNI_FREE_STRUCT(p);
@@ -170,9 +170,9 @@ tlstran_pipe_alloc(tlstran_pipe **pipep)
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->txaio, tlstran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) {
tlstran_pipe_fini(p);
return (rv);
}
@@ -626,8 +626,8 @@ tlstran_ep_fini(void *arg)
nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_aio_fini(ep->timeaio);
- nni_aio_fini(ep->connaio);
+ nni_aio_free(ep->timeaio);
+ nni_aio_free(ep->connaio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
@@ -709,7 +709,7 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl)
memcpy(src, surl->u_hostname, len);
src[len] = '\0';
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
nni_free(src, len + 1);
return (rv);
}
@@ -719,7 +719,7 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl)
if ((rv = nni_aio_result(aio)) == 0) {
nni_aio_get_sockaddr(aio, sa);
}
- nni_aio_fini(aio);
+ nni_aio_free(aio);
nni_free(src, len + 1);
return (rv);
}
@@ -872,7 +872,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
}
if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) ||
- ((rv = nni_aio_init(&ep->connaio, tlstran_dial_cb, ep)) != 0)) {
+ ((rv = nni_aio_alloc(&ep->connaio, tlstran_dial_cb, ep)) != 0)) {
return (rv);
}
ep->authmode = NNG_TLS_AUTH_MODE_REQUIRED;
@@ -923,8 +923,8 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
return (NNG_EADDRINVAL);
}
if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) ||
- ((rv = nni_aio_init(&ep->connaio, tlstran_accept_cb, ep)) != 0) ||
- ((rv = nni_aio_init(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) {
+ ((rv = nni_aio_alloc(&ep->connaio, tlstran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_alloc(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) {
return (rv);
}
@@ -942,7 +942,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
// be worse than the cost of just waiting here. We always recommend
// using local IP addresses rather than names when possible.
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ if ((rv = nni_aio_alloc(&aio, NULL, NULL)) != 0) {
tlstran_ep_fini(ep);
return (rv);
}
@@ -950,7 +950,7 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
nni_tcp_resolv(host, url->u_port, af, 1, aio);
nni_aio_wait(aio);
rv = nni_aio_result(aio);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
if ((rv != 0) ||
((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) ||
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 3424480a..e70b1fd4 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -205,8 +205,8 @@ wstran_pipe_fini(void *arg)
{
ws_pipe *p = arg;
- nni_aio_fini(p->rxaio);
- nni_aio_fini(p->txaio);
+ nni_aio_free(p->rxaio);
+ nni_aio_free(p->txaio);
nng_stream_free(p->ws);
nni_mtx_fini(&p->mtx);
@@ -238,8 +238,8 @@ wstran_pipe_alloc(ws_pipe **pipep, void *ws)
nni_mtx_init(&p->mtx);
// Initialize AIOs.
- if (((rv = nni_aio_init(&p->txaio, wstran_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) {
+ if (((rv = nni_aio_alloc(&p->txaio, wstran_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_alloc(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) {
wstran_pipe_fini(p);
return (rv);
}
@@ -381,7 +381,7 @@ wstran_dialer_fini(void *arg)
nni_aio_stop(d->connaio);
nng_stream_dialer_free(d->dialer);
- nni_aio_fini(d->connaio);
+ nni_aio_free(d->connaio);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
@@ -393,7 +393,7 @@ wstran_listener_fini(void *arg)
nni_aio_stop(l->accaio);
nng_stream_listener_free(l->listener);
- nni_aio_fini(l->accaio);
+ nni_aio_free(l->accaio);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -515,7 +515,7 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
nni_sock_peer_name(s));
if (((rv = nni_ws_dialer_alloc(&d->dialer, url)) != 0) ||
- ((rv = nni_aio_init(&d->connaio, wstran_connect_cb, d)) != 0) ||
+ ((rv = nni_aio_alloc(&d->connaio, wstran_connect_cb, d)) != 0) ||
((rv = nng_stream_dialer_set_bool(
d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_dialer_set_string(
@@ -551,7 +551,7 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
nni_sock_proto_name(s));
if (((rv = nni_ws_listener_alloc(&l->listener, url)) != 0) ||
- ((rv = nni_aio_init(&l->accaio, wstran_accept_cb, l)) != 0) ||
+ ((rv = nni_aio_alloc(&l->accaio, wstran_accept_cb, l)) != 0) ||
((rv = nng_stream_listener_set_bool(
l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_listener_set_string(
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index 027d46c2..9d166ad2 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -1327,12 +1327,12 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
return (-1);
}
- if (nni_aio_init(&aio, NULL, NULL) != 0) {
+ if (nni_aio_alloc(&aio, NULL, NULL) != 0) {
// Out of memory
return (-1);
}
if ((buf = nni_alloc(sizeof(*hdr) + len)) == NULL) {
- nni_aio_fini(aio);
+ nni_aio_free(aio);
return (-1);
}
@@ -1359,7 +1359,7 @@ zt_wire_packet_send(ZT_Node *node, void *userptr, void *thr, int64_t socket,
// care which. (There may be a few thread context switches, but
// none of them are going to have to wait for some unbounded time.)
nni_aio_wait(aio);
- nni_aio_fini(aio);
+ nni_aio_free(aio);
nni_free(hdr, hdr->len + sizeof(*hdr));
return (0);
@@ -1406,8 +1406,8 @@ zt_node_destroy(zt_node *ztn)
if (ztn->zn_flock != NULL) {
nni_file_unlock(ztn->zn_flock);
}
- nni_aio_fini(ztn->zn_rcv4_aio);
- nni_aio_fini(ztn->zn_rcv6_aio);
+ nni_aio_free(ztn->zn_rcv4_aio);
+ nni_aio_free(ztn->zn_rcv6_aio);
nni_idhash_fini(ztn->zn_eps);
nni_idhash_fini(ztn->zn_lpipes);
nni_idhash_fini(ztn->zn_rpipes);
@@ -1440,8 +1440,8 @@ zt_node_create(zt_node **ztnp, const char *path)
NNI_LIST_INIT(&ztn->zn_eplist, zt_ep, ze_link);
NNI_LIST_INIT(&ztn->zn_plist, zt_pipe, zp_link);
nni_cv_init(&ztn->zn_bgcv, &zt_lk);
- nni_aio_init(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn);
- nni_aio_init(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn);
+ nni_aio_alloc(&ztn->zn_rcv4_aio, zt_node_rcv4_cb, ztn);
+ nni_aio_alloc(&ztn->zn_rcv6_aio, zt_node_rcv6_cb, ztn);
if (((ztn->zn_rcv4_buf = nni_alloc(zt_rcv_bufsize)) == NULL) ||
((ztn->zn_rcv6_buf = nni_alloc(zt_rcv_bufsize)) == NULL)) {
@@ -1642,7 +1642,7 @@ zt_pipe_fini(void *arg)
zt_pipe *p = arg;
zt_node *ztn = p->zp_ztn;
- nni_aio_fini(p->zp_ping_aio);
+ nni_aio_free(p->zp_ping_aio);
// This tosses the connection details and all state.
nni_mtx_lock(&zt_lk);
@@ -1705,7 +1705,7 @@ zt_pipe_alloc(
rv = nni_idhash_insert(ztn->zn_lpipes, laddr, p);
}
if ((rv != 0) ||
- ((rv = nni_aio_init(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) {
+ ((rv = nni_aio_alloc(&p->zp_ping_aio, zt_pipe_ping_cb, p)) != 0)) {
zt_pipe_reap(p);
return (rv);
}
@@ -2078,7 +2078,7 @@ zt_ep_fini(void *arg)
{
zt_ep *ep = arg;
nni_aio_stop(ep->ze_creq_aio);
- nni_aio_fini(ep->ze_creq_aio);
+ nni_aio_free(ep->ze_creq_aio);
NNI_FREE_STRUCT(ep);
}
@@ -2154,7 +2154,7 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, nni_dialer *ndialer,
nni_aio_list_init(&ep->ze_aios);
- rv = nni_aio_init(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep);
+ rv = nni_aio_alloc(&ep->ze_creq_aio, zt_ep_conn_req_cb, ep);
if (rv != 0) {
zt_ep_fini(ep);
return (rv);