aboutsummaryrefslogtreecommitdiff
path: root/src/transport/tcp/tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/tcp/tcp.c')
-rw-r--r--src/transport/tcp/tcp.c137
1 files changed, 60 insertions, 77 deletions
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 28ec9438..1504d0ee 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -130,7 +130,7 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
static void
nni_tcp_cancel_nego(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = aio->a_prov_data;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_negaio != aio) {
@@ -140,7 +140,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv)
p->user_negaio = NULL;
nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(p->negaio, rv);
+ nni_aio_abort(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -164,18 +164,20 @@ nni_tcp_pipe_nego_cb(void *arg)
}
if (p->gottxhead < p->wanttxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead;
- aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead];
+ nni_iov iov;
+ iov.iov_len = p->wanttxhead - p->gottxhead;
+ iov.iov_buf = &p->txlen[p->gottxhead];
// send it down...
+ nni_aio_set_iov(aio, 1, &iov);
nni_plat_tcp_pipe_send(p->tpp, aio);
nni_mtx_unlock(&p->mtx);
return;
}
if (p->gotrxhead < p->wantrxhead) {
- aio->a_niov = 1;
- aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead;
- aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead];
+ nni_iov iov;
+ iov.iov_len = p->wantrxhead - p->gotrxhead;
+ iov.iov_buf = &p->rxlen[p->gotrxhead];
+ nni_aio_set_iov(aio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, aio);
nni_mtx_unlock(&p->mtx);
return;
@@ -206,7 +208,7 @@ nni_tcp_pipe_send_cb(void *arg)
int rv;
nni_aio * aio;
size_t n;
- nng_msg * msg;
+ nni_msg * msg;
nni_aio * txaio = p->txaio;
nni_mtx_lock(&p->mtx);
@@ -226,20 +228,8 @@ nni_tcp_pipe_send_cb(void *arg)
}
n = nni_aio_count(txaio);
- while (n) {
- NNI_ASSERT(txaio->a_niov != 0);
- if (txaio->a_iov[0].iov_len > n) {
- txaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(txaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= txaio->a_iov[0].iov_len;
- for (int i = 0; i < txaio->a_niov; i++) {
- txaio->a_iov[i] = txaio->a_iov[i + 1];
- }
- txaio->a_niov--;
- }
- if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) {
+ nni_aio_iov_advance(txaio, n);
+ if (nni_aio_iov_count(txaio) > 0) {
nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -261,7 +251,7 @@ nni_tcp_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg * msg;
- nni_aio * rxaio = p->rxaio;
+ nni_aio * rxaio;
nni_mtx_lock(&p->mtx);
@@ -271,26 +261,15 @@ nni_tcp_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(p->rxaio)) != 0) {
+ rxaio = p->rxaio;
+
+ if ((rv = nni_aio_result(rxaio)) != 0) {
goto recv_error;
}
- n = nni_aio_count(p->rxaio);
- while (n) {
- NNI_ASSERT(rxaio->a_niov != 0);
- if (rxaio->a_iov[0].iov_len > n) {
- rxaio->a_iov[0].iov_len -= n;
- NNI_INCPTR(rxaio->a_iov[0].iov_buf, n);
- break;
- }
- n -= rxaio->a_iov[0].iov_len;
- rxaio->a_niov--;
- for (int i = 0; i < rxaio->a_niov; i++) {
- rxaio->a_iov[i] = rxaio->a_iov[i + 1];
- }
- }
- // Was this a partial read? If so then resubmit for the rest.
- if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) {
+ n = nni_aio_count(rxaio);
+ nni_aio_iov_advance(rxaio, n);
+ if (nni_aio_iov_count(rxaio) > 0) {
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -311,17 +290,18 @@ nni_tcp_pipe_recv_cb(void *arg)
goto recv_error;
}
- if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
+ if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
goto recv_error;
}
// Submit the rest of the data for a read -- we want to
// read the entire message now.
if (len != 0) {
- rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- rxaio->a_iov[0].iov_len = (size_t) len;
- rxaio->a_niov = 1;
+ nni_iov iov;
+ iov.iov_buf = nni_msg_body(p->rxmsg);
+ iov.iov_len = (size_t) len;
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
@@ -348,7 +328,7 @@ recv_error:
static void
nni_tcp_cancel_tx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = aio->a_prov_data;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_txaio != aio) {
@@ -359,7 +339,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(p->txaio, rv);
+ nni_aio_abort(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -371,6 +351,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
uint64_t len;
nni_aio * txaio;
int niov;
+ nni_iov iov[3];
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -385,22 +366,22 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- niov = 0;
- txaio = p->txaio;
- txaio->a_iov[niov].iov_buf = p->txlen;
- txaio->a_iov[niov].iov_len = sizeof(p->txlen);
+ niov = 0;
+ txaio = p->txaio;
+ iov[niov].iov_buf = p->txlen;
+ iov[niov].iov_len = sizeof(p->txlen);
niov++;
if (nni_msg_header_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_header(msg);
- txaio->a_iov[niov].iov_len = nni_msg_header_len(msg);
+ iov[niov].iov_buf = nni_msg_header(msg);
+ iov[niov].iov_len = nni_msg_header_len(msg);
niov++;
}
if (nni_msg_len(msg) > 0) {
- txaio->a_iov[niov].iov_buf = nni_msg_body(msg);
- txaio->a_iov[niov].iov_len = nni_msg_len(msg);
+ iov[niov].iov_buf = nni_msg_body(msg);
+ iov[niov].iov_len = nni_msg_len(msg);
niov++;
}
- txaio->a_niov = niov;
+ nni_aio_set_iov(txaio, niov, iov);
nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
@@ -409,7 +390,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
static void
nni_tcp_cancel_rx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *p = aio->a_prov_data;
+ nni_tcp_pipe *p = nni_aio_get_prov_data(aio);
nni_mtx_lock(&p->mtx);
if (p->user_rxaio != aio) {
@@ -420,7 +401,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(p->rxaio, rv);
+ nni_aio_abort(p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -429,6 +410,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
nni_aio * rxaio;
+ nni_iov iov;
nni_mtx_lock(&p->mtx);
@@ -441,10 +423,10 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- rxaio = p->rxaio;
- rxaio->a_iov[0].iov_buf = p->rxlen;
- rxaio->a_iov[0].iov_len = sizeof(p->rxlen);
- rxaio->a_niov = 1;
+ rxaio = p->rxaio;
+ iov.iov_buf = p->rxlen;
+ iov.iov_len = sizeof(p->rxlen);
+ nni_aio_set_iov(rxaio, 1, &iov);
nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
@@ -463,7 +445,7 @@ nni_tcp_pipe_getopt_locaddr(void *arg, void *v, size_t *szp)
{
nni_tcp_pipe *p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_plat_tcp_pipe_sockname(p->tpp, &sa)) == 0) {
@@ -477,7 +459,7 @@ nni_tcp_pipe_getopt_remaddr(void *arg, void *v, size_t *szp)
{
nni_tcp_pipe *p = arg;
int rv;
- nng_sockaddr sa;
+ nni_sockaddr sa;
memset(&sa, 0, sizeof(sa));
if ((rv = nni_plat_tcp_pipe_peername(p->tpp, &sa)) == 0) {
@@ -492,6 +474,7 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
nni_aio * negaio;
+ nni_iov iov;
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
@@ -501,15 +484,15 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&p->txlen[4], p->proto);
NNI_PUT16(&p->txlen[6], 0);
- p->user_negaio = aio;
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- negaio = p->negaio;
- negaio->a_niov = 1;
- negaio->a_iov[0].iov_len = 8;
- negaio->a_iov[0].iov_buf = &p->txlen[0];
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txlen[0];
+ nni_aio_set_iov(negaio, 1, &iov);
if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
@@ -572,7 +555,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
if (mode == NNI_EP_MODE_DIAL) {
passive = 0;
lsa.s_un.s_family = NNG_AF_UNSPEC;
- aio->a_addr = &rsa;
+ nni_aio_set_input(aio, 0, &rsa);
if ((host == NULL) || (serv == NULL)) {
nni_aio_fini(aio);
return (NNG_EADDRINVAL);
@@ -580,7 +563,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
} else {
passive = 1;
rsa.s_un.s_family = NNG_AF_UNSPEC;
- aio->a_addr = &lsa;
+ nni_aio_set_input(aio, 0, &lsa);
}
nni_plat_tcp_resolv(host, serv, NNG_AF_UNSPEC, passive, aio);
@@ -685,7 +668,7 @@ nni_tcp_ep_cb(void *arg)
static void
nni_tcp_cancel_ep(nni_aio *aio, int rv)
{
- nni_tcp_ep *ep = aio->a_prov_data;
+ nni_tcp_ep *ep = nni_aio_get_prov_data(aio);
nni_mtx_lock(&ep->mtx);
if (ep->user_aio != aio) {
@@ -695,7 +678,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(ep->aio, rv);
+ nni_aio_abort(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}