aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/tcp.c7
-rw-r--r--src/platform/posix/posix_tcpconn.c18
-rw-r--r--src/platform/posix/posix_tcpdial.c20
-rw-r--r--src/platform/posix/posix_tcplisten.c9
-rw-r--r--src/platform/windows/win_tcpconn.c16
-rw-r--r--src/platform/windows/win_tcpdial.c11
-rw-r--r--src/platform/windows/win_tcplisten.c7
-rw-r--r--src/sp/transport/tcp/tcp.c39
8 files changed, 41 insertions, 86 deletions
diff --git a/src/core/tcp.c b/src/core/tcp.c
index 75b938b0..119f1f57 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -177,16 +177,15 @@ static void
tcp_dialer_dial(void *arg, nng_aio *aio)
{
tcp_dialer *d = arg;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+
+ nni_aio_reset(aio);
nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if (!nni_aio_defer(aio, tcp_dial_cancel, d)) {
+ if (!nni_aio_start(aio, tcp_dial_cancel, d)) {
nni_mtx_unlock(&d->mtx);
return;
}
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index c23e1840..9837a839 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -283,16 +283,11 @@ static void
tcp_send(void *arg, nni_aio *aio)
{
nni_tcp_conn *c = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&c->mtx);
-
- if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
+ if (!nni_aio_start(aio, tcp_cancel, c)) {
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&c->writeq, aio);
@@ -313,16 +308,11 @@ static void
tcp_recv(void *arg, nni_aio *aio)
{
nni_tcp_conn *c = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&c->mtx);
-
- if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
+ if (!nni_aio_start(aio, tcp_cancel, c)) {
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&c->readq, aio);
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 41036366..e827b666 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -193,9 +193,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
int ka;
int nd;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
@@ -208,15 +206,22 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
return;
}
- nni_refcnt_hold(&d->ref);
-
if ((rv = nni_posix_tcp_alloc(&c, d, fd)) != 0) {
+ (void) close(fd);
nni_aio_finish_error(aio, rv);
- nni_posix_tcp_dialer_rele(d);
return;
}
+ // hold for the stream
+ nni_refcnt_hold(&d->ref);
+
nni_mtx_lock(&d->mtx);
+ if (!nni_aio_start(aio, tcp_dialer_cancel, d)) {
+ nni_mtx_unlock(&d->mtx);
+ nng_stream_free(&c->stream);
+ return;
+ }
+
if (d->closed) {
rv = NNG_ECLOSED;
goto error;
@@ -227,9 +232,6 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
goto error;
}
}
- if ((rv = nni_aio_schedule(aio, tcp_dialer_cancel, d)) != 0) {
- goto error;
- }
c->dial_aio = aio;
if (connect(fd, (void *) &ss, sslen) != 0) {
if (errno != EINPROGRESS) {
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index 78e9b253..b14114b7 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -283,15 +283,11 @@ nni_tcp_listener_fini(nni_tcp_listener *l)
void
nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
{
- int rv;
-
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
// connection is ready for us. There isn't anything else for us to
// do really, as that will have been done in listen.
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&l->mtx);
if (!l->started) {
@@ -304,9 +300,8 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, tcp_listener_cancel, l)) != 0) {
+ if (!nni_aio_start(aio, tcp_listener_cancel, l)) {
nni_mtx_unlock(&l->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&l->acceptq, aio);
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index 2f2fd378..bd7f8504 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -121,15 +121,11 @@ static void
tcp_recv(void *arg, nni_aio *aio)
{
nni_tcp_conn *c = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_recv_cancel, c)) != 0) {
+ if (!nni_aio_start(aio, tcp_recv_cancel, c)) {
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&c->recv_aios, aio);
@@ -230,15 +226,11 @@ static void
tcp_send(void *arg, nni_aio *aio)
{
nni_tcp_conn *c = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_send_cancel, c)) != 0) {
+ if (!nni_aio_start(aio, tcp_send_cancel, c)) {
nni_mtx_unlock(&c->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&c->send_aios, aio);
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index c0385648..474bd79f 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -196,9 +196,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_tcp_conn *c;
int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
if ((len = nni_win_nn2sockaddr(&ss, sa)) <= 0) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
@@ -246,14 +244,13 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
return;
}
- c->dialer = d;
- nni_aio_set_prov_data(aio, c);
- if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
+ if (!nni_aio_start(aio, tcp_dial_cancel, d)) {
nni_mtx_unlock(&d->mtx);
nng_stream_free(&c->ops);
- nni_aio_finish_error(aio, rv);
return;
}
+ nni_aio_set_prov_data(aio, c);
+ c->dialer = d;
c->conn_aio = aio;
nni_aio_list_append(&d->aios, aio);
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 4e4fb090..e7c7a966 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -382,9 +382,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
{
int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&l->mtx);
if (!l->started) {
@@ -393,9 +391,8 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
return;
}
- if ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0) {
+ if (!nni_aio_start(aio, tcp_accept_cancel, l)) {
nni_mtx_unlock(&l->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c
index 4c0f76bd..c8539c5e 100644
--- a/src/sp/transport/tcp/tcp.c
+++ b/src/sp/transport/tcp/tcp.c
@@ -473,19 +473,11 @@ static void
tcptran_pipe_send(void *arg, nni_aio *aio)
{
tcptran_pipe *p = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- // No way to give the message back to the protocol, so
- // we just discard it silently to prevent it from leaking.
- nni_msg_free(nni_aio_get_msg(aio));
- nni_aio_set_msg(aio, NULL);
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tcptran_pipe_send_cancel, p)) != 0) {
+ if (!nni_aio_start(aio, tcptran_pipe_send_cancel, p)) {
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&p->sendq, aio);
@@ -549,15 +541,11 @@ static void
tcptran_pipe_recv(void *arg, nni_aio *aio)
{
tcptran_pipe *p = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&p->mtx);
- if ((rv = nni_aio_schedule(aio, tcptran_pipe_recv_cancel, p)) != 0) {
+ if (!nni_aio_start(aio, tcptran_pipe_recv_cancel, p)) {
nni_mtx_unlock(&p->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
@@ -715,6 +703,9 @@ error:
nni_aio_finish_error(aio, rv);
}
switch (rv) {
+ case NNG_ECLOSED:
+ case NNG_ESTOPPED:
+ break;
case NNG_ENOMEM:
case NNG_ENOFILES:
@@ -867,11 +858,8 @@ static void
tcptran_ep_connect(void *arg, nni_aio *aio)
{
tcptran_ep *ep = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&ep->mtx);
if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
@@ -883,9 +871,8 @@ tcptran_ep_connect(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_EBUSY);
return;
}
- if ((rv = nni_aio_schedule(aio, tcptran_ep_cancel, ep)) != 0) {
+ if (!nni_aio_start(aio, tcptran_ep_cancel, ep)) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
ep->useraio = aio;
@@ -946,11 +933,8 @@ static void
tcptran_ep_accept(void *arg, nni_aio *aio)
{
tcptran_ep *ep = arg;
- int rv;
- if (nni_aio_begin(aio) != 0) {
- return;
- }
+ nni_aio_reset(aio);
nni_mtx_lock(&ep->mtx);
if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
@@ -962,9 +946,8 @@ tcptran_ep_accept(void *arg, nni_aio *aio)
nni_aio_finish_error(aio, NNG_EBUSY);
return;
}
- if ((rv = nni_aio_schedule(aio, tcptran_ep_cancel, ep)) != 0) {
+ if (!nni_aio_start(aio, tcptran_ep_cancel, ep)) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
ep->useraio = aio;