aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/dialer.c66
-rw-r--r--src/core/listener.c13
-rw-r--r--src/core/socket.c73
-rw-r--r--src/core/socket.h2
4 files changed, 77 insertions, 77 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index ee0d2916..09ecdac5 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -308,26 +308,24 @@ dialer_connect_cb(void *arg)
nni_pipe * p;
nni_aio * aio = d->d_con_aio;
int rv;
+ bool synch;
if ((rv = nni_aio_result(aio)) == 0) {
void *data = nni_aio_get_output(aio, 0);
NNI_ASSERT(data != NULL);
rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
}
- if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
- nni_pipe_stop(p);
- }
-
nni_mtx_lock(&d->d_mtx);
- switch (rv) {
- case 0:
- nni_pipe_set_dialer(p, d);
- nni_list_append(&d->d_pipes, p);
+ synch = d->d_synch;
+ d->d_synch = false;
+ if (rv == 0) {
if (d->d_closing) {
nni_mtx_unlock(&d->d_mtx);
nni_pipe_stop(p);
return;
}
+ nni_pipe_set_dialer(p, d);
+ nni_list_append(&d->d_pipes, p);
// Good connect, so reset the backoff timer.
// Note that a host that accepts the connect, but drops
@@ -337,7 +335,16 @@ dialer_connect_cb(void *arg)
// trying to connect to some port that does not speak
// SP for example.
d->d_currtime = d->d_inirtime;
+ }
+ nni_mtx_unlock(&d->d_mtx);
+
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
+ nni_mtx_lock(&d->d_mtx);
+ switch (rv) {
+ case 0:
// No further outgoing connects -- we will restart a
// connection from the pipe when the pipe is removed.
break;
@@ -347,17 +354,13 @@ dialer_connect_cb(void *arg)
break;
default:
// redial, but only if we are not synchronous
- if (!d->d_synch) {
+ if (!synch) {
dialer_timer_start(d);
}
break;
}
- if (d->d_synch) {
- if (rv != 0) {
- d->d_started = false;
- }
+ if (synch) {
d->d_lastrv = rv;
- d->d_synch = false;
nni_cv_wake(&d->d_cv);
}
nni_mtx_unlock(&d->d_mtx);
@@ -381,7 +384,8 @@ nni_dialer_start(nni_dialer *d, int flags)
{
int rv = 0;
- nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime);
+ // nni_sock_reconntimes(d->d_sock, &d->d_inirtime,
+ //&d->d_maxrtime);
d->d_currtime = d->d_inirtime;
nni_mtx_lock(&d->d_mtx);
@@ -413,6 +417,9 @@ nni_dialer_start(nni_dialer *d, int flags)
rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv;
nni_cv_wake(&d->d_cv);
+ if (rv != 0) {
+ d->d_started = false;
+ }
nni_mtx_unlock(&d->d_mtx);
return (rv);
}
@@ -455,6 +462,20 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
if (strcmp(name, NNG_OPT_URL) == 0) {
return (NNG_EREADONLY);
}
+ if (strcmp(name, NNG_OPT_RECONNMAXT) == 0) {
+ int rv;
+ nni_mtx_lock(&d->d_mtx);
+ rv = nni_copyin_ms(&d->d_maxrtime, val, sz, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+ if (strcmp(name, NNG_OPT_RECONNMINT) == 0) {
+ int rv;
+ nni_mtx_lock(&d->d_mtx);
+ rv = nni_copyin_ms(&d->d_inirtime, val, sz, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
for (o = d->d_ops.d_options; o && o->o_name; o++) {
int rv;
@@ -481,6 +502,21 @@ nni_dialer_getopt(
{
nni_tran_option *o;
+ if (strcmp(name, NNG_OPT_RECONNMAXT) == 0) {
+ int rv;
+ nni_mtx_lock(&d->d_mtx);
+ rv = nni_copyout_ms(d->d_maxrtime, valp, szp, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+ if (strcmp(name, NNG_OPT_RECONNMINT) == 0) {
+ int rv;
+ nni_mtx_lock(&d->d_mtx);
+ rv = nni_copyout_ms(d->d_inirtime, valp, szp, t);
+ nni_mtx_unlock(&d->d_mtx);
+ return (rv);
+ }
+
for (o = d->d_ops.d_options; o && o->o_name; o++) {
int rv;
if (strcmp(o->o_name, name) != 0) {
diff --git a/src/core/listener.c b/src/core/listener.c
index 6d580cd8..31c154bc 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -282,21 +282,16 @@ listener_accept_cb(void *arg)
NNI_ASSERT(data != NULL);
rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data);
}
-
- if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
- nni_pipe_stop(p);
- }
-
nni_mtx_lock(&l->l_mtx);
switch (rv) {
case 0:
- nni_pipe_set_listener(p, l);
- nni_list_append(&l->l_pipes, p);
if (l->l_closing) {
nni_mtx_unlock(&l->l_mtx);
nni_pipe_stop(p);
return;
}
+ nni_pipe_set_listener(p, l);
+ nni_list_append(&l->l_pipes, p);
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
@@ -316,6 +311,10 @@ listener_accept_cb(void *arg)
break;
}
nni_mtx_unlock(&l->l_mtx);
+
+ if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
+ nni_pipe_stop(p);
+ }
}
static void
diff --git a/src/core/socket.c b/src/core/socket.c
index 894e4fee..4d7bbbc1 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -182,30 +182,6 @@ sock_get_sendtimeo(nni_sock *s, void *buf, size_t *szp, nni_opt_type t)
}
static int
-sock_set_reconnmint(nni_sock *s, const void *buf, size_t sz, nni_opt_type t)
-{
- return (nni_copyin_ms(&s->s_reconn, buf, sz, t));
-}
-
-static int
-sock_get_reconnmint(nni_sock *s, void *buf, size_t *szp, nni_opt_type t)
-{
- return (nni_copyout_ms(s->s_reconn, buf, szp, t));
-}
-
-static int
-sock_set_reconnmaxt(nni_sock *s, const void *buf, size_t sz, nni_opt_type t)
-{
- return (nni_copyin_ms(&s->s_reconnmax, buf, sz, t));
-}
-
-static int
-sock_get_reconnmaxt(nni_sock *s, void *buf, size_t *szp, nni_opt_type t)
-{
- return (nni_copyout_ms(s->s_reconnmax, buf, szp, t));
-}
-
-static int
sock_set_recvbuf(nni_sock *s, const void *buf, size_t sz, nni_opt_type t)
{
int len;
@@ -317,18 +293,6 @@ static const sock_option sock_options[] = {
.o_set = sock_set_sendbuf,
},
{
- .o_name = NNG_OPT_RECONNMINT,
- .o_type = NNI_TYPE_DURATION,
- .o_get = sock_get_reconnmint,
- .o_set = sock_set_reconnmint,
- },
- {
- .o_name = NNG_OPT_RECONNMAXT,
- .o_type = NNI_TYPE_DURATION,
- .o_get = sock_get_reconnmaxt,
- .o_set = sock_set_reconnmaxt,
- },
- {
.o_name = NNG_OPT_SOCKNAME,
.o_type = NNI_TYPE_STRING,
.o_get = sock_get_sockname,
@@ -914,16 +878,6 @@ nni_sock_proto_data(nni_sock *sock)
return (sock->s_data);
}
-void
-nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
-{
- // These two values are linked, so get them atomically.
- nni_mtx_lock(&sock->s_mx);
- *rcur = sock->s_reconn;
- *rmax = sock->s_reconnmax ? sock->s_reconnmax : sock->s_reconn;
- nni_mtx_unlock(&sock->s_mx);
-}
-
int
nni_sock_add_listener(nni_sock *s, nni_listener *l)
{
@@ -1036,8 +990,7 @@ nni_sock_setopt(
return (rv);
}
- // Some options do not go down to transports. Handle them
- // directly.
+ // Some options do not go down to transports. Handle them directly.
for (sso = sock_options; sso->o_name != NULL; sso++) {
if (strcmp(sso->o_name, name) != 0) {
continue;
@@ -1058,10 +1011,16 @@ nni_sock_setopt(
return (rv);
}
- // Validation of transport options. This is stateless, so
+ // Validation of generic and transport options. This is stateless, so
// transports should not fail to set an option later if they
// passed it here.
- if ((rv = nni_tran_chkopt(name, v, sz, t)) != 0) {
+ if ((strcmp(name, NNG_OPT_RECONNMINT) == 0) ||
+ (strcmp(name, NNG_OPT_RECONNMAXT) == 0)) {
+ nng_duration ms;
+ if ((rv = nni_copyin_ms(&ms, v, sz, t)) != 0) {
+ return (rv);
+ }
+ } else if ((rv = nni_tran_chkopt(name, v, sz, t)) != 0) {
return (rv);
}
@@ -1194,10 +1153,18 @@ nni_sock_getopt(
size_t sz = sopt->sz;
if ((sopt->typ != NNI_TYPE_OPAQUE) &&
- (t != NNI_TYPE_OPAQUE) && (t != sopt->typ)) {
- nni_mtx_unlock(&s->s_mx);
- return (NNG_EBADTYPE);
+ (t != sopt->typ)) {
+
+ if (t != NNI_TYPE_OPAQUE) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EBADTYPE);
+ }
+ if (*szp != sopt->sz) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EINVAL);
+ }
}
+
if (sopt->sz > *szp) {
sz = *szp;
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 184cfb64..37256571 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -64,8 +64,6 @@ extern nni_msgq *nni_sock_sendq(nni_sock *);
// inject incoming messages from pipes to it.
extern nni_msgq *nni_sock_recvq(nni_sock *);
-extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *);
-
// nni_sock_flags returns the socket flags, used to indicate whether read
// and or write are appropriate for the protocol.
extern uint32_t nni_sock_flags(nni_sock *);