aboutsummaryrefslogtreecommitdiff
path: root/src/transport/tcp
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-02-18 20:15:16 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-23 11:50:17 -0800
commit64e784237d143aa032311942bc44abd22e1e4114 (patch)
treee3ea529d5e5adfd022773ab207622cc2247f057c /src/transport/tcp
parentd210ef96517c1462bc058c95bced8c27b5e19c4f (diff)
downloadnng-64e784237d143aa032311942bc44abd22e1e4114.tar.gz
nng-64e784237d143aa032311942bc44abd22e1e4114.tar.bz2
nng-64e784237d143aa032311942bc44abd22e1e4114.zip
fixes #848 server hang waiting for client handshake
fixes #698 Need TCP stats fixes #699 Need IPC stats fixes #701 Need TLS stats This commit addresses a problem when negotiating using one of the stream based negotiation APIs -- a slow or misbehaving peer can prevent well behaved ones from establishing a connection. The fix is a fairly significant change in how these transports link up, and it does rely on the fact that the socket only has a single accept() or connect() pending at a time (on a given endpoint that is). While here, we have completely revamped the way transport statistics are done, offering a standard API for collecting these statistics. Unfortunately, this completely borks the statistics for inproc. As we are planning to change the way inproc works soon, in order to provide more control and work on performance fixes for the message queue, we feel this is an acceptable trade-off. Furthermore, almost nobody uses inproc for anything, and even fewer people are making use of the statistics at this time.
Diffstat (limited to 'src/transport/tcp')
-rw-r--r--src/transport/tcp/tcp.c444
1 files changed, 312 insertions, 132 deletions
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f8b1ce54..7748cd8f 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -46,7 +46,6 @@ struct tcptran_pipe {
nni_aio * txaio;
nni_aio * rxaio;
nni_aio * negoaio;
- nni_aio * connaio;
nni_msg * rxmsg;
nni_mtx mtx;
};
@@ -57,22 +56,30 @@ struct tcptran_ep {
uint16_t proto;
size_t rcvmax;
bool fini;
+ bool started;
+ bool closed;
nng_url * url;
const char * host; // for dialers
nng_sockaddr src;
- nni_list pipes;
+ int refcnt; // active pipes
+ nni_aio * useraio;
+ nni_aio * connaio;
+ nni_aio * timeaio;
+ nni_list busypipes; // busy pipes -- ones passed to socket
+ nni_list waitpipes; // pipes waiting to match to socket
+ nni_list negopipes; // pipes busy negotiating
nni_reap_item reap;
nng_stream_dialer * dialer;
nng_stream_listener *listener;
nni_dialer * ndialer;
nni_listener * nlistener;
+ nni_stat_item st_rcvmaxsz;
};
static void tcptran_pipe_send_start(tcptran_pipe *);
static void tcptran_pipe_recv_start(tcptran_pipe *);
static void tcptran_pipe_send_cb(void *);
static void tcptran_pipe_recv_cb(void *);
-static void tcptran_pipe_conn_cb(void *);
static void tcptran_pipe_nego_cb(void *);
static void tcptran_ep_fini(void *);
@@ -99,7 +106,6 @@ tcptran_pipe_close(void *arg)
nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
nng_stream_close(p->conn);
}
@@ -112,7 +118,6 @@ tcptran_pipe_stop(void *arg)
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
- nni_aio_stop(p->connaio);
}
static int
@@ -120,6 +125,7 @@ tcptran_pipe_init(void *arg, nni_pipe *npipe)
{
tcptran_pipe *p = arg;
p->npipe = npipe;
+
return (0);
}
@@ -132,8 +138,9 @@ tcptran_pipe_fini(void *arg)
tcptran_pipe_stop(p);
if ((ep = p->ep) != NULL) {
nni_mtx_lock(&ep->mtx);
- nni_list_remove(&ep->pipes, p);
- if (ep->fini && nni_list_empty(&ep->pipes)) {
+ nni_list_node_remove(&p->node);
+ ep->refcnt--;
+ if (ep->fini && (ep->refcnt == 0)) {
nni_reap(&ep->reap, tcptran_ep_fini, ep);
}
nni_mtx_unlock(&ep->mtx);
@@ -142,7 +149,6 @@ tcptran_pipe_fini(void *arg)
nni_aio_fini(p->rxaio);
nni_aio_fini(p->txaio);
nni_aio_fini(p->negoaio);
- nni_aio_fini(p->connaio);
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
nni_mtx_fini(&p->mtx);
@@ -161,7 +167,7 @@ tcptran_pipe_reap(tcptran_pipe *p)
}
static int
-tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
+tcptran_pipe_alloc(tcptran_pipe **pipep)
{
tcptran_pipe *p;
int rv;
@@ -172,7 +178,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->connaio, tcptran_pipe_conn_cb, p)) != 0) ||
((rv = nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) {
tcptran_pipe_fini(p);
return (rv);
@@ -180,77 +185,28 @@ tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);
- nni_list_append(&ep->pipes, p);
- p->rcvmax = ep->rcvmax;
- p->proto = ep->proto;
- p->ep = ep;
- *pipep = p;
+ *pipep = p;
return (0);
}
static void
-tcptran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv)
-{
- tcptran_pipe *p = arg;
-
- nni_mtx_lock(&p->ep->mtx);
- if (aio == p->useraio) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- p->useraio = NULL;
- nni_aio_finish_error(aio, rv);
- tcptran_pipe_reap(p);
- }
- nni_mtx_unlock(&p->ep->mtx);
-}
-
-static void
-tcptran_pipe_conn_cb(void *arg)
+tcptran_ep_match(tcptran_ep *ep)
{
- tcptran_pipe *p = arg;
- tcptran_ep * ep = p->ep;
- nni_aio * aio = p->connaio;
- nni_aio * uaio;
- nni_iov iov;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- uaio = p->useraio;
- if ((rv = nni_aio_result(aio)) == 0) {
- p->conn = nni_aio_get_output(aio, 0);
- }
-
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&ep->mtx);
- tcptran_pipe_reap(p);
- return;
- }
+ nni_aio * aio;
+ tcptran_pipe *p;
- if (rv != 0) {
- p->useraio = NULL;
- nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
- tcptran_pipe_reap(p);
+ if (((aio = ep->useraio) == NULL) ||
+ ((p = nni_list_first(&ep->waitpipes)) == NULL)) {
return;
}
- p->txlen[0] = 0;
- p->txlen[1] = 'S';
- p->txlen[2] = 'P';
- p->txlen[3] = 0;
- NNI_PUT16(&p->txlen[4], p->proto);
- NNI_PUT16(&p->txlen[6], 0);
-
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- iov.iov_len = 8;
- iov.iov_buf = &p->txlen[0];
- nni_aio_set_iov(p->negoaio, 1, &iov);
- nng_stream_send(p->conn, p->negoaio);
- nni_mtx_unlock(&ep->mtx);
+ nni_list_remove(&ep->waitpipes, p);
+ nni_list_append(&ep->busypipes, p);
+ ep->useraio = NULL;
+ p->rcvmax = ep->rcvmax;
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
@@ -263,11 +219,6 @@ tcptran_pipe_nego_cb(void *arg)
int rv;
nni_mtx_lock(&ep->mtx);
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&ep->mtx);
- tcptran_pipe_reap(p);
- return;
- }
if ((rv = nni_aio_result(aio)) != 0) {
goto error;
@@ -309,18 +260,33 @@ tcptran_pipe_nego_cb(void *arg)
}
NNI_GET16(&p->rxlen[4], p->peer);
- p->useraio = NULL;
+ // We are all ready now. We put this in the wait list, and
+ // then try to run the matcher.
+ nni_list_remove(&ep->negopipes, p);
+ nni_list_append(&ep->waitpipes, p);
+
+ tcptran_ep_match(ep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_set_output(uaio, 0, p);
- nni_aio_finish(uaio, 0, 0);
return;
error:
- p->useraio = NULL;
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+
+ nng_stream_close(p->conn);
+
+ // If we are waiting to negotiate on a client side, then a failure
+ // here has to be passed to the user app.
+ if ((ep->dialer != NULL) && ((uaio = ep->useraio) != NULL)) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(uaio, rv);
+ }
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
tcptran_pipe_reap(p);
}
@@ -338,6 +304,7 @@ tcptran_pipe_send_cb(void *arg)
aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally we do not queue up another transfer.
// There's an excellent chance that the pipe is no longer
// usable, with a partial transfer.
@@ -360,10 +327,11 @@ tcptran_pipe_send_cb(void *arg)
nni_aio_list_remove(aio);
tcptran_pipe_send_start(p);
- nni_mtx_unlock(&p->mtx);
-
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
+ nni_pipe_bump_tx(p->npipe, n);
+ nni_mtx_unlock(&p->mtx);
+
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
nni_aio_finish_synch(aio, 0, n);
@@ -431,17 +399,21 @@ tcptran_pipe_recv_cb(void *arg)
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ n = nni_msg_len(msg);
+
+ nni_pipe_bump_rx(p->npipe, n);
tcptran_pipe_recv_start(p);
nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, msg);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ nni_aio_finish_synch(aio, 0, n);
return;
recv_error:
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally, we do not queue up another receive.
// The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
@@ -634,19 +606,55 @@ tcptran_pipe_getopt(
}
static void
+tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep)
+{
+ nni_iov iov;
+
+ ep->refcnt++;
+
+ p->conn = conn;
+ p->ep = ep;
+ p->proto = ep->proto;
+
+ p->txlen[0] = 0;
+ p->txlen[1] = 'S';
+ p->txlen[2] = 'P';
+ p->txlen[3] = 0;
+ NNI_PUT16(&p->txlen[4], p->proto);
+ NNI_PUT16(&p->txlen[6], 0);
+
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txlen[0];
+ nni_aio_set_iov(p->negoaio, 1, &iov);
+ nni_list_append(&ep->negopipes, p);
+
+ nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
+ nng_stream_send(p->conn, p->negoaio);
+}
+
+static void
tcptran_ep_fini(void *arg)
{
tcptran_ep *ep = arg;
nni_mtx_lock(&ep->mtx);
ep->fini = true;
- if (!nni_list_empty(&ep->pipes)) {
+ if (ep->refcnt != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_stop(ep->timeaio);
+ nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_fini(ep->timeaio);
+ nni_aio_fini(ep->connaio);
+
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -658,21 +666,29 @@ tcptran_ep_close(void *arg)
tcptran_pipe *p;
nni_mtx_lock(&ep->mtx);
- NNI_LIST_FOREACH (&ep->pipes, p) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- nni_aio_close(p->txaio);
- nni_aio_close(p->rxaio);
- if (p->conn != NULL) {
- nng_stream_close(p->conn);
- }
- }
+
+ ep->closed = true;
+ nni_aio_close(ep->timeaio);
if (ep->dialer != NULL) {
nng_stream_dialer_close(ep->dialer);
}
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ tcptran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ tcptran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
+ tcptran_pipe_close(p);
+ }
+ if (ep->useraio != NULL) {
+ nni_aio_finish_error(ep->useraio, NNG_ECLOSED);
+ ep->useraio = NULL;
+ }
+
nni_mtx_unlock(&ep->mtx);
}
@@ -735,6 +751,130 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl)
return (rv);
}
+static void
+tcptran_timer_cb(void *arg)
+{
+ tcptran_ep *ep = arg;
+ if (nni_aio_result(ep->timeaio) == 0) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+}
+
+static void
+tcptran_accept_cb(void *arg)
+{
+ tcptran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ tcptran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ nni_mtx_lock(&ep->mtx);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = tcptran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+
+ if (ep->closed) {
+ tcptran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ tcptran_pipe_start(p, conn, ep);
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ nni_listener_bump_error(ep->nlistener, rv);
+ switch (rv) {
+
+ case NNG_ENOMEM:
+ nng_sleep_aio(10, ep->timeaio);
+ break;
+
+ default:
+ if (!ep->closed) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+ break;
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+tcptran_dial_cb(void *arg)
+{
+ tcptran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ tcptran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = tcptran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+ nni_mtx_lock(&ep->mtx);
+ if (ep->closed) {
+ tcptran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ } else {
+ tcptran_pipe_start(p, conn, ep);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ // Error connecting. We need to pass this straight back
+ // to the user.
+ nni_dialer_bump_error(ep->ndialer, rv);
+ nni_mtx_lock(&ep->mtx);
+ if ((aio = ep->useraio) != NULL) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+}
+
+static int
+tcptran_ep_init(tcptran_ep **epp, nng_url *url, nni_sock *sock)
+{
+ tcptran_ep *ep;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&ep->mtx);
+ NNI_LIST_INIT(&ep->busypipes, tcptran_pipe, node);
+ NNI_LIST_INIT(&ep->waitpipes, tcptran_pipe, node);
+ NNI_LIST_INIT(&ep->negopipes, tcptran_pipe, node);
+
+ ep->proto = nni_sock_proto_id(sock);
+ ep->url = url;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+
+ *epp = ep;
+ return (0);
+}
+
static int
tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
{
@@ -758,17 +898,13 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
return (NNG_EADDRINVAL);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = tcptran_ep_init(&ep, url, sock)) != 0) {
+ return (rv);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, tcptran_pipe, node);
-
- ep->proto = nni_sock_proto_id(sock);
- ep->url = url;
ep->ndialer = ndialer;
if ((rv != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, tcptran_dial_cb, ep)) != 0) ||
((rv = nng_stream_dialer_alloc_url(&ep->dialer, &myurl)) != 0)) {
tcptran_ep_fini(ep);
return (rv);
@@ -779,9 +915,12 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
tcptran_ep_fini(ep);
return (rv);
}
+
+ nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
*dp = ep;
return (0);
}
+
static int
tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
{
@@ -798,48 +937,71 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
return (NNG_EADDRINVAL);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = tcptran_ep_init(&ep, url, sock)) != 0) {
+ return (rv);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, tcptran_pipe, node);
- ep->proto = nni_sock_proto_id(sock);
- ep->url = url;
ep->nlistener = nlistener;
- if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) {
+ if (((rv = nni_aio_init(&ep->connaio, tcptran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep)) != 0) ||
+ ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
tcptran_ep_fini(ep);
return (rv);
}
+ nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
*lp = ep;
return (0);
}
static void
+tcptran_ep_cancel(nni_aio *aio, void *arg, int rv)
+{
+ tcptran_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (ep->useraio == aio) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ if (ep->ndialer) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
tcptran_ep_connect(void *arg, nni_aio *aio)
{
- tcptran_ep * ep = arg;
- tcptran_pipe *p;
- int rv;
+ tcptran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_dialer_bump_error(ep->ndialer, NNG_ECLOSED);
+ return;
+ }
+ if (ep->useraio != NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_dialer_bump_error(ep->ndialer, NNG_EBUSY);
return;
}
- if ((rv = nni_aio_schedule(aio, tcptran_pipe_conn_cancel, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcptran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
+ nni_dialer_bump_error(ep->ndialer, rv);
nni_aio_finish_error(aio, rv);
- tcptran_pipe_reap(p);
return;
}
- p->useraio = aio;
- nng_stream_dialer_dial(ep->dialer, p->connaio);
+ ep->useraio = aio;
+
+ nng_stream_dialer_dial(ep->dialer, ep->connaio);
nni_mtx_unlock(&ep->mtx);
}
@@ -881,14 +1043,20 @@ tcptran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
tcptran_ep *ep = arg;
size_t val;
int rv;
- if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) &&
- (ep != NULL)) {
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
tcptran_pipe *p;
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
- NNI_LIST_FOREACH (&ep->pipes, p) {
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
p->rcvmax = val;
}
+ nni_stat_set_value(&ep->st_rcvmaxsz, val);
nni_mtx_unlock(&ep->mtx);
}
return (rv);
@@ -902,6 +1070,9 @@ tcptran_ep_bind(void *arg)
nni_mtx_lock(&ep->mtx);
rv = nng_stream_listener_listen(ep->listener);
+ if (rv != 0) {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
nni_mtx_unlock(&ep->mtx);
return (rv);
@@ -910,29 +1081,38 @@ tcptran_ep_bind(void *arg)
static void
tcptran_ep_accept(void *arg, nni_aio *aio)
{
- tcptran_ep * ep = arg;
- tcptran_pipe *p;
- int rv;
+ tcptran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_listener_bump_error(ep->nlistener, NNG_ECLOSED);
+ return;
+ }
+ if (ep->useraio != NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_listener_bump_error(ep->nlistener, NNG_EBUSY);
return;
}
- if ((rv = nni_aio_schedule(aio, tcptran_pipe_conn_cancel, p)) != 0) {
- nni_list_remove(&ep->pipes, p);
- p->ep = NULL;
+ if ((rv = nni_aio_schedule(aio, tcptran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
- tcptran_pipe_reap(p);
+ nni_listener_bump_error(ep->nlistener, rv);
return;
}
- p->useraio = aio;
- nng_stream_listener_accept(ep->listener, p->connaio);
+ ep->useraio = aio;
+ if (!ep->started) {
+ ep->started = true;
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ } else {
+ tcptran_ep_match(ep);
+ }
nni_mtx_unlock(&ep->mtx);
}