diff options
Diffstat (limited to 'src/sp/protocol')
| -rw-r--r-- | src/sp/protocol/pair1/pair.c | 471 | ||||
| -rw-r--r-- | src/sp/protocol/pair1/pair1_test.c | 210 |
2 files changed, 590 insertions, 91 deletions
diff --git a/src/sp/protocol/pair1/pair.c b/src/sp/protocol/pair1/pair.c index ba497c42..60de92e0 100644 --- a/src/sp/protocol/pair1/pair.c +++ b/src/sp/protocol/pair1/pair.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -26,19 +26,26 @@ typedef struct pair1_sock pair1_sock; static void pair1_pipe_send_cb(void *); static void pair1_pipe_recv_cb(void *); -static void pair1_pipe_get_cb(void *); -static void pair1_pipe_put_cb(void *); static void pair1_pipe_fini(void *); +static void pair1_send_sched(pair1_sock *); +static void pair1_pipe_send(pair1_pipe *, nni_msg *); // pair1_sock is our per-socket protocol private structure. struct pair1_sock { - nni_msgq * uwq; - nni_msgq * urq; nni_sock * sock; bool raw; pair1_pipe * p; nni_atomic_int ttl; nni_mtx mtx; + nni_lmq wmq; + nni_list waq; + nni_lmq rmq; + nni_list raq; + nni_pollable writable; + nni_pollable readable; + bool rd_ready; // pipe ready for read + bool wr_ready; // pipe ready for write + #ifdef NNG_ENABLE_STATS nni_stat_item stat_poly; nni_stat_item stat_raw; @@ -60,8 +67,6 @@ struct pair1_pipe { pair1_sock *pair; nni_aio aio_send; nni_aio aio_recv; - nni_aio aio_get; - nni_aio aio_put; }; static void @@ -69,6 +74,10 @@ pair1_sock_fini(void *arg) { pair1_sock *s = arg; + nni_lmq_fini(&s->rmq); + nni_lmq_fini(&s->wmq); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->mtx); } @@ -90,6 +99,16 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) // Raw mode uses this. nni_mtx_init(&s->mtx); s->sock = sock; + s->raw = raw; + + nni_lmq_init(&s->rmq, 0); + nni_lmq_init(&s->wmq, 0); + nni_aio_list_init(&s->raq); + nni_aio_list_init(&s->waq); + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); + nni_atomic_init(&s->ttl); + nni_atomic_set(&s->ttl, 8); #ifdef NNG_ENABLE_STATS static const nni_stat_info poly_info = { @@ -161,12 +180,6 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw) nni_stat_set_bool(&s->stat_poly, false); #endif - s->raw = raw; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); - nni_atomic_init(&s->ttl); - nni_atomic_set(&s->ttl, 8); - return (0); } @@ -191,12 +204,22 @@ pair1_pipe_stop(void *arg) nni_mtx_lock(&s->mtx); if (s->p == p) { s->p = NULL; + if (s->rd_ready) { + nni_msg *m = nni_aio_get_msg(&p->aio_recv); + nni_msg_free(m); + s->rd_ready = false; + } + if (s->wr_ready) { + s->wr_ready = false; + nni_pollable_clear(&s->writable); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } } nni_mtx_unlock(&s->mtx); nni_aio_stop(&p->aio_send); nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_put); - nni_aio_stop(&p->aio_get); } static void @@ -206,8 +229,6 @@ pair1_pipe_fini(void *arg) nni_aio_fini(&p->aio_send); nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_put); - nni_aio_fini(&p->aio_get); } static int @@ -217,8 +238,6 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair) nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p); nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p); - nni_aio_init(&p->aio_get, pair1_pipe_get_cb, p); - nni_aio_init(&p->aio_put, pair1_pipe_put_cb, p); p->pipe = pipe; p->pair = pair; @@ -226,6 +245,19 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair) return (0); } +static void +pair1_cancel(nni_aio *aio, void *arg, int rv) +{ + pair1_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + static int pair1_pipe_start(void *arg) { @@ -244,11 +276,11 @@ pair1_pipe_start(void *arg) BUMP_STAT(&s->stat_reject_already); return (NNG_EBUSY); } - s->p = p; + s->p = p; + s->rd_ready = false; nni_mtx_unlock(&s->mtx); - // Schedule a get. - nni_msgq_aio_get(s->uwq, &p->aio_get); + pair1_send_sched(s); // And the pipe read of course. nni_pipe_recv(p->pipe, &p->aio_recv); @@ -263,8 +295,6 @@ pair1_pipe_close(void *arg) nni_aio_close(&p->aio_send); nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_put); - nni_aio_close(&p->aio_get); } static void @@ -276,6 +306,7 @@ pair1_pipe_recv_cb(void *arg) uint32_t hdr; nni_pipe * pipe = p->pipe; size_t len; + nni_aio * a; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); @@ -283,13 +314,13 @@ pair1_pipe_recv_cb(void *arg) } msg = nni_aio_get_msg(&p->aio_recv); - nni_aio_set_msg(&p->aio_recv, NULL); // Store the pipe ID. nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + len = nni_msg_len(msg); // If the message is missing the hop count header, scrap it. - if ((nni_msg_len(msg) < sizeof(uint32_t)) || + if ((len < sizeof(uint32_t)) || ((hdr = nni_msg_trim_u32(msg)) > 0xff)) { BUMP_STAT(&s->stat_rx_malformed); nni_msg_free(msg); @@ -297,93 +328,101 @@ pair1_pipe_recv_cb(void *arg) return; } - len = nni_msg_len(msg); - // If we bounced too many times, discard the message, but // keep getting more. if ((int) hdr > nni_atomic_get(&s->ttl)) { BUMP_STAT(&s->stat_ttl_drop); nni_msg_free(msg); + nni_aio_set_msg(&p->aio_recv, NULL); nni_pipe_recv(pipe, &p->aio_recv); return; } + nni_sock_bump_rx(s->sock, len); + // Store the hop count in the header. nni_msg_header_append_u32(msg, hdr); - // Send the message up. - nni_aio_set_msg(&p->aio_put, msg); - nni_sock_bump_rx(s->sock, len); - nni_msgq_aio_put(s->urq, &p->aio_put); -} - -static void -pair1_pipe_put_cb(void *arg) -{ - pair1_pipe *p = arg; + nni_mtx_lock(&s->mtx); - if (nni_aio_result(&p->aio_put) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_put)); - nni_aio_set_msg(&p->aio_put, NULL); - nni_pipe_close(p->pipe); + // if anyone is blocking, then the lmq will be empty, and + // we should deliver it there. + if ((a = nni_list_first(&s->raq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_set_msg(a, msg); + nni_pipe_recv(pipe, &p->aio_recv); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_sync(a, 0, len); return; } - nni_pipe_recv(p->pipe, &p->aio_recv); + + // maybe we have room in the rmq? + if (!nni_lmq_full(&s->rmq)) { + nni_lmq_putq(&s->rmq, msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(pipe, &p->aio_recv); + } else { + s->rd_ready = true; + } + nni_pollable_raise(&s->readable); + nni_mtx_unlock(&s->mtx); } static void -pair1_pipe_get_cb(void *arg) +pair1_send_sched(pair1_sock *s) { - pair1_pipe *p = arg; - pair1_sock *s = p->pair; - nni_msg * msg; - uint32_t hops; + pair1_pipe *p; + nni_msg * m; + nni_aio * a = NULL; + size_t l; - if (nni_aio_result(&p->aio_get) != 0) { - nni_pipe_close(p->pipe); + nni_mtx_lock(&s->mtx); + + if ((p = s->p) == NULL) { + nni_mtx_unlock(&s->mtx); return; } - msg = nni_aio_get_msg(&p->aio_get); - nni_aio_set_msg(&p->aio_get, NULL); + s->wr_ready = true; - // Raw mode messages have the header already formed, with a hop count. - // Cooked mode messages have no header so we have to add one. - if (s->raw) { - if ((nni_msg_header_len(msg) != sizeof(uint32_t)) || - ((hops = nni_msg_header_trim_u32(msg)) > 254)) { - BUMP_STAT(&s->stat_tx_malformed); - nni_msg_free(msg); - nni_msgq_aio_get(s->uwq, &p->aio_get); - return; + // if message waiting in buffered queue, then we prefer that. + if (nni_lmq_getq(&s->wmq, &m) == 0) { + pair1_pipe_send(p, m); + + if ((a = nni_list_first(&s->waq)) != NULL) { + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + nni_lmq_putq(&s->wmq, m); } -#if NNG_TEST_LIB - } else if (s->inject_header) { - nni_aio_set_msg(&p->aio_send, msg); - nni_pipe_send(p->pipe, &p->aio_send); - return; -#endif - } else { - // Strip off any previously existing header, such as when - // replying to messages. - nni_msg_header_clear(msg); - hops = 0; + + } else if ((a = nni_list_first(&s->waq)) != NULL) { + // Looks like we had the unbuffered case, but + // someone was waiting. + nni_aio_list_remove(a); + + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + pair1_pipe_send(p, m); } - hops++; + // if we were blocked before, but not now, update. + if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) { + nni_pollable_raise(&s->writable); + } - // Insert the hops header. - nni_msg_header_append_u32(msg, hops); + nni_mtx_unlock(&s->mtx); - nni_aio_set_msg(&p->aio_send, msg); - nni_pipe_send(p->pipe, &p->aio_send); + if (a != NULL) { + nni_aio_set_msg(a, NULL); + nni_aio_finish_sync(a, 0, l); + } } static void pair1_pipe_send_cb(void *arg) { pair1_pipe *p = arg; - pair1_sock *s = p->pair; if (nni_aio_result(&p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(&p->aio_send)); @@ -392,7 +431,7 @@ pair1_pipe_send_cb(void *arg) return; } - nni_msgq_aio_get(s->uwq, &p->aio_get); + pair1_send_sched(p->pair); } static void @@ -404,7 +443,20 @@ pair1_sock_open(void *arg) static void pair1_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + pair1_sock *s = arg; + nni_aio * a; + nni_msg * m; + nni_mtx_lock(&s->mtx); + while (((a = nni_list_first(&s->raq)) != NULL) || + ((a = nni_list_first(&s->waq)) != NULL)) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + while ((nni_lmq_getq(&s->rmq, &m) == 0) || + (nni_lmq_getq(&s->wmq, &m) == 0)) { + nni_msg_free(m); + } + nni_mtx_unlock(&s->mtx); } static int @@ -442,20 +494,255 @@ pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t) #endif static void +pair1_pipe_send(pair1_pipe *p, nni_msg *m) +{ + pair1_sock *s = p->pair; + // assumption: we have unique access to the message at this point. + NNI_ASSERT(!nni_msg_shared(m)); + +#if NNG_TEST_LIB + if (s->inject_header) { + goto inject; + } +#endif + NNI_ASSERT(nni_msg_header_len(m) == sizeof(uint32_t)); + nni_msg_header_poke_u32(m, nni_msg_header_peek_u32(m) + 1); + +#if NNG_TEST_LIB +inject: +#endif + + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + s->wr_ready = false; +} + +static void pair1_sock_send(void *arg, nni_aio *aio) { pair1_sock *s = arg; + nni_msg * m; + size_t len; + int rv; + + m = nni_aio_get_msg(aio); + len = nni_msg_len(m); + nni_sock_bump_tx(s->sock, len); + + if (nni_aio_begin(aio) != 0) { + return; + } + +#if NNG_TEST_LIB + if (s->inject_header) { + goto inject; + } +#endif - nni_sock_bump_tx(s->sock, nni_msg_len(nni_aio_get_msg(aio))); - nni_msgq_aio_put(s->uwq, aio); + // Raw mode messages have the header already formed, with a hop count. + // Cooked mode messages have no header so we have to add one. + if (s->raw) { + if ((nni_msg_header_len(m) != sizeof(uint32_t)) || + (nni_msg_header_peek_u32(m) >= 0xff)) { + BUMP_STAT(&s->stat_tx_malformed); + nni_aio_finish_error(aio, NNG_EPROTO); + return; + } + + } else { + // Strip off any previously existing header, such as when + // replying to messages. + nni_msg_header_clear(m); + nni_msg_header_append_u32(m, 0); + } + +#if NNG_TEST_LIB +inject: +#endif + + nni_mtx_lock(&s->mtx); + if (s->wr_ready) { + pair1_pipe *p = s->p; + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + pair1_pipe_send(p, m); + nni_mtx_unlock(&s->mtx); + return; + } + + // Can we maybe queue it. + if (nni_lmq_putq(&s->wmq, m) == 0) { + // Yay, we can. So we're done. + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); + return; + } + nni_aio_list_append(&s->waq, aio); + nni_mtx_unlock(&s->mtx); } static void pair1_sock_recv(void *arg, nni_aio *aio) { pair1_sock *s = arg; + pair1_pipe *p; + nni_msg * m; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->mtx); + p = s->p; + + // Buffered read. If there is a message waiting for us, pick + // it up. We might need to post another read request as well. + if (nni_lmq_getq(&s->rmq, &m) == 0) { + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_lmq_putq(&s->rmq, m); + nni_pipe_recv(p->pipe, &p->aio_recv); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + // Unbuffered -- but waiting. + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + nni_pipe_recv(p->pipe, &p->aio_recv); + nni_pollable_clear(&s->readable); + nni_mtx_unlock(&s->mtx); + return; + } + + if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_list_append(&s->raq, aio); + } + nni_mtx_unlock(&s->mtx); +} + +static int +pair1_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair1_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->wmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_full(&s->wmq)) { + nni_pollable_raise(&s->writable); + } else if (!s->wr_ready) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->wmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair1_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair1_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->rmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_empty(&s->rmq)) { + nni_pollable_raise(&s->readable); + } else if (!s->rd_ready) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair1_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->rmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair1_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +pair1_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair1_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); } static nni_proto_pipe_ops pair1_pipe_ops = { @@ -473,6 +760,24 @@ static nni_option pair1_sock_options[] = { .o_get = pair1_sock_get_max_ttl, .o_set = pair1_sock_set_max_ttl, }, + { + .o_name = NNG_OPT_RECVFD, + .o_get = pair1_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = pair1_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = pair1_get_send_buf_len, + .o_set = pair1_set_send_buf_len, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = pair1_get_recv_buf_len, + .o_set = pair1_set_recv_buf_len, + }, #ifdef NNG_TEST_LIB { // Test only option to pass header unmolested. This allows diff --git a/src/sp/protocol/pair1/pair1_test.c b/src/sp/protocol/pair1/pair1_test.c index 881c4ac8..3185a3a4 100644 --- a/src/sp/protocol/pair1/pair1_test.c +++ b/src/sp/protocol/pair1/pair1_test.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -144,6 +144,22 @@ test_mono_back_pressure(void) } void +test_send_no_peer(void) +{ + nng_socket s1; + nng_msg * msg; + nng_duration to = 100; + + NUTS_PASS(nng_pair1_open(&s1)); + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to)); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_FAIL(nng_sendmsg(s1, msg, 0), NNG_ETIMEDOUT); + nng_msg_free(msg); + NUTS_CLOSE(s1); +} + +void test_mono_raw_exchange(void) { nng_socket s1; @@ -210,8 +226,8 @@ test_mono_raw_header(void) // Missing bits in the header NUTS_PASS(nng_msg_alloc(&msg, 0)); - NUTS_PASS(nng_sendmsg(c1, msg, 0)); - NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT); + NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO); + nng_msg_free(msg); // Valid header works NUTS_PASS(nng_msg_alloc(&msg, 0)); @@ -226,14 +242,14 @@ test_mono_raw_header(void) // Header with reserved bits set dropped NUTS_PASS(nng_msg_alloc(&msg, 0)); NUTS_PASS(nng_msg_header_append_u32(msg, 0xDEAD0000)); - NUTS_PASS(nng_sendmsg(c1, msg, 0)); - NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT); + NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO); + nng_msg_free(msg); // Header with no chance to add another hop gets dropped NUTS_PASS(nng_msg_alloc(&msg, 0)); NUTS_PASS(nng_msg_header_append_u32(msg, 0xff)); - NUTS_PASS(nng_sendmsg(c1, msg, 0)); - NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT); + NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO); + nng_msg_free(msg); // With the same bits clear it works NUTS_PASS(nng_msg_alloc(&msg, 0)); @@ -250,6 +266,26 @@ test_mono_raw_header(void) } void +test_pair1_send_closed_aio(void) +{ + nng_socket s1; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_pair1_open(&s1)); + nng_aio_set_msg(aio, msg); + nng_aio_stop(aio); + nng_send_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + nng_msg_free(msg); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void test_pair1_raw(void) { nng_socket s1; @@ -408,7 +444,7 @@ test_pair1_recv_garbage(void) // ridiculous hop count NUTS_PASS(nng_msg_alloc(&m, 0)); - NUTS_PASS(nng_msg_append_u32(m, 0x1000)); + NUTS_PASS(nng_msg_header_append_u32(m, 0x1000)); NUTS_PASS(nng_sendmsg(c, m, 0)); NUTS_FAIL(nng_recvmsg(s, &m, 0), NNG_ETIMEDOUT); @@ -416,18 +452,176 @@ test_pair1_recv_garbage(void) NUTS_CLOSE(s); } +static void +test_pair1_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_pair1_open(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_pair1_send_buffer(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + + NUTS_PASS(nng_pair1_open(&s)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v)); + NUTS_TRUE(v == 0); + NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_SENDBUF, &b), NNG_EBADTYPE); + sz = 1; + NUTS_FAIL(nng_getopt(s, NNG_OPT_SENDBUF, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, -1), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, 100000), NNG_EINVAL); + NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_SENDBUF, false), NNG_EBADTYPE); + NUTS_FAIL(nng_setopt(s, NNG_OPT_SENDBUF, &b, 1), NNG_EINVAL); + NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 100)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v)); + NUTS_TRUE(v == 100); + NUTS_CLOSE(s); +} + +static void +test_pair1_recv_buffer(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + + NUTS_PASS(nng_pair1_open(&s)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v)); + NUTS_TRUE(v == 0); + NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_RECVBUF, &b), NNG_EBADTYPE); + sz = 1; + NUTS_FAIL(nng_getopt(s, NNG_OPT_RECVBUF, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, -1), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, 100000), NNG_EINVAL); + NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_RECVBUF, false), NNG_EBADTYPE); + NUTS_FAIL(nng_setopt(s, NNG_OPT_RECVBUF, &b, 1), NNG_EINVAL); + NUTS_PASS(nng_setopt_int(s, NNG_OPT_RECVBUF, 100)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v)); + NUTS_TRUE(v == 100); + NUTS_CLOSE(s); +} + +static void +test_pair1_poll_readable(void) +{ + int fd; + nng_socket s1; + nng_socket s2; + + NUTS_PASS(nng_pair1_open(&s1)); + NUTS_PASS(nng_pair1_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(s1, s2); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(s2, "abc"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + + // and receiving makes it no longer ready + NUTS_RECV(s1, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // also let's confirm handling when we shrink the buffer size. + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 1)); + NUTS_SEND(s2, "def"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 0)); + NUTS_TRUE(nuts_poll_fd(fd) == false); + // growing doesn't magically make it readable either + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 10)); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +static void +test_pair1_poll_writable(void) +{ + int fd; + nng_socket s1; + nng_socket s2; + + NUTS_PASS(nng_pair1_open(&s1)); + NUTS_PASS(nng_pair1_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not writable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But after connect, we can. + NUTS_MARRY(s1, s2); + NUTS_TRUE(nuts_poll_fd(fd)); + + // We are unbuffered. + NUTS_SEND(s1, "abc"); // first one in the receiver + NUTS_SEND(s1, "def"); // second one on the sending pipe + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // and receiving makes it ready + NUTS_RECV(s2, "abc"); + NUTS_SLEEP(100); // time for the sender to complete + NUTS_TRUE(nuts_poll_fd(fd)); + + // close the peer for now. + NUTS_CLOSE(s2); + NUTS_SLEEP(100); + + // resize up, makes us writable. + NUTS_TRUE(nuts_poll_fd(fd) == false); + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 1)); + NUTS_TRUE(nuts_poll_fd(fd)); + // resize down and we aren't anymore. + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 0)); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(s1); +} + NUTS_TESTS = { { "pair1 mono identity", test_mono_identity }, { "pair1 mono cooked", test_mono_cooked }, { "pair1 mono faithful", test_mono_faithful }, { "pair1 mono back pressure", test_mono_back_pressure }, + { "pair1 send no peer", test_send_no_peer }, { "pair1 mono raw exchange", test_mono_raw_exchange }, { "pair1 mono raw header", test_mono_raw_header }, + { "pair1 send closed aio", test_pair1_send_closed_aio }, { "pair1 raw", test_pair1_raw }, { "pair1 ttl", test_pair1_ttl }, { "pair1 validate peer", test_pair1_validate_peer }, { "pair1 recv no header", test_pair1_recv_no_header }, { "pair1 recv garbage", test_pair1_recv_garbage }, + { "pair1 no context", test_pair1_no_context }, + { "pair1 send buffer", test_pair1_send_buffer }, + { "pair1 recv buffer", test_pair1_recv_buffer }, + { "pair1 poll readable", test_pair1_poll_readable }, + { "pair1 poll writable", test_pair1_poll_writable }, { NULL, NULL }, }; |
