From e9c28eed5ffb8da59ba8a6805e85936f04bdb644 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 24 Jan 2021 21:22:13 -0800 Subject: fixes #1419 pairv0 performance work This takes the performance work we did for pairv1, and provides an implementation for pairv0. The upshot should be a nice performance boost for pair v0. --- src/sp/protocol/pair0/CMakeLists.txt | 5 +- src/sp/protocol/pair0/pair.c | 478 ++++++++++++++++++++++++++++------- src/sp/protocol/pair0/pair0_test.c | 439 ++++++++++++++++++++++++++++++++ 3 files changed, 832 insertions(+), 90 deletions(-) create mode 100644 src/sp/protocol/pair0/pair0_test.c (limited to 'src') diff --git a/src/sp/protocol/pair0/CMakeLists.txt b/src/sp/protocol/pair0/CMakeLists.txt index b12583ab..2f74e139 100644 --- a/src/sp/protocol/pair0/CMakeLists.txt +++ b/src/sp/protocol/pair0/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. +# Copyright 2021 Staysail Systems, Inc. # Copyright 2018 Capitar IT Group BV # # This software is supplied under the terms of the MIT License, a @@ -13,4 +13,5 @@ nng_directory(pair0) nng_sources_if(NNG_PROTO_PAIR0 pair.c) nng_headers_if(NNG_PROTO_PAIR0 nng/protocol/pair0/pair.h) -nng_defines_if(NNG_PROTO_PAIR0 NNG_HAVE_PAIR0) \ No newline at end of file +nng_defines_if(NNG_PROTO_PAIR0 NNG_HAVE_PAIR0) +nng_test(pair0_test) diff --git a/src/sp/protocol/pair0/pair.c b/src/sp/protocol/pair0/pair.c index 41f88c7c..c2407d81 100644 --- a/src/sp/protocol/pair0/pair.c +++ b/src/sp/protocol/pair0/pair.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2021 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -24,18 +24,24 @@ typedef struct pair0_pipe pair0_pipe; typedef struct pair0_sock pair0_sock; -static void pair0_send_cb(void *); -static void pair0_recv_cb(void *); -static void pair0_getq_cb(void *); -static void pair0_putq_cb(void *); +static void pair0_pipe_send_cb(void *); +static void pair0_pipe_recv_cb(void *); static void pair0_pipe_fini(void *); +static void pair0_send_sched(pair0_sock *); +static void pair0_pipe_send(pair0_pipe *, nni_msg *); // pair0_sock is our per-socket protocol private structure. struct pair0_sock { - pair0_pipe *ppipe; - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx mtx; + pair0_pipe * p; + nni_mtx mtx; + nni_lmq wmq; + nni_list waq; + nni_lmq rmq; + nni_list raq; + nni_pollable readable; + nni_pollable writable; + bool rd_ready; // pipe ready for read + bool wr_ready; // pipe ready for write }; // An pair0_pipe is our per-pipe protocol private structure. We keep @@ -43,23 +49,28 @@ struct pair0_sock { // pipe. The separate data structure is more like other protocols that do // manage multiple pipes. struct pair0_pipe { - nni_pipe * npipe; - pair0_sock *psock; + nni_pipe * pipe; + pair0_sock *pair; nni_aio aio_send; nni_aio aio_recv; - nni_aio aio_getq; - nni_aio aio_putq; }; static int -pair0_sock_init(void *arg, nni_sock *nsock) +pair0_sock_init(void *arg, nni_sock *sock) { pair0_sock *s = arg; + NNI_ARG_UNUSED(sock); nni_mtx_init(&s->mtx); - s->ppipe = NULL; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + + nni_lmq_init(&s->rmq, 0); + nni_lmq_init(&s->wmq, 0); + nni_aio_list_init(&s->raq); + nni_aio_list_init(&s->waq); + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); + + s->p = NULL; return (0); } @@ -68,6 +79,8 @@ pair0_sock_fini(void *arg) { pair0_sock *s = arg; + nni_lmq_fini(&s->rmq); + nni_lmq_fini(&s->wmq); nni_mtx_fini(&s->mtx); } @@ -75,11 +88,28 @@ static void pair0_pipe_stop(void *arg) { pair0_pipe *p = arg; + pair0_sock *s = p->pair; + + nni_mtx_lock(&s->mtx); + if (s->p == p) { + s->p = NULL; + if (s->rd_ready) { + nni_msg *m = nni_aio_get_msg(&p->aio_recv); + nni_msg_free(m); + s->rd_ready = false; + } + if (s->wr_ready) { + s->wr_ready = false; + nni_pollable_clear(&s->writable); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } + } + nni_mtx_unlock(&s->mtx); nni_aio_stop(&p->aio_send); nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_putq); - nni_aio_stop(&p->aio_getq); } static void @@ -89,48 +119,59 @@ pair0_pipe_fini(void *arg) nni_aio_fini(&p->aio_send); nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_putq); - nni_aio_fini(&p->aio_getq); } static int -pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock) +pair0_pipe_init(void *arg, nni_pipe *pipe, void *pair) { pair0_pipe *p = arg; - nni_aio_init(&p->aio_send, pair0_send_cb, p); - nni_aio_init(&p->aio_recv, pair0_recv_cb, p); - nni_aio_init(&p->aio_getq, pair0_getq_cb, p); - nni_aio_init(&p->aio_putq, pair0_putq_cb, p); + nni_aio_init(&p->aio_send, pair0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pair0_pipe_recv_cb, p); + + p->pipe = pipe; + p->pair = pair; - p->npipe = npipe; - p->psock = psock; return (0); } +static void +pair0_cancel(nni_aio *aio, void *arg, int rv) +{ + pair0_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&s->mtx); +} + static int pair0_pipe_start(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; + pair0_sock *s = p->pair; - if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V0) { + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PAIR_V0) { // Peer protocol mismatch. return (NNG_EPROTO); } nni_mtx_lock(&s->mtx); - if (s->ppipe != NULL) { + if (s->p != NULL) { nni_mtx_unlock(&s->mtx); return (NNG_EBUSY); // Already have a peer, denied. } - s->ppipe = p; + s->p = p; + s->rd_ready = false; nni_mtx_unlock(&s->mtx); - // Schedule a getq on the upper, and a read from the pipe. - // Each of these also sets up another hold on the pipe itself. - nni_msgq_aio_get(s->uwq, &p->aio_getq); - nni_pipe_recv(p->npipe, &p->aio_recv); + pair0_send_sched(s); + + // And the pipe read of course. + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -139,83 +180,117 @@ static void pair0_pipe_close(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; nni_aio_close(&p->aio_send); nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_putq); - nni_aio_close(&p->aio_getq); - - nni_mtx_lock(&s->mtx); - if (s->ppipe == p) { - s->ppipe = NULL; - } - nni_mtx_unlock(&s->mtx); } static void -pair0_recv_cb(void *arg) +pair0_pipe_recv_cb(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; + pair0_sock *s = p->pair; nni_msg * msg; + nni_aio * a; if (nni_aio_result(&p->aio_recv) != 0) { - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } msg = nni_aio_get_msg(&p->aio_recv); - nni_aio_set_msg(&p->aio_putq, msg); - nni_aio_set_msg(&p->aio_recv, NULL); + // Store the pipe ID. + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); - nni_msgq_aio_put(s->urq, &p->aio_putq); -} - -static void -pair0_putq_cb(void *arg) -{ - pair0_pipe *p = arg; + nni_mtx_lock(&s->mtx); - if (nni_aio_result(&p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_putq)); - nni_aio_set_msg(&p->aio_putq, NULL); - nni_pipe_close(p->npipe); + // if anyone is blocking, then the lmq will be empty, and + // we should deliver it there. + if ((a = nni_list_first(&s->raq)) != NULL) { + nni_aio_list_remove(a); + nni_aio_set_msg(a, msg); + nni_pipe_recv(p->pipe, &p->aio_recv); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_sync(a, 0, nni_msg_len(msg)); return; } - nni_pipe_recv(p->npipe, &p->aio_recv); + + // maybe we have room in the rmq? + if (!nni_lmq_full(&s->rmq)) { + nni_lmq_putq(&s->rmq, msg); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_pipe_recv(p->pipe, &p->aio_recv); + } else { + s->rd_ready = true; + } + nni_pollable_raise(&s->readable); + nni_mtx_unlock(&s->mtx); } static void -pair0_getq_cb(void *arg) +pair0_send_sched(pair0_sock *s) { - pair0_pipe *p = arg; + pair0_pipe *p; + nni_msg * m; + nni_aio * a = NULL; + size_t l = 0; - if (nni_aio_result(&p->aio_getq) != 0) { - nni_pipe_close(p->npipe); + nni_mtx_lock(&s->mtx); + + if ((p = s->p) == NULL) { + nni_mtx_unlock(&s->mtx); return; } - nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq)); - nni_aio_set_msg(&p->aio_getq, NULL); - nni_pipe_send(p->npipe, &p->aio_send); + s->wr_ready = true; + + // if message waiting in buffered queue, then we prefer that. + if (nni_lmq_getq(&s->wmq, &m) == 0) { + pair0_pipe_send(p, m); + + if ((a = nni_list_first(&s->waq)) != NULL) { + nni_aio_list_remove(a); + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + nni_lmq_putq(&s->wmq, m); + } + + } else if ((a = nni_list_first(&s->waq)) != NULL) { + // Looks like we had the unbuffered case, but + // someone was waiting. + nni_aio_list_remove(a); + + m = nni_aio_get_msg(a); + l = nni_msg_len(m); + pair0_pipe_send(p, m); + } + + // if we were blocked before, but not now, update. + if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) { + nni_pollable_raise(&s->writable); + } + + nni_mtx_unlock(&s->mtx); + + if (a != NULL) { + nni_aio_set_msg(a, NULL); + nni_aio_finish_sync(a, 0, l); + } } static void -pair0_send_cb(void *arg) +pair0_pipe_send_cb(void *arg) { pair0_pipe *p = arg; - pair0_sock *s = p->psock; if (nni_aio_result(&p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(&p->aio_send)); nni_aio_set_msg(&p->aio_send, NULL); - nni_pipe_close(p->npipe); + nni_pipe_close(p->pipe); return; } - nni_msgq_aio_get(s->uwq, &p->aio_getq); + pair0_send_sched(p->pair); } static void @@ -227,25 +302,259 @@ pair0_sock_open(void *arg) static void pair0_sock_close(void *arg) { - NNI_ARG_UNUSED(arg); + pair0_sock *s = arg; + nni_aio * a; + nni_msg * m; + nni_mtx_lock(&s->mtx); + while (((a = nni_list_first(&s->raq)) != NULL) || + ((a = nni_list_first(&s->waq)) != NULL)) { + nni_aio_list_remove(a); + nni_aio_finish_error(a, NNG_ECLOSED); + } + while ((nni_lmq_getq(&s->rmq, &m) == 0) || + (nni_lmq_getq(&s->wmq, &m) == 0)) { + nni_msg_free(m); + } + nni_mtx_unlock(&s->mtx); +} + +static void +pair0_pipe_send(pair0_pipe *p, nni_msg *m) +{ + pair0_sock *s = p->pair; + // assumption: we have unique access to the message at this point. + NNI_ASSERT(!nni_msg_shared(m)); + + nni_aio_set_msg(&p->aio_send, m); + nni_pipe_send(p->pipe, &p->aio_send); + s->wr_ready = false; } static void pair0_sock_send(void *arg, nni_aio *aio) { pair0_sock *s = arg; + nni_msg * m; + size_t len; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + m = nni_aio_get_msg(aio); + len = nni_msg_len(m); + + nni_mtx_lock(&s->mtx); + if (s->wr_ready) { + pair0_pipe *p = s->p; + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + pair0_pipe_send(p, m); + nni_mtx_unlock(&s->mtx); + return; + } + + // Can we maybe queue it. + if (nni_lmq_putq(&s->wmq, m) == 0) { + // Yay, we can. So we're done. + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + if (nni_lmq_full(&s->wmq)) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return; + } - nni_msgq_aio_put(s->uwq, aio); + if ((rv = nni_aio_schedule(aio, pair0_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); + return; + } + nni_aio_list_append(&s->waq, aio); + nni_mtx_unlock(&s->mtx); } static void pair0_sock_recv(void *arg, nni_aio *aio) { pair0_sock *s = arg; + pair0_pipe *p; + nni_msg * m; + int rv; - nni_msgq_aio_get(s->urq, aio); + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&s->mtx); + p = s->p; + + // Buffered read. If there is a message waiting for us, pick + // it up. We might need to post another read request as well. + if (nni_lmq_getq(&s->rmq, &m) == 0) { + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_lmq_putq(&s->rmq, m); + nni_pipe_recv(p->pipe, &p->aio_recv); + } + if (nni_lmq_empty(&s->rmq)) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return; + } + + // Unbuffered -- but waiting. + if (s->rd_ready) { + s->rd_ready = false; + m = nni_aio_get_msg(&p->aio_recv); + nni_aio_set_msg(&p->aio_recv, NULL); + nni_aio_set_msg(aio, m); + nni_aio_finish(aio, 0, nni_msg_len(m)); + nni_pipe_recv(p->pipe, &p->aio_recv); + nni_pollable_clear(&s->readable); + nni_mtx_unlock(&s->mtx); + return; + } + + if ((rv = nni_aio_schedule(aio, pair0_cancel, s)) != 0) { + nni_aio_finish_error(aio, rv); + } else { + nni_aio_list_append(&s->raq, aio); + } + nni_mtx_unlock(&s->mtx); } +static int +pair0_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->wmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_full(&s->wmq)) { + nni_pollable_raise(&s->writable); + } else if (!s->wr_ready) { + nni_pollable_clear(&s->writable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair0_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->wmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair0_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) +{ + pair0_sock *s = arg; + int val; + int rv; + + if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { + return (rv); + } + nni_mtx_lock(&s->mtx); + rv = nni_lmq_resize(&s->rmq, (size_t) val); + // Changing the size of the queue can affect our readiness. + if (!nni_lmq_empty(&s->rmq)) { + nni_pollable_raise(&s->readable); + } else if (!s->rd_ready) { + nni_pollable_clear(&s->readable); + } + nni_mtx_unlock(&s->mtx); + return (rv); +} + +static int +pair0_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int val; + + nni_mtx_lock(&s->mtx); + val = nni_lmq_cap(&s->rmq); + nni_mtx_unlock(&s->mtx); + + return (nni_copyout_int(val, buf, szp, t)); +} + +static int +pair0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static int +pair0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + pair0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, t)); +} + +static nni_option pair0_sock_options[] = { + { + .o_name = NNG_OPT_RECVFD, + .o_get = pair0_sock_get_recv_fd, + }, + { + .o_name = NNG_OPT_SENDFD, + .o_get = pair0_sock_get_send_fd, + }, + { + .o_name = NNG_OPT_SENDBUF, + .o_get = pair0_get_send_buf_len, + .o_set = pair0_set_send_buf_len, + }, + { + .o_name = NNG_OPT_RECVBUF, + .o_get = pair0_get_recv_buf_len, + .o_set = pair0_set_recv_buf_len, + }, + // terminate list + { + .o_name = NULL, + }, +}; + static nni_proto_pipe_ops pair0_pipe_ops = { .pipe_size = sizeof(pair0_pipe), .pipe_init = pair0_pipe_init, @@ -255,13 +564,6 @@ static nni_proto_pipe_ops pair0_pipe_ops = { .pipe_stop = pair0_pipe_stop, }; -static nni_option pair0_sock_options[] = { - // terminate list - { - .o_name = NULL, - } -}; - static nni_proto_sock_ops pair0_sock_ops = { .sock_size = sizeof(pair0_sock), .sock_init = pair0_sock_init, @@ -293,13 +595,13 @@ static nni_proto pair0_proto_raw = { }; int -nng_pair0_open(nng_socket *sidp) +nng_pair0_open(nng_socket *sock) { - return (nni_proto_open(sidp, &pair0_proto)); + return (nni_proto_open(sock, &pair0_proto)); } int -nng_pair0_open_raw(nng_socket *sidp) +nng_pair0_open_raw(nng_socket *sock) { - return (nni_proto_open(sidp, &pair0_proto_raw)); + return (nni_proto_open(sock, &pair0_proto_raw)); } diff --git a/src/sp/protocol/pair0/pair0_test.c b/src/sp/protocol/pair0/pair0_test.c new file mode 100644 index 00000000..20acc200 --- /dev/null +++ b/src/sp/protocol/pair0/pair0_test.c @@ -0,0 +1,439 @@ +// +// Copyright 2021 Staysail Systems, Inc. +// Copyright 2017 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +#define SECOND 1000 + +#define APPEND_STR(m, s) NUTS_TRUE(nng_msg_append(m, s, strlen(s)) == 0) +#define CHECK_STR(m, s) \ + NUTS_TRUE(nng_msg_len(m) == strlen(s)); \ + NUTS_TRUE(memcmp(nng_msg_body(m), s, strlen(s)) == 0) + +static void +test_identity(void) +{ + nng_socket s; + int p; + char * n; + + NUTS_PASS(nng_pair0_open(&s)); + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); + NUTS_TRUE(p == NUTS_PROTO(1u, 0u)); // 16 + NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); + NUTS_TRUE(p == NUTS_PROTO(1u, 0u)); // 17 + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); + NUTS_MATCH(n, "pair"); + nng_strfree(n); + NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); + NUTS_MATCH(n, "pair"); + nng_strfree(n); + NUTS_CLOSE(s); +} + +void +test_cooked(void) +{ + nng_socket s1; + nng_socket c1; + nng_msg * msg; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_pair0_open(&c1)); + NUTS_PASS(nuts_marry(s1, c1)); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_msg_append(msg, "ALPHA", strlen("ALPHA") + 1)); + NUTS_PASS(nng_sendmsg(c1, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_TRUE(nng_msg_len(msg) == strlen("ALPHA") + 1); + NUTS_MATCH(nng_msg_body(msg), "ALPHA"); + nng_msg_free(msg); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_msg_append(msg, "BETA", strlen("BETA") + 1)); + NUTS_PASS(nng_sendmsg(s1, msg, 0)); + NUTS_PASS(nng_recvmsg(c1, &msg, 0)); + NUTS_TRUE(nng_msg_len(msg) == strlen("BETA") + 1); + NUTS_MATCH(nng_msg_body(msg), "BETA"); + + nng_msg_free(msg); + NUTS_CLOSE(c1); + NUTS_CLOSE(s1); +} + +void +test_faithful(void) +{ + nng_socket s1; + nng_socket c1; + nng_socket c2; + nng_msg * msg; + const char *addr = "inproc://pair0_mono_faithful"; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_pair0_open(&c1)); + NUTS_PASS(nng_pair0_open(&c2)); + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 4)); + NUTS_PASS(nng_setopt_ms(c1, NNG_OPT_SENDTIMEO, SECOND)); + NUTS_PASS(nng_setopt_ms(c2, NNG_OPT_SENDTIMEO, SECOND)); + NUTS_PASS(nng_setopt_int(c2, NNG_OPT_SENDBUF, 2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_MARRY(s1, c1); + NUTS_PASS(nng_dial(c2, addr, NULL, 0)); + + NUTS_SLEEP(100); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + APPEND_STR(msg, "ONE"); + NUTS_PASS(nng_sendmsg(c1, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + CHECK_STR(msg, "ONE"); + nng_msg_free(msg); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + APPEND_STR(msg, "TWO"); + NUTS_PASS(nng_sendmsg(c2, msg, 0)); + NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT); + + NUTS_CLOSE(s1); + NUTS_CLOSE(c1); + NUTS_CLOSE(c2); +} + +void +test_back_pressure(void) +{ + nng_socket s1; + nng_socket c1; + int i; + int rv; + nng_msg * msg; + nng_duration to = 100; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_pair0_open(&c1)); + NUTS_PASS(nng_setopt_int(s1, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_setopt_int(s1, NNG_OPT_SENDBUF, 1)); + NUTS_PASS(nng_setopt_int(c1, NNG_OPT_RECVBUF, 1)); + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to)); + + NUTS_MARRY(s1, c1); + + // We choose to allow some buffering. In reality the + // buffer size is just 1, and we will fail after 2. + for (i = 0, rv = 0; i < 10; i++) { + NUTS_PASS(nng_msg_alloc(&msg, 0)); + if ((rv = nng_sendmsg(s1, msg, 0)) != 0) { + nng_msg_free(msg); + break; + } + } + NUTS_FAIL(rv, NNG_ETIMEDOUT); + NUTS_TRUE(i < 10); + NUTS_CLOSE(s1); + NUTS_CLOSE(c1); +} + +void +test_send_no_peer(void) +{ + nng_socket s1; + nng_msg * msg; + nng_duration to = 100; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to)); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_FAIL(nng_sendmsg(s1, msg, 0), NNG_ETIMEDOUT); + nng_msg_free(msg); + NUTS_CLOSE(s1); +} + +void +test_raw_exchange(void) +{ + nng_socket s1; + nng_socket c1; + + nng_msg *msg; + + NUTS_PASS(nng_pair0_open_raw(&s1)); + NUTS_PASS(nng_pair0_open_raw(&c1)); + + NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_PASS(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND)); + NUTS_MARRY(s1, c1); + + nng_pipe p = NNG_PIPE_INITIALIZER; + NUTS_PASS(nng_msg_alloc(&msg, 0)); + APPEND_STR(msg, "GAMMA"); + NUTS_PASS(nng_sendmsg(c1, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_TRUE(nng_pipe_id(p) > 0); + + CHECK_STR(msg, "GAMMA"); + nng_msg_free(msg); + + NUTS_PASS(nng_msg_alloc(&msg, 0)); + APPEND_STR(msg, "EPSILON"); + NUTS_PASS(nng_sendmsg(s1, msg, 0)); + NUTS_PASS(nng_recvmsg(c1, &msg, 0)); + CHECK_STR(msg, "EPSILON"); + p = nng_msg_get_pipe(msg); + NUTS_TRUE(nng_pipe_id(p) > 0); + + nng_msg_free(msg); + + NUTS_CLOSE(s1); + NUTS_CLOSE(c1); +} + +void +test_pair0_send_closed_aio(void) +{ + nng_socket s1; + nng_aio * aio; + nng_msg * msg; + + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_pair0_open(&s1)); + nng_aio_set_msg(aio, msg); + nng_aio_stop(aio); + nng_send_aio(s1, aio); + nng_aio_wait(aio); + NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); + nng_msg_free(msg); + nng_aio_free(aio); + NUTS_PASS(nng_close(s1)); +} + +void +test_pair0_raw(void) +{ + nng_socket s1; + bool raw; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_getopt_bool(s1, NNG_OPT_RAW, &raw)); + NUTS_TRUE(raw == false); + NUTS_FAIL(nng_setopt_bool(s1, NNG_OPT_RAW, true), NNG_EREADONLY); + NUTS_PASS(nng_close(s1)); + + NUTS_PASS(nng_pair0_open_raw(&s1)); + NUTS_PASS(nng_getopt_bool(s1, NNG_OPT_RAW, &raw)); + NUTS_TRUE(raw == true); + NUTS_FAIL(nng_setopt_bool(s1, NNG_OPT_RAW, false), NNG_EREADONLY); + NUTS_PASS(nng_close(s1)); +} + +void +test_pair0_validate_peer(void) +{ + nng_socket s1, s2; + nng_stat * stats; + nng_stat * reject; + char * addr; + + NUTS_ADDR(addr, "inproc"); + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_pair1_open(&s2)); + + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); + + NUTS_SLEEP(100); + NUTS_PASS(nng_stats_get(&stats)); + + NUTS_TRUE(stats != NULL); + NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); + NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); + + NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); + NUTS_TRUE(nng_stat_value(reject) > 0); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + nng_stats_free(stats); +} + +static void +test_pair0_no_context(void) +{ + nng_socket s; + nng_ctx ctx; + + NUTS_PASS(nng_pair0_open(&s)); + NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +static void +test_pair0_send_buffer(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + + NUTS_PASS(nng_pair0_open(&s)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v)); + NUTS_TRUE(v == 0); + NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_SENDBUF, &b), NNG_EBADTYPE); + sz = 1; + NUTS_FAIL(nng_getopt(s, NNG_OPT_SENDBUF, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, -1), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, 100000), NNG_EINVAL); + NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_SENDBUF, false), NNG_EBADTYPE); + NUTS_FAIL(nng_setopt(s, NNG_OPT_SENDBUF, &b, 1), NNG_EINVAL); + NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 100)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v)); + NUTS_TRUE(v == 100); + NUTS_CLOSE(s); +} + +static void +test_pair0_recv_buffer(void) +{ + nng_socket s; + int v; + bool b; + size_t sz; + + NUTS_PASS(nng_pair0_open(&s)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v)); + NUTS_TRUE(v == 0); + NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_RECVBUF, &b), NNG_EBADTYPE); + sz = 1; + NUTS_FAIL(nng_getopt(s, NNG_OPT_RECVBUF, &b, &sz), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, -1), NNG_EINVAL); + NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, 100000), NNG_EINVAL); + NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_RECVBUF, false), NNG_EBADTYPE); + NUTS_FAIL(nng_setopt(s, NNG_OPT_RECVBUF, &b, 1), NNG_EINVAL); + NUTS_PASS(nng_setopt_int(s, NNG_OPT_RECVBUF, 100)); + NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v)); + NUTS_TRUE(v == 100); + NUTS_CLOSE(s); +} + +static void +test_pair0_poll_readable(void) +{ + int fd; + nng_socket s1; + nng_socket s2; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_pair0_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not readable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // Even after connect (no message yet) + NUTS_MARRY(s1, s2); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But once we send messages, it is. + // We have to send a request, in order to send a reply. + NUTS_SEND(s2, "abc"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + + // and receiving makes it no longer ready + NUTS_RECV(s1, "abc"); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // also let's confirm handling when we shrink the buffer size. + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 1)); + NUTS_SEND(s2, "def"); + NUTS_SLEEP(100); + NUTS_TRUE(nuts_poll_fd(fd)); + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 0)); + NUTS_TRUE(nuts_poll_fd(fd) == false); + // growing doesn't magically make it readable either + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 10)); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); +} + +static void +test_pair0_poll_writable(void) +{ + int fd; + nng_socket s1; + nng_socket s2; + + NUTS_PASS(nng_pair0_open(&s1)); + NUTS_PASS(nng_pair0_open(&s2)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_SENDFD, &fd)); + NUTS_TRUE(fd >= 0); + + // Not writable if not connected! + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // But after connect, we can. + NUTS_MARRY(s1, s2); + NUTS_TRUE(nuts_poll_fd(fd)); + + // We are unbuffered. + NUTS_SEND(s1, "abc"); // first one in the receiver + NUTS_SEND(s1, "def"); // second one on the sending pipe + NUTS_TRUE(nuts_poll_fd(fd) == false); + + // and receiving makes it ready + NUTS_RECV(s2, "abc"); + NUTS_SLEEP(100); // time for the sender to complete + NUTS_TRUE(nuts_poll_fd(fd)); + + // close the peer for now. + NUTS_CLOSE(s2); + NUTS_SLEEP(100); + + // resize up, makes us writable. + NUTS_TRUE(nuts_poll_fd(fd) == false); + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 1)); + NUTS_TRUE(nuts_poll_fd(fd)); + // resize down and we aren't anymore. + NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 0)); + NUTS_TRUE(nuts_poll_fd(fd) == false); + + NUTS_CLOSE(s1); +} + +NUTS_TESTS = { + { "pair0 identity", test_identity }, + { "pair0 cooked", test_cooked }, + { "pair0 faithful", test_faithful }, + { "pair0 back pressure", test_back_pressure }, + { "pair0 send no peer", test_send_no_peer }, + { "pair0 raw exchange", test_raw_exchange }, + { "pair0 send closed aio", test_pair0_send_closed_aio }, + { "pair0 raw", test_pair0_raw }, + { "pair0 validate peer", test_pair0_validate_peer }, + { "pair0 no context", test_pair0_no_context }, + { "pair0 send buffer", test_pair0_send_buffer }, + { "pair0 recv buffer", test_pair0_recv_buffer }, + { "pair0 poll readable", test_pair0_poll_readable }, + { "pair0 poll writable", test_pair0_poll_writable }, + + { NULL, NULL }, +}; -- cgit v1.2.3-70-g09d2