aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-15 15:36:48 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-15 16:06:27 -0800
commit1a552b12afb509c260e52cae4df5e1de6ba00e88 (patch)
tree19a75fd5cf10f1a022299974bdf3229c43ee968d /src
parentf3b5b0fbac1a328b29b6d4cfa3639a94b0118275 (diff)
downloadnng-1a552b12afb509c260e52cae4df5e1de6ba00e88.tar.gz
nng-1a552b12afb509c260e52cae4df5e1de6ba00e88.tar.bz2
nng-1a552b12afb509c260e52cae4df5e1de6ba00e88.zip
tcp transport: inline aios (eliminate failure paths)
Diffstat (limited to 'src')
-rw-r--r--src/sp/transport/tcp/tcp.c101
1 files changed, 48 insertions, 53 deletions
diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c
index 51991cbd..2a4bb9e2 100644
--- a/src/sp/transport/tcp/tcp.c
+++ b/src/sp/transport/tcp/tcp.c
@@ -41,9 +41,9 @@ struct tcptran_pipe {
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
- nni_aio *txaio;
- nni_aio *rxaio;
- nni_aio *negoaio;
+ nni_aio txaio;
+ nni_aio rxaio;
+ nni_aio negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -58,8 +58,8 @@ struct tcptran_ep {
const char *host; // for dialers
int refcnt; // active pipes
nni_aio *useraio;
- nni_aio *connaio;
- nni_aio *timeaio;
+ nni_aio connaio;
+ nni_aio timeaio;
nni_list busypipes; // busy pipes -- ones passed to socket
nni_list waitpipes; // pipes waiting to match to socket
nni_list negopipes; // pipes busy negotiating
@@ -109,9 +109,9 @@ tcptran_pipe_close(void *arg)
p->closed = true;
nni_mtx_unlock(&p->mtx);
- nni_aio_close(p->rxaio);
- nni_aio_close(p->txaio);
- nni_aio_close(p->negoaio);
+ nni_aio_close(&p->rxaio);
+ nni_aio_close(&p->txaio);
+ nni_aio_close(&p->negoaio);
nng_stream_close(p->conn);
}
@@ -121,9 +121,9 @@ tcptran_pipe_stop(void *arg)
{
tcptran_pipe *p = arg;
- nni_aio_stop(p->rxaio);
- nni_aio_stop(p->txaio);
- nni_aio_stop(p->negoaio);
+ nni_aio_stop(&p->rxaio);
+ nni_aio_stop(&p->txaio);
+ nni_aio_stop(&p->negoaio);
nng_stream_stop(p->conn);
}
@@ -154,9 +154,9 @@ tcptran_pipe_fini(void *arg)
}
nng_stream_free(p->conn);
- nni_aio_free(p->rxaio);
- nni_aio_free(p->txaio);
- nni_aio_free(p->negoaio);
+ nni_aio_fini(&p->rxaio);
+ nni_aio_fini(&p->txaio);
+ nni_aio_fini(&p->negoaio);
nni_msg_free(p->rxmsg);
nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
@@ -177,19 +177,14 @@ static int
tcptran_pipe_alloc(tcptran_pipe **pipep)
{
tcptran_pipe *p;
- int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) !=
- 0)) {
- tcptran_pipe_fini(p);
- return (rv);
- }
+ nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p);
+ nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p);
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);
@@ -222,7 +217,7 @@ tcptran_pipe_nego_cb(void *arg)
{
tcptran_pipe *p = arg;
tcptran_ep *ep = p->ep;
- nni_aio *aio = p->negoaio;
+ nni_aio *aio = &p->negoaio;
nni_aio *uaio;
int rv;
@@ -307,7 +302,7 @@ tcptran_pipe_send_cb(void *arg)
nni_aio *aio;
size_t n;
nni_msg *msg;
- nni_aio *txaio = p->txaio;
+ nni_aio *txaio = &p->txaio;
nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
@@ -354,7 +349,7 @@ tcptran_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg *msg;
- nni_aio *rxaio = p->rxaio;
+ nni_aio *rxaio = &p->rxaio;
nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);
@@ -464,7 +459,7 @@ tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the txaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->sendq) == aio) {
- nni_aio_abort(p->txaio, rv);
+ nni_aio_abort(&p->txaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -502,7 +497,7 @@ tcptran_pipe_send_start(tcptran_pipe *p)
NNI_PUT64(p->txlen, len);
- txaio = p->txaio;
+ txaio = &p->txaio;
niov = 0;
iov[0].iov_buf = p->txlen;
iov[0].iov_len = sizeof(p->txlen);
@@ -561,7 +556,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the rxaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->recvq) == aio) {
- nni_aio_abort(p->rxaio, rv);
+ nni_aio_abort(&p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -573,7 +568,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
static void
tcptran_pipe_recv_start(tcptran_pipe *p)
{
- nni_aio *rxaio;
+ nni_aio *rxaio = &p->rxaio;
nni_iov iov;
NNI_ASSERT(p->rxmsg == NULL);
@@ -590,7 +585,6 @@ tcptran_pipe_recv_start(tcptran_pipe *p)
}
// Schedule a read of the header.
- rxaio = p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
@@ -661,11 +655,11 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep)
p->wanttxhead = 8;
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
- nni_aio_set_iov(p->negoaio, 1, &iov);
+ nni_aio_set_iov(&p->negoaio, 1, &iov);
nni_list_append(&ep->negopipes, p);
- nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
- nng_stream_send(p->conn, p->negoaio);
+ nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate
+ nng_stream_send(p->conn, &p->negoaio);
}
static void
@@ -673,8 +667,8 @@ tcptran_ep_stop(void *arg)
{
tcptran_ep *ep = arg;
- nni_aio_stop(ep->timeaio);
- nni_aio_stop(ep->connaio);
+ nni_aio_stop(&ep->timeaio);
+ nni_aio_stop(&ep->connaio);
nng_stream_dialer_stop(ep->dialer);
nng_stream_listener_stop(ep->listener);
}
@@ -693,8 +687,8 @@ tcptran_ep_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_aio_free(ep->timeaio);
- nni_aio_free(ep->connaio);
+ nni_aio_fini(&ep->timeaio);
+ nni_aio_fini(&ep->connaio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
@@ -709,7 +703,7 @@ tcptran_ep_close(void *arg)
nni_mtx_lock(&ep->mtx);
ep->closed = true;
- nni_aio_close(ep->timeaio);
+ nni_aio_close(&ep->timeaio);
if (ep->dialer != NULL) {
nng_stream_dialer_close(ep->dialer);
}
@@ -737,8 +731,8 @@ static void
tcptran_timer_cb(void *arg)
{
tcptran_ep *ep = arg;
- if (nni_aio_result(ep->timeaio) == 0) {
- nng_stream_listener_accept(ep->listener, ep->connaio);
+ if (nni_aio_result(&ep->timeaio) == 0) {
+ nng_stream_listener_accept(ep->listener, &ep->connaio);
}
}
@@ -746,7 +740,7 @@ static void
tcptran_accept_cb(void *arg)
{
tcptran_ep *ep = arg;
- nni_aio *aio = ep->connaio;
+ nni_aio *aio = &ep->connaio;
tcptran_pipe *p;
int rv;
nng_stream *conn;
@@ -770,7 +764,7 @@ tcptran_accept_cb(void *arg)
goto error;
}
tcptran_pipe_start(p, conn, ep);
- nng_stream_listener_accept(ep->listener, ep->connaio);
+ nng_stream_listener_accept(ep->listener, &ep->connaio);
nni_mtx_unlock(&ep->mtx);
return;
@@ -785,12 +779,12 @@ error:
case NNG_ENOMEM:
case NNG_ENOFILES:
- nng_sleep_aio(10, ep->timeaio);
+ nng_sleep_aio(10, &ep->timeaio);
break;
default:
if (!ep->closed) {
- nng_stream_listener_accept(ep->listener, ep->connaio);
+ nng_stream_listener_accept(ep->listener, &ep->connaio);
}
break;
}
@@ -801,7 +795,7 @@ static void
tcptran_dial_cb(void *arg)
{
tcptran_ep *ep = arg;
- nni_aio *aio = ep->connaio;
+ nni_aio *aio = &ep->connaio;
tcptran_pipe *p;
int rv;
nng_stream *conn;
@@ -891,9 +885,9 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
return (rv);
}
- if ((rv != 0) ||
- ((rv = nni_aio_alloc(&ep->connaio, tcptran_dial_cb, ep)) != 0) ||
- ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) {
+ nni_aio_init(&ep->connaio, tcptran_dial_cb, ep);
+
+ if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) {
tcptran_ep_fini(ep);
return (rv);
}
@@ -925,9 +919,10 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
return (rv);
}
- if (((rv = nni_aio_alloc(&ep->connaio, tcptran_accept_cb, ep)) != 0) ||
- ((rv = nni_aio_alloc(&ep->timeaio, tcptran_timer_cb, ep)) != 0) ||
- ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
+ nni_aio_init(&ep->connaio, tcptran_accept_cb, ep);
+ nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep);
+
+ if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) {
tcptran_ep_fini(ep);
return (rv);
}
@@ -978,7 +973,7 @@ tcptran_ep_connect(void *arg, nni_aio *aio)
}
ep->useraio = aio;
- nng_stream_dialer_dial(ep->dialer, ep->connaio);
+ nng_stream_dialer_dial(ep->dialer, &ep->connaio);
nni_mtx_unlock(&ep->mtx);
}
@@ -1058,7 +1053,7 @@ tcptran_ep_accept(void *arg, nni_aio *aio)
ep->useraio = aio;
if (!ep->started) {
ep->started = true;
- nng_stream_listener_accept(ep->listener, ep->connaio);
+ nng_stream_listener_accept(ep->listener, &ep->connaio);
} else {
tcptran_ep_match(ep);
}