diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-02-06 19:12:55 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-02-07 18:53:20 -0800 |
| commit | f79eb9473fbee6207b3e992aa884b8913d7cdc46 (patch) | |
| tree | 5724eee808251a6fdec51b9d7a47550cb5852845 /src/protocol/pair1/pair.c | |
| parent | 28c38d4116ffe8a05123cf98f62f7a63fdd1c920 (diff) | |
| download | nng-f79eb9473fbee6207b3e992aa884b8913d7cdc46.tar.gz nng-f79eb9473fbee6207b3e992aa884b8913d7cdc46.tar.bz2 nng-f79eb9473fbee6207b3e992aa884b8913d7cdc46.zip | |
fixes #1189 Extract and deprecate polyamorous mode
Diffstat (limited to 'src/protocol/pair1/pair.c')
| -rw-r--r-- | src/protocol/pair1/pair.c | 134 |
1 files changed, 32 insertions, 102 deletions
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index f564195d..b98975c3 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -13,16 +13,13 @@ #include "core/nng_impl.h" #include "nng/protocol/pair1/pair.h" -// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, -// usually, but it can support a polyamorous mode where a single server can -// communicate with multiple partners. +// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern. #define BUMP_STAT(x) nni_stat_inc_atomic(x, 1) typedef struct pair1_pipe pair1_pipe; typedef struct pair1_sock pair1_sock; -static void pair1_sock_get_cb(void *); static void pair1_pipe_send_cb(void *); static void pair1_pipe_recv_cb(void *); static void pair1_pipe_get_cb(void *); @@ -40,8 +37,6 @@ struct pair1_sock { nni_idhash * pipes; nni_list plist; bool started; - bool poly; - nni_aio aio_get; nni_stat_item stat_poly; nni_stat_item stat_raw; nni_stat_item stat_reject_mismatch; @@ -50,13 +45,15 @@ struct pair1_sock { nni_stat_item stat_rx_malformed; nni_stat_item stat_tx_malformed; nni_stat_item stat_tx_drop; +#ifdef NNG_TEST_LIB + bool inject_header; +#endif }; // pair1_pipe is our per-pipe protocol private structure. struct pair1_pipe { nni_pipe * pipe; pair1_sock * pair; - nni_msgq * send_queue; nni_aio aio_send; nni_aio aio_recv; nni_aio aio_get; @@ -69,7 +66,6 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; - nni_aio_fini(&s->aio_get); nni_idhash_fini(s->pipes); nni_mtx_fini(&s->mtx); } @@ -87,11 +83,8 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) // Raw mode uses this. nni_mtx_init(&s->mtx); - nni_aio_init(&s->aio_get, pair1_sock_get_cb, s); - nni_stat_init_bool( &s->stat_poly, "polyamorous", "polyamorous mode?", false); - nni_stat_set_lock(&s->stat_poly, &s->mtx); nni_sock_add_stat(sock, &s->stat_poly); nni_stat_init_bool(&s->stat_raw, "raw", "raw mode?", raw); @@ -131,7 +124,6 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) s->sock = sock; s->raw = raw; - s->poly = false; s->uwq = nni_sock_sendq(sock); s->urq = nni_sock_recvq(sock); nni_atomic_init(&s->ttl); @@ -172,25 +164,18 @@ pair1_pipe_fini(void *arg) nni_aio_fini(&p->aio_recv); nni_aio_fini(&p->aio_put); nni_aio_fini(&p->aio_get); - nni_msgq_fini(p->send_queue); } static int pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair) { pair1_pipe *p = arg; - int rv; nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); nni_aio_init(&p->aio_get, pair1_pipe_get_cb, p); nni_aio_init(&p->aio_put, pair1_pipe_put_cb, p); - if ((rv = nni_msgq_init(&p->send_queue, 2)) != 0) { - pair1_pipe_fini(p); - return (rv); - } - p->pipe = pipe; p->pair = pair; @@ -218,31 +203,19 @@ pair1_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); return (rv); } - if (!s->poly) { - if (!nni_list_empty(&s->plist)) { - nni_idhash_remove(s->pipes, id); - nni_mtx_unlock(&s->mtx); - BUMP_STAT(&s->stat_reject_already); - return (NNG_EBUSY); - } - } else { - if (!s->started) { - nni_msgq_aio_get(s->uwq, &s->aio_get); - } + if (!nni_list_empty(&s->plist)) { + nni_idhash_remove(s->pipes, id); + nni_mtx_unlock(&s->mtx); + BUMP_STAT(&s->stat_reject_already); + return (NNG_EBUSY); } nni_list_append(&s->plist, p); s->started = true; nni_mtx_unlock(&s->mtx); - // Schedule a get. In polyamorous mode we get on the per pipe - // send_queue, as the socket distributes to us. In monogamous mode - // we bypass and get from the upper write queue directly (saving a - // set of context switches). - if (s->poly) { - nni_msgq_aio_get(p->send_queue, &p->aio_get); - } else { - nni_msgq_aio_get(s->uwq, &p->aio_get); - } + // Schedule a get. + nni_msgq_aio_get(s->uwq, &p->aio_get); + // And the pipe read of course. nni_pipe_recv(p->pipe, &p->aio_recv); @@ -264,8 +237,6 @@ pair1_pipe_close(void *arg) nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_list_node_remove(&p->node); nni_mtx_unlock(&s->mtx); - - nni_msgq_close(p->send_queue); } static void @@ -319,47 +290,6 @@ pair1_pipe_recv_cb(void *arg) } static void -pair1_sock_get_cb(void *arg) -{ - pair1_pipe *p; - pair1_sock *s = arg; - nni_msg * msg; - uint32_t id; - - if (nni_aio_result(&s->aio_get) != 0) { - // Socket closing... - return; - } - - msg = nni_aio_get_msg(&s->aio_get); - nni_aio_set_msg(&s->aio_get, NULL); - - p = NULL; - nni_mtx_lock(&s->mtx); - // By definition we are in polyamorous mode. - NNI_ASSERT(s->poly); - // If no pipe was requested, we look for any connected peer. - if (((id = nni_msg_get_pipe(msg)) == 0) && - (!nni_list_empty(&s->plist))) { - p = nni_list_first(&s->plist); - } else { - nni_idhash_find(s->pipes, id, (void **) &p); - } - - // Try a non-blocking send. If this fails we just discard the - // message. We have to do this to avoid head-of-line blocking - // for messages sent to other pipes. Note that there is some - // buffering in the send_queue. - if ((p == NULL) || nni_msgq_tryput(p->send_queue, msg) != 0) { - BUMP_STAT(&s->stat_tx_drop); - nni_msg_free(msg); - } - - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, &s->aio_get); -} - -static void pair1_pipe_put_cb(void *arg) { pair1_pipe *p = arg; @@ -396,10 +326,15 @@ pair1_pipe_get_cb(void *arg) ((hops = nni_msg_header_trim_u32(msg)) > 254)) { BUMP_STAT(&s->stat_tx_malformed); nni_msg_free(msg); - nni_msgq_aio_get( - s->poly ? p->send_queue : s->uwq, &p->aio_get); + nni_msgq_aio_get(s->uwq, &p->aio_get); return; } +#if NNG_TEST_LIB + } else if (s->inject_header) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); + return; +#endif } else { // Strip off any previously existing header, such as when // replying to messages. @@ -429,9 +364,7 @@ pair1_pipe_send_cb(void *arg) return; } - // In polyamorous mode, we want to get from the send_queue; in - // monogamous we get from upper write queue. - nni_msgq_aio_get(s->poly ? p->send_queue : s->uwq, &p->aio_get); + nni_msgq_aio_get(s->uwq, &p->aio_get); } static void @@ -443,8 +376,7 @@ pair1_sock_open(void *arg) static void pair1_sock_close(void *arg) { - pair1_sock *s = arg; - nni_aio_close(&s->aio_get); + NNI_ARG_UNUSED(arg); } static int @@ -468,24 +400,19 @@ pair1_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t)); } + +#ifdef NNG_TEST_LIB static int -pair1_sock_set_poly(void *arg, const void *buf, size_t sz, nni_opt_type t) +pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t) { pair1_sock *s = arg; int rv; nni_mtx_lock(&s->mtx); - rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->poly, buf, sz, t); - nni_stat_set_value(&s->stat_poly, s->poly); + rv = nni_copyin_bool(&s->inject_header, buf, sz, t); nni_mtx_unlock(&s->mtx); return (rv); } - -static int -pair1_sock_get_poly(void *arg, void *buf, size_t *szp, nni_opt_type t) -{ - pair1_sock *s = arg; - return (nni_copyout_bool(s->poly, buf, szp, t)); -} +#endif static void pair1_sock_send(void *arg, nni_aio *aio) @@ -519,11 +446,14 @@ static nni_option pair1_sock_options[] = { .o_get = pair1_sock_get_max_ttl, .o_set = pair1_sock_set_max_ttl, }, +#ifdef NNG_TEST_LIB { - .o_name = NNG_OPT_PAIR1_POLY, - .o_get = pair1_sock_get_poly, - .o_set = pair1_sock_set_poly, + // Test only option to pass header unmolested. This allows + // us to inject bad header contents. + .o_name = "pair1_test_inject_header", + .o_set = pair1_set_test_inject_header, }, +#endif // terminate list { .o_name = NULL, |
