aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/ipc/ipc.c133
-rw-r--r--src/transport/tcp/tcp.c163
2 files changed, 163 insertions, 133 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index b2c382fb..f10f80b4 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -39,9 +39,9 @@ struct nni_ipc_pipe {
nni_aio *user_txaio;
nni_aio *user_rxaio;
nni_aio *user_negaio;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -51,7 +51,7 @@ struct nni_ipc_ep {
nni_plat_ipc_ep *iep;
uint16_t proto;
size_t rcvmax;
- nni_aio aio;
+ nni_aio * aio;
nni_aio * user_aio;
nni_mtx mtx;
};
@@ -94,13 +94,13 @@ nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_aio_stop(&pipe->rxaio);
- nni_aio_stop(&pipe->txaio);
- nni_aio_stop(&pipe->negaio);
+ nni_aio_stop(pipe->rxaio);
+ nni_aio_stop(pipe->txaio);
+ nni_aio_stop(pipe->negaio);
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_aio_fini(&pipe->negaio);
+ nni_aio_fini(pipe->rxaio);
+ nni_aio_fini(pipe->txaio);
+ nni_aio_fini(pipe->negaio);
if (pipe->ipp != NULL) {
nni_plat_ipc_pipe_fini(pipe->ipp);
}
@@ -115,14 +115,18 @@ static int
nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p);
- nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p);
+ if (((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) {
+ nni_ipc_pipe_fini(p);
+ return (rv);
+ }
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -146,7 +150,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv)
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->negaio, rv);
+ nni_aio_cancel(pipe->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -154,7 +158,7 @@ static void
nni_ipc_pipe_nego_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_aio * aio = &pipe->negaio;
+ nni_aio * aio = pipe->negaio;
int rv;
nni_mtx_lock(&pipe->mtx);
@@ -219,12 +223,13 @@ nni_ipc_pipe_send_cb(void *arg)
return;
}
pipe->user_txaio = NULL;
- if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ if ((rv = nni_aio_result(pipe->txaio)) != 0) {
len = 0;
} else {
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
+ nni_msg *msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_msg_free(msg);
+ nni_aio_set_msg(aio, NULL);
}
nni_aio_finish(aio, rv, len);
nni_mtx_unlock(&pipe->mtx);
@@ -245,7 +250,7 @@ nni_ipc_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ if ((rv = nni_aio_result(pipe->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
@@ -264,6 +269,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// message to allocate and how much more to expect.
if (pipe->rxmsg == NULL) {
uint64_t len;
+ nni_aio *rxaio;
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
@@ -298,11 +304,12 @@ nni_ipc_pipe_recv_cb(void *arg)
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- pipe->rxaio.a_niov = 1;
+ rxaio = pipe->rxaio;
+ rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -329,7 +336,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->txaio, rv);
+ nni_aio_cancel(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -337,8 +344,9 @@ static void
nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
+ nni_aio * txaio;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -353,15 +361,16 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- pipe->txaio.a_iov[0].iov_buf = pipe->txhead;
- pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead);
- pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- pipe->txaio.a_niov = 3;
+ txaio = pipe->txaio;
+ txaio->a_iov[0].iov_buf = pipe->txhead;
+ txaio->a_iov[0].iov_len = sizeof(pipe->txhead);
+ txaio->a_iov[1].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
+ txaio->a_iov[2].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[2].iov_len = nni_msg_len(msg);
+ txaio->a_niov = 3;
- nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -378,7 +387,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv)
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->rxaio, rv);
+ nni_aio_cancel(pipe->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -386,6 +395,7 @@ static void
nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ nni_aio * rxaio;
nni_mtx_lock(&pipe->mtx);
@@ -398,11 +408,12 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
- pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead;
- pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead);
- pipe->rxaio.a_niov = 1;
+ rxaio = pipe->rxaio;
+ rxaio->a_iov[0].iov_buf = pipe->rxhead;
+ rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead);
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -411,6 +422,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
int rv;
+ nni_aio * negaio;
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
@@ -420,20 +432,21 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&pipe->txhead[4], pipe->proto);
NNI_PUT16(&pipe->txhead[6], 0);
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- pipe->negaio.a_niov = 1;
- pipe->negaio.a_iov[0].iov_len = 8;
- pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ negaio = pipe->negaio;
+ negaio->a_niov = 1;
+ negaio->a_iov[0].iov_len = 8;
+ negaio->a_iov[0].iov_buf = &pipe->txhead[0];
rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
if (rv != 0) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -473,9 +486,9 @@ nni_ipc_ep_fini(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
nni_plat_ipc_ep_fini(ep->iep);
- nni_aio_fini(&ep->aio);
+ nni_aio_fini(ep->aio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -532,7 +545,7 @@ nni_ipc_ep_close(void *arg)
nni_plat_ipc_ep_close(ep->iep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
}
static int
@@ -554,19 +567,19 @@ nni_ipc_ep_finish(nni_ipc_ep *ep)
int rv;
nni_ipc_pipe *pipe = NULL;
- if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(ep->aio.a_pipe != NULL);
+ NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe);
+ rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
done:
- ep->aio.a_pipe = NULL;
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ nni_aio_set_pipe(ep->aio, NULL);
+ aio = ep->user_aio;
+ ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
NNI_ASSERT(pipe != NULL);
@@ -607,7 +620,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, rv);
+ nni_aio_cancel(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -627,7 +640,7 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
+ nni_plat_ipc_ep_accept(ep->iep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -648,7 +661,7 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
+ nni_plat_ipc_ep_connect(ep->iep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f0f07592..759e20f5 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -38,9 +38,9 @@ struct nni_tcp_pipe {
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -52,7 +52,7 @@ struct nni_tcp_ep {
size_t rcvmax;
nni_duration linger;
int ipv4only;
- nni_aio aio;
+ nni_aio * aio;
nni_aio * user_aio;
nni_mtx mtx;
};
@@ -98,13 +98,13 @@ nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *p = arg;
- nni_aio_stop(&p->rxaio);
- nni_aio_stop(&p->txaio);
- nni_aio_stop(&p->negaio);
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+ nni_aio_stop(p->negaio);
- nni_aio_fini(&p->rxaio);
- nni_aio_fini(&p->txaio);
- nni_aio_fini(&p->negaio);
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+ nni_aio_fini(p->negaio);
if (p->tpp != NULL) {
nni_plat_tcp_pipe_fini(p->tpp);
}
@@ -119,14 +119,18 @@ static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p);
- nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p);
+ if (((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) {
+ nni_tcp_pipe_fini(p);
+ return (rv);
+ }
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -150,7 +154,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv)
p->user_negaio = NULL;
nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(&p->negaio, rv);
+ nni_aio_cancel(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -158,7 +162,7 @@ static void
nni_tcp_pipe_nego_cb(void *arg)
{
nni_tcp_pipe *p = arg;
- nni_aio * aio = &p->negaio;
+ nni_aio * aio = p->negaio;
int rv;
nni_mtx_lock(&p->mtx);
@@ -224,12 +228,12 @@ nni_tcp_pipe_send_cb(void *arg)
}
p->user_txaio = NULL;
- if ((rv = nni_aio_result(&p->txaio)) != 0) {
+ if ((rv = nni_aio_result(p->txaio)) != 0) {
len = 0;
} else {
len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
}
nni_aio_finish(aio, 0, len);
nni_mtx_unlock(&p->mtx);
@@ -251,7 +255,7 @@ nni_tcp_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(&p->rxaio)) != 0) {
+ if ((rv = nni_aio_result(p->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
@@ -269,6 +273,7 @@ nni_tcp_pipe_recv_cb(void *arg)
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
if (p->rxmsg == NULL) {
+ nni_aio *rxaio;
uint64_t len;
// We should have gotten a message header.
NNI_GET64(p->rxlen, len);
@@ -291,11 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg)
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- p->rxaio.a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- p->rxaio.a_iov[0].iov_len = nni_msg_len(p->rxmsg);
- p->rxaio.a_niov = 1;
+ rxaio = p->rxaio;
+ rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
+ rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg);
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -322,7 +328,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&p->txaio, rv);
+ nni_aio_cancel(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -330,8 +336,9 @@ static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
+ nni_aio * txaio;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -346,15 +353,16 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- p->txaio.a_iov[0].iov_buf = p->txlen;
- p->txaio.a_iov[0].iov_len = sizeof(p->txlen);
- p->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- p->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- p->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- p->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- p->txaio.a_niov = 3;
+ txaio = p->txaio;
+ txaio->a_iov[0].iov_buf = p->txlen;
+ txaio->a_iov[0].iov_len = sizeof(p->txlen);
+ txaio->a_iov[1].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
+ txaio->a_iov[2].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[2].iov_len = nni_msg_len(msg);
+ txaio->a_niov = 3;
- nni_plat_tcp_pipe_send(p->tpp, &p->txaio);
+ nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
}
@@ -372,7 +380,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&p->rxaio, rv);
+ nni_aio_cancel(p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -380,6 +388,7 @@ static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ nni_aio * rxaio;
nni_mtx_lock(&p->mtx);
@@ -392,11 +401,12 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- p->rxaio.a_iov[0].iov_buf = p->rxlen;
- p->rxaio.a_iov[0].iov_len = sizeof(p->rxlen);
- p->rxaio.a_niov = 1;
+ rxaio = p->rxaio;
+ rxaio->a_iov[0].iov_buf = p->rxlen;
+ rxaio->a_iov[0].iov_len = sizeof(p->rxlen);
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -525,6 +535,7 @@ static void
nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ nni_aio * negaio;
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
@@ -534,19 +545,20 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&p->txlen[4], p->proto);
NNI_PUT16(&p->txlen[6], 0);
- p->user_negaio = aio;
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- p->negaio.a_niov = 1;
- p->negaio.a_iov[0].iov_len = 8;
- p->negaio.a_iov[0].iov_buf = &p->txlen[0];
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ negaio->a_niov = 1;
+ negaio->a_iov[0].iov_len = 8;
+ negaio->a_iov[0].iov_buf = &p->txlen[0];
if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- nni_plat_tcp_pipe_send(p->tpp, &p->negaio);
+ nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -555,11 +567,11 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
if (ep->tep != NULL) {
nni_plat_tcp_ep_fini(ep->tep);
}
- nni_aio_fini(&ep->aio);
+ nni_aio_fini(ep->aio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -575,7 +587,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
char * lhost;
char * lserv;
nni_sockaddr rsa, lsa;
- nni_aio aio;
+ nni_aio * aio;
int passive;
// Make a copy of the url (to allow for destructive operations)
@@ -590,17 +602,19 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
passive = (mode == NNI_EP_MODE_DIAL ? 0 : 1);
- nni_aio_init(&aio, NULL, NULL);
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
// XXX: arguably we could defer this part to the point we do a bind
// or connect!
if ((rhost != NULL) || (rserv != NULL)) {
- aio.a_addr = &rsa;
- nni_plat_tcp_resolv(
- rhost, rserv, NNG_AF_UNSPEC, passive, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) != 0) {
+ aio->a_addr = &rsa;
+ nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
return (rv);
}
} else {
@@ -608,16 +622,17 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
if ((lhost != NULL) || (lserv != NULL)) {
- aio.a_addr = &lsa;
- nni_plat_tcp_resolv(
- lhost, lserv, NNG_AF_UNSPEC, passive, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) != 0) {
+ aio->a_addr = &lsa;
+ nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
return (rv);
}
} else {
lsa.s_un.s_family = NNG_AF_UNSPEC;
}
+ nni_aio_fini(aio);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -633,8 +648,10 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
nni_mtx_init(&ep->mtx);
- nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep);
-
+ if ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) {
+ nni_tcp_ep_fini(ep);
+ return (rv);
+ }
ep->proto = nni_sock_proto(sock);
*epp = ep;
@@ -650,7 +667,7 @@ nni_tcp_ep_close(void *arg)
nni_plat_tcp_ep_close(ep->tep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
}
static int
@@ -673,19 +690,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep)
int rv;
nni_tcp_pipe *pipe = NULL;
- if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(ep->aio.a_pipe != NULL);
+ NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe);
+ rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
done:
- ep->aio.a_pipe = NULL;
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ nni_aio_set_pipe(ep->aio, NULL);
+ aio = ep->user_aio;
+ ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
nni_aio_finish_pipe(aio, pipe);
@@ -723,7 +740,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, rv);
+ nni_aio_cancel(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -743,7 +760,7 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_tcp_ep_accept(ep->tep, &ep->aio);
+ nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -764,7 +781,7 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_tcp_ep_connect(ep->tep, &ep->aio);
+ nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}