diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/sp/transport/tcp/tcp.c | 101 |
1 files changed, 48 insertions, 53 deletions
diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 51991cbd..2a4bb9e2 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -41,9 +41,9 @@ struct tcptran_pipe { size_t wantrxhead; nni_list recvq; nni_list sendq; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; nni_msg *rxmsg; nni_mtx mtx; }; @@ -58,8 +58,8 @@ struct tcptran_ep { const char *host; // for dialers int refcnt; // active pipes nni_aio *useraio; - nni_aio *connaio; - nni_aio *timeaio; + nni_aio connaio; + nni_aio timeaio; nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating @@ -109,9 +109,9 @@ tcptran_pipe_close(void *arg) p->closed = true; nni_mtx_unlock(&p->mtx); - nni_aio_close(p->rxaio); - nni_aio_close(p->txaio); - nni_aio_close(p->negoaio); + nni_aio_close(&p->rxaio); + nni_aio_close(&p->txaio); + nni_aio_close(&p->negoaio); nng_stream_close(p->conn); } @@ -121,9 +121,9 @@ tcptran_pipe_stop(void *arg) { tcptran_pipe *p = arg; - nni_aio_stop(p->rxaio); - nni_aio_stop(p->txaio); - nni_aio_stop(p->negoaio); + nni_aio_stop(&p->rxaio); + nni_aio_stop(&p->txaio); + nni_aio_stop(&p->negoaio); nng_stream_stop(p->conn); } @@ -154,9 +154,9 @@ tcptran_pipe_fini(void *arg) } nng_stream_free(p->conn); - nni_aio_free(p->rxaio); - nni_aio_free(p->txaio); - nni_aio_free(p->negoaio); + nni_aio_fini(&p->rxaio); + nni_aio_fini(&p->txaio); + nni_aio_fini(&p->negoaio); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); NNI_FREE_STRUCT(p); @@ -177,19 +177,14 @@ static int tcptran_pipe_alloc(tcptran_pipe **pipep) { tcptran_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) != - 0)) { - tcptran_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p); nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); nni_atomic_flag_reset(&p->reaped); @@ -222,7 +217,7 @@ tcptran_pipe_nego_cb(void *arg) { tcptran_pipe *p = arg; tcptran_ep *ep = p->ep; - nni_aio *aio = p->negoaio; + nni_aio *aio = &p->negoaio; nni_aio *uaio; int rv; @@ -307,7 +302,7 @@ tcptran_pipe_send_cb(void *arg) nni_aio *aio; size_t n; nni_msg *msg; - nni_aio *txaio = p->txaio; + nni_aio *txaio = &p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -354,7 +349,7 @@ tcptran_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg *msg; - nni_aio *rxaio = p->rxaio; + nni_aio *rxaio = &p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -464,7 +459,7 @@ tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) // The callback on the txaio will cause the user aio to // be canceled too. if (nni_list_first(&p->sendq) == aio) { - nni_aio_abort(p->txaio, rv); + nni_aio_abort(&p->txaio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -502,7 +497,7 @@ tcptran_pipe_send_start(tcptran_pipe *p) NNI_PUT64(p->txlen, len); - txaio = p->txaio; + txaio = &p->txaio; niov = 0; iov[0].iov_buf = p->txlen; iov[0].iov_len = sizeof(p->txlen); @@ -561,7 +556,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) // The callback on the rxaio will cause the user aio to // be canceled too. if (nni_list_first(&p->recvq) == aio) { - nni_aio_abort(p->rxaio, rv); + nni_aio_abort(&p->rxaio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -573,7 +568,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) static void tcptran_pipe_recv_start(tcptran_pipe *p) { - nni_aio *rxaio; + nni_aio *rxaio = &p->rxaio; nni_iov iov; NNI_ASSERT(p->rxmsg == NULL); @@ -590,7 +585,6 @@ tcptran_pipe_recv_start(tcptran_pipe *p) } // Schedule a read of the header. - rxaio = p->rxaio; iov.iov_buf = p->rxlen; iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); @@ -661,11 +655,11 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep) p->wanttxhead = 8; iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; - nni_aio_set_iov(p->negoaio, 1, &iov); + nni_aio_set_iov(&p->negoaio, 1, &iov); nni_list_append(&ep->negopipes, p); - nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate - nng_stream_send(p->conn, p->negoaio); + nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate + nng_stream_send(p->conn, &p->negoaio); } static void @@ -673,8 +667,8 @@ tcptran_ep_stop(void *arg) { tcptran_ep *ep = arg; - nni_aio_stop(ep->timeaio); - nni_aio_stop(ep->connaio); + nni_aio_stop(&ep->timeaio); + nni_aio_stop(&ep->connaio); nng_stream_dialer_stop(ep->dialer); nng_stream_listener_stop(ep->listener); } @@ -693,8 +687,8 @@ tcptran_ep_fini(void *arg) nni_mtx_unlock(&ep->mtx); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_free(ep->timeaio); - nni_aio_free(ep->connaio); + nni_aio_fini(&ep->timeaio); + nni_aio_fini(&ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); @@ -709,7 +703,7 @@ tcptran_ep_close(void *arg) nni_mtx_lock(&ep->mtx); ep->closed = true; - nni_aio_close(ep->timeaio); + nni_aio_close(&ep->timeaio); if (ep->dialer != NULL) { nng_stream_dialer_close(ep->dialer); } @@ -737,8 +731,8 @@ static void tcptran_timer_cb(void *arg) { tcptran_ep *ep = arg; - if (nni_aio_result(ep->timeaio) == 0) { - nng_stream_listener_accept(ep->listener, ep->connaio); + if (nni_aio_result(&ep->timeaio) == 0) { + nng_stream_listener_accept(ep->listener, &ep->connaio); } } @@ -746,7 +740,7 @@ static void tcptran_accept_cb(void *arg) { tcptran_ep *ep = arg; - nni_aio *aio = ep->connaio; + nni_aio *aio = &ep->connaio; tcptran_pipe *p; int rv; nng_stream *conn; @@ -770,7 +764,7 @@ tcptran_accept_cb(void *arg) goto error; } tcptran_pipe_start(p, conn, ep); - nng_stream_listener_accept(ep->listener, ep->connaio); + nng_stream_listener_accept(ep->listener, &ep->connaio); nni_mtx_unlock(&ep->mtx); return; @@ -785,12 +779,12 @@ error: case NNG_ENOMEM: case NNG_ENOFILES: - nng_sleep_aio(10, ep->timeaio); + nng_sleep_aio(10, &ep->timeaio); break; default: if (!ep->closed) { - nng_stream_listener_accept(ep->listener, ep->connaio); + nng_stream_listener_accept(ep->listener, &ep->connaio); } break; } @@ -801,7 +795,7 @@ static void tcptran_dial_cb(void *arg) { tcptran_ep *ep = arg; - nni_aio *aio = ep->connaio; + nni_aio *aio = &ep->connaio; tcptran_pipe *p; int rv; nng_stream *conn; @@ -891,9 +885,9 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) return (rv); } - if ((rv != 0) || - ((rv = nni_aio_alloc(&ep->connaio, tcptran_dial_cb, ep)) != 0) || - ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { + nni_aio_init(&ep->connaio, tcptran_dial_cb, ep); + + if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) { tcptran_ep_fini(ep); return (rv); } @@ -925,9 +919,10 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) return (rv); } - if (((rv = nni_aio_alloc(&ep->connaio, tcptran_accept_cb, ep)) != 0) || - ((rv = nni_aio_alloc(&ep->timeaio, tcptran_timer_cb, ep)) != 0) || - ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { + nni_aio_init(&ep->connaio, tcptran_accept_cb, ep); + nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep); + + if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { tcptran_ep_fini(ep); return (rv); } @@ -978,7 +973,7 @@ tcptran_ep_connect(void *arg, nni_aio *aio) } ep->useraio = aio; - nng_stream_dialer_dial(ep->dialer, ep->connaio); + nng_stream_dialer_dial(ep->dialer, &ep->connaio); nni_mtx_unlock(&ep->mtx); } @@ -1058,7 +1053,7 @@ tcptran_ep_accept(void *arg, nni_aio *aio) ep->useraio = aio; if (!ep->started) { ep->started = true; - nng_stream_listener_accept(ep->listener, ep->connaio); + nng_stream_listener_accept(ep->listener, &ep->connaio); } else { tcptran_ep_match(ep); } |
