diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-17 12:54:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-17 19:29:37 -0700 |
| commit | 70d478f5d185e147ca8d3dcba4cbd8bb6da3719a (patch) | |
| tree | 443e3b0e81138d7c195660d45eca7d4d497af8ac /tests/pipe.c | |
| parent | e490aa3353f05e158a0f1f485f371cd49e70b4f5 (diff) | |
| download | nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.gz nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.bz2 nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.zip | |
fixes #449 Want more flexible pipe events
This changes the signature of nng_pipe_notify(), and the associated
events. The documentation is updated to reflect this.
We have also broken the lock up so that we don't hold the master
socket lock for some of these things, which may have beneficial
impact on performance.
Diffstat (limited to 'tests/pipe.c')
| -rw-r--r-- | tests/pipe.c | 115 |
1 files changed, 95 insertions, 20 deletions
diff --git a/tests/pipe.c b/tests/pipe.c index 3457db9b..79006fc8 100644 --- a/tests/pipe.c +++ b/tests/pipe.c @@ -27,27 +27,48 @@ struct testcase { nng_dialer d; nng_listener l; nng_pipe p; - int add; + int add_pre; + int add_post; int rem; int err; + int rej; + nng_mtx * lk; }; +static int +getval(struct testcase *t, int *vp) +{ + int rv; + nng_mtx_lock(t->lk); + rv = *vp; + nng_mtx_unlock(t->lk); + return (rv); +} + void -notify(nng_pipe p, nng_pipe_action act, void *arg) +notify(nng_pipe p, int act, void *arg) { struct testcase *t = arg; + nng_mtx_lock(t->lk); if ((nng_socket_id(nng_pipe_socket(p)) != nng_socket_id(t->s)) || (nng_listener_id(nng_pipe_listener(p)) != nng_listener_id(t->l)) || (nng_dialer_id(nng_pipe_dialer(p)) != nng_dialer_id(t->d))) { t->err++; + nng_mtx_unlock(t->lk); return; } + if (t->add_post > t->add_pre) { + t->err++; + } switch (act) { - case NNG_PIPE_ADD: - t->add++; + case NNG_PIPE_EV_ADD_PRE: + t->add_pre++; + break; + case NNG_PIPE_EV_ADD_POST: + t->add_post++; break; - case NNG_PIPE_REM: + case NNG_PIPE_EV_REM_POST: t->rem++; break; default: @@ -55,6 +76,21 @@ notify(nng_pipe p, nng_pipe_action act, void *arg) return; } t->p = p; + nng_mtx_unlock(t->lk); +} + +void +reject(nng_pipe p, int act, void *arg) +{ + struct testcase *t = arg; + notify(p, act, arg); + + nng_mtx_lock(t->lk); + if (!t->rej) { + nng_pipe_close(p); + t->rej++; + } + nng_mtx_unlock(t->lk); } char addr[64]; @@ -70,16 +106,30 @@ TestMain("Pipe notify works", { memset(&pull, 0, sizeof(pull)); memset(&push, 0, sizeof(push)); + So(nng_mtx_alloc(&push.lk) == 0); + So(nng_mtx_alloc(&pull.lk) == 0); So(nng_push_open(&push.s) == 0); So(nng_pull_open(&pull.s) == 0); Reset({ nng_close(push.s); nng_close(pull.s); + nng_mtx_free(push.lk); + nng_mtx_free(pull.lk); }); - So(nng_pipe_notify(push.s, notify, &push) == 0); - So(nng_pipe_notify(pull.s, notify, &pull) == 0); + So(nng_pipe_notify( + push.s, NNG_PIPE_EV_ADD_PRE, notify, &push) == 0); + So(nng_pipe_notify( + push.s, NNG_PIPE_EV_ADD_POST, notify, &push) == 0); + So(nng_pipe_notify( + push.s, NNG_PIPE_EV_REM_POST, notify, &push) == 0); + So(nng_pipe_notify( + pull.s, NNG_PIPE_EV_ADD_PRE, notify, &pull) == 0); + So(nng_pipe_notify( + pull.s, NNG_PIPE_EV_ADD_POST, notify, &pull) == 0); + So(nng_pipe_notify( + pull.s, NNG_PIPE_EV_REM_POST, notify, &pull) == 0); So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0); So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0); @@ -92,12 +142,14 @@ TestMain("Pipe notify works", { So(nng_listener_start(pull.l, 0) == 0); So(nng_dialer_start(push.d, 0) == 0); nng_msleep(100); - So(pull.add == 1); - So(pull.rem == 0); - So(pull.err == 0); - So(push.add == 1); - So(push.rem == 0); - So(push.err == 0); + So(getval(&pull, &pull.add_pre) == 1); + So(getval(&pull, &pull.add_post) == 1); + So(getval(&pull, &pull.rem) == 0); + So(getval(&pull, &pull.err) == 0); + So(getval(&push, &push.add_pre) == 1); + So(getval(&push, &push.add_post) == 1); + So(getval(&push, &push.rem) == 0); + So(getval(&push, &push.err) == 0); Convey("We can send a frame", { nng_msg *msg; @@ -114,17 +166,20 @@ TestMain("Pipe notify works", { }); Convey("Reconnection works", { - So(pull.add == 1); + So(getval(&pull, &pull.add_pre) == 1); + So(getval(&pull, &pull.add_post) == 1); nng_pipe_close(pull.p); nng_msleep(200); - So(pull.err == 0); - So(pull.rem == 1); - So(pull.add == 2); + So(getval(&pull, &pull.err) == 0); + So(getval(&pull, &pull.rem) == 1); + So(getval(&pull, &pull.add_pre) == 2); + So(getval(&pull, &pull.add_post) == 2); - So(push.err == 0); - So(push.rem == 1); - So(push.add == 2); + So(getval(&push, &push.err) == 0); + So(getval(&push, &push.rem) == 1); + So(getval(&push, &push.add_pre) == 2); + So(getval(&push, &push.add_post) == 2); Convey("They still exchange frames", { nng_msg *msg; @@ -145,5 +200,25 @@ TestMain("Pipe notify works", { }); }); }); + + Convey("Reject works", { + So(nng_pipe_notify(pull.s, NNG_PIPE_EV_ADD_PRE, reject, + &pull) == 0); + So(nng_listener_create(&pull.l, pull.s, addr) == 0); + So(nng_dialer_create(&push.d, push.s, addr) == 0); + So(nng_listener_id(pull.l) > 0); + So(nng_dialer_id(push.d) > 0); + So(nng_listener_start(pull.l, 0) == 0); + So(nng_dialer_start(push.d, 0) == 0); + nng_msleep(100); + So(getval(&pull, &pull.add_pre) == 2); + So(getval(&pull, &pull.add_post) == 1); + So(getval(&pull, &pull.rem) == 1); + So(getval(&pull, &pull.err) == 0); + So(getval(&push, &push.add_pre) == 2); + So(getval(&push, &push.add_post) == 2); + So(getval(&push, &push.rem) == 1); + So(getval(&push, &push.err) == 0); + }); }); }) |
