diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-02-08 14:16:23 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-02-08 18:54:09 -0800 |
| commit | d606317f5c028fa8fba5d5384b0ccd90ffa4eab5 (patch) | |
| tree | 3a6a70f6f2dc81b8722134752716f1c58bd1825c /src/transport | |
| parent | e9efefca683b244b40f831c554d7c72a745b8372 (diff) | |
| download | nng-d606317f5c028fa8fba5d5384b0ccd90ffa4eab5.tar.gz nng-d606317f5c028fa8fba5d5384b0ccd90ffa4eab5.tar.bz2 nng-d606317f5c028fa8fba5d5384b0ccd90ffa4eab5.zip | |
fixes #171 Refactor aio to use generic data fields
This addresses the use of the pipe special field, and eliminates it.
The message APIs (recvmsg, sendmsg) need to be updated as well still,
but I want to handle that as part of a separate issue.
While here we fixed various compiler warnings, etc.
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 135 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 8 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 8 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 8 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 16 | ||||
| -rw-r--r-- | src/transport/zerotier/zerotier.c | 12 |
6 files changed, 87 insertions, 100 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 6329a627..f9f438ab 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -217,14 +217,11 @@ nni_inproc_ep_fini(void *arg) } static void -nni_inproc_conn_finish(nni_aio *aio, int rv) +nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) { nni_inproc_ep *ep = nni_aio_get_prov_data(aio); - void * pipe; nni_aio_list_remove(aio); - pipe = nni_aio_get_pipe(aio); - nni_aio_set_pipe(aio, NULL); if ((ep != NULL) && (ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { @@ -232,11 +229,10 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) } if (rv == 0) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); } else { - if (pipe != NULL) { - nni_inproc_pipe_fini(pipe); - } + NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } } @@ -255,67 +251,81 @@ nni_inproc_ep_close(void *arg) // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_inproc_conn_finish(aio, NNG_ECLOSED); + nni_inproc_conn_finish(aio, NNG_ECLOSED, NULL); } nni_mtx_unlock(&nni_inproc.mx); } static void -nni_inproc_accept_clients(nni_inproc_ep *server) +nni_inproc_accept_clients(nni_inproc_ep *srv) { - nni_inproc_ep * client, *nclient; - nni_aio * saio, *caio; - nni_inproc_pair *pair; - int rv; + nni_inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { - nclient = nni_list_first(&server->clients); - while ((client = nclient) != NULL) { - nclient = nni_list_next(&server->clients, nclient); - NNI_LIST_FOREACH (&client->aios, caio) { - if ((saio = nni_list_first(&server->aios)) == NULL) { + nni_inproc_pipe *cpipe; + nni_inproc_pipe *spipe; + nni_inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. break; } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { - nni_inproc_conn_finish(caio, NNG_ENOMEM); - nni_inproc_conn_finish(saio, NNG_ENOMEM); + nni_inproc_conn_finish(caio, NNG_ENOMEM, NULL); + nni_inproc_conn_finish(saio, NNG_ENOMEM, NULL); continue; } + nni_mtx_init(&pair->mx); - if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + spipe = cpipe = NULL; + if (((rv = nni_inproc_pipe_init(&cpipe, cli)) != 0) || + ((rv = nni_inproc_pipe_init(&spipe, srv)) != 0) || + ((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { + + if (cpipe != NULL) { + nni_inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + nni_inproc_pipe_fini(spipe); + } + nni_inproc_conn_finish(caio, rv, NULL); + nni_inproc_conn_finish(saio, rv, NULL); nni_inproc_pair_destroy(pair); - nni_inproc_conn_finish(caio, rv); - nni_inproc_conn_finish(saio, rv); continue; } - nni_mtx_init(&pair->mx); - - pair->pipes[0] = nni_aio_get_pipe(caio); - pair->pipes[1] = nni_aio_get_pipe(saio); - pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; - pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; - pair->pipes[0]->pair = pair->pipes[1]->pair = pair; - pair->pipes[1]->peer = pair->pipes[0]->proto; - pair->pipes[0]->peer = pair->pipes[1]->proto; - pair->refcnt = 2; - - nni_inproc_conn_finish(caio, 0); - nni_inproc_conn_finish(saio, 0); + spipe->peer = cpipe->proto; + cpipe->peer = spipe->proto; + pair->pipes[0] = cpipe; + pair->pipes[1] = spipe; + pair->refcnt = 2; + cpipe->pair = spipe->pair = pair; + cpipe->rq = spipe->wq = pair->q[0]; + cpipe->wq = spipe->rq = pair->q[1]; + + nni_inproc_conn_finish(caio, 0, cpipe); + nni_inproc_conn_finish(saio, 0, spipe); } - if (nni_list_first(&client->aios) == NULL) { - // No more outstanding client connects. Normally - // there should only be one. - if (nni_list_active(&server->clients, client)) { - nni_list_remove(&server->clients, client); + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); } } } @@ -324,17 +334,12 @@ nni_inproc_accept_clients(nni_inproc_ep *server) static void nni_inproc_ep_cancel(nni_aio *aio, int rv) { - nni_inproc_ep * ep = nni_aio_get_prov_data(aio); - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); - if ((pipe = nni_aio_get_pipe(aio)) != NULL) { - nni_aio_set_pipe(aio, NULL); - nni_inproc_pipe_fini(pipe); - } nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); @@ -343,10 +348,9 @@ nni_inproc_ep_cancel(nni_aio *aio, int rv) static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_ep * server; - int rv; - nni_inproc_pipe *pipe; + nni_inproc_ep *ep = arg; + nni_inproc_ep *server; + int rv; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish_error(aio, NNG_EINVAL); @@ -359,13 +363,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { @@ -373,7 +370,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) } } if (server == NULL) { - nni_inproc_conn_finish(aio, NNG_ECONNREFUSED); + nni_inproc_conn_finish(aio, NNG_ECONNREFUSED, NULL); nni_mtx_unlock(&nni_inproc.mx); return; } @@ -407,9 +404,8 @@ nni_inproc_ep_bind(void *arg) static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep * ep = arg; - nni_inproc_pipe *pipe; - int rv; + nni_inproc_ep *ep = arg; + int rv; nni_mtx_lock(&nni_inproc.mx); @@ -419,16 +415,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) } // We are already on the master list of servers, thanks to bind. - - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&nni_inproc.mx); - return; - } - nni_aio_set_pipe(aio, pipe); - - // Insert us into the pending server aios, and then run the - // accept list. + // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); nni_inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 6475e43b..9ebf3145 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -579,20 +579,20 @@ nni_ipc_ep_finish(nni_ipc_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { NNI_ASSERT(pipe != NULL); - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1504d0ee..475a77ff 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -631,19 +631,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } if (pipe != NULL) { diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 09c59582..fc993501 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -653,19 +653,19 @@ nni_tls_ep_finish(nni_tls_ep *ep) if ((rv = nni_aio_result(ep->aio)) != 0) { goto done; } - NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL); + NNI_ASSERT(nni_aio_get_output(ep->aio, 0) != NULL); // Attempt to allocate the parent pipe. If this fails we'll // drop the connection (ENOMEM probably). - rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio)); + rv = nni_tls_pipe_init(&pipe, ep, nni_aio_get_output(ep->aio, 0)); done: - nni_aio_set_pipe(ep->aio, NULL); aio = ep->user_aio; ep->user_aio = NULL; if ((aio != NULL) && (rv == 0)) { - nni_aio_finish_pipe(aio, pipe); + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); return; } if (pipe != NULL) { diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 2fe7804c..9740cdde 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -205,7 +205,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) { ws_pipe *p; int rv; - nni_aio *aio; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -225,11 +224,6 @@ ws_pipe_init(ws_pipe **pipep, ws_ep *ep, void *ws) p->lproto = ep->lproto; p->ws = ws; - if ((aio = nni_list_first(&ep->aios)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_pipe(aio, p); - } - *pipep = p; return (0); } @@ -621,7 +615,7 @@ ws_ep_conn_cb(void *arg) nni_mtx_lock(&ep->mtx); if (nni_aio_result(caio) == 0) { - ws = nni_aio_get_pipe(caio); + ws = nni_aio_get_output(caio, 0); } if ((uaio = nni_list_first(&ep->aios)) == NULL) { // The client stopped caring about this! @@ -639,7 +633,8 @@ ws_ep_conn_cb(void *arg) nni_ws_fini(ws); nni_aio_finish_error(uaio, rv); } else { - nni_aio_finish_pipe(uaio, p); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } nni_mtx_unlock(&ep->mtx); } @@ -672,7 +667,7 @@ ws_ep_acc_cb(void *arg) nni_aio_finish_error(uaio, rv); } } else { - nni_ws *ws = nni_aio_get_pipe(aaio); + nni_ws *ws = nni_aio_get_output(aaio, 0); if (uaio != NULL) { ws_pipe *p; // Make a pipe @@ -681,7 +676,8 @@ ws_ep_acc_cb(void *arg) nni_ws_close(ws); nni_aio_finish_error(uaio, rv); } else { - nni_aio_finish_pipe(uaio, p); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } } } diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c index d6762d25..6dddbb9a 100644 --- a/src/transport/zerotier/zerotier.c +++ b/src/transport/zerotier/zerotier.c @@ -690,7 +690,8 @@ zt_ep_recv_conn_ack(zt_ep *ep, uint64_t raddr, const uint8_t *data, size_t len) nni_idhash_remove(ztn->zn_eps, ep->ze_laddr); ep->ze_laddr = 0; - nni_aio_finish_pipe(aio, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void @@ -2351,7 +2352,8 @@ zt_ep_doaccept(zt_ep *ep) } p->zp_peer = creq.cr_proto; zt_pipe_send_conn_ack(p); - nni_aio_finish_pipe(aio, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } } @@ -2398,14 +2400,16 @@ zt_ep_conn_req_cb(void *arg) ep->ze_creq_active = 0; switch ((rv = nni_aio_result(aio))) { case 0: + p = nni_aio_get_output(aio, 0); // Already canceled, or already handled? if ((uaio = nni_list_first(&ep->ze_aios)) != NULL) { nni_aio_list_remove(uaio); - nni_aio_finish_pipe(uaio, nni_aio_get_pipe(aio)); + nni_aio_set_output(uaio, 0, p); + nni_aio_finish(uaio, 0, 0); } else { // We have a pipe, but nowhere to stick it. // Just discard it. - zt_pipe_fini(nni_aio_get_pipe(aio)); + zt_pipe_fini(p); } ep->ze_creq_try = 0; break; |
