diff options
Diffstat (limited to 'src/sp/protocol/pair1/pair.c')
| -rw-r--r-- | src/sp/protocol/pair1/pair.c | 471 |
1 files changed, 388 insertions, 83 deletions
diff --git a/src/sp/protocol/pair1/pair.c b/src/sp/protocol/pair1/pair.c index ba497c42..60de92e0 100644 --- a/src/sp/protocol/pair1/pair.c +++ b/src/sp/protocol/pair1/pair.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -26,19 +26,26 @@ typedef struct pair1_sock pair1_sock; static void pair1_pipe_send_cb(void *); static void pair1_pipe_recv_cb(void *); -static void pair1_pipe_get_cb(void *); -static void pair1_pipe_put_cb(void *); static void pair1_pipe_fini(void *); +static void pair1_send_sched(pair1_sock *); +static void pair1_pipe_send(pair1_pipe *, nni_msg *); // pair1_sock is our per-socket protocol private structure. struct pair1_sock { - nni_msgq * uwq; - nni_msgq * urq; nni_sock * sock; bool raw; pair1_pipe * p; nni_atomic_int ttl; nni_mtx mtx; + nni_lmq wmq; + nni_list waq; + nni_lmq rmq; + nni_list raq; + nni_pollable writable; + nni_pollable readable; + bool rd_ready; // pipe ready for read + bool wr_ready; // pipe ready for write + #ifdef NNG_ENABLE_STATS nni_stat_item stat_poly; nni_stat_item stat_raw; @@ -60,8 +67,6 @@ struct pair1_pipe { pair1_sock *pair; nni_aio aio_send; nni_aio aio_recv; - nni_aio aio_get; - nni_aio aio_put; }; static void @@ -69,6 +74,10 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; + nni_lmq_fini(&s->rmq); + nni_lmq_fini(&s->wmq); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->mtx); } @@ -90,6 +99,16 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) // Raw mode uses this. nni_mtx_init(&s->mtx); s->sock = sock; + s->raw = raw; + + nni_lmq_init(&s->rmq, 0); + nni_lmq_init(&s->wmq, 0); + nni_aio_list_init(&s->raq); + nni_aio_list_init(&s->waq); + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); #ifdef NNG_ENABLE_STATS static const nni_stat_info poly_info = { @@ -161,12 +180,6 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) nni_stat_set_bool(&s->stat_poly, false); #endif - s->raw = raw; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); - nni_atomic_init(&s->ttl); - nni_atomic_set(&s->ttl, 8); - return (0); } @@ -191,12 +204,22 @@ pair1_pipe_stop(void *arg) nni_mtx_lock(&s->mtx); if (s->p == p) { s->p = NULL; + if (s->rd_ready) { + nni_msg *m = nni_aio_get_msg(&p->aio_recv); + nni_msg_free(m); + s->rd_ready = false; + } + if (s->wr_ready) { + s->wr_ready = false; + nni_pollable_clear(&s->writable); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } } nni_mtx_unlock(&s->mtx); nni_aio_stop(&p->aio_send); nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_put); - nni_aio_stop(&p->aio_get); } static void @@ -206,8 +229,6 @@ pair1_pipe_fini(void *arg) nni_aio_fini(&p->aio_send); nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_put); - nni_aio_fini(&p->aio_get); } static int @@ -217,8 +238,6 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair) nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); - nni_aio_init(&p->aio_get, pair1_pipe_get_cb, p); - nni_aio_init(&p->aio_put, pair1_pipe_put_cb, p); p->pipe = pipe; p->pair = pair; @@ -226,6 +245,19 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair) return (0); } +static void +pair1_cancel(nni_aio *aio, void *arg, int rv) +{ + pair1_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + static int pair1_pipe_start(void *arg) { @@ -244,11 +276,11 @@ pair1_pipe_start(void *arg) BUMP_STAT(&s->stat_reject_already); return (NNG_EBUSY); } - s->p = p; + s->p = p; + s->rd_ready = false; nni_mtx_unlock(&s->mtx); - // Schedule a get. - nni_msgq_aio_get(s->uwq, &p->aio_get); + pair1_send_sched(s); // And the pipe read of course. nni_pipe_recv(p->pipe, &p->aio_recv); @@ -263,8 +295,6 @@ pair1_pipe_close(void *arg) nni_aio_close(&p->aio_send); nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_put); - nni_aio_close(&p->aio_get); } static void @@ -276,6 +306,7 @@ pair1_pipe_recv_cb(void *arg) uint32_t hdr; nni_pipe * pipe = p->pipe; size_t len; + nni_aio * a; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -283,13 +314,13 @@ pair1_pipe_recv_cb(void *arg) } msg = nni_aio_get_msg(&p->aio_recv); - nni_aio_set_msg(&p->aio_recv, NULL); // Store the pipe ID. nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + len = nni_msg_len(msg); // If the message is missing the hop count header, scrap it. - if ((nni_msg_len(msg) < sizeof(uint32_t)) || + if ((len < sizeof(uint32_t)) || ((hdr = nni_msg_trim_u32(msg)) > 0xff)) { BUMP_STAT(&s->stat_rx_malformed); nni_msg_free(msg); @@ -297,93 +328,101 @@ pair1_pipe_recv_cb(void *arg) return; } - len = nni_msg_len(msg); - // If we bounced too many times, discard the message, but // keep getting more. if ((int) hdr > nni_atomic_get(&s->ttl)) { BUMP_STAT(&s->stat_ttl_drop); nni_msg_free(msg); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_recv(pipe, &p->aio_recv); return; } + nni_sock_bump_rx(s->sock, len); + // Store the hop count in the header. nni_msg_header_append_u32(msg, hdr); - // Send the message up. - nni_aio_set_msg(&p->aio_put, msg); - nni_sock_bump_rx(s->sock, len); - nni_msgq_aio_put(s->urq, &p->aio_put); -} - -static void -pair1_pipe_put_cb(void *arg) -{ - pair1_pipe *p = arg; + nni_mtx_lock(&s->mtx); - if (nni_aio_result(&p->aio_put) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_put)); - nni_aio_set_msg(&p->aio_put, NULL); - nni_pipe_close(p->pipe); + // if anyone is blocking, then the lmq will be empty, and + // we should deliver it there. + if ((a = nni_list_first(&s->raq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_set_msg(a, msg); + nni_pipe_recv(pipe, &p->aio_recv); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_sync(a, 0, len); return; } - nni_pipe_recv(p->pipe, &p->aio_recv); + + // maybe we have room in the rmq? + if (!nni_lmq_full(&s->rmq)) { + nni_lmq_putq(&s->rmq, msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(pipe, &p->aio_recv); + } else { + s->rd_ready = true; + } + nni_pollable_raise(&s->readable); + nni_mtx_unlock(&s->mtx); } static void -pair1_pipe_get_cb(void *arg) +pair1_send_sched(pair1_sock *s) { - pair1_pipe *p = arg; - pair1_sock *s = p->pair; - nni_msg * msg; - uint32_t hops; + pair1_pipe *p; + nni_msg * m; + nni_aio * a = NULL; + size_t l; - if (nni_aio_result(&p->aio_get) != 0) { - nni_pipe_close(p->pipe); + nni_mtx_lock(&s->mtx); + + if ((p = s->p) == NULL) { + nni_mtx_unlock(&s->mtx); return; } - msg = nni_aio_get_msg(&p->aio_get); - nni_aio_set_msg(&p->aio_get, NULL); + s->wr_ready = true; - // Raw mode messages have the header already formed, with a hop count. - // Cooked mode messages have no header so we have to add one. - if (s->raw) { - if ((nni_msg_header_len(msg) != sizeof(uint32_t)) || - ((hops = nni_msg_header_trim_u32(msg)) > 254)) { - BUMP_STAT(&s->stat_tx_malformed); - nni_msg_free(msg); - nni_msgq_aio_get(s->uwq, &p->aio_get); - return; + // if message waiting in buffered queue, then we prefer that. + if (nni_lmq_getq(&s->wmq, &m) == 0) { + pair1_pipe_send(p, m); + + if ((a = nni_list_first(&s->waq)) != NULL) { + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + nni_lmq_putq(&s->wmq, m); } -#if NNG_TEST_LIB - } else if (s->inject_header) { - nni_aio_set_msg(&p->aio_send, msg); - nni_pipe_send(p->pipe, &p->aio_send); - return; -#endif - } else { - // Strip off any previously existing header, such as when - // replying to messages. - nni_msg_header_clear(msg); - hops = 0; + + } else if ((a = nni_list_first(&s->waq)) != NULL) { + // Looks like we had the unbuffered case, but + // someone was waiting. + nni_aio_list_remove(a); + + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + pair1_pipe_send(p, m); } - hops++; + // if we were blocked before, but not now, update. + if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) { + nni_pollable_raise(&s->writable); + } - // Insert the hops header. - nni_msg_header_append_u32(msg, hops); + nni_mtx_unlock(&s->mtx); - nni_aio_set_msg(&p->aio_send, msg); - nni_pipe_send(p->pipe, &p->aio_send); + if (a != NULL) { + nni_aio_set_msg(a, NULL); + nni_aio_finish_sync(a, 0, l); + } } static void pair1_pipe_send_cb(void *arg) { pair1_pipe *p = arg; - pair1_sock *s = p->pair; if (nni_aio_result(&p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(&p->aio_send)); @@ -392,7 +431,7 @@ pair1_pipe_send_cb(void *arg) return; } - nni_msgq_aio_get(s->uwq, &p->aio_get); + pair1_send_sched(p->pair); } static void @@ -404,7 +443,20 @@ pair1_sock_open(void *arg) static void pair1_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + pair1_sock *s = arg; + nni_aio * a; + nni_msg * m; + nni_mtx_lock(&s->mtx); + while (((a = nni_list_first(&s->raq)) != NULL) || + ((a = nni_list_first(&s->waq)) != NULL)) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + while ((nni_lmq_getq(&s->rmq, &m) == 0) || + (nni_lmq_getq(&s->wmq, &m) == 0)) { + nni_msg_free(m); + } + nni_mtx_unlock(&s->mtx); } static int @@ -442,20 +494,255 @@ pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t) #endif static void +pair1_pipe_send(pair1_pipe *p, nni_msg *m) +{ + pair1_sock *s = p->pair; + // assumption: we have unique access to the message at this point. + NNI_ASSERT(!nni_msg_shared(m)); + +#if NNG_TEST_LIB + if (s->inject_header) { + goto inject; + } +#endif + NNI_ASSERT(nni_msg_header_len(m) == sizeof(uint32_t)); + nni_msg_header_poke_u32(m, nni_msg_header_peek_u32(m) + 1); + +#if NNG_TEST_LIB +inject: +#endif + + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + s->wr_ready = false; +} + +static void pair1_sock_send(void *arg, nni_aio *aio) { pair1_sock *s = arg; + nni_msg * m; + size_t len; + int rv; + + m = nni_aio_get_msg(aio); + len = nni_msg_len(m); + nni_sock_bump_tx(s->sock, len); + + if (nni_aio_begin(aio) != 0) { + return; + } + +#if NNG_TEST_LIB + if (s->inject_header) { + goto inject; + } +#endif - nni_sock_bump_tx(s->sock, nni_msg_len(nni_aio_get_msg(aio))); - nni_msgq_aio_put(s->uwq, aio); + // Raw mode messages have the header already formed, with a hop count. + // Cooked mode messages have no header so we have to add one. + if (s->raw) { + if ((nni_msg_header_len(m) != sizeof(uint32_t)) || + (nni_msg_header_peek_u32(m) >= 0xff)) { + BUMP_STAT(&s->stat_tx_malformed); + nni_aio_finish_error(aio, NNG_EPROTO); + return; + } + + } else { + // Strip off any previously existing header, such as when + // replying to messages. + nni_msg_header_clear(m); + nni_msg_header_append_u32(m, 0); + } + +#if NNG_TEST_LIB +inject: +#endif + + nni_mtx_lock(&s->mtx); + if (s->wr_ready) { + pair1_pipe *p = s->p; + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + pair1_pipe_send(p, m); + nni_mtx_unlock(&s->mtx); + return; + } + + // Can we maybe queue it. + if (nni_lmq_putq(&s->wmq, m) == 0) { + // Yay, we can. So we're done. + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); + return; + } + nni_aio_list_append(&s->waq, aio); + nni_mtx_unlock(&s->mtx); } static void pair1_sock_recv(void *arg, nni_aio *aio) { pair1_sock *s = arg; + pair1_pipe *p; + nni_msg * m; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->mtx); + p = s->p; + + // Buffered read. If there is a message waiting for us, pick + // it up. We might need to post another read request as well. + if (nni_lmq_getq(&s->rmq, &m) == 0) { + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_lmq_putq(&s->rmq, m); + nni_pipe_recv(p->pipe, &p->aio_recv); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + // Unbuffered -- but waiting. + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + nni_pipe_recv(p->pipe, &p->aio_recv); + nni_pollable_clear(&s->readable); + nni_mtx_unlock(&s->mtx); + return; + } + + if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_list_append(&s->raq, aio); + } + nni_mtx_unlock(&s->mtx); +} + +static int +pair1_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair1_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->wmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_full(&s->wmq)) { + nni_pollable_raise(&s->writable); + } else if (!s->wr_ready) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->wmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair1_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair1_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->rmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_empty(&s->rmq)) { + nni_pollable_raise(&s->readable); + } else if (!s->rd_ready) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->rmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair1_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +pair1_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); } static nni_proto_pipe_ops pair1_pipe_ops = { @@ -473,6 +760,24 @@ static nni_option pair1_sock_options[] = { .o_get = pair1_sock_get_max_ttl, .o_set = pair1_sock_set_max_ttl, }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = pair1_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = pair1_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = pair1_get_send_buf_len, + .o_set = pair1_set_send_buf_len, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = pair1_get_recv_buf_len, + .o_set = pair1_set_recv_buf_len, + }, #ifdef NNG_TEST_LIB { // Test only option to pass header unmolested. This allows |
