diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-07-02 12:44:42 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-07-02 12:44:42 -0700 |
| commit | a772bcc6ebe198f939889abbda18eded2a326941 (patch) | |
| tree | ab48eefaefdb242682fbe84c89a0557307242946 | |
| parent | 63418ae95eb9d61d2cffa56f7e8fbdb48aaaf703 (diff) | |
| download | nng-a772bcc6ebe198f939889abbda18eded2a326941.tar.gz nng-a772bcc6ebe198f939889abbda18eded2a326941.tar.bz2 nng-a772bcc6ebe198f939889abbda18eded2a326941.zip | |
fixes #564 Race regression caused by#522
fixes #565 Option getting should validate sizes more aggressively
fixes #563 Reconnect timeouts should be settable on dialers
fixes #562 pipe test is fragile
| -rw-r--r-- | src/core/dialer.c | 66 | ||||
| -rw-r--r-- | src/core/listener.c | 13 | ||||
| -rw-r--r-- | src/core/socket.c | 73 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | tests/pipe.c | 100 | ||||
| -rw-r--r-- | tests/sock.c | 2 |
6 files changed, 141 insertions, 115 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c index ee0d2916..09ecdac5 100644 --- a/src/core/dialer.c +++ b/src/core/dialer.c @@ -308,26 +308,24 @@ dialer_connect_cb(void *arg) nni_pipe * p; nni_aio * aio = d->d_con_aio; int rv; + bool synch; if ((rv = nni_aio_result(aio)) == 0) { void *data = nni_aio_get_output(aio, 0); NNI_ASSERT(data != NULL); rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data); } - if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { - nni_pipe_stop(p); - } - nni_mtx_lock(&d->d_mtx); - switch (rv) { - case 0: - nni_pipe_set_dialer(p, d); - nni_list_append(&d->d_pipes, p); + synch = d->d_synch; + d->d_synch = false; + if (rv == 0) { if (d->d_closing) { nni_mtx_unlock(&d->d_mtx); nni_pipe_stop(p); return; } + nni_pipe_set_dialer(p, d); + nni_list_append(&d->d_pipes, p); // Good connect, so reset the backoff timer. // Note that a host that accepts the connect, but drops @@ -337,7 +335,16 @@ dialer_connect_cb(void *arg) // trying to connect to some port that does not speak // SP for example. d->d_currtime = d->d_inirtime; + } + nni_mtx_unlock(&d->d_mtx); + + if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) { + nni_pipe_stop(p); + } + nni_mtx_lock(&d->d_mtx); + switch (rv) { + case 0: // No further outgoing connects -- we will restart a // connection from the pipe when the pipe is removed. break; @@ -347,17 +354,13 @@ dialer_connect_cb(void *arg) break; default: // redial, but only if we are not synchronous - if (!d->d_synch) { + if (!synch) { dialer_timer_start(d); } break; } - if (d->d_synch) { - if (rv != 0) { - d->d_started = false; - } + if (synch) { d->d_lastrv = rv; - d->d_synch = false; nni_cv_wake(&d->d_cv); } nni_mtx_unlock(&d->d_mtx); @@ -381,7 +384,8 @@ nni_dialer_start(nni_dialer *d, int flags) { int rv = 0; - nni_sock_reconntimes(d->d_sock, &d->d_inirtime, &d->d_maxrtime); + // nni_sock_reconntimes(d->d_sock, &d->d_inirtime, + //&d->d_maxrtime); d->d_currtime = d->d_inirtime; nni_mtx_lock(&d->d_mtx); @@ -413,6 +417,9 @@ nni_dialer_start(nni_dialer *d, int flags) rv = d->d_closing ? NNG_ECLOSED : d->d_lastrv; nni_cv_wake(&d->d_cv); + if (rv != 0) { + d->d_started = false; + } nni_mtx_unlock(&d->d_mtx); return (rv); } @@ -455,6 +462,20 @@ nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz, if (strcmp(name, NNG_OPT_URL) == 0) { return (NNG_EREADONLY); } + if (strcmp(name, NNG_OPT_RECONNMAXT) == 0) { + int rv; + nni_mtx_lock(&d->d_mtx); + rv = nni_copyin_ms(&d->d_maxrtime, val, sz, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + if (strcmp(name, NNG_OPT_RECONNMINT) == 0) { + int rv; + nni_mtx_lock(&d->d_mtx); + rv = nni_copyin_ms(&d->d_inirtime, val, sz, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } for (o = d->d_ops.d_options; o && o->o_name; o++) { int rv; @@ -481,6 +502,21 @@ nni_dialer_getopt( { nni_tran_option *o; + if (strcmp(name, NNG_OPT_RECONNMAXT) == 0) { + int rv; + nni_mtx_lock(&d->d_mtx); + rv = nni_copyout_ms(d->d_maxrtime, valp, szp, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + if (strcmp(name, NNG_OPT_RECONNMINT) == 0) { + int rv; + nni_mtx_lock(&d->d_mtx); + rv = nni_copyout_ms(d->d_inirtime, valp, szp, t); + nni_mtx_unlock(&d->d_mtx); + return (rv); + } + for (o = d->d_ops.d_options; o && o->o_name; o++) { int rv; if (strcmp(o->o_name, name) != 0) { diff --git a/src/core/listener.c b/src/core/listener.c index 6d580cd8..31c154bc 100644 --- a/src/core/listener.c +++ b/src/core/listener.c @@ -282,21 +282,16 @@ listener_accept_cb(void *arg) NNI_ASSERT(data != NULL); rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data); } - - if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { - nni_pipe_stop(p); - } - nni_mtx_lock(&l->l_mtx); switch (rv) { case 0: - nni_pipe_set_listener(p, l); - nni_list_append(&l->l_pipes, p); if (l->l_closing) { nni_mtx_unlock(&l->l_mtx); nni_pipe_stop(p); return; } + nni_pipe_set_listener(p, l); + nni_list_append(&l->l_pipes, p); listener_accept_start(l); break; case NNG_ECONNABORTED: // remote condition, no cooldown @@ -316,6 +311,10 @@ listener_accept_cb(void *arg) break; } nni_mtx_unlock(&l->l_mtx); + + if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) { + nni_pipe_stop(p); + } } static void diff --git a/src/core/socket.c b/src/core/socket.c index 894e4fee..4d7bbbc1 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -182,30 +182,6 @@ sock_get_sendtimeo(nni_sock *s, void *buf, size_t *szp, nni_opt_type t) } static int -sock_set_reconnmint(nni_sock *s, const void *buf, size_t sz, nni_opt_type t) -{ - return (nni_copyin_ms(&s->s_reconn, buf, sz, t)); -} - -static int -sock_get_reconnmint(nni_sock *s, void *buf, size_t *szp, nni_opt_type t) -{ - return (nni_copyout_ms(s->s_reconn, buf, szp, t)); -} - -static int -sock_set_reconnmaxt(nni_sock *s, const void *buf, size_t sz, nni_opt_type t) -{ - return (nni_copyin_ms(&s->s_reconnmax, buf, sz, t)); -} - -static int -sock_get_reconnmaxt(nni_sock *s, void *buf, size_t *szp, nni_opt_type t) -{ - return (nni_copyout_ms(s->s_reconnmax, buf, szp, t)); -} - -static int sock_set_recvbuf(nni_sock *s, const void *buf, size_t sz, nni_opt_type t) { int len; @@ -317,18 +293,6 @@ static const sock_option sock_options[] = { .o_set = sock_set_sendbuf, }, { - .o_name = NNG_OPT_RECONNMINT, - .o_type = NNI_TYPE_DURATION, - .o_get = sock_get_reconnmint, - .o_set = sock_set_reconnmint, - }, - { - .o_name = NNG_OPT_RECONNMAXT, - .o_type = NNI_TYPE_DURATION, - .o_get = sock_get_reconnmaxt, - .o_set = sock_set_reconnmaxt, - }, - { .o_name = NNG_OPT_SOCKNAME, .o_type = NNI_TYPE_STRING, .o_get = sock_get_sockname, @@ -914,16 +878,6 @@ nni_sock_proto_data(nni_sock *sock) return (sock->s_data); } -void -nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) -{ - // These two values are linked, so get them atomically. - nni_mtx_lock(&sock->s_mx); - *rcur = sock->s_reconn; - *rmax = sock->s_reconnmax ? sock->s_reconnmax : sock->s_reconn; - nni_mtx_unlock(&sock->s_mx); -} - int nni_sock_add_listener(nni_sock *s, nni_listener *l) { @@ -1036,8 +990,7 @@ nni_sock_setopt( return (rv); } - // Some options do not go down to transports. Handle them - // directly. + // Some options do not go down to transports. Handle them directly. for (sso = sock_options; sso->o_name != NULL; sso++) { if (strcmp(sso->o_name, name) != 0) { continue; @@ -1058,10 +1011,16 @@ nni_sock_setopt( return (rv); } - // Validation of transport options. This is stateless, so + // Validation of generic and transport options. This is stateless, so // transports should not fail to set an option later if they // passed it here. - if ((rv = nni_tran_chkopt(name, v, sz, t)) != 0) { + if ((strcmp(name, NNG_OPT_RECONNMINT) == 0) || + (strcmp(name, NNG_OPT_RECONNMAXT) == 0)) { + nng_duration ms; + if ((rv = nni_copyin_ms(&ms, v, sz, t)) != 0) { + return (rv); + } + } else if ((rv = nni_tran_chkopt(name, v, sz, t)) != 0) { return (rv); } @@ -1194,10 +1153,18 @@ nni_sock_getopt( size_t sz = sopt->sz; if ((sopt->typ != NNI_TYPE_OPAQUE) && - (t != NNI_TYPE_OPAQUE) && (t != sopt->typ)) { - nni_mtx_unlock(&s->s_mx); - return (NNG_EBADTYPE); + (t != sopt->typ)) { + + if (t != NNI_TYPE_OPAQUE) { + nni_mtx_unlock(&s->s_mx); + return (NNG_EBADTYPE); + } + if (*szp != sopt->sz) { + nni_mtx_unlock(&s->s_mx); + return (NNG_EINVAL); + } } + if (sopt->sz > *szp) { sz = *szp; } diff --git a/src/core/socket.h b/src/core/socket.h index 184cfb64..37256571 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -64,8 +64,6 @@ extern nni_msgq *nni_sock_sendq(nni_sock *); // inject incoming messages from pipes to it. extern nni_msgq *nni_sock_recvq(nni_sock *); -extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *); - // nni_sock_flags returns the socket flags, used to indicate whether read // and or write are appropriate for the protocol. extern uint32_t nni_sock_flags(nni_sock *); diff --git a/tests/pipe.c b/tests/pipe.c index 79006fc8..dfebbd56 100644 --- a/tests/pipe.c +++ b/tests/pipe.c @@ -15,6 +15,7 @@ #include "supplemental/util/platform.h" #include "stubs.h" +#include <stdbool.h> #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) @@ -33,16 +34,27 @@ struct testcase { int err; int rej; nng_mtx * lk; + nng_cv * cv; }; -static int -getval(struct testcase *t, int *vp) +static bool +expect(struct testcase *t, int *vp, int v) { - int rv; + bool ok; + nng_time when = nng_clock() + 5000; // five seconds + nng_mtx_lock(t->lk); - rv = *vp; + while (*vp != v) { + if (nng_cv_until(t->cv, when) == NNG_ETIMEDOUT) { + break; + } + } + ok = (*vp == v) ? true : false; + if (!ok) { + printf("Expected %d but got %d\n", v, *vp); + } nng_mtx_unlock(t->lk); - return (rv); + return (ok); } void @@ -55,6 +67,7 @@ notify(nng_pipe p, int act, void *arg) (nng_listener_id(nng_pipe_listener(p)) != nng_listener_id(t->l)) || (nng_dialer_id(nng_pipe_dialer(p)) != nng_dialer_id(t->d))) { t->err++; + nng_cv_wake(t->cv); nng_mtx_unlock(t->lk); return; } @@ -73,9 +86,12 @@ notify(nng_pipe p, int act, void *arg) break; default: t->err++; + nng_cv_wake(t->cv); + nng_mtx_unlock(t->lk); return; } t->p = p; + nng_cv_wake(t->cv); nng_mtx_unlock(t->lk); } @@ -107,17 +123,26 @@ TestMain("Pipe notify works", { memset(&pull, 0, sizeof(pull)); memset(&push, 0, sizeof(push)); So(nng_mtx_alloc(&push.lk) == 0); + So(nng_cv_alloc(&push.cv, push.lk) == 0); So(nng_mtx_alloc(&pull.lk) == 0); + So(nng_cv_alloc(&pull.cv, pull.lk) == 0); So(nng_push_open(&push.s) == 0); So(nng_pull_open(&pull.s) == 0); Reset({ nng_close(push.s); nng_close(pull.s); + nng_cv_free(push.cv); + nng_cv_free(pull.cv); nng_mtx_free(push.lk); nng_mtx_free(pull.lk); }); + So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0); + So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0); + So(nng_setopt_ms(pull.s, NNG_OPT_RECONNMINT, 10) == 0); + So(nng_setopt_ms(pull.s, NNG_OPT_RECONNMAXT, 10) == 0); + So(nng_pipe_notify( push.s, NNG_PIPE_EV_ADD_PRE, notify, &push) == 0); So(nng_pipe_notify( @@ -131,25 +156,27 @@ TestMain("Pipe notify works", { So(nng_pipe_notify( pull.s, NNG_PIPE_EV_REM_POST, notify, &pull) == 0); - So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0); - So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0); - Convey("Dialing works", { So(nng_listener_create(&pull.l, pull.s, addr) == 0); So(nng_dialer_create(&push.d, push.s, addr) == 0); So(nng_listener_id(pull.l) > 0); So(nng_dialer_id(push.d) > 0); + So(nng_dialer_setopt_ms( + push.d, NNG_OPT_RECONNMINT, 10) == 0); + So(nng_dialer_setopt_ms( + push.d, NNG_OPT_RECONNMAXT, 10) == 0); So(nng_listener_start(pull.l, 0) == 0); So(nng_dialer_start(push.d, 0) == 0); - nng_msleep(100); - So(getval(&pull, &pull.add_pre) == 1); - So(getval(&pull, &pull.add_post) == 1); - So(getval(&pull, &pull.rem) == 0); - So(getval(&pull, &pull.err) == 0); - So(getval(&push, &push.add_pre) == 1); - So(getval(&push, &push.add_post) == 1); - So(getval(&push, &push.rem) == 0); - So(getval(&push, &push.err) == 0); + So(expect(&pull, &pull.add_pre, 1)); + So(expect(&pull, &pull.add_post, 1)); + So(expect(&pull, &pull.add_pre, 1)); + So(expect(&pull, &pull.add_post, 1)); + So(expect(&pull, &pull.rem, 0)); + So(expect(&pull, &pull.err, 0)); + So(expect(&push, &push.add_pre, 1)); + So(expect(&push, &push.add_post, 1)); + So(expect(&push, &push.rem, 0)); + So(expect(&push, &push.err, 0)); Convey("We can send a frame", { nng_msg *msg; @@ -166,20 +193,19 @@ TestMain("Pipe notify works", { }); Convey("Reconnection works", { - So(getval(&pull, &pull.add_pre) == 1); - So(getval(&pull, &pull.add_post) == 1); + So(expect(&pull, &pull.add_pre, 1)); + So(expect(&pull, &pull.add_post, 1)); nng_pipe_close(pull.p); - nng_msleep(200); - So(getval(&pull, &pull.err) == 0); - So(getval(&pull, &pull.rem) == 1); - So(getval(&pull, &pull.add_pre) == 2); - So(getval(&pull, &pull.add_post) == 2); + So(expect(&pull, &pull.rem, 1)); + So(expect(&pull, &pull.err, 0)); + So(expect(&pull, &pull.add_pre, 2)); + So(expect(&pull, &pull.add_post, 2)); - So(getval(&push, &push.err) == 0); - So(getval(&push, &push.rem) == 1); - So(getval(&push, &push.add_pre) == 2); - So(getval(&push, &push.add_post) == 2); + So(expect(&push, &push.rem, 1)); + So(expect(&push, &push.err, 0)); + So(expect(&push, &push.add_pre, 2)); + So(expect(&push, &push.add_post, 2)); Convey("They still exchange frames", { nng_msg *msg; @@ -209,16 +235,16 @@ TestMain("Pipe notify works", { So(nng_listener_id(pull.l) > 0); So(nng_dialer_id(push.d) > 0); So(nng_listener_start(pull.l, 0) == 0); - So(nng_dialer_start(push.d, 0) == 0); nng_msleep(100); - So(getval(&pull, &pull.add_pre) == 2); - So(getval(&pull, &pull.add_post) == 1); - So(getval(&pull, &pull.rem) == 1); - So(getval(&pull, &pull.err) == 0); - So(getval(&push, &push.add_pre) == 2); - So(getval(&push, &push.add_post) == 2); - So(getval(&push, &push.rem) == 1); - So(getval(&push, &push.err) == 0); + So(nng_dialer_start(push.d, 0) == 0); + So(expect(&pull, &pull.add_pre, 2)); + So(expect(&pull, &pull.add_post, 1)); + So(expect(&pull, &pull.rem, 1)); + So(expect(&pull, &pull.err, 0)); + So(expect(&push, &push.add_pre, 2)); + So(expect(&push, &push.add_post, 2)); + So(expect(&push, &push.rem, 1) == 1); + So(expect(&push, &push.err, 0)); }); }); }) diff --git a/tests/sock.c b/tests/sock.c index 3792090b..002ae944 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -387,7 +387,7 @@ TestMain("Socket Operations", { // Not appropriate for dialer. So(nng_dialer_setopt_bool( ep, NNG_OPT_RAW, true) == NNG_ENOTSUP); - So(nng_dialer_setopt_ms(ep, NNG_OPT_RECONNMINT, + So(nng_dialer_setopt_ms(ep, NNG_OPT_SENDTIMEO, 1) == NNG_ENOTSUP); So(nng_dialer_setopt_string(ep, NNG_OPT_SOCKNAME, |
