diff options
Diffstat (limited to 'src/transport/tcp/tcp.c')
| -rw-r--r-- | src/transport/tcp/tcp.c | 137 |
1 files changed, 60 insertions, 77 deletions
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 28ec9438..1504d0ee 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -130,7 +130,7 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) static void nni_tcp_cancel_nego(nni_aio *aio, int rv) { - nni_tcp_pipe *p = aio->a_prov_data; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_negaio != aio) { @@ -140,7 +140,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv) p->user_negaio = NULL; nni_mtx_unlock(&p->mtx); - nni_aio_cancel(p->negaio, rv); + nni_aio_abort(p->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -164,18 +164,20 @@ nni_tcp_pipe_nego_cb(void *arg) } if (p->gottxhead < p->wanttxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead; - aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead]; + nni_iov iov; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txlen[p->gottxhead]; // send it down... + nni_aio_set_iov(aio, 1, &iov); nni_plat_tcp_pipe_send(p->tpp, aio); nni_mtx_unlock(&p->mtx); return; } if (p->gotrxhead < p->wantrxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead; - aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead]; + nni_iov iov; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxlen[p->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, aio); nni_mtx_unlock(&p->mtx); return; @@ -206,7 +208,7 @@ nni_tcp_pipe_send_cb(void *arg) int rv; nni_aio * aio; size_t n; - nng_msg * msg; + nni_msg * msg; nni_aio * txaio = p->txaio; nni_mtx_lock(&p->mtx); @@ -226,20 +228,8 @@ nni_tcp_pipe_send_cb(void *arg) } n = nni_aio_count(txaio); - while (n) { - NNI_ASSERT(txaio->a_niov != 0); - if (txaio->a_iov[0].iov_len > n) { - txaio->a_iov[0].iov_len -= n; - NNI_INCPTR(txaio->a_iov[0].iov_buf, n); - break; - } - n -= txaio->a_iov[0].iov_len; - for (int i = 0; i < txaio->a_niov; i++) { - txaio->a_iov[i] = txaio->a_iov[i + 1]; - } - txaio->a_niov--; - } - if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) > 0) { nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); return; @@ -261,7 +251,7 @@ nni_tcp_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg * msg; - nni_aio * rxaio = p->rxaio; + nni_aio * rxaio; nni_mtx_lock(&p->mtx); @@ -271,26 +261,15 @@ nni_tcp_pipe_recv_cb(void *arg) return; } - if ((rv = nni_aio_result(p->rxaio)) != 0) { + rxaio = p->rxaio; + + if ((rv = nni_aio_result(rxaio)) != 0) { goto recv_error; } - n = nni_aio_count(p->rxaio); - while (n) { - NNI_ASSERT(rxaio->a_niov != 0); - if (rxaio->a_iov[0].iov_len > n) { - rxaio->a_iov[0].iov_len -= n; - NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); - break; - } - n -= rxaio->a_iov[0].iov_len; - rxaio->a_niov--; - for (int i = 0; i < rxaio->a_niov; i++) { - rxaio->a_iov[i] = rxaio->a_iov[i + 1]; - } - } - // Was this a partial read? If so then resubmit for the rest. - if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + n = nni_aio_count(rxaio); + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) > 0) { nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; @@ -311,17 +290,18 @@ nni_tcp_pipe_recv_cb(void *arg) goto recv_error; } - if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { goto recv_error; } // Submit the rest of the data for a read -- we want to // read the entire message now. if (len != 0) { - rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg); - rxaio->a_iov[0].iov_len = (size_t) len; - rxaio->a_niov = 1; + nni_iov iov; + iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); return; @@ -348,7 +328,7 @@ recv_error: static void nni_tcp_cancel_tx(nni_aio *aio, int rv) { - nni_tcp_pipe *p = aio->a_prov_data; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_txaio != aio) { @@ -359,7 +339,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(p->txaio, rv); + nni_aio_abort(p->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -371,6 +351,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) uint64_t len; nni_aio * txaio; int niov; + nni_iov iov[3]; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -385,22 +366,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(p->txlen, len); - niov = 0; - txaio = p->txaio; - txaio->a_iov[niov].iov_buf = p->txlen; - txaio->a_iov[niov].iov_len = sizeof(p->txlen); + niov = 0; + txaio = p->txaio; + iov[niov].iov_buf = p->txlen; + iov[niov].iov_len = sizeof(p->txlen); niov++; if (nni_msg_header_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_header(msg); - txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); niov++; } if (nni_msg_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_body(msg); - txaio->a_iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); niov++; } - txaio->a_niov = niov; + nni_aio_set_iov(txaio, niov, iov); nni_plat_tcp_pipe_send(p->tpp, txaio); nni_mtx_unlock(&p->mtx); @@ -409,7 +390,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) static void nni_tcp_cancel_rx(nni_aio *aio, int rv) { - nni_tcp_pipe *p = aio->a_prov_data; + nni_tcp_pipe *p = nni_aio_get_prov_data(aio); nni_mtx_lock(&p->mtx); if (p->user_rxaio != aio) { @@ -420,7 +401,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv) nni_mtx_unlock(&p->mtx); // cancel the underlying operation. - nni_aio_cancel(p->rxaio, rv); + nni_aio_abort(p->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -429,6 +410,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; nni_aio * rxaio; + nni_iov iov; nni_mtx_lock(&p->mtx); @@ -441,10 +423,10 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(p->rxmsg == NULL); // Schedule a read of the TCP header. - rxaio = p->rxaio; - rxaio->a_iov[0].iov_buf = p->rxlen; - rxaio->a_iov[0].iov_len = sizeof(p->rxlen); - rxaio->a_niov = 1; + rxaio = p->rxaio; + iov.iov_buf = p->rxlen; + iov.iov_len = sizeof(p->rxlen); + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_tcp_pipe_recv(p->tpp, rxaio); nni_mtx_unlock(&p->mtx); @@ -463,7 +445,7 @@ nni_tcp_pipe_getopt_locaddr(void *arg, void *v, size_t *szp) { nni_tcp_pipe *p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) { @@ -477,7 +459,7 @@ nni_tcp_pipe_getopt_remaddr(void *arg, void *v, size_t *szp) { nni_tcp_pipe *p = arg; int rv; - nng_sockaddr sa; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) { @@ -492,6 +474,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) { nni_tcp_pipe *p = arg; nni_aio * negaio; + nni_iov iov; nni_mtx_lock(&p->mtx); p->txlen[0] = 0; @@ -501,15 +484,15 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&p->txlen[4], p->proto); NNI_PUT16(&p->txlen[6], 0); - p->user_negaio = aio; - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - negaio = p->negaio; - negaio->a_niov = 1; - negaio->a_iov[0].iov_len = 8; - negaio->a_iov[0].iov_buf = &p->txlen[0]; + p->user_negaio = aio; + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + negaio = p->negaio; + iov.iov_len = 8; + iov.iov_buf = &p->txlen[0]; + nni_aio_set_iov(negaio, 1, &iov); if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) { nni_mtx_unlock(&p->mtx); return; @@ -572,7 +555,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) if (mode == NNI_EP_MODE_DIAL) { passive = 0; lsa.s_un.s_family = NNG_AF_UNSPEC; - aio->a_addr = &rsa; + nni_aio_set_input(aio, 0, &rsa); if ((host == NULL) || (serv == NULL)) { nni_aio_fini(aio); return (NNG_EADDRINVAL); @@ -580,7 +563,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode) } else { passive = 1; rsa.s_un.s_family = NNG_AF_UNSPEC; - aio->a_addr = &lsa; + nni_aio_set_input(aio, 0, &lsa); } nni_plat_tcp_resolv(host, serv, NNG_AF_UNSPEC, passive, aio); @@ -685,7 +668,7 @@ nni_tcp_ep_cb(void *arg) static void nni_tcp_cancel_ep(nni_aio *aio, int rv) { - nni_tcp_ep *ep = aio->a_prov_data; + nni_tcp_ep *ep = nni_aio_get_prov_data(aio); nni_mtx_lock(&ep->mtx); if (ep->user_aio != aio) { @@ -695,7 +678,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(ep->aio, rv); + nni_aio_abort(ep->aio, rv); nni_aio_finish_error(aio, rv); } |
