aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-08 13:44:04 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-08 13:44:04 -0700
commitfec1e51b8c193152120d22c1898d71a2a3bbc934 (patch)
treef8bc8f7bee559d9043357cc7f24d1c60373c2f7e /src
parent13cd86ec4aeb881871991a0da41f4f07473ca08b (diff)
downloadnng-fec1e51b8c193152120d22c1898d71a2a3bbc934.tar.gz
nng-fec1e51b8c193152120d22c1898d71a2a3bbc934.tar.bz2
nng-fec1e51b8c193152120d22c1898d71a2a3bbc934.zip
Simplify initialization, fix error in closed TCP endpoint.
Diffstat (limited to 'src')
-rw-r--r--src/transport/ipc/ipc.c62
-rw-r--r--src/transport/tcp/tcp.c341
2 files changed, 189 insertions, 214 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 51051681..7d976122 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -106,39 +106,27 @@ nni_ipc_pipe_fini(void *arg)
static int
nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
- nni_ipc_pipe *pipe;
+ nni_ipc_pipe *p;
int rv;
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&pipe->negaio, nni_ipc_pipe_nego_cb, pipe);
- if (rv != 0) {
- goto fail;
+ if (((rv = nni_mtx_init(&p->mtx)) != 0) ||
+ ((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) {
+ nni_ipc_pipe_fini(p);
+ return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
- pipe->ipp = ipp;
- pipe->addr = ep->addr;
+ p->proto = ep->proto;
+ p->rcvmax = ep->rcvmax;
+ p->ipp = ipp;
+ p->addr = ep->addr;
- *pipep = pipe;
+ *pipep = p;
return (0);
-
-fail:
- nni_ipc_pipe_fini(pipe);
- return (rv);
}
static void
@@ -619,18 +607,18 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- if (ep->closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
+ ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
return;
}
- ep->user_aio = aio;
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- ep->user_aio = NULL;
+ if (ep->closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ep->mtx);
return;
}
+ ep->user_aio = aio;
nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
nni_mtx_unlock(&ep->mtx);
@@ -645,22 +633,20 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
- if (ep->closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
+ // If we can't start, then its dying and we can't report either.
+ if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
- ep->user_aio = aio;
-
- // If we can't start, then its dying and we can't report
- // either,
- if ((rv = nni_aio_start(aio, nni_ipc_cancel_ep, ep)) != 0) {
- ep->user_aio = NULL;
+ if (ep->closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
nni_mtx_unlock(&ep->mtx);
return;
}
+ ep->user_aio = aio;
+
nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index a0b399c7..4d47733b 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -84,150 +84,138 @@ nni_tcp_pipe_close(void *arg)
static void
nni_tcp_pipe_fini(void *arg)
{
- nni_tcp_pipe *pipe = arg;
+ nni_tcp_pipe *p = arg;
- nni_aio_stop(&pipe->rxaio);
- nni_aio_stop(&pipe->txaio);
- nni_aio_stop(&pipe->negaio);
+ nni_aio_stop(&p->rxaio);
+ nni_aio_stop(&p->txaio);
+ nni_aio_stop(&p->negaio);
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_aio_fini(&pipe->negaio);
- if (pipe->tpp != NULL) {
- nni_plat_tcp_pipe_fini(pipe->tpp);
+ nni_aio_fini(&p->rxaio);
+ nni_aio_fini(&p->txaio);
+ nni_aio_fini(&p->negaio);
+ if (p->tpp != NULL) {
+ nni_plat_tcp_pipe_fini(p->tpp);
}
- if (pipe->rxmsg) {
- nni_msg_free(pipe->rxmsg);
+ if (p->rxmsg) {
+ nni_msg_free(p->rxmsg);
}
- NNI_FREE_STRUCT(pipe);
+ NNI_FREE_STRUCT(p);
}
static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
- nni_tcp_pipe *pipe;
+ nni_tcp_pipe *p;
int rv;
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pipe->mtx)) != 0) {
- goto fail;
- }
- rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&pipe->negaio, nni_tcp_pipe_nego_cb, pipe);
- if (rv != 0) {
- goto fail;
+ if (((rv = nni_mtx_init(&p->mtx)) != 0) ||
+ ((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) {
+ nni_tcp_pipe_fini(p);
+ return (rv);
}
- pipe->proto = ep->proto;
- pipe->rcvmax = ep->rcvmax;
- pipe->tpp = tpp;
- pipe->addr = ep->addr;
+ p->proto = ep->proto;
+ p->rcvmax = ep->rcvmax;
+ p->tpp = tpp;
+ p->addr = ep->addr;
- *pipep = pipe;
+ *pipep = p;
return (0);
-
-fail:
- nni_tcp_pipe_fini(pipe);
- return (rv);
}
static void
nni_tcp_cancel_nego(nni_aio *aio, int rv)
{
- nni_tcp_pipe *pipe = aio->a_prov_data;
+ nni_tcp_pipe *p = aio->a_prov_data;
- nni_mtx_lock(&pipe->mtx);
- if (pipe->user_negaio != aio) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if (p->user_negaio != aio) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_negaio = NULL;
- nni_mtx_unlock(&pipe->mtx);
+ p->user_negaio = NULL;
+ nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(&pipe->negaio, rv);
+ nni_aio_cancel(&p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
nni_tcp_pipe_nego_cb(void *arg)
{
- nni_tcp_pipe *pipe = arg;
- nni_aio * aio = &pipe->negaio;
+ nni_tcp_pipe *p = arg;
+ nni_aio * aio = &p->negaio;
int rv;
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
goto done;
}
// We start transmitting before we receive.
- if (pipe->gottxhead < pipe->wanttxhead) {
- pipe->gottxhead += nni_aio_count(aio);
- } else if (pipe->gotrxhead < pipe->wantrxhead) {
- pipe->gotrxhead += nni_aio_count(aio);
+ if (p->gottxhead < p->wanttxhead) {
+ p->gottxhead += nni_aio_count(aio);
+ } else if (p->gotrxhead < p->wantrxhead) {
+ p->gotrxhead += nni_aio_count(aio);
}
- if (pipe->gottxhead < pipe->wanttxhead) {
+ if (p->gottxhead < p->wanttxhead) {
aio->a_niov = 1;
- aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
- aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead];
+ aio->a_iov[0].iov_len = p->wanttxhead - p->gottxhead;
+ aio->a_iov[0].iov_buf = &p->txlen[p->gottxhead];
// send it down...
- nni_plat_tcp_pipe_send(pipe->tpp, aio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_plat_tcp_pipe_send(p->tpp, aio);
+ nni_mtx_unlock(&p->mtx);
return;
}
- if (pipe->gotrxhead < pipe->wantrxhead) {
+ if (p->gotrxhead < p->wantrxhead) {
aio->a_niov = 1;
- aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
- aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead];
- nni_plat_tcp_pipe_recv(pipe->tpp, aio);
- nni_mtx_unlock(&pipe->mtx);
+ aio->a_iov[0].iov_len = p->wantrxhead - p->gotrxhead;
+ aio->a_iov[0].iov_buf = &p->rxlen[p->gotrxhead];
+ nni_plat_tcp_pipe_recv(p->tpp, aio);
+ nni_mtx_unlock(&p->mtx);
return;
}
// We have both sent and received the headers. Lets check the
// receive side header.
- if ((pipe->rxlen[0] != 0) || (pipe->rxlen[1] != 'S') ||
- (pipe->rxlen[2] != 'P') || (pipe->rxlen[3] != 0) ||
- (pipe->rxlen[6] != 0) || (pipe->rxlen[7] != 0)) {
+ if ((p->rxlen[0] != 0) || (p->rxlen[1] != 'S') ||
+ (p->rxlen[2] != 'P') || (p->rxlen[3] != 0) || (p->rxlen[6] != 0) ||
+ (p->rxlen[7] != 0)) {
rv = NNG_EPROTO;
goto done;
}
- NNI_GET16(&pipe->rxlen[4], pipe->peer);
+ NNI_GET16(&p->rxlen[4], p->peer);
done:
- if ((aio = pipe->user_negaio) != NULL) {
- pipe->user_negaio = NULL;
+ if ((aio = p->user_negaio) != NULL) {
+ p->user_negaio = NULL;
nni_aio_finish(aio, rv, 0);
}
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
}
static void
nni_tcp_pipe_send_cb(void *arg)
{
- nni_tcp_pipe *pipe = arg;
+ nni_tcp_pipe *p = arg;
int rv;
nni_aio * aio;
size_t len;
- nni_mtx_lock(&pipe->mtx);
- if ((aio = pipe->user_txaio) == NULL) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if ((aio = p->user_txaio) == NULL) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_txaio = NULL;
+ p->user_txaio = NULL;
- if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ if ((rv = nni_aio_result(&p->txaio)) != 0) {
len = 0;
} else {
len = nni_msg_len(aio->a_msg);
@@ -235,181 +223,180 @@ nni_tcp_pipe_send_cb(void *arg)
aio->a_msg = NULL;
}
nni_aio_finish(aio, 0, len);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
}
static void
nni_tcp_pipe_recv_cb(void *arg)
{
- nni_tcp_pipe *pipe = arg;
+ nni_tcp_pipe *p = arg;
nni_aio * aio;
int rv;
nni_msg * msg;
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
- aio = pipe->user_rxaio;
+ aio = p->user_rxaio;
if (aio == NULL) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
return;
}
- if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ if ((rv = nni_aio_result(&p->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
- if (pipe->rxmsg != NULL) {
- nni_msg_free(pipe->rxmsg);
- pipe->rxmsg = NULL;
+ if (p->rxmsg != NULL) {
+ nni_msg_free(p->rxmsg);
+ p->rxmsg = NULL;
}
- pipe->user_rxaio = NULL;
+ p->user_rxaio = NULL;
nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
return;
}
// If we don't have a message yet, we were reading the TCP message
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
- if (pipe->rxmsg == NULL) {
+ if (p->rxmsg == NULL) {
uint64_t len;
// We should have gotten a message header.
- NNI_GET64(pipe->rxlen, len);
+ NNI_GET64(p->rxlen, len);
// Make sure the message payload is not too big. If it is
// the caller will shut down the pipe.
- if (len > pipe->rcvmax) {
- pipe->user_rxaio = NULL;
+ if (len > p->rcvmax) {
+ p->user_rxaio = NULL;
nni_aio_finish_error(aio, NNG_EMSGSIZE);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
return;
}
- if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
- pipe->user_rxaio = NULL;
+ if ((rv = nng_msg_alloc(&p->rxmsg, (size_t) len)) != 0) {
+ p->user_rxaio = NULL;
nni_aio_finish_error(aio, rv);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
return;
}
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- pipe->rxaio.a_niov = 1;
+ p->rxaio.a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
+ p->rxaio.a_iov[0].iov_len = nni_msg_len(p->rxmsg);
+ p->rxaio.a_niov = 1;
- nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_mtx_unlock(&p->mtx);
return;
}
- // Otherwise we got a message read completely. Let the user know the
- // good news.
- pipe->user_rxaio = NULL;
- msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+ // We read a message completely. Let the user know the good news.
+ p->user_rxaio = NULL;
+ msg = p->rxmsg;
+ p->rxmsg = NULL;
nni_aio_finish_msg(aio, msg);
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_unlock(&p->mtx);
}
static void
nni_tcp_cancel_tx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *pipe = aio->a_prov_data;
+ nni_tcp_pipe *p = aio->a_prov_data;
- nni_mtx_lock(&pipe->mtx);
- if (pipe->user_txaio != aio) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if (p->user_txaio != aio) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_txaio = NULL;
- nni_mtx_unlock(&pipe->mtx);
+ p->user_txaio = NULL;
+ nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&pipe->txaio, rv);
+ nni_aio_cancel(&p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
- nni_tcp_pipe *pipe = arg;
- nni_msg * msg = aio->a_msg;
+ nni_tcp_pipe *p = arg;
+ nni_msg * msg = aio->a_msg;
uint64_t len;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_tx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_start(aio, nni_tcp_cancel_tx, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_txaio = aio;
+ p->user_txaio = aio;
- NNI_PUT64(pipe->txlen, len);
+ NNI_PUT64(p->txlen, len);
- pipe->txaio.a_iov[0].iov_buf = pipe->txlen;
- pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txlen);
- pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- pipe->txaio.a_niov = 3;
+ p->txaio.a_iov[0].iov_buf = p->txlen;
+ p->txaio.a_iov[0].iov_len = sizeof(p->txlen);
+ p->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
+ p->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
+ p->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
+ p->txaio.a_iov[2].iov_len = nni_msg_len(msg);
+ p->txaio.a_niov = 3;
- nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_plat_tcp_pipe_send(p->tpp, &p->txaio);
+ nni_mtx_unlock(&p->mtx);
}
static void
nni_tcp_cancel_rx(nni_aio *aio, int rv)
{
- nni_tcp_pipe *pipe = aio->a_prov_data;
+ nni_tcp_pipe *p = aio->a_prov_data;
- nni_mtx_lock(&pipe->mtx);
- if (pipe->user_rxaio != aio) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
+ if (p->user_rxaio != aio) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_rxaio = NULL;
- nni_mtx_unlock(&pipe->mtx);
+ p->user_rxaio = NULL;
+ nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&pipe->rxaio, rv);
+ nni_aio_cancel(&p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
- nni_tcp_pipe *pipe = arg;
+ nni_tcp_pipe *p = arg;
- nni_mtx_lock(&pipe->mtx);
+ nni_mtx_lock(&p->mtx);
- if (nni_aio_start(aio, nni_tcp_cancel_rx, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ if (nni_aio_start(aio, nni_tcp_cancel_rx, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- pipe->user_rxaio = aio;
+ p->user_rxaio = aio;
- NNI_ASSERT(pipe->rxmsg == NULL);
+ NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen;
- pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxlen);
- pipe->rxaio.a_niov = 1;
+ p->rxaio.a_iov[0].iov_buf = p->rxlen;
+ p->rxaio.a_iov[0].iov_len = sizeof(p->rxlen);
+ p->rxaio.a_niov = 1;
- nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_mtx_unlock(&p->mtx);
}
static uint16_t
nni_tcp_pipe_peer(void *arg)
{
- nni_tcp_pipe *pipe = arg;
+ nni_tcp_pipe *p = arg;
- return (pipe->peer);
+ return (p->peer);
}
static int
@@ -519,30 +506,30 @@ nni_tcp_parse_url(
static void
nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
- nni_tcp_pipe *pipe = arg;
-
- nni_mtx_lock(&pipe->mtx);
- pipe->txlen[0] = 0;
- pipe->txlen[1] = 'S';
- pipe->txlen[2] = 'P';
- pipe->txlen[3] = 0;
- NNI_PUT16(&pipe->txlen[4], pipe->proto);
- NNI_PUT16(&pipe->txlen[6], 0);
-
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- pipe->negaio.a_niov = 1;
- pipe->negaio.a_iov[0].iov_len = 8;
- pipe->negaio.a_iov[0].iov_buf = &pipe->txlen[0];
- if (nni_aio_start(aio, nni_tcp_cancel_nego, pipe) != 0) {
- nni_mtx_unlock(&pipe->mtx);
+ nni_tcp_pipe *p = arg;
+
+ nni_mtx_lock(&p->mtx);
+ p->txlen[0] = 0;
+ p->txlen[1] = 'S';
+ p->txlen[2] = 'P';
+ p->txlen[3] = 0;
+ NNI_PUT16(&p->txlen[4], p->proto);
+ NNI_PUT16(&p->txlen[6], 0);
+
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ p->negaio.a_niov = 1;
+ p->negaio.a_iov[0].iov_len = 8;
+ p->negaio.a_iov[0].iov_buf = &p->txlen[0];
+ if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
+ nni_mtx_unlock(&p->mtx);
return;
}
- nni_plat_tcp_pipe_send(pipe->tpp, &pipe->negaio);
- nni_mtx_unlock(&pipe->mtx);
+ nni_plat_tcp_pipe_send(p->tpp, &p->negaio);
+ nni_mtx_unlock(&p->mtx);
}
static void
@@ -708,20 +695,22 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
int rv;
nni_mtx_lock(&ep->mtx);
- if (ep->closed) {
- nni_aio_finish(aio, NNG_ECLOSED, 0);
- nni_mtx_unlock(&ep->mtx);
- }
NNI_ASSERT(ep->user_aio == NULL);
- ep->user_aio = aio;
- // If we can't start, then its dying and we can't report either,
+ // If we can't start, then its dying and we can't report either.
if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) {
- ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
return;
}
+ if (ep->closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+
+ ep->user_aio = aio;
+
nni_plat_tcp_ep_connect(ep->tep, &ep->aio);
nni_mtx_unlock(&ep->mtx);
}