aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-02-18 20:15:16 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-23 11:50:17 -0800
commit64e784237d143aa032311942bc44abd22e1e4114 (patch)
treee3ea529d5e5adfd022773ab207622cc2247f057c /src
parentd210ef96517c1462bc058c95bced8c27b5e19c4f (diff)
downloadnng-64e784237d143aa032311942bc44abd22e1e4114.tar.gz
nng-64e784237d143aa032311942bc44abd22e1e4114.tar.bz2
nng-64e784237d143aa032311942bc44abd22e1e4114.zip
fixes #848 server hang waiting for client handshake
fixes #698 Need TCP stats fixes #699 Need IPC stats fixes #701 Need TLS stats This commit addresses a problem when negotiating using one of the stream based negotiation APIs -- a slow or misbehaving peer can prevent well behaved ones from establishing a connection. The fix is a fairly significant change in how these transports link up, and it does rely on the fact that the socket only has a single accept() or connect() pending at a time (on a given endpoint that is). While here, we have completely revamped the way transport statistics are done, offering a standard API for collecting these statistics. Unfortunately, this completely borks the statistics for inproc. As we are planning to change the way inproc works soon, in order to provide more control and work on performance fixes for the message queue, we feel this is an acceptable trade-off. Furthermore, almost nobody uses inproc for anything, and even fewer people are making use of the statistics at this time.
Diffstat (limited to 'src')
-rw-r--r--src/core/dialer.c76
-rw-r--r--src/core/dialer.h3
-rw-r--r--src/core/listener.c69
-rw-r--r--src/core/listener.h1
-rw-r--r--src/core/pipe.c38
-rw-r--r--src/core/pipe.h8
-rw-r--r--src/core/socket.c30
-rw-r--r--src/core/sockimpl.h25
-rw-r--r--src/supplemental/tcp/tcp.c6
-rw-r--r--src/transport/inproc/inproc.c186
-rw-r--r--src/transport/ipc/ipc.c476
-rw-r--r--src/transport/tcp/tcp.c444
-rw-r--r--src/transport/tls/tls.c441
13 files changed, 1182 insertions, 621 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index f0589275..ffbef8a7 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -94,31 +94,69 @@ dialer_stats_init(nni_dialer *d)
nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
nni_stat_append(root, &st->s_npipes);
- nni_stat_init_atomic(&st->s_connok, "connok", "connections made");
- nni_stat_append(root, &st->s_connok);
-
nni_stat_init_atomic(
- &st->s_canceled, "canceled", "connections canceled");
- nni_stat_append(root, &st->s_canceled);
+ &st->s_connok, "connect", "connections established");
+ nni_stat_append(root, &st->s_connok);
nni_stat_init_atomic(&st->s_refused, "refused", "connections refused");
nni_stat_append(root, &st->s_refused);
- nni_stat_init_atomic(
- &st->s_timedout, "timedout", "connections timed out");
- nni_stat_append(root, &st->s_timedout);
+ nni_stat_init_atomic(&st->s_discon, "discon", "remote disconnects");
+ nni_stat_append(root, &st->s_discon);
- nni_stat_init_atomic(
- &st->s_othererr, "othererr", "other connection errors");
+ nni_stat_init_atomic(&st->s_canceled, "canceled", "canceled");
+ nni_stat_append(root, &st->s_canceled);
+
+ nni_stat_init_atomic(&st->s_othererr, "othererr", "other errors");
nni_stat_append(root, &st->s_othererr);
- nni_stat_init_atomic(
- &st->s_protorej, "protoreject", "pipes rejected by protocol");
- nni_stat_append(root, &st->s_protorej);
+ nni_stat_init_atomic(&st->s_etimedout, "timedout", "timed out");
+ nni_stat_append(root, &st->s_etimedout);
- nni_stat_init_atomic(
- &st->s_apprej, "appreject", "pipes rejected by application");
- nni_stat_append(root, &st->s_apprej);
+ nni_stat_init_atomic(&st->s_eproto, "protoerr", "protcol errors");
+ nni_stat_append(root, &st->s_eproto);
+
+ nni_stat_init_atomic(&st->s_eauth, "autherr", "auth errors");
+ nni_stat_append(root, &st->s_eauth);
+
+ nni_stat_init_atomic(&st->s_enomem, "nomem", "out of memory");
+ nni_stat_append(root, &st->s_enomem);
+
+ nni_stat_init_atomic(&st->s_reject, "reject", "pipes rejected");
+ nni_stat_append(root, &st->s_reject);
+}
+
+void
+nni_dialer_bump_error(nni_dialer *d, int err)
+{
+ switch (err) {
+ case NNG_ECONNABORTED:
+ case NNG_ECONNRESET:
+ BUMPSTAT(&d->d_stats.s_discon);
+ break;
+ case NNG_ECONNREFUSED:
+ BUMPSTAT(&d->d_stats.s_refused);
+ break;
+ case NNG_ECANCELED:
+ BUMPSTAT(&d->d_stats.s_canceled);
+ break;
+ case NNG_ETIMEDOUT:
+ BUMPSTAT(&d->d_stats.s_etimedout);
+ break;
+ case NNG_EPROTO:
+ BUMPSTAT(&d->d_stats.s_eproto);
+ break;
+ case NNG_EPEERAUTH:
+ case NNG_ECRYPTO:
+ BUMPSTAT(&d->d_stats.s_eauth);
+ break;
+ case NNG_ENOMEM:
+ BUMPSTAT(&d->d_stats.s_enomem);
+ break;
+ default:
+ BUMPSTAT(&d->d_stats.s_othererr);
+ break;
+ }
}
int
@@ -304,10 +342,8 @@ dialer_connect_cb(void *arg)
break;
case NNG_ECLOSED: // No further action.
case NNG_ECANCELED: // No further action.
- BUMPSTAT(&d->d_stats.s_canceled);
break;
case NNG_ECONNREFUSED:
- BUMPSTAT(&d->d_stats.s_refused);
if (uaio == NULL) {
nni_dialer_timer_start(d);
} else {
@@ -316,7 +352,6 @@ dialer_connect_cb(void *arg)
break;
case NNG_ETIMEDOUT:
- BUMPSTAT(&d->d_stats.s_timedout);
if (uaio == NULL) {
nni_dialer_timer_start(d);
} else {
@@ -325,7 +360,6 @@ dialer_connect_cb(void *arg)
break;
default:
- BUMPSTAT(&d->d_stats.s_othererr);
if (uaio == NULL) {
nni_dialer_timer_start(d);
} else {
diff --git a/src/core/dialer.h b/src/core/dialer.h
index af715955..200ad01d 100644
--- a/src/core/dialer.h
+++ b/src/core/dialer.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -28,5 +28,6 @@ extern int nni_dialer_setopt(
extern int nni_dialer_getopt(
nni_dialer *, const char *, void *, size_t *, nni_type);
extern void nni_dialer_add_stat(nni_dialer *, nni_stat_item *);
+extern void nni_dialer_bump_error(nni_dialer *, int);
#endif // CORE_DIALER_H
diff --git a/src/core/listener.c b/src/core/listener.c
index df6dab11..58758cf3 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -98,27 +98,59 @@ listener_stats_init(nni_listener *l)
nni_stat_init_atomic(&st->s_accept, "accept", "connections accepted");
nni_stat_append(root, &st->s_accept);
- nni_stat_init_atomic(
- &st->s_aborted, "aborted", "accepts aborted remotely");
- nni_stat_append(root, &st->s_aborted);
+ nni_stat_init_atomic(&st->s_discon, "discon", "remote disconnects");
+ nni_stat_append(root, &st->s_discon);
- nni_stat_init_atomic(&st->s_timedout, "timedout", "accepts timed out");
- nni_stat_append(root, &st->s_timedout);
-
- nni_stat_init_atomic(&st->s_canceled, "canceled", "accepts canceled");
+ nni_stat_init_atomic(&st->s_canceled, "canceled", "canceled");
nni_stat_append(root, &st->s_canceled);
- nni_stat_init_atomic(
- &st->s_othererr, "othererr", "other accept errors");
+ nni_stat_init_atomic(&st->s_othererr, "othererr", "other errors");
nni_stat_append(root, &st->s_othererr);
- nni_stat_init_atomic(
- &st->s_protorej, "protoreject", "pipes rejected by protocol");
- nni_stat_append(root, &st->s_protorej);
+ nni_stat_init_atomic(&st->s_etimedout, "timedout", "timed out");
+ nni_stat_append(root, &st->s_etimedout);
+
+ nni_stat_init_atomic(&st->s_eproto, "protoerr", "protcol errors");
+ nni_stat_append(root, &st->s_eproto);
+
+ nni_stat_init_atomic(&st->s_eauth, "autherr", "auth errors");
+ nni_stat_append(root, &st->s_eauth);
- nni_stat_init_atomic(
- &st->s_apprej, "appreject", "pipes rejected by application");
- nni_stat_append(root, &st->s_apprej);
+ nni_stat_init_atomic(&st->s_enomem, "nomem", "out of memory");
+ nni_stat_append(root, &st->s_enomem);
+
+ nni_stat_init_atomic(&st->s_reject, "reject", "pipes rejected");
+ nni_stat_append(root, &st->s_reject);
+}
+
+void
+nni_listener_bump_error(nni_listener *l, int err)
+{
+ switch (err) {
+ case NNG_ECONNABORTED:
+ case NNG_ECONNRESET:
+ BUMPSTAT(&l->l_stats.s_discon);
+ break;
+ case NNG_ECANCELED:
+ BUMPSTAT(&l->l_stats.s_canceled);
+ break;
+ case NNG_ETIMEDOUT:
+ BUMPSTAT(&l->l_stats.s_etimedout);
+ break;
+ case NNG_EPROTO:
+ BUMPSTAT(&l->l_stats.s_eproto);
+ break;
+ case NNG_EPEERAUTH:
+ case NNG_ECRYPTO:
+ BUMPSTAT(&l->l_stats.s_eauth);
+ break;
+ case NNG_ENOMEM:
+ BUMPSTAT(&l->l_stats.s_enomem);
+ break;
+ default:
+ BUMPSTAT(&l->l_stats.s_othererr);
+ break;
+ }
}
int
@@ -298,24 +330,19 @@ listener_accept_cb(void *arg)
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
case NNG_ECONNRESET: // remote condition, no cooldown
- BUMPSTAT(&l->l_stats.s_aborted);
listener_accept_start(l);
break;
case NNG_ETIMEDOUT:
// No need to sleep since we timed out already.
- BUMPSTAT(&l->l_stats.s_timedout);
listener_accept_start(l);
break;
case NNG_EPEERAUTH: // peer validation failure
- BUMPSTAT(&l->l_stats.s_othererr);
listener_accept_start(l);
break;
case NNG_ECLOSED: // no further action
case NNG_ECANCELED: // no further action
- BUMPSTAT(&l->l_stats.s_canceled);
break;
default:
- BUMPSTAT(&l->l_stats.s_othererr);
// We don't really know why we failed, but we backoff
// here. This is because errors here are probably due
// to system failures (resource exhaustion) and we hope
diff --git a/src/core/listener.h b/src/core/listener.h
index 12974880..ef87a930 100644
--- a/src/core/listener.h
+++ b/src/core/listener.h
@@ -28,5 +28,6 @@ extern int nni_listener_setopt(
extern int nni_listener_getopt(
nni_listener *, const char *, void *, size_t *, nni_type);
extern void nni_listener_add_stat(nni_listener *, nni_stat_item *);
+extern void nni_listener_bump_error(nni_listener *, int);
#endif // CORE_LISTENER_H
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 6ad62fa4..44957def 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -228,6 +228,18 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_stat_init_id(&st->s_sock_id, "socket", "socket for pipe",
nni_sock_id(p->p_sock));
nni_stat_append(&st->s_root, &st->s_sock_id);
+ nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
+ nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(&st->s_root, &st->s_rxmsgs);
+ nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
+ nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
+ nni_stat_append(&st->s_root, &st->s_txmsgs);
+ nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
+ nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
+ nni_stat_append(&st->s_root, &st->s_rxbytes);
+ nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
+ nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
+ nni_stat_append(&st->s_root, &st->s_txbytes);
if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tdata, p)) != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
@@ -337,3 +349,27 @@ nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
{
nni_stat_append(&p->p_stats.s_root, item);
}
+
+void
+nni_pipe_bump_rx(nni_pipe *p, size_t nbytes)
+{
+ nni_stat_inc_atomic(&p->p_stats.s_rxbytes, nbytes);
+ nni_stat_inc_atomic(&p->p_stats.s_rxmsgs, 1);
+}
+
+void
+nni_pipe_bump_tx(nni_pipe *p, size_t nbytes)
+{
+ nni_stat_inc_atomic(&p->p_stats.s_txbytes, nbytes);
+ nni_stat_inc_atomic(&p->p_stats.s_txmsgs, 1);
+}
+
+void
+nni_pipe_bump_error(nni_pipe *p, int err)
+{
+ if (p->p_dialer != NULL) {
+ nni_dialer_bump_error(p->p_dialer, err);
+ } else {
+ nni_listener_bump_error(p->p_listener, err);
+ }
+} \ No newline at end of file
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 5a83059f..20d34e82 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -63,6 +63,10 @@ extern uint32_t nni_pipe_dialer_id(nni_pipe *);
extern void nni_pipe_rele(nni_pipe *);
// nni_pipe_add_stat adds a statistic to the pipe
-extern void nni_pipe_add_stat(nni_pipe *p, nni_stat_item *);
+extern void nni_pipe_add_stat(nni_pipe *, nni_stat_item *);
+
+extern void nni_pipe_bump_rx(nni_pipe *, size_t);
+extern void nni_pipe_bump_tx(nni_pipe *, size_t);
+extern void nni_pipe_bump_error(nni_pipe *, int);
#endif // CORE_PIPE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 387796a6..dadc9073 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -58,8 +58,7 @@ typedef struct sock_stats {
nni_stat_item s_txbytes; // number of bytes received
nni_stat_item s_rxmsgs; // number of msgs received
nni_stat_item s_txmsgs; // number of msgs sent
- nni_stat_item s_protorej; // pipes rejected by protocol
- nni_stat_item s_apprej; // pipes rejected by application
+ nni_stat_item s_reject; // pipes rejected
} sock_stats;
struct nni_socket {
@@ -465,13 +464,8 @@ sock_stats_init(nni_sock *s)
nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
nni_stat_append(root, &st->s_txmsgs);
- nni_stat_init_atomic(
- &st->s_protorej, "protoreject", "pipes rejected by protocol");
- nni_stat_append(root, &st->s_protorej);
-
- nni_stat_init_atomic(
- &st->s_apprej, "appreject", "pipes rejected by application");
- nni_stat_append(root, &st->s_apprej);
+ nni_stat_init_atomic(&st->s_reject, "reject", "pipes rejected");
+ nni_stat_append(root, &st->s_reject);
#else
NNI_ARG_UNUSED(s);
#endif
@@ -1441,15 +1435,15 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
nni_mtx_lock(&s->s_mx);
if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&d->d_stats.s_apprej, 1);
- nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&d->d_stats.s_reject, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_rele(p);
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&d->d_stats.s_protorej, 1);
- nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&d->d_stats.s_reject, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
@@ -1550,15 +1544,15 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_mtx_lock(&s->s_mx);
if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&l->l_stats.s_apprej, 1);
- nni_stat_inc_atomic(&s->s_stats.s_apprej, 1);
+ nni_stat_inc_atomic(&l->l_stats.s_reject, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_rele(p);
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&l->l_stats.s_protorej, 1);
- nni_stat_inc_atomic(&s->s_stats.s_protorej, 1);
+ nni_stat_inc_atomic(&l->l_stats.s_reject, 1);
+ nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
nni_pipe_close(p);
nni_pipe_rele(p);
return;
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 5a9a2589..ffe6c6e8 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -23,11 +23,14 @@ typedef struct nni_dialer_stats {
nni_stat_item s_npipes;
nni_stat_item s_connok;
nni_stat_item s_refused;
+ nni_stat_item s_discon;
nni_stat_item s_canceled;
- nni_stat_item s_timedout;
nni_stat_item s_othererr;
- nni_stat_item s_protorej;
- nni_stat_item s_apprej;
+ nni_stat_item s_etimedout;
+ nni_stat_item s_eproto; // protocol error
+ nni_stat_item s_eauth;
+ nni_stat_item s_enomem;
+ nni_stat_item s_reject;
char s_scope[24]; // scope name for stats
} nni_dialer_stats;
@@ -64,12 +67,14 @@ typedef struct nni_listener_stats {
nni_stat_item s_url;
nni_stat_item s_npipes;
nni_stat_item s_accept;
- nni_stat_item s_aborted; // aborted remotely
- nni_stat_item s_timedout;
+ nni_stat_item s_discon; // aborted remotely
nni_stat_item s_canceled;
nni_stat_item s_othererr;
- nni_stat_item s_protorej;
- nni_stat_item s_apprej;
+ nni_stat_item s_etimedout;
+ nni_stat_item s_eproto; // protocol error
+ nni_stat_item s_eauth;
+ nni_stat_item s_enomem;
+ nni_stat_item s_reject;
char s_scope[24]; // scope name for stats
} nni_listener_stats;
@@ -97,6 +102,10 @@ typedef struct nni_pipe_stats {
nni_stat_item s_id;
nni_stat_item s_ep_id;
nni_stat_item s_sock_id;
+ nni_stat_item s_rxmsgs;
+ nni_stat_item s_txmsgs;
+ nni_stat_item s_rxbytes;
+ nni_stat_item s_txbytes;
char s_scope[16]; // scope name for stats ("pipe" is short)
} nni_pipe_stats;
diff --git a/src/supplemental/tcp/tcp.c b/src/supplemental/tcp/tcp.c
index 21922ef6..78a6d7e0 100644
--- a/src/supplemental/tcp/tcp.c
+++ b/src/supplemental/tcp/tcp.c
@@ -468,6 +468,12 @@ static const nni_chkoption tcp_chkopts[] = {
.o_check = tcp_check_bool,
},
{
+ .o_name = NNG_OPT_TCP_BOUND_PORT,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ },
+ {
.o_name = NULL,
},
};
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 40ab9107..6618d794 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -30,24 +30,14 @@ typedef struct {
// inproc_pipe represents one half of a connection.
struct inproc_pipe {
- const char * addr;
- inproc_pair * pair;
- nni_msgq * rq;
- nni_msgq * wq;
- nni_pipe * npipe;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
- nni_stat_item st_rxbytes;
- nni_stat_item st_txbytes;
- nni_stat_item st_rxmsgs;
- nni_stat_item st_txmsgs;
- nni_stat_item st_rxdiscards;
- nni_stat_item st_txdiscards;
- nni_stat_item st_rxerrs;
- nni_stat_item st_txerrs;
- nni_stat_item st_rxoversize;
- nni_stat_item st_rcvmaxsz;
+ const char * addr;
+ inproc_pair *pair;
+ nni_msgq * rq;
+ nni_msgq * wq;
+ nni_pipe * npipe;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
};
// inproc_pair represents a pair of pipes. Because we control both
@@ -135,120 +125,12 @@ inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep)
return (0);
}
-#ifdef NNG_ENABLE_STATS
-static void
-inproc_get_rxbytes(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq));
-}
-
-static void
-inproc_get_rxmsgs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq));
-}
-
-static void
-inproc_get_txbytes(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq));
-}
-
-static void
-inproc_get_txmsgs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq));
-}
-
-static void
-inproc_get_discards(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_discards(mq));
-}
-
-static void
-inproc_get_txerrs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_put_errs(mq));
-}
-
-static void
-inproc_get_rxerrs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_get_errs(mq));
-}
-#else
-#undef nni_stat_set_update
-#define nni_stat_set_update(p, x, f)
-#endif
-
static int
inproc_pipe_init(void *arg, nni_pipe *p)
{
inproc_pipe *pipe = arg;
pipe->npipe = p;
- nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)");
- nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES);
- nni_pipe_add_stat(p, &pipe->st_rxbytes);
-
- nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)");
- nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq);
- nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES);
- nni_pipe_add_stat(p, &pipe->st_txbytes);
-
- nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received");
- nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxmsgs);
-
- nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent");
- nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq);
- nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_txmsgs);
-
- nni_stat_init(
- &pipe->st_rxdiscards, "rxdiscards", "receives discarded");
- nni_stat_set_update(
- &pipe->st_rxdiscards, inproc_get_discards, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxdiscards);
-
- nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded");
- nni_stat_set_update(
- &pipe->st_txdiscards, inproc_get_discards, pipe->wq);
- nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_txdiscards);
-
- nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors");
- nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxerrs);
-
- nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors");
- nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq);
- nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_txerrs);
-
- nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize",
- "oversize msgs received (dropped)");
- nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxoversize);
-
- nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
- nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
- nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
- nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax);
- nni_pipe_add_stat(p, &pipe->st_rcvmaxsz);
-
return (0);
}
@@ -409,6 +291,11 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe)
nni_aio_set_output(aio, 0, pipe);
nni_aio_finish(aio, 0, 0);
} else {
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
NNI_ASSERT(pipe == NULL);
nni_aio_finish_error(aio, rv);
}
@@ -419,7 +306,7 @@ inproc_filter(void *arg, nni_msg *msg)
{
inproc_pipe *p = arg;
if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) {
- nni_stat_inc_atomic(&p->st_rxoversize, 1);
+ nni_pipe_bump_error(p->npipe, NNG_EMSGSIZE);
nni_msg_free(msg);
return (NULL);
}
@@ -535,6 +422,11 @@ inproc_ep_cancel(nni_aio *aio, void *arg, int rv)
nni_list_node_remove(&ep->node);
nni_aio_finish_error(aio, rv);
}
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
nni_mtx_unlock(&nni_inproc.mx);
}
@@ -559,6 +451,7 @@ inproc_ep_connect(void *arg, nni_aio *aio)
}
if (server == NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_dialer_bump_error(ep->ndialer, NNG_ECONNREFUSED);
nni_aio_finish_error(aio, NNG_ECONNREFUSED);
return;
}
@@ -568,6 +461,7 @@ inproc_ep_connect(void *arg, nni_aio *aio)
// that in the upper API.
if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_dialer_bump_error(ep->ndialer, rv);
nni_aio_finish_error(aio, rv);
return;
}
@@ -590,6 +484,7 @@ inproc_ep_bind(void *arg)
NNI_LIST_FOREACH (list, srch) {
if (strcmp(srch->addr, ep->addr) == 0) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_listener_bump_error(ep->nlistener, NNG_EADDRINUSE);
return (NNG_EADDRINUSE);
}
}
@@ -614,6 +509,7 @@ inproc_ep_accept(void *arg, nni_aio *aio)
// accept was tried -- there is no API to do such a thing.
if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_listener_bump_error(ep->nlistener, rv);
nni_aio_finish_error(aio, rv);
return;
}
@@ -642,8 +538,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
inproc_ep *ep = arg;
size_t val;
int rv;
- if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) &&
- (ep != NULL)) {
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
nni_stat_set_value(&ep->st_rcvmaxsz, val);
@@ -728,6 +623,36 @@ inproc_ep_setopt(
return (nni_setopt(inproc_ep_options, name, arg, v, sz, t));
}
+static int
+inproc_check_recvmaxsz(const void *v, size_t sz, nni_type t)
+{
+ return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
+}
+
+static nni_chkoption inproc_checkopts[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_check = inproc_check_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+inproc_checkopt(const char *name, const void *buf, size_t sz, nni_type t)
+{
+ int rv;
+ rv = nni_chkopt(inproc_checkopts, name, buf, sz, t);
+ return (rv);
+}
+
static nni_tran_dialer_ops inproc_dialer_ops = {
.d_init = inproc_dialer_init,
.d_fini = inproc_ep_fini,
@@ -757,6 +682,7 @@ struct nni_tran nni_inproc_tran = {
.tran_pipe = &inproc_pipe_ops,
.tran_init = inproc_init,
.tran_fini = inproc_fini,
+ .tran_checkopt = inproc_checkopt,
};
int
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index e0d83be0..0d8f12ae 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -38,23 +38,20 @@ struct ipctran_pipe {
nni_list_node node;
nni_atomic_flag reaped;
nni_reap_item reap;
-
- uint8_t txhead[1 + sizeof(uint64_t)];
- uint8_t rxhead[1 + sizeof(uint64_t)];
- size_t gottxhead;
- size_t gotrxhead;
- size_t wanttxhead;
- size_t wantrxhead;
-
- nni_list recvq;
- nni_list sendq;
- nni_aio *useraio;
- nni_aio *txaio;
- nni_aio *rxaio;
- nni_aio *negoaio;
- nni_aio *connaio;
- nni_msg *rxmsg;
- nni_mtx mtx;
+ uint8_t txhead[1 + sizeof(uint64_t)];
+ uint8_t rxhead[1 + sizeof(uint64_t)];
+ size_t gottxhead;
+ size_t gotrxhead;
+ size_t wanttxhead;
+ size_t wantrxhead;
+ nni_list recvq;
+ nni_list sendq;
+ nni_aio * useraio;
+ nni_aio * txaio;
+ nni_aio * rxaio;
+ nni_aio * negoaio;
+ nni_msg * rxmsg;
+ nni_mtx mtx;
};
struct ipctran_ep {
@@ -62,13 +59,22 @@ struct ipctran_ep {
nni_sockaddr sa;
size_t rcvmax;
uint16_t proto;
- nni_list pipes;
+ bool started;
+ bool closed;
bool fini;
+ int refcnt;
nng_stream_dialer * dialer;
nng_stream_listener *listener;
+ nni_aio * useraio;
+ nni_aio * connaio;
+ nni_aio * timeaio;
+ nni_list busypipes; // busy pipes -- ones passed to socket
+ nni_list waitpipes; // pipes waiting to match to socket
+ nni_list negopipes; // pipes busy negotiating
nni_reap_item reap;
nni_dialer * ndialer;
nni_listener * nlistener;
+ nni_stat_item st_rcvmaxsz;
};
static void ipctran_pipe_send_start(ipctran_pipe *);
@@ -76,7 +82,6 @@ static void ipctran_pipe_recv_start(ipctran_pipe *);
static void ipctran_pipe_send_cb(void *);
static void ipctran_pipe_recv_cb(void *);
static void ipctran_pipe_nego_cb(void *);
-static void ipctran_pipe_conn_cb(void *);
static void ipctran_ep_fini(void *);
static int
@@ -102,7 +107,6 @@ ipctran_pipe_close(void *arg)
nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
nng_stream_close(p->conn);
}
@@ -115,7 +119,6 @@ ipctran_pipe_stop(void *arg)
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
- nni_aio_stop(p->connaio);
}
static int
@@ -135,8 +138,9 @@ ipctran_pipe_fini(void *arg)
ipctran_pipe_stop(p);
if ((ep = p->ep) != NULL) {
nni_mtx_lock(&ep->mtx);
- nni_list_remove(&ep->pipes, p);
- if (ep->fini && nni_list_empty(&ep->pipes)) {
+ nni_list_node_remove(&p->node);
+ ep->refcnt--;
+ if (ep->fini && (ep->refcnt == 0)) {
nni_reap(&ep->reap, ipctran_ep_fini, ep);
}
nni_mtx_unlock(&ep->mtx);
@@ -144,7 +148,6 @@ ipctran_pipe_fini(void *arg)
nni_aio_fini(p->rxaio);
nni_aio_fini(p->txaio);
nni_aio_fini(p->negoaio);
- nni_aio_fini(p->connaio);
nng_stream_free(p->conn);
if (p->rxmsg) {
nni_msg_free(p->rxmsg);
@@ -165,7 +168,7 @@ ipctran_pipe_reap(ipctran_pipe *p)
}
static int
-ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep)
+ipctran_pipe_alloc(ipctran_pipe **pipep)
{
ipctran_pipe *p;
int rv;
@@ -176,7 +179,6 @@ ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep)
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->connaio, ipctran_pipe_conn_cb, p)) != 0) ||
((rv = nni_aio_init(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) {
ipctran_pipe_fini(p);
return (rv);
@@ -184,81 +186,38 @@ ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep)
nni_aio_list_init(&p->sendq);
nni_aio_list_init(&p->recvq);
nni_atomic_flag_reset(&p->reaped);
- nni_list_append(&ep->pipes, p);
-
- // 5 seconds each for connection and negotiation; should be more than
- // sufficient.
- nni_aio_set_timeout(p->connaio, 5000);
- nni_aio_set_timeout(p->negoaio, 5000);
-
- p->proto = ep->proto;
- p->rcvmax = ep->rcvmax;
- p->ep = ep;
-
*pipep = p;
return (0);
}
static void
-ipctran_pipe_conn_cb(void *arg)
+ipctran_ep_match(ipctran_ep *ep)
{
- ipctran_pipe *p = arg;
- ipctran_ep * ep = p->ep;
- nni_aio * aio = p->connaio;
- nni_aio * uaio;
- nni_iov iov;
- int rv;
+ nni_aio * aio;
+ ipctran_pipe *p;
- nni_mtx_lock(&ep->mtx);
- uaio = p->useraio;
- if ((rv = nni_aio_result(aio)) == 0) {
- p->conn = nni_aio_get_output(aio, 0);
- }
- if (uaio == NULL) {
- nni_mtx_unlock(&ep->mtx);
- ipctran_pipe_reap(p);
+ if (((aio = ep->useraio) == NULL) ||
+ ((p = nni_list_first(&ep->waitpipes)) == NULL)) {
return;
}
- if (rv != 0) {
- p->useraio = NULL;
- nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
- ipctran_pipe_reap(p);
- return;
- }
- p->conn = nni_aio_get_output(aio, 0);
- p->txhead[0] = 0;
- p->txhead[1] = 'S';
- p->txhead[2] = 'P';
- p->txhead[3] = 0;
- NNI_PUT16(&p->txhead[4], p->proto);
- NNI_PUT16(&p->txhead[6], 0);
-
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- iov.iov_len = 8;
- iov.iov_buf = &p->txhead[0];
- nni_aio_set_iov(p->negoaio, 1, &iov);
- nng_stream_send(p->conn, p->negoaio);
- nni_mtx_unlock(&ep->mtx);
+ nni_list_remove(&ep->waitpipes, p);
+ nni_list_append(&ep->busypipes, p);
+ ep->useraio = NULL;
+ p->rcvmax = ep->rcvmax;
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
ipctran_pipe_nego_cb(void *arg)
{
ipctran_pipe *p = arg;
+ ipctran_ep * ep = p->ep;
nni_aio * aio = p->negoaio;
nni_aio * uaio;
int rv;
- nni_mtx_lock(&p->ep->mtx);
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&p->ep->mtx);
- ipctran_pipe_reap(p);
- return;
- }
+ nni_mtx_lock(&ep->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
goto error;
}
@@ -298,16 +257,33 @@ ipctran_pipe_nego_cb(void *arg)
}
NNI_GET16(&p->rxhead[4], p->peer);
- p->useraio = NULL;
- nni_mtx_unlock(&p->ep->mtx);
- nni_aio_set_output(uaio, 0, p);
- nni_aio_finish(uaio, 0, 0);
+
+ // We are all ready now. We put this in the wait list, and
+ // then try to run the matcher.
+ nni_list_remove(&ep->negopipes, p);
+ nni_list_append(&ep->waitpipes, p);
+
+ ipctran_ep_match(ep);
+ nni_mtx_unlock(&ep->mtx);
return;
error:
p->useraio = NULL;
- nni_mtx_unlock(&p->ep->mtx);
- nni_aio_finish_error(uaio, rv);
+
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+
+ nng_stream_close(p->conn);
+ // If we are waiting to negotiate on a client side, then a failure
+ // here has to be passed to the user app.
+ if ((ep->dialer != NULL) && ((uaio = ep->useraio) != NULL)) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(uaio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
ipctran_pipe_reap(p);
}
@@ -323,6 +299,7 @@ ipctran_pipe_send_cb(void *arg)
nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_result(txaio)) != 0) {
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally we do not queue up another transfer.
// There's an excellent chance that the pipe is no longer
// usable, with a partial transfer.
@@ -349,10 +326,11 @@ ipctran_pipe_send_cb(void *arg)
nni_aio_list_remove(aio);
ipctran_pipe_send_start(p);
- nni_mtx_unlock(&p->mtx);
-
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
+ nni_pipe_bump_tx(p->npipe, n);
+ nni_mtx_unlock(&p->mtx);
+
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
nni_aio_finish_synch(aio, 0, n);
@@ -439,11 +417,13 @@ ipctran_pipe_recv_cb(void *arg)
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ n = nni_msg_len(msg);
+ nni_pipe_bump_rx(p->npipe, n);
ipctran_pipe_recv_start(p);
nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, msg);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ nni_aio_finish_synch(aio, 0, n);
return;
error:
@@ -453,6 +433,7 @@ error:
}
msg = p->rxmsg;
p->rxmsg = NULL;
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally, we do not queue up another receive.
// The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
@@ -641,19 +622,65 @@ ipctran_pipe_peer(void *arg)
}
static void
-ipctran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv)
+ipctran_pipe_start(ipctran_pipe *p, nng_stream *conn, ipctran_ep *ep)
{
- ipctran_pipe *p = arg;
+ nni_iov iov;
- nni_mtx_lock(&p->ep->mtx);
- if (aio == p->useraio) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- p->useraio = NULL;
- nni_aio_finish_error(aio, rv);
- ipctran_pipe_reap(p);
+ ep->refcnt++;
+
+ p->conn = conn;
+ p->ep = ep;
+ p->proto = ep->proto;
+
+ p->txhead[0] = 0;
+ p->txhead[1] = 'S';
+ p->txhead[2] = 'P';
+ p->txhead[3] = 0;
+ NNI_PUT16(&p->txhead[4], p->proto);
+ NNI_PUT16(&p->txhead[6], 0);
+
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txhead[0];
+ nni_aio_set_iov(p->negoaio, 1, &iov);
+ nni_list_append(&ep->negopipes, p);
+
+ nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
+ nng_stream_send(p->conn, p->negoaio);
+}
+
+static void
+ipctran_ep_close(void *arg)
+{
+ ipctran_ep * ep = arg;
+ ipctran_pipe *p;
+
+ nni_mtx_lock(&ep->mtx);
+ ep->closed = true;
+ nni_aio_close(ep->timeaio);
+ if (ep->dialer != NULL) {
+ nng_stream_dialer_close(ep->dialer);
+ }
+ if (ep->listener != NULL) {
+ nng_stream_listener_close(ep->listener);
+ }
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ ipctran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ ipctran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
+ ipctran_pipe_close(p);
}
- nni_mtx_unlock(&p->ep->mtx);
+ if (ep->useraio != NULL) {
+ nni_aio_finish_error(ep->useraio, NNG_ECLOSED);
+ ep->useraio = NULL;
+ }
+ nni_mtx_unlock(&ep->mtx);
}
static void
@@ -663,63 +690,163 @@ ipctran_ep_fini(void *arg)
nni_mtx_lock(&ep->mtx);
ep->fini = true;
- if (!nni_list_empty(&ep->pipes)) {
+ if (ep->refcnt != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_stop(ep->timeaio);
+ nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_fini(ep->timeaio);
+ nni_aio_fini(ep->connaio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
static void
-ipctran_ep_close(void *arg)
+ipctran_timer_cb(void *arg)
{
- ipctran_ep * ep = arg;
+ ipctran_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (nni_aio_result(ep->timeaio) == 0) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ipctran_accept_cb(void *arg)
+{
+ ipctran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
ipctran_pipe *p;
+ int rv;
+ nng_stream * conn;
nni_mtx_lock(&ep->mtx);
- NNI_LIST_FOREACH (&ep->pipes, p) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- nni_aio_close(p->txaio);
- nni_aio_close(p->rxaio);
- if (p->conn != NULL) {
- nng_stream_close(p->conn);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = ipctran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+ if (ep->closed) {
+ ipctran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ ipctran_pipe_start(p, conn, ep);
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ nni_listener_bump_error(ep->nlistener, rv);
+ switch (rv) {
+
+ case NNG_ENOMEM:
+ nng_sleep_aio(10, ep->timeaio);
+ break;
+
+ default:
+ if (!ep->closed) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
}
+ break;
}
- if (ep->dialer != NULL) {
- nng_stream_dialer_close(ep->dialer);
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+ipctran_dial_cb(void *arg)
+{
+ ipctran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ ipctran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
}
- if (ep->listener != NULL) {
- nng_stream_listener_close(ep->listener);
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = ipctran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+ nni_mtx_lock(&ep->mtx);
+ if (ep->closed) {
+ ipctran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ } else {
+ ipctran_pipe_start(p, conn, ep);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ // Error connecting. We need to pass this straight back
+ // to the user.
+ nni_dialer_bump_error(ep->ndialer, rv);
+ nni_mtx_lock(&ep->mtx);
+ if ((aio = ep->useraio) != NULL) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&ep->mtx);
+ return;
}
static int
-ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
+ipctran_ep_init(ipctran_ep **epp, nni_sock *sock)
{
ipctran_ep *ep;
- int rv;
- nni_sock * sock = nni_dialer_sock(ndialer);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, ipctran_pipe, node);
+ NNI_LIST_INIT(&ep->busypipes, ipctran_pipe, node);
+ NNI_LIST_INIT(&ep->waitpipes, ipctran_pipe, node);
+ NNI_LIST_INIT(&ep->negopipes, ipctran_pipe, node);
+
+ ep->proto = nni_sock_proto_id(sock);
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+
+ *epp = ep;
+ return (0);
+}
+
+static int
+ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
+{
+ ipctran_ep *ep;
+ int rv;
+ nni_sock * sock = nni_dialer_sock(ndialer);
- ep->proto = nni_sock_proto_id(sock);
+ if ((rv = ipctran_ep_init(&ep, sock)) != 0) {
+ return (rv);
+ }
ep->ndialer = ndialer;
- if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) {
+ if (((rv = nni_aio_init(&ep->connaio, ipctran_dial_cb, ep)) != 0) ||
+ ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) {
ipctran_ep_fini(ep);
return (rv);
}
+ nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
*dp = ep;
return (0);
}
@@ -731,50 +858,71 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener)
int rv;
nni_sock * sock = nni_listener_sock(nlistener);
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = ipctran_ep_init(&ep, sock)) != 0) {
+ return (rv);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, ipctran_pipe, node);
-
- ep->proto = nni_sock_proto_id(sock);
ep->nlistener = nlistener;
- if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) {
+ if (((rv = nni_aio_init(&ep->connaio, ipctran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->timeaio, ipctran_timer_cb, ep)) != 0) ||
+ ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
ipctran_ep_fini(ep);
return (rv);
}
+ nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
*dp = ep;
return (0);
}
static void
+ipctran_ep_cancel(nni_aio *aio, void *arg, int rv)
+{
+ ipctran_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (aio == ep->useraio) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ if (ep->ndialer) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
ipctran_ep_connect(void *arg, nni_aio *aio)
{
- ipctran_ep * ep = arg;
- ipctran_pipe *p = NULL;
- int rv;
+ ipctran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_dialer_bump_error(ep->ndialer, NNG_ECLOSED);
return;
}
- if ((rv = nni_aio_schedule(aio, ipctran_pipe_conn_cancel, p)) != 0) {
- nni_list_remove(&ep->pipes, p);
- p->ep = NULL;
+ if (ep->useraio != NULL) {
nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_dialer_bump_error(ep->ndialer, NNG_EBUSY);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, ipctran_ep_cancel, ep)) != 0) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_dialer_bump_error(ep->ndialer, NNG_EBUSY);
nni_aio_finish_error(aio, rv);
- ipctran_pipe_fini(p);
return;
}
- p->useraio = aio;
- nng_stream_dialer_dial(ep->dialer, p->connaio);
+ ep->useraio = aio;
+ nng_stream_dialer_dial(ep->dialer, ep->connaio);
nni_mtx_unlock(&ep->mtx);
}
@@ -795,15 +943,21 @@ ipctran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_type t)
ipctran_ep *ep = arg;
size_t val;
int rv;
- if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) &&
- (ep != NULL)) {
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
ipctran_pipe *p;
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
- NNI_LIST_FOREACH (&ep->pipes, p) {
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
p->rcvmax = val;
}
+ nni_stat_set_value(&ep->st_rcvmaxsz, val);
nni_mtx_unlock(&ep->mtx);
}
return (rv);
@@ -816,7 +970,9 @@ ipctran_ep_bind(void *arg)
int rv;
nni_mtx_lock(&ep->mtx);
- rv = nng_stream_listener_listen(ep->listener);
+ if ((rv = nng_stream_listener_listen(ep->listener)) != 0) {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
nni_mtx_unlock(&ep->mtx);
return (rv);
}
@@ -824,29 +980,39 @@ ipctran_ep_bind(void *arg)
static void
ipctran_ep_accept(void *arg, nni_aio *aio)
{
- ipctran_ep * ep = arg;
- ipctran_pipe *p = NULL;
- int rv;
+ ipctran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_listener_bump_error(ep->nlistener, NNG_ECLOSED);
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_aio_schedule(aio, ipctran_pipe_conn_cancel, p)) != 0) {
- nni_list_remove(&ep->pipes, p);
- p->ep = NULL;
+ if (ep->useraio != NULL) {
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_listener_bump_error(ep->nlistener, NNG_EBUSY);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, ipctran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
- ipctran_pipe_fini(p);
+ nni_listener_bump_error(ep->nlistener, rv);
return;
}
- p->useraio = aio;
- nng_stream_listener_accept(ep->listener, p->connaio);
+ ep->useraio = aio;
+ if (!ep->started) {
+ ep->started = true;
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ } else {
+ ipctran_ep_match(ep);
+ }
+
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f8b1ce54..7748cd8f 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -46,7 +46,6 @@ struct tcptran_pipe {
nni_aio * txaio;
nni_aio * rxaio;
nni_aio * negoaio;
- nni_aio * connaio;
nni_msg * rxmsg;
nni_mtx mtx;
};
@@ -57,22 +56,30 @@ struct tcptran_ep {
uint16_t proto;
size_t rcvmax;
bool fini;
+ bool started;
+ bool closed;
nng_url * url;
const char * host; // for dialers
nng_sockaddr src;
- nni_list pipes;
+ int refcnt; // active pipes
+ nni_aio * useraio;
+ nni_aio * connaio;
+ nni_aio * timeaio;
+ nni_list busypipes; // busy pipes -- ones passed to socket
+ nni_list waitpipes; // pipes waiting to match to socket
+ nni_list negopipes; // pipes busy negotiating
nni_reap_item reap;
nng_stream_dialer * dialer;
nng_stream_listener *listener;
nni_dialer * ndialer;
nni_listener * nlistener;
+ nni_stat_item st_rcvmaxsz;
};
static void tcptran_pipe_send_start(tcptran_pipe *);
static void tcptran_pipe_recv_start(tcptran_pipe *);
static void tcptran_pipe_send_cb(void *);
static void tcptran_pipe_recv_cb(void *);
-static void tcptran_pipe_conn_cb(void *);
static void tcptran_pipe_nego_cb(void *);
static void tcptran_ep_fini(void *);
@@ -99,7 +106,6 @@ tcptran_pipe_close(void *arg)
nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
nng_stream_close(p->conn);
}
@@ -112,7 +118,6 @@ tcptran_pipe_stop(void *arg)
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
- nni_aio_stop(p->connaio);
}
static int
@@ -120,6 +125,7 @@ tcptran_pipe_init(void *arg, nni_pipe *npipe)
{
tcptran_pipe *p = arg;
p->npipe = npipe;
+
return (0);
}
@@ -132,8 +138,9 @@ tcptran_pipe_fini(void *arg)
tcptran_pipe_stop(p);
if ((ep = p->ep) != NULL) {
nni_mtx_lock(&ep->mtx);
- nni_list_remove(&ep->pipes, p);
- if (ep->fini && nni_list_empty(&ep->pipes)) {
+ nni_list_node_remove(&p->node);
+ ep->refcnt--;
+ if (ep->fini && (ep->refcnt == 0)) {
nni_reap(&ep->reap, tcptran_ep_fini, ep);
}
nni_mtx_unlock(&ep->mtx);
@@ -142,7 +149,6 @@ tcptran_pipe_fini(void *arg)
nni_aio_fini(p->rxaio);
nni_aio_fini(p->txaio);
nni_aio_fini(p->negoaio);
- nni_aio_fini(p->connaio);
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
nni_mtx_fini(&p->mtx);
@@ -161,7 +167,7 @@ tcptran_pipe_reap(tcptran_pipe *p)
}
static int
-tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
+tcptran_pipe_alloc(tcptran_pipe **pipep)
{
tcptran_pipe *p;
int rv;
@@ -172,7 +178,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->connaio, tcptran_pipe_conn_cb, p)) != 0) ||
((rv = nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p)) != 0)) {
tcptran_pipe_fini(p);
return (rv);
@@ -180,77 +185,28 @@ tcptran_pipe_alloc(tcptran_pipe **pipep, tcptran_ep *ep)
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);
- nni_list_append(&ep->pipes, p);
- p->rcvmax = ep->rcvmax;
- p->proto = ep->proto;
- p->ep = ep;
- *pipep = p;
+ *pipep = p;
return (0);
}
static void
-tcptran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv)
-{
- tcptran_pipe *p = arg;
-
- nni_mtx_lock(&p->ep->mtx);
- if (aio == p->useraio) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- p->useraio = NULL;
- nni_aio_finish_error(aio, rv);
- tcptran_pipe_reap(p);
- }
- nni_mtx_unlock(&p->ep->mtx);
-}
-
-static void
-tcptran_pipe_conn_cb(void *arg)
+tcptran_ep_match(tcptran_ep *ep)
{
- tcptran_pipe *p = arg;
- tcptran_ep * ep = p->ep;
- nni_aio * aio = p->connaio;
- nni_aio * uaio;
- nni_iov iov;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- uaio = p->useraio;
- if ((rv = nni_aio_result(aio)) == 0) {
- p->conn = nni_aio_get_output(aio, 0);
- }
-
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&ep->mtx);
- tcptran_pipe_reap(p);
- return;
- }
+ nni_aio * aio;
+ tcptran_pipe *p;
- if (rv != 0) {
- p->useraio = NULL;
- nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
- tcptran_pipe_reap(p);
+ if (((aio = ep->useraio) == NULL) ||
+ ((p = nni_list_first(&ep->waitpipes)) == NULL)) {
return;
}
- p->txlen[0] = 0;
- p->txlen[1] = 'S';
- p->txlen[2] = 'P';
- p->txlen[3] = 0;
- NNI_PUT16(&p->txlen[4], p->proto);
- NNI_PUT16(&p->txlen[6], 0);
-
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- iov.iov_len = 8;
- iov.iov_buf = &p->txlen[0];
- nni_aio_set_iov(p->negoaio, 1, &iov);
- nng_stream_send(p->conn, p->negoaio);
- nni_mtx_unlock(&ep->mtx);
+ nni_list_remove(&ep->waitpipes, p);
+ nni_list_append(&ep->busypipes, p);
+ ep->useraio = NULL;
+ p->rcvmax = ep->rcvmax;
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
@@ -263,11 +219,6 @@ tcptran_pipe_nego_cb(void *arg)
int rv;
nni_mtx_lock(&ep->mtx);
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&ep->mtx);
- tcptran_pipe_reap(p);
- return;
- }
if ((rv = nni_aio_result(aio)) != 0) {
goto error;
@@ -309,18 +260,33 @@ tcptran_pipe_nego_cb(void *arg)
}
NNI_GET16(&p->rxlen[4], p->peer);
- p->useraio = NULL;
+ // We are all ready now. We put this in the wait list, and
+ // then try to run the matcher.
+ nni_list_remove(&ep->negopipes, p);
+ nni_list_append(&ep->waitpipes, p);
+
+ tcptran_ep_match(ep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_set_output(uaio, 0, p);
- nni_aio_finish(uaio, 0, 0);
return;
error:
- p->useraio = NULL;
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+
+ nng_stream_close(p->conn);
+
+ // If we are waiting to negotiate on a client side, then a failure
+ // here has to be passed to the user app.
+ if ((ep->dialer != NULL) && ((uaio = ep->useraio) != NULL)) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(uaio, rv);
+ }
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
tcptran_pipe_reap(p);
}
@@ -338,6 +304,7 @@ tcptran_pipe_send_cb(void *arg)
aio = nni_list_first(&p->sendq);
if ((rv = nni_aio_result(txaio)) != 0) {
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally we do not queue up another transfer.
// There's an excellent chance that the pipe is no longer
// usable, with a partial transfer.
@@ -360,10 +327,11 @@ tcptran_pipe_send_cb(void *arg)
nni_aio_list_remove(aio);
tcptran_pipe_send_start(p);
- nni_mtx_unlock(&p->mtx);
-
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
+ nni_pipe_bump_tx(p->npipe, n);
+ nni_mtx_unlock(&p->mtx);
+
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
nni_aio_finish_synch(aio, 0, n);
@@ -431,17 +399,21 @@ tcptran_pipe_recv_cb(void *arg)
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ n = nni_msg_len(msg);
+
+ nni_pipe_bump_rx(p->npipe, n);
tcptran_pipe_recv_start(p);
nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, msg);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ nni_aio_finish_synch(aio, 0, n);
return;
recv_error:
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally, we do not queue up another receive.
// The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
@@ -634,19 +606,55 @@ tcptran_pipe_getopt(
}
static void
+tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep)
+{
+ nni_iov iov;
+
+ ep->refcnt++;
+
+ p->conn = conn;
+ p->ep = ep;
+ p->proto = ep->proto;
+
+ p->txlen[0] = 0;
+ p->txlen[1] = 'S';
+ p->txlen[2] = 'P';
+ p->txlen[3] = 0;
+ NNI_PUT16(&p->txlen[4], p->proto);
+ NNI_PUT16(&p->txlen[6], 0);
+
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txlen[0];
+ nni_aio_set_iov(p->negoaio, 1, &iov);
+ nni_list_append(&ep->negopipes, p);
+
+ nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
+ nng_stream_send(p->conn, p->negoaio);
+}
+
+static void
tcptran_ep_fini(void *arg)
{
tcptran_ep *ep = arg;
nni_mtx_lock(&ep->mtx);
ep->fini = true;
- if (!nni_list_empty(&ep->pipes)) {
+ if (ep->refcnt != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_stop(ep->timeaio);
+ nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_fini(ep->timeaio);
+ nni_aio_fini(ep->connaio);
+
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -658,21 +666,29 @@ tcptran_ep_close(void *arg)
tcptran_pipe *p;
nni_mtx_lock(&ep->mtx);
- NNI_LIST_FOREACH (&ep->pipes, p) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- nni_aio_close(p->txaio);
- nni_aio_close(p->rxaio);
- if (p->conn != NULL) {
- nng_stream_close(p->conn);
- }
- }
+
+ ep->closed = true;
+ nni_aio_close(ep->timeaio);
if (ep->dialer != NULL) {
nng_stream_dialer_close(ep->dialer);
}
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ tcptran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ tcptran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
+ tcptran_pipe_close(p);
+ }
+ if (ep->useraio != NULL) {
+ nni_aio_finish_error(ep->useraio, NNG_ECLOSED);
+ ep->useraio = NULL;
+ }
+
nni_mtx_unlock(&ep->mtx);
}
@@ -735,6 +751,130 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl)
return (rv);
}
+static void
+tcptran_timer_cb(void *arg)
+{
+ tcptran_ep *ep = arg;
+ if (nni_aio_result(ep->timeaio) == 0) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+}
+
+static void
+tcptran_accept_cb(void *arg)
+{
+ tcptran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ tcptran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ nni_mtx_lock(&ep->mtx);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = tcptran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+
+ if (ep->closed) {
+ tcptran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ tcptran_pipe_start(p, conn, ep);
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ nni_listener_bump_error(ep->nlistener, rv);
+ switch (rv) {
+
+ case NNG_ENOMEM:
+ nng_sleep_aio(10, ep->timeaio);
+ break;
+
+ default:
+ if (!ep->closed) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+ break;
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+tcptran_dial_cb(void *arg)
+{
+ tcptran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ tcptran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = tcptran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+ nni_mtx_lock(&ep->mtx);
+ if (ep->closed) {
+ tcptran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ } else {
+ tcptran_pipe_start(p, conn, ep);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ // Error connecting. We need to pass this straight back
+ // to the user.
+ nni_dialer_bump_error(ep->ndialer, rv);
+ nni_mtx_lock(&ep->mtx);
+ if ((aio = ep->useraio) != NULL) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+}
+
+static int
+tcptran_ep_init(tcptran_ep **epp, nng_url *url, nni_sock *sock)
+{
+ tcptran_ep *ep;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&ep->mtx);
+ NNI_LIST_INIT(&ep->busypipes, tcptran_pipe, node);
+ NNI_LIST_INIT(&ep->waitpipes, tcptran_pipe, node);
+ NNI_LIST_INIT(&ep->negopipes, tcptran_pipe, node);
+
+ ep->proto = nni_sock_proto_id(sock);
+ ep->url = url;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+
+ *epp = ep;
+ return (0);
+}
+
static int
tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
{
@@ -758,17 +898,13 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
return (NNG_EADDRINVAL);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = tcptran_ep_init(&ep, url, sock)) != 0) {
+ return (rv);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, tcptran_pipe, node);
-
- ep->proto = nni_sock_proto_id(sock);
- ep->url = url;
ep->ndialer = ndialer;
if ((rv != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, tcptran_dial_cb, ep)) != 0) ||
((rv = nng_stream_dialer_alloc_url(&ep->dialer, &myurl)) != 0)) {
tcptran_ep_fini(ep);
return (rv);
@@ -779,9 +915,12 @@ tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
tcptran_ep_fini(ep);
return (rv);
}
+
+ nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
*dp = ep;
return (0);
}
+
static int
tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
{
@@ -798,48 +937,71 @@ tcptran_listener_init(void **lp, nng_url *url, nni_listener *nlistener)
return (NNG_EADDRINVAL);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if ((rv = tcptran_ep_init(&ep, url, sock)) != 0) {
+ return (rv);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, tcptran_pipe, node);
- ep->proto = nni_sock_proto_id(sock);
- ep->url = url;
ep->nlistener = nlistener;
- if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) {
+ if (((rv = nni_aio_init(&ep->connaio, tcptran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->timeaio, tcptran_timer_cb, ep)) != 0) ||
+ ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
tcptran_ep_fini(ep);
return (rv);
}
+ nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
*lp = ep;
return (0);
}
static void
+tcptran_ep_cancel(nni_aio *aio, void *arg, int rv)
+{
+ tcptran_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (ep->useraio == aio) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ if (ep->ndialer) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
tcptran_ep_connect(void *arg, nni_aio *aio)
{
- tcptran_ep * ep = arg;
- tcptran_pipe *p;
- int rv;
+ tcptran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_dialer_bump_error(ep->ndialer, NNG_ECLOSED);
+ return;
+ }
+ if (ep->useraio != NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_dialer_bump_error(ep->ndialer, NNG_EBUSY);
return;
}
- if ((rv = nni_aio_schedule(aio, tcptran_pipe_conn_cancel, p)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcptran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
+ nni_dialer_bump_error(ep->ndialer, rv);
nni_aio_finish_error(aio, rv);
- tcptran_pipe_reap(p);
return;
}
- p->useraio = aio;
- nng_stream_dialer_dial(ep->dialer, p->connaio);
+ ep->useraio = aio;
+
+ nng_stream_dialer_dial(ep->dialer, ep->connaio);
nni_mtx_unlock(&ep->mtx);
}
@@ -881,14 +1043,20 @@ tcptran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
tcptran_ep *ep = arg;
size_t val;
int rv;
- if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) &&
- (ep != NULL)) {
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
tcptran_pipe *p;
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
- NNI_LIST_FOREACH (&ep->pipes, p) {
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
p->rcvmax = val;
}
+ nni_stat_set_value(&ep->st_rcvmaxsz, val);
nni_mtx_unlock(&ep->mtx);
}
return (rv);
@@ -902,6 +1070,9 @@ tcptran_ep_bind(void *arg)
nni_mtx_lock(&ep->mtx);
rv = nng_stream_listener_listen(ep->listener);
+ if (rv != 0) {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
nni_mtx_unlock(&ep->mtx);
return (rv);
@@ -910,29 +1081,38 @@ tcptran_ep_bind(void *arg)
static void
tcptran_ep_accept(void *arg, nni_aio *aio)
{
- tcptran_ep * ep = arg;
- tcptran_pipe *p;
- int rv;
+ tcptran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tcptran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_listener_bump_error(ep->nlistener, NNG_ECLOSED);
+ return;
+ }
+ if (ep->useraio != NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_listener_bump_error(ep->nlistener, NNG_EBUSY);
return;
}
- if ((rv = nni_aio_schedule(aio, tcptran_pipe_conn_cancel, p)) != 0) {
- nni_list_remove(&ep->pipes, p);
- p->ep = NULL;
+ if ((rv = nni_aio_schedule(aio, tcptran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
- tcptran_pipe_reap(p);
+ nni_listener_bump_error(ep->nlistener, rv);
return;
}
- p->useraio = aio;
- nng_stream_listener_accept(ep->listener, p->connaio);
+ ep->useraio = aio;
+ if (!ep->started) {
+ ep->started = true;
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ } else {
+ tcptran_ep_match(ep);
+ }
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 8b02702f..58b43f2c 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -53,7 +53,6 @@ struct tlstran_pipe {
nni_aio * txaio;
nni_aio * rxaio;
nni_aio * negoaio;
- nni_aio * connaio;
nni_msg * rxmsg;
nni_mtx mtx;
};
@@ -64,25 +63,34 @@ struct tlstran_ep {
uint16_t af;
uint16_t proto;
size_t rcvmax;
+ bool started;
+ bool closed;
bool fini;
+ int refcnt;
int authmode;
nni_url * url;
nni_list pipes;
nni_reap_item reap;
nng_stream_dialer * dialer;
nng_stream_listener *listener;
+ nni_aio * useraio;
+ nni_aio * connaio;
+ nni_aio * timeaio;
+ nni_list busypipes; // busy pipes -- ones passed to socket
+ nni_list waitpipes; // pipes waiting to match to socket
+ nni_list negopipes; // pipes busy negotiating
const char * host;
nng_sockaddr src;
nng_sockaddr sa;
nni_dialer * ndialer;
nni_listener * nlistener;
+ nni_stat_item st_rcvmaxsz;
};
static void tlstran_pipe_send_start(tlstran_pipe *);
static void tlstran_pipe_recv_start(tlstran_pipe *);
static void tlstran_pipe_send_cb(void *);
static void tlstran_pipe_recv_cb(void *);
-static void tlstran_pipe_conn_cb(void *);
static void tlstran_pipe_nego_cb(void *);
static void tlstran_ep_fini(void *);
@@ -105,7 +113,6 @@ tlstran_pipe_close(void *arg)
nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
nng_stream_close(p->tls);
}
@@ -118,7 +125,6 @@ tlstran_pipe_stop(void *arg)
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
- nni_aio_stop(p->connaio);
}
static int
@@ -138,8 +144,9 @@ tlstran_pipe_fini(void *arg)
tlstran_pipe_stop(p);
if ((ep = p->ep) != NULL) {
nni_mtx_lock(&ep->mtx);
- nni_list_remove(&ep->pipes, p);
- if (ep->fini && nni_list_empty(&ep->pipes)) {
+ nni_list_node_remove(&p->node);
+ ep->refcnt--;
+ if (ep->fini && (ep->refcnt == 0)) {
nni_reap(&ep->reap, tlstran_ep_fini, ep);
}
nni_mtx_unlock(&ep->mtx);
@@ -147,14 +154,13 @@ tlstran_pipe_fini(void *arg)
nni_aio_fini(p->rxaio);
nni_aio_fini(p->txaio);
nni_aio_fini(p->negoaio);
- nni_aio_fini(p->connaio);
nng_stream_free(p->tls);
nni_msg_free(p->rxmsg);
NNI_FREE_STRUCT(p);
}
static int
-tlstran_pipe_alloc(tlstran_pipe **pipep, tlstran_ep *ep)
+tlstran_pipe_alloc(tlstran_pipe **pipep)
{
tlstran_pipe *p;
int rv;
@@ -166,7 +172,6 @@ tlstran_pipe_alloc(tlstran_pipe **pipep, tlstran_ep *ep)
if (((rv = nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->connaio, tlstran_pipe_conn_cb, p)) != 0) ||
((rv = nni_aio_init(&p->negoaio, tlstran_pipe_nego_cb, p)) != 0)) {
tlstran_pipe_fini(p);
return (rv);
@@ -174,12 +179,8 @@ tlstran_pipe_alloc(tlstran_pipe **pipep, tlstran_ep *ep)
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);
- nni_list_append(&ep->pipes, p);
- p->rcvmax = ep->rcvmax;
- p->proto = ep->proto;
- p->ep = ep;
- *pipep = p;
+ *pipep = p;
return (0);
}
@@ -195,67 +196,21 @@ tlstran_pipe_reap(tlstran_pipe *p)
}
static void
-tlstran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv)
+tlstran_ep_match(tlstran_ep *ep)
{
- tlstran_pipe *p = arg;
-
- nni_mtx_lock(&p->ep->mtx);
- if (aio == p->useraio) {
- p->useraio = NULL;
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- nni_aio_finish_error(aio, rv);
- tlstran_pipe_reap(p);
- }
- nni_mtx_unlock(&p->ep->mtx);
-}
-
-static void
-tlstran_pipe_conn_cb(void *arg)
-{
- tlstran_pipe *p = arg;
- tlstran_ep * ep = p->ep;
- nni_aio * aio = p->connaio;
- nni_aio * uaio;
- nni_iov iov;
- int rv;
-
- nni_mtx_lock(&ep->mtx);
- if ((rv = nni_aio_result(aio)) == 0) {
- p->tls = nni_aio_get_output(aio, 0);
- } else {
- p->tls = NULL;
- }
-
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&ep->mtx);
- tlstran_pipe_reap(p);
- return;
- }
+ nni_aio * aio;
+ tlstran_pipe *p;
- if (rv != 0) {
- p->useraio = NULL;
- nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
- tlstran_pipe_reap(p);
+ if (((aio = ep->useraio) == NULL) ||
+ ((p = nni_list_first(&ep->waitpipes)) == NULL)) {
return;
}
- p->txlen[0] = 0;
- p->txlen[1] = 'S';
- p->txlen[2] = 'P';
- p->txlen[3] = 0;
- NNI_PUT16(&p->txlen[4], p->proto);
- NNI_PUT16(&p->txlen[6], 0);
-
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- iov.iov_len = 8;
- iov.iov_buf = &p->txlen[0];
- nni_aio_set_iov(p->negoaio, 1, &iov);
- nng_stream_send(p->tls, p->negoaio);
- nni_mtx_unlock(&ep->mtx);
+ nni_list_remove(&ep->waitpipes, p);
+ nni_list_append(&ep->busypipes, p);
+ ep->useraio = NULL;
+ p->rcvmax = ep->rcvmax;
+ nni_aio_set_output(aio, 0, p);
+ nni_aio_finish(aio, 0, 0);
}
static void
@@ -268,12 +223,6 @@ tlstran_pipe_nego_cb(void *arg)
int rv;
nni_mtx_lock(&ep->mtx);
- if ((uaio = p->useraio) == NULL) {
- nni_mtx_unlock(&ep->mtx);
- tlstran_pipe_reap(p);
- return;
- }
-
if ((rv = nni_aio_result(aio)) != 0) {
goto error;
}
@@ -314,17 +263,33 @@ tlstran_pipe_nego_cb(void *arg)
}
NNI_GET16(&p->rxlen[4], p->peer);
- p->useraio = NULL;
+
+ // We are all ready now. We put this in the wait list, and
+ // then try to run the matcher.
+ nni_list_remove(&ep->negopipes, p);
+ nni_list_append(&ep->waitpipes, p);
+
+ tlstran_ep_match(ep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_set_output(uaio, 0, p);
- nni_aio_finish(uaio, 0, 0);
return;
error:
- p->useraio = NULL;
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+
+ nng_stream_close(p->tls);
+
+ // If we are waiting to negotiate on a client side, then a failure
+ // here has to be passed to the user app.
+ if ((ep->dialer != NULL) && ((uaio = ep->useraio) != NULL)) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(uaio, rv);
+ }
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(uaio, rv);
tlstran_pipe_reap(p);
}
@@ -350,6 +315,7 @@ tlstran_pipe_send_cb(void *arg)
nni_aio_list_remove(aio);
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
+ nni_pipe_bump_error(p->npipe, rv);
return;
}
@@ -362,10 +328,11 @@ tlstran_pipe_send_cb(void *arg)
}
nni_aio_list_remove(aio);
tlstran_pipe_send_start(p);
- nni_mtx_unlock(&p->mtx);
msg = nni_aio_get_msg(aio);
n = nni_msg_len(msg);
+ nni_pipe_bump_tx(p->npipe, n);
+ nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
nni_aio_finish_synch(aio, 0, n);
@@ -434,19 +401,22 @@ tlstran_pipe_recv_cb(void *arg)
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ n = nni_msg_len(msg);
if (!nni_list_empty(&p->recvq)) {
tlstran_pipe_recv_start(p);
}
+ nni_pipe_bump_rx(p->npipe, n);
nni_mtx_unlock(&p->mtx);
nni_aio_set_msg(aio, msg);
- nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ nni_aio_finish_synch(aio, 0, n);
return;
recv_error:
nni_aio_list_remove(aio);
msg = p->rxmsg;
p->rxmsg = NULL;
+ nni_pipe_bump_error(p->npipe, rv);
// Intentionally, we do not queue up another receive.
// The protocol should notice this error and close the pipe.
nni_mtx_unlock(&p->mtx);
@@ -610,19 +580,54 @@ tlstran_pipe_peer(void *arg)
}
static void
+tlstran_pipe_start(tlstran_pipe *p, nng_stream *conn, tlstran_ep *ep)
+{
+ nni_iov iov;
+
+ ep->refcnt++;
+
+ p->tls = conn;
+ p->ep = ep;
+ p->proto = ep->proto;
+
+ p->txlen[0] = 0;
+ p->txlen[1] = 'S';
+ p->txlen[2] = 'P';
+ p->txlen[3] = 0;
+ NNI_PUT16(&p->txlen[4], p->proto);
+ NNI_PUT16(&p->txlen[6], 0);
+
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ iov.iov_len = 8;
+ iov.iov_buf = &p->txlen[0];
+ nni_aio_set_iov(p->negoaio, 1, &iov);
+ nni_list_append(&ep->negopipes, p);
+
+ nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
+ nng_stream_send(p->tls, p->negoaio);
+}
+
+static void
tlstran_ep_fini(void *arg)
{
tlstran_ep *ep = arg;
nni_mtx_lock(&ep->mtx);
ep->fini = true;
- if (!nni_list_empty(&ep->pipes)) {
+ if (ep->refcnt != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_stop(ep->timeaio);
+ nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_mtx_unlock(&ep->mtx);
+ nni_aio_fini(ep->timeaio);
+ nni_aio_fini(ep->connaio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
@@ -635,21 +640,28 @@ tlstran_ep_close(void *arg)
tlstran_pipe *p;
nni_mtx_lock(&ep->mtx);
- NNI_LIST_FOREACH (&ep->pipes, p) {
- nni_aio_close(p->negoaio);
- nni_aio_close(p->connaio);
- nni_aio_close(p->txaio);
- nni_aio_close(p->rxaio);
- if (p->tls != NULL) {
- nng_stream_close(p->tls);
- }
- }
+ ep->closed = true;
+ nni_aio_close(ep->timeaio);
+
if (ep->dialer != NULL) {
nng_stream_dialer_close(ep->dialer);
}
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ tlstran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ tlstran_pipe_close(p);
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
+ tlstran_pipe_close(p);
+ }
+ if (ep->useraio != NULL) {
+ nni_aio_finish_error(ep->useraio, NNG_ECLOSED);
+ ep->useraio = NULL;
+ }
nni_mtx_unlock(&ep->mtx);
}
@@ -712,6 +724,130 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl)
return (rv);
}
+static void
+tlstran_timer_cb(void *arg)
+{
+ tlstran_ep *ep = arg;
+ if (nni_aio_result(ep->timeaio) == 0) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+}
+
+static void
+tlstran_accept_cb(void *arg)
+{
+ tlstran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ tlstran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ nni_mtx_lock(&ep->mtx);
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = tlstran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+
+ if (ep->closed) {
+ tlstran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ goto error;
+ }
+ tlstran_pipe_start(p, conn, ep);
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ nni_listener_bump_error(ep->nlistener, rv);
+ switch (rv) {
+
+ case NNG_ENOMEM:
+ nng_sleep_aio(10, ep->timeaio);
+ break;
+
+ default:
+ if (!ep->closed) {
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ }
+ break;
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
+tlstran_dial_cb(void *arg)
+{
+ tlstran_ep * ep = arg;
+ nni_aio * aio = ep->connaio;
+ tlstran_pipe *p;
+ int rv;
+ nng_stream * conn;
+
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto error;
+ }
+
+ conn = nni_aio_get_output(aio, 0);
+ if ((rv = tlstran_pipe_alloc(&p)) != 0) {
+ nng_stream_free(conn);
+ goto error;
+ }
+ nni_mtx_lock(&ep->mtx);
+ if (ep->closed) {
+ tlstran_pipe_fini(p);
+ nng_stream_free(conn);
+ rv = NNG_ECLOSED;
+ } else {
+ tlstran_pipe_start(p, conn, ep);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+
+error:
+ // Error connecting. We need to pass this straight back
+ // to the user.
+ nni_dialer_bump_error(ep->ndialer, rv);
+ nni_mtx_lock(&ep->mtx);
+ if ((aio = ep->useraio) != NULL) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&ep->mtx);
+ return;
+}
+
+static int
+tlstran_ep_init(tlstran_ep **epp, nng_url *url, nni_sock *sock)
+{
+ tlstran_ep *ep;
+
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&ep->mtx);
+ NNI_LIST_INIT(&ep->busypipes, tlstran_pipe, node);
+ NNI_LIST_INIT(&ep->waitpipes, tlstran_pipe, node);
+ NNI_LIST_INIT(&ep->negopipes, tlstran_pipe, node);
+
+ ep->proto = nni_sock_proto_id(sock);
+ ep->url = url;
+
+ nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
+ nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
+ nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
+
+ *epp = ep;
+ return (0);
+}
+
static int
tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
{
@@ -735,16 +871,11 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
return (NNG_EADDRINVAL);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, tlstran_dial_cb, ep)) != 0)) {
+ return (rv);
}
-
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, tlstran_pipe, node);
-
ep->authmode = NNG_TLS_AUTH_MODE_REQUIRED;
- ep->url = url;
- ep->proto = nni_sock_proto_id(sock);
ep->ndialer = ndialer;
if ((rv != 0) ||
@@ -758,6 +889,7 @@ tlstran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer)
tlstran_ep_fini(ep);
return (rv);
}
+ nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
*dp = ep;
return (0);
}
@@ -790,17 +922,14 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
(url->u_query != NULL)) {
return (NNG_EADDRINVAL);
}
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
+ if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) ||
+ ((rv = nni_aio_init(&ep->connaio, tlstran_accept_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->timeaio, tlstran_timer_cb, ep)) != 0)) {
+ return (rv);
}
- nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->pipes, tlstran_pipe, node);
-
ep->authmode = NNG_TLS_AUTH_MODE_NONE;
- ep->url = url;
ep->af = af;
- ep->proto = nni_sock_proto_id(sock);
ep->nlistener = nlistener;
if (strlen(host) == 0) {
@@ -832,35 +961,60 @@ tlstran_ep_init_listener(void **lp, nni_url *url, nni_listener *nlistener)
return (rv);
}
+ nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
*lp = ep;
return (0);
}
static void
+tlstran_ep_cancel(nni_aio *aio, void *arg, int rv)
+{
+ tlstran_ep *ep = arg;
+ nni_mtx_lock(&ep->mtx);
+ if (ep->useraio == aio) {
+ ep->useraio = NULL;
+ nni_aio_finish_error(aio, rv);
+ if (ep->ndialer) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
+ }
+ nni_mtx_unlock(&ep->mtx);
+}
+
+static void
tlstran_ep_connect(void *arg, nni_aio *aio)
{
- tlstran_ep * ep = arg;
- tlstran_pipe *p;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
+
nni_mtx_lock(&ep->mtx);
- if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_dialer_bump_error(ep->ndialer, NNG_ECLOSED);
return;
}
-
- if ((rv = nni_aio_schedule(aio, tlstran_pipe_conn_cancel, p)) != 0) {
+ if (ep->useraio != NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_dialer_bump_error(ep->ndialer, NNG_EBUSY);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tlstran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
+ nni_dialer_bump_error(ep->ndialer, rv);
nni_aio_finish_error(aio, rv);
- tlstran_pipe_reap(p);
return;
}
- p->useraio = aio;
- nng_stream_dialer_dial(ep->dialer, p->connaio);
+ ep->useraio = aio;
+
+ nng_stream_dialer_dial(ep->dialer, ep->connaio);
nni_mtx_unlock(&ep->mtx);
}
@@ -871,7 +1025,9 @@ tlstran_ep_bind(void *arg)
int rv;
nni_mtx_lock(&ep->mtx);
- rv = nng_stream_listener_listen(ep->listener);
+ if ((rv = nng_stream_listener_listen(ep->listener)) != 0) {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
nni_mtx_unlock(&ep->mtx);
return (rv);
@@ -880,29 +1036,38 @@ tlstran_ep_bind(void *arg)
static void
tlstran_ep_accept(void *arg, nni_aio *aio)
{
- tlstran_ep * ep = arg;
- tlstran_pipe *p;
- int rv;
+ tlstran_ep *ep = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&ep->mtx);
- if ((rv = tlstran_pipe_alloc(&p, ep)) != 0) {
+ if (ep->closed) {
nni_mtx_unlock(&ep->mtx);
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ nni_listener_bump_error(ep->nlistener, NNG_ECLOSED);
return;
}
-
- if ((rv = nni_aio_schedule(aio, tlstran_pipe_conn_cancel, p)) != 0) {
+ if (ep->useraio != NULL) {
+ nni_mtx_unlock(&ep->mtx);
+ nni_aio_finish_error(aio, NNG_EBUSY);
+ nni_listener_bump_error(ep->nlistener, NNG_EBUSY);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, tlstran_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&ep->mtx);
nni_aio_finish_error(aio, rv);
- tlstran_pipe_reap(p);
+ nni_listener_bump_error(ep->nlistener, rv);
return;
}
-
- p->useraio = aio;
- nng_stream_listener_accept(ep->listener, p->connaio);
+ ep->useraio = aio;
+ if (!ep->started) {
+ ep->started = true;
+ nng_stream_listener_accept(ep->listener, ep->connaio);
+ } else {
+ tlstran_ep_match(ep);
+ }
nni_mtx_unlock(&ep->mtx);
}
@@ -913,8 +1078,20 @@ tlstran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_type t)
size_t val;
int rv;
if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
+ tlstran_pipe *p;
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ NNI_LIST_FOREACH (&ep->waitpipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->negopipes, p) {
+ p->rcvmax = val;
+ }
+ NNI_LIST_FOREACH (&ep->busypipes, p) {
+ p->rcvmax = val;
+ }
+ nni_stat_set_value(&ep->st_rcvmaxsz, val);
+
nni_mtx_unlock(&ep->mtx);
}
return (rv);