summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c45
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/endpt.c9
-rw-r--r--src/core/url.c4
-rw-r--r--src/nng.c2
-rw-r--r--src/platform/posix/posix_epdesc.c5
-rw-r--r--src/platform/windows/win_ipc.c6
-rw-r--r--src/platform/windows/win_tcp.c10
-rw-r--r--src/supplemental/http/http_client.c2
-rw-r--r--src/supplemental/http/http_public.c2
-rw-r--r--src/supplemental/http/http_server.c2
-rw-r--r--src/supplemental/websocket/websocket.c9
-rw-r--r--src/transport/inproc/inproc.c135
-rw-r--r--src/transport/ipc/ipc.c8
-rw-r--r--src/transport/tcp/tcp.c8
-rw-r--r--src/transport/tls/tls.c8
-rw-r--r--src/transport/ws/websocket.c16
-rw-r--r--src/transport/zerotier/zerotier.c12
-rw-r--r--tests/httpserver.c2
-rw-r--r--tests/resolv.c22
20 files changed, 137 insertions, 173 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index ee1ddf2a..1c293020 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -74,17 +74,14 @@ struct nng_aio {
// Read/write operations.
nni_iov *a_iov;
- int a_niov;
+ unsigned a_niov;
nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is short
nni_iov *a_iovalloc; // dynamically allocated IOVs
- int a_niovalloc; // number of allocated IOVs
+ unsigned a_niovalloc; // number of allocated IOVs
// Message operations.
nni_msg *a_msg;
- // Connect/accept operations.
- void *a_pipe; // opaque pipe handle
-
// User scratch data. Consumers may store values here, which
// must be preserved by providers and the framework.
void *a_user_data[4];
@@ -218,18 +215,6 @@ nni_aio_get_msg(nni_aio *aio)
}
void
-nni_aio_set_pipe(nni_aio *aio, void *p)
-{
- aio->a_pipe = p;
-}
-
-void *
-nni_aio_get_pipe(nni_aio *aio)
-{
- return (aio->a_pipe);
-}
-
-void
nni_aio_set_data(nni_aio *aio, int index, void *data)
{
if ((index >= 0) && (index < NNI_NUM_ELEMENTS(aio->a_user_data))) {
@@ -330,6 +315,9 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
aio->a_active = 1;
+ for (int i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
+ aio->a_outputs[i] = NULL;
+ }
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
@@ -370,8 +358,7 @@ nni_aio_abort(nni_aio *aio, int rv)
// I/O provider related functions.
static void
-nni_aio_finish_impl(
- nni_aio *aio, int result, size_t count, void *pipe, nni_msg *msg)
+nni_aio_finish_impl(nni_aio *aio, int rv, size_t count, nni_msg *msg)
{
nni_mtx_lock(&nni_aio_lk);
@@ -380,12 +367,9 @@ nni_aio_finish_impl(
nni_list_node_remove(&aio->a_expire_node);
aio->a_pend = 1;
- aio->a_result = result;
+ aio->a_result = rv;
aio->a_count = count;
aio->a_prov_cancel = NULL;
- if (pipe) {
- aio->a_pipe = pipe;
- }
if (msg) {
aio->a_msg = msg;
}
@@ -409,27 +393,20 @@ nni_aio_finish_impl(
void
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
- nni_aio_finish_impl(aio, result, count, NULL, NULL);
+ nni_aio_finish_impl(aio, result, count, NULL);
}
void
nni_aio_finish_error(nni_aio *aio, int result)
{
- nni_aio_finish_impl(aio, result, 0, NULL, NULL);
-}
-
-void
-nni_aio_finish_pipe(nni_aio *aio, void *pipe)
-{
- NNI_ASSERT(pipe != NULL);
- nni_aio_finish_impl(aio, 0, 0, pipe, NULL);
+ nni_aio_finish_impl(aio, result, 0, NULL);
}
void
nni_aio_finish_msg(nni_aio *aio, nni_msg *msg)
{
NNI_ASSERT(msg != NULL);
- nni_aio_finish_impl(aio, 0, nni_msg_len(msg), NULL, msg);
+ nni_aio_finish_impl(aio, 0, nni_msg_len(msg), msg);
}
void
@@ -609,7 +586,7 @@ nni_aio_iov_count(nni_aio *aio)
{
size_t resid = 0;
- for (int i = 0; i < aio->a_niov; i++) {
+ for (unsigned i = 0; i < aio->a_niov; i++) {
resid += aio->a_iov[i].iov_len;
}
return (resid);
diff --git a/src/core/aio.h b/src/core/aio.h
index b17c8e97..718eeb91 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -78,8 +78,6 @@ extern void *nni_aio_get_output(nni_aio *, int);
// XXX: These should be refactored in terms of generic inputs and outputs.
extern void nni_aio_set_msg(nni_aio *, nni_msg *);
extern nni_msg *nni_aio_get_msg(nni_aio *);
-extern void nni_aio_set_pipe(nni_aio *, void *);
-extern void * nni_aio_get_pipe(nni_aio *);
// nni_aio_set_synch sets a synchronous completion flag on the AIO.
// When this is set, the next time the AIO is completed, the callback
@@ -126,7 +124,6 @@ extern int nni_aio_list_active(nni_aio *);
// nni_aio_finish is called by the provider when an operation is complete.
extern void nni_aio_finish(nni_aio *, int, size_t);
extern void nni_aio_finish_error(nni_aio *, int);
-extern void nni_aio_finish_pipe(nni_aio *, void *);
extern void nni_aio_finish_msg(nni_aio *, nni_msg *);
// nni_aio_abort is used to abort an operation. Any pending I/O or
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 4a2c3097..b7167ad7 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -364,7 +364,7 @@ nni_ep_con_cb(void *arg)
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
+ rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
}
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
@@ -448,7 +448,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// As we're synchronous, we also have to handle the completion.
if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) {
+ ((rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
@@ -464,8 +464,8 @@ nni_ep_acc_cb(void *arg)
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(nni_aio_get_pipe(aio) != NULL);
- rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
+ NNI_ASSERT(nni_aio_get_output(aio, 0) != NULL);
+ rv = nni_pipe_create(ep, nni_aio_get_output(aio, 0));
}
nni_mtx_lock(&ep->ep_mtx);
@@ -503,7 +503,6 @@ nni_ep_acc_start(nni_ep *ep)
if (ep->ep_closing) {
return;
}
- nni_aio_set_pipe(aio, NULL);
ep->ep_ops.ep_accept(ep->ep_data, aio);
}
diff --git a/src/core/url.c b/src/core/url.c
index 6a2a4e53..2cdb43c2 100644
--- a/src/core/url.c
+++ b/src/core/url.c
@@ -284,7 +284,7 @@ nni_url_parse(nni_url **urlp, const char *raw)
rv = NNG_ENOMEM;
goto error;
}
- for (int i = 0; i < len; i++) {
+ for (size_t i = 0; i < len; i++) {
url->u_scheme[i] = tolower(s[i]);
}
url->u_scheme[len] = '\0';
@@ -334,7 +334,7 @@ nni_url_parse(nni_url **urlp, const char *raw)
}
// Copy the host portion, but make it lower case (hostnames are
// case insensitive).
- for (int i = 0; i < len; i++) {
+ for (size_t i = 0; i < len; i++) {
url->u_host[i] = tolower(s[i]);
}
url->u_host[len] = '\0';
diff --git a/src/nng.c b/src/nng.c
index ef5b5292..f552e26b 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1140,7 +1140,7 @@ void
nng_aio_finish(nng_aio *aio, int rv)
{
// Preserve the count.
- return (nni_aio_finish(aio, rv, nni_aio_count(aio)));
+ nni_aio_finish(aio, rv, nni_aio_count(aio));
}
#if 0
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index 931ed052..b5e65826 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -74,7 +74,8 @@ nni_posix_epdesc_finish(nni_aio *aio, int rv, int newfd)
if (rv != 0) {
nni_aio_finish_error(aio, rv);
} else {
- nni_aio_finish_pipe(aio, pd);
+ nni_aio_set_output(aio, 0, pd);
+ nni_aio_finish(aio, 0, 0);
}
}
@@ -317,7 +318,6 @@ nni_posix_epdesc_accept(nni_posix_epdesc *ed, nni_aio *aio)
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
nni_mtx_lock(&ed->mtx);
- nni_aio_set_pipe(aio, NULL);
// If we can't start, it means that the AIO was stopped.
if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
nni_mtx_unlock(&ed->mtx);
@@ -343,7 +343,6 @@ nni_posix_epdesc_connect(nni_posix_epdesc *ed, nni_aio *aio)
int fd;
nni_mtx_lock(&ed->mtx);
- nni_aio_set_pipe(aio, NULL);
// If we can't start, it means that the AIO was stopped.
if ((rv = nni_aio_start(aio, nni_posix_epdesc_cancel, ed)) != 0) {
nni_mtx_unlock(&ed->mtx);
diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c
index 63ba8a67..9851af25 100644
--- a/src/platform/windows/win_ipc.c
+++ b/src/platform/windows/win_ipc.c
@@ -321,7 +321,8 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio)
return;
}
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
}
static void
@@ -448,7 +449,8 @@ nni_win_ipc_conn_thr(void *arg)
((rv = nni_win_iocp_register(p)) != 0)) {
goto fail;
}
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
continue;
fail:
diff --git a/src/platform/windows/win_tcp.c b/src/platform/windows/win_tcp.c
index 254cf40b..6d58d495 100644
--- a/src/platform/windows/win_tcp.c
+++ b/src/platform/windows/win_tcp.c
@@ -105,7 +105,7 @@ nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio)
DWORD niov;
DWORD flags;
nni_plat_tcp_pipe *pipe = evt->ptr;
- int i;
+ unsigned i;
unsigned naiov;
nni_iov * aiov;
WSABUF * iov;
@@ -469,7 +469,8 @@ nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio)
memcpy(&pipe->sockname, sa1, len1);
memcpy(&pipe->peername, sa2, len2);
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
}
static int
@@ -512,7 +513,6 @@ nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio)
void
nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio)
{
- nni_aio_set_pipe(aio, NULL);
nni_win_event_submit(&ep->acc_ev, aio);
}
@@ -562,7 +562,8 @@ nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio)
len = sizeof(pipe->sockname);
(void) getsockname(s, (SOCKADDR *) &pipe->sockname, &len);
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
}
static int
@@ -632,7 +633,6 @@ nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio)
extern void
nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio)
{
- nni_aio_set_pipe(aio, NULL);
nni_win_event_submit(&ep->con_ev, aio);
}
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index 918b7b09..9058b6e2 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -44,7 +44,7 @@ http_conn_done(void *arg)
nni_mtx_lock(&c->mtx);
rv = nni_aio_result(c->connaio);
- p = rv == 0 ? nni_aio_get_pipe(c->connaio) : NULL;
+ p = rv == 0 ? nni_aio_get_output(c->connaio, 0) : NULL;
if ((aio = nni_list_first(&c->aios)) == NULL) {
if (p != NULL) {
nni_plat_tcp_pipe_fini(p);
diff --git a/src/supplemental/http/http_public.c b/src/supplemental/http/http_public.c
index 9d23ed8e..5437a000 100644
--- a/src/supplemental/http/http_public.c
+++ b/src/supplemental/http/http_public.c
@@ -737,7 +737,7 @@ void
nng_http_client_connect(nng_http_client *cli, nng_aio *aio)
{
#ifdef NNG_SUPP_HTTP
- return (nni_http_client_connect(cli, aio));
+ nni_http_client_connect(cli, aio);
#else
NNI_ARG_UNUSED(cli);
if (nni_aio_start(aio, NULL, NULL)) {
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index 4a426c8c..4c89708d 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -719,7 +719,7 @@ http_server_acccb(void *arg)
nni_mtx_unlock(&s->mtx);
return;
}
- tcp = nni_aio_get_pipe(aio);
+ tcp = nni_aio_get_output(aio, 0);
if (s->closed) {
// If we're closing, then reject this one.
nni_plat_tcp_pipe_fini(tcp);
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index ad4ce196..05a2c62c 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1174,7 +1174,8 @@ ws_http_cb_listener(nni_ws *ws, nni_aio *aio)
ws->ready = true;
if ((aio = nni_list_first(&l->aios)) != NULL) {
nni_list_remove(&l->aios, aio);
- nni_aio_finish_pipe(aio, ws);
+ nni_aio_set_output(aio, 0, ws);
+ nni_aio_finish(aio, 0, 0);
} else {
nni_list_append(&l->pend, ws);
}
@@ -1269,7 +1270,8 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
nni_list_remove(&d->wspend, ws);
ws->ready = true;
ws->useraio = NULL;
- nni_aio_finish_pipe(uaio, ws);
+ nni_aio_set_output(uaio, 0, ws);
+ nni_aio_finish(uaio, 0, 0);
nni_mtx_unlock(&d->mtx);
return;
err:
@@ -1621,7 +1623,8 @@ nni_ws_listener_accept(nni_ws_listener *l, nni_aio *aio)
}
if ((ws = nni_list_first(&l->pend)) != NULL) {
nni_list_remove(&l->pend, ws);
- nni_aio_finish_pipe(aio, ws);
+ nni_aio_set_output(aio, 0, ws);
+ nni_aio_finish(aio, 0, 0);
} else {
nni_list_append(&l->aios, aio);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 6329a627..f9f438ab 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg)
}
static void
-nni_inproc_conn_finish(nni_aio *aio, int rv)
+nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe)
{
nni_inproc_ep *ep = nni_aio_get_prov_data(aio);
- void * pipe;
nni_aio_list_remove(aio);
- pipe = nni_aio_get_pipe(aio);
- nni_aio_set_pipe(aio, NULL);
if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) &&
nni_list_empty(&ep->aios)) {
@@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv)
}
if (rv == 0) {
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
} else {
- if (pipe != NULL) {
- nni_inproc_pipe_fini(pipe);
- }
+ NNI_ASSERT(pipe == NULL);
nni_aio_finish_error(aio, rv);
}
}
@@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg)
// Notify any waiting clients that we are closed.
while ((client = nni_list_first(&ep->clients)) != NULL) {
while ((aio = nni_list_first(&client->aios)) != NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED);
+ nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
}
nni_list_remove(&ep->clients, client);
}
while ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_inproc_conn_finish(aio, NNG_ECLOSED);
+ nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL);
}
nni_mtx_unlock(&nni_inproc.mx);
}
static void
-nni_inproc_accept_clients(nni_inproc_ep *server)
+nni_inproc_accept_clients(nni_inproc_ep *srv)
{
- nni_inproc_ep * client, *nclient;
- nni_aio * saio, *caio;
- nni_inproc_pair *pair;
- int rv;
+ nni_inproc_ep *cli, *nclient;
+
+ nclient = nni_list_first(&srv->clients);
+ while ((cli = nclient) != NULL) {
+ nni_aio *caio;
+ nclient = nni_list_next(&srv->clients, nclient);
+ NNI_LIST_FOREACH (&cli->aios, caio) {
- nclient = nni_list_first(&server->clients);
- while ((client = nclient) != NULL) {
- nclient = nni_list_next(&server->clients, nclient);
- NNI_LIST_FOREACH (&client->aios, caio) {
- if ((saio = nni_list_first(&server->aios)) == NULL) {
+ nni_inproc_pipe *cpipe;
+ nni_inproc_pipe *spipe;
+ nni_inproc_pair *pair;
+ nni_aio * saio;
+ int rv;
+
+ if ((saio = nni_list_first(&srv->aios)) == NULL) {
// No outstanding accept() calls.
break;
}
if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) {
- nni_inproc_conn_finish(caio, NNG_ENOMEM);
- nni_inproc_conn_finish(saio, NNG_ENOMEM);
+ nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL);
+ nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL);
continue;
}
+ nni_mtx_init(&pair->mx);
- if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
+ spipe = cpipe = NULL;
+ if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) ||
+ ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) ||
+ ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) ||
((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) {
+
+ if (cpipe != NULL) {
+ nni_inproc_pipe_fini(cpipe);
+ }
+ if (spipe != NULL) {
+ nni_inproc_pipe_fini(spipe);
+ }
+ nni_inproc_conn_finish(caio, rv, NULL);
+ nni_inproc_conn_finish(saio, rv, NULL);
nni_inproc_pair_destroy(pair);
- nni_inproc_conn_finish(caio, rv);
- nni_inproc_conn_finish(saio, rv);
continue;
}
- nni_mtx_init(&pair->mx);
-
- pair->pipes[0] = nni_aio_get_pipe(caio);
- pair->pipes[1] = nni_aio_get_pipe(saio);
- pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
- pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1];
- pair->pipes[0]->pair = pair->pipes[1]->pair = pair;
- pair->pipes[1]->peer = pair->pipes[0]->proto;
- pair->pipes[0]->peer = pair->pipes[1]->proto;
- pair->refcnt = 2;
-
- nni_inproc_conn_finish(caio, 0);
- nni_inproc_conn_finish(saio, 0);
+ spipe->peer = cpipe->proto;
+ cpipe->peer = spipe->proto;
+ pair->pipes[0] = cpipe;
+ pair->pipes[1] = spipe;
+ pair->refcnt = 2;
+ cpipe->pair = spipe->pair = pair;
+ cpipe->rq = spipe->wq = pair->q[0];
+ cpipe->wq = spipe->rq = pair->q[1];
+
+ nni_inproc_conn_finish(caio, 0, cpipe);
+ nni_inproc_conn_finish(saio, 0, spipe);
}
- if (nni_list_first(&client->aios) == NULL) {
- // No more outstanding client connects. Normally
- // there should only be one.
- if (nni_list_active(&server->clients, client)) {
- nni_list_remove(&server->clients, client);
+ if (nni_list_first(&cli->aios) == NULL) {
+ // No more outstanding client connects.
+ // Normally there should only be one.
+ if (nni_list_active(&srv->clients, cli)) {
+ nni_list_remove(&srv->clients, cli);
}
}
}
@@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server)
static void
nni_inproc_ep_cancel(nni_aio *aio, int rv)
{
- nni_inproc_ep * ep = nni_aio_get_prov_data(aio);
- nni_inproc_pipe *pipe;
+ nni_inproc_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&nni_inproc.mx);
if (nni_aio_list_active(aio)) {
nni_aio_list_remove(aio);
nni_list_node_remove(&ep->node);
- if ((pipe = nni_aio_get_pipe(aio)) != NULL) {
- nni_aio_set_pipe(aio, NULL);
- nni_inproc_pipe_fini(pipe);
- }
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&nni_inproc.mx);
@@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv)
static void
nni_inproc_ep_connect(void *arg, nni_aio *aio)
{
- nni_inproc_ep * ep = arg;
- nni_inproc_ep * server;
- int rv;
- nni_inproc_pipe *pipe;
+ nni_inproc_ep *ep = arg;
+ nni_inproc_ep *server;
+ int rv;
if (ep->mode != NNI_EP_MODE_DIAL) {
nni_aio_finish_error(aio, NNG_EINVAL);
@@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
return;
}
- if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
- nni_aio_set_pipe(aio, pipe);
-
// Find a server.
NNI_LIST_FOREACH (&nni_inproc.servers, server) {
if (strcmp(server->addr, ep->addr) == 0) {
@@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio)
}
}
if (server == NULL) {
- nni_inproc_conn_finish(aio, NNG_ECONNREFUSED);
+ nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL);
nni_mtx_unlock(&nni_inproc.mx);
return;
}
@@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg)
static void
nni_inproc_ep_accept(void *arg, nni_aio *aio)
{
- nni_inproc_ep * ep = arg;
- nni_inproc_pipe *pipe;
- int rv;
+ nni_inproc_ep *ep = arg;
+ int rv;
nni_mtx_lock(&nni_inproc.mx);
@@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio)
}
// We are already on the master list of servers, thanks to bind.
-
- if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) {
- nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&nni_inproc.mx);
- return;
- }
- nni_aio_set_pipe(aio, pipe);
-
- // Insert us into the pending server aios, and then run the
- // accept list.
+ // Insert us into pending server aios, and then run accept list.
nni_aio_list_append(&ep->aios, aio);
nni_inproc_accept_clients(ep);
nni_mtx_unlock(&nni_inproc.mx);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 6475e43b..9ebf3145 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -579,20 +579,20 @@ nni_ipc_ep_finish(nni_ipc_ep *ep)
if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
+ NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
+ rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0));
done:
- nni_aio_set_pipe(ep->aio, NULL);
aio = ep->user_aio;
ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
NNI_ASSERT(pipe != NULL);
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
return;
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 1504d0ee..475a77ff 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -631,19 +631,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep)
if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
+ NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
+ rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0));
done:
- nni_aio_set_pipe(ep->aio, NULL);
aio = ep->user_aio;
ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
return;
}
if (pipe != NULL) {
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 09c59582..fc993501 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -653,19 +653,19 @@ nni_tls_ep_finish(nni_tls_ep *ep)
if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
+ NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
+ rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0));
done:
- nni_aio_set_pipe(ep->aio, NULL);
aio = ep->user_aio;
ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
- nni_aio_finish_pipe(aio, pipe);
+ nni_aio_set_output(aio, 0, pipe);
+ nni_aio_finish(aio, 0, 0);
return;
}
if (pipe != NULL) {
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 2fe7804c..9740cdde 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -205,7 +205,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
{
ws_pipe *p;
int rv;
- nni_aio *aio;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -225,11 +224,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws)
p->lproto = ep->lproto;
p->ws = ws;
- if ((aio = nni_list_first(&ep->aios)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_pipe(aio, p);
- }
-
*pipep = p;
return (0);
}
@@ -621,7 +615,7 @@ ws_ep_conn_cb(void *arg)
nni_mtx_lock(&ep->mtx);
if (nni_aio_result(caio) == 0) {
- ws = nni_aio_get_pipe(caio);
+ ws = nni_aio_get_output(caio, 0);
}
if ((uaio = nni_list_first(&ep->aios)) == NULL) {
// The client stopped caring about this!
@@ -639,7 +633,8 @@ ws_ep_conn_cb(void *arg)
nni_ws_fini(ws);
nni_aio_finish_error(uaio, rv);
} else {
- nni_aio_finish_pipe(uaio, p);
+ nni_aio_set_output(uaio, 0, p);
+ nni_aio_finish(uaio, 0, 0);
}
nni_mtx_unlock(&ep->mtx);
}
@@ -672,7 +667,7 @@ ws_ep_acc_cb(void *arg)
nni_aio_finish_error(uaio, rv);
}
} else {
- nni_ws *ws = nni_aio_get_pipe(aaio);
+ nni_ws *ws = nni_aio_get_output(aaio, 0);
if (uaio != NULL) {
ws_pipe *p;
// Make a pipe
@@ -681,7 +676,8 @@ ws_ep_acc_cb(void *arg)
nni_ws_close(ws);
nni_aio_finish_error(uaio, rv);
} else {
- nni_aio_finish_pipe(uaio, p);
+ nni_aio_set_output(uaio, 0, p);
+ nni_aio_finish(uaio, 0, 0);
}
}
}
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index d6762d25..6dddbb9a 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -690,7 +690,8 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len)
nni_idhash_remove(ztn->zn_eps, ep->ze_laddr);
ep->ze_laddr = 0;
- nni_aio_finish_pipe(aio, p);
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
@@ -2351,7 +2352,8 @@ zt_ep_doaccept(zt_ep *ep)
}
p->zp_peer = creq.cr_proto;
zt_pipe_send_conn_ack(p);
- nni_aio_finish_pipe(aio, p);
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
}
@@ -2398,14 +2400,16 @@ zt_ep_conn_req_cb(void *arg)
ep->ze_creq_active = 0;
switch ((rv = nni_aio_result(aio))) {
case 0:
+ p = nni_aio_get_output(aio, 0);
// Already canceled, or already handled?
if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) {
nni_aio_list_remove(uaio);
- nni_aio_finish_pipe(uaio, nni_aio_get_pipe(aio));
+ nni_aio_set_output(uaio, 0, p);
+ nni_aio_finish(uaio, 0, 0);
} else {
// We have a pipe, but nowhere to stick it.
// Just discard it.
- zt_pipe_fini(nni_aio_get_pipe(aio));
+ zt_pipe_fini(p);
}
ep->ze_creq_try = 0;
break;
diff --git a/tests/httpserver.c b/tests/httpserver.c
index a62bae25..bbdd8423 100644
--- a/tests/httpserver.c
+++ b/tests/httpserver.c
@@ -200,7 +200,7 @@ TestMain("HTTP Server", {
nng_aio_wait(aio);
So(nng_aio_result(aio) == 0);
- h = nni_aio_get_output(aio, 0);
+ h = nng_aio_get_output(aio, 0);
So(h != NULL);
So(nng_http_req_alloc(&req, url) == 0);
So(nng_http_res_alloc(&res) == 0);
diff --git a/tests/resolv.c b/tests/resolv.c
index 607b3b11..9d83e6df 100644
--- a/tests/resolv.c
+++ b/tests/resolv.c
@@ -52,20 +52,20 @@ ip6tostr(void *addr)
// the normal assumptions on Linux do *not* hold true.
#if 0
Convey("Localhost IPv6 resolves", {
- nni_aio aio;
- memset(&aio, 0, sizeof (aio));
+ nng_aio *aio;
const char *str;
- nni_aio_init(&aio, NULL, NULL);
+ nng_sockaddr sa;
+ So(nng_aio_alloc(&aio, NULL, NULL) == 0);
+ So(nng_aio_set_input(aio, 0, &sa) == 0);
nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET6, 1,
- &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
- So(aio.a_naddrs == 1);
- So(aio.a_addrs[0].s_un.s_in6.sa_family == NNG_AF_INET6);
- So(aio.a_addrs[0].s_un.s_in6.sa_port == ntohs(80));
- str = ip6tostr(&aio.a_addrs[0].s_un.s_in6.sa_addr);
+ aio);
+ nng_aio_wait(aio);
+ So(nng_aio_result(aio) == 0);
+ So(sa.s_un.s_in6.sa_family == NNG_AF_INET6);
+ So(sa.s_un.s_in6.sa_port == ntohs(80));
+ str = ip6tostr(&sa.s_un.s_in6.sa_addr);
So(strcmp(str, "::1") == 0);
- nni_aio_fini(&aio);
+ nng_aio_free(aio);
}
#endif