diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-01-01 11:30:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-01-01 12:46:17 -0800 |
| commit | ed542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch) | |
| tree | 673924ff077d468e6756529c2c204698d3faa47c /src/protocol/pipeline0 | |
| parent | 1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff) | |
| download | nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2 nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip | |
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other
protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/protocol/pipeline0')
| -rw-r--r-- | src/protocol/pipeline0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c | 325 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull_test.c | 264 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.c | 442 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push_test.c | 525 |
5 files changed, 0 insertions, 1579 deletions
diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt deleted file mode 100644 index 8a10eab7..00000000 --- a/src/protocol/pipeline0/CMakeLists.txt +++ /dev/null @@ -1,23 +0,0 @@ -# -# Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> -# Copyright 2018 Capitar IT Group BV <info@capitar.com> -# -# This software is supplied under the terms of the MIT License, a -# copy of which should be located in the distribution where this -# file was obtained (LICENSE.txt). A copy of the license may also be -# found online at https://opensource.org/licenses/MIT. -# - -# Pipeline protocol -nng_directory(pipeline0) - -nng_sources_if(NNG_PROTO_PUSH0 push.c) -nng_headers_if(NNG_PROTO_PUSH0 nng/protocol/pipeline0/push.h) -nng_defines_if(NNG_PROTO_PUSH0 NNG_HAVE_PUSH0) - -nng_sources_if(NNG_PROTO_PULL0 pull.c) -nng_headers_if(NNG_PROTO_PULL0 nng/protocol/pipeline0/pull.h) -nng_defines_if(NNG_PROTO_PULL0 NNG_HAVE_PULL0) - -nng_test(pull_test) -nng_test(push_test) diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c deleted file mode 100644 index 616b0817..00000000 --- a/src/protocol/pipeline0/pull.c +++ /dev/null @@ -1,325 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <stdlib.h> - -#include "core/nng_impl.h" -#include "nng/protocol/pipeline0/pull.h" - -// Pull protocol. The PULL protocol is the "read" side of a pipeline. - -#ifndef NNI_PROTO_PULL_V0 -#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) -#endif - -#ifndef NNI_PROTO_PUSH_V0 -#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) -#endif - -typedef struct pull0_pipe pull0_pipe; -typedef struct pull0_sock pull0_sock; - -static void pull0_recv_cb(void *); - -// pull0_sock is our per-socket protocol private structure. -struct pull0_sock { - bool raw; - nni_list pl; // pipe list (pipes with data ready) - nni_list rq; // recv queue (aio list) - nni_mtx m; - nni_pollable readable; -}; - -// pull0_pipe is our per-pipe protocol private structure. -struct pull0_pipe { - nni_pipe * p; - pull0_sock * s; - nni_msg * m; - nni_aio aio; - bool closed; - nni_list_node node; -}; - -static int -pull0_sock_init(void *arg, nni_sock *sock) -{ - pull0_sock *s = arg; - NNI_ARG_UNUSED(sock); - - nni_aio_list_init(&s->rq); - NNI_LIST_INIT(&s->pl, pull0_pipe, node); - nni_mtx_init(&s->m); - nni_pollable_init(&s->readable); - return (0); -} - -static void -pull0_sock_fini(void *arg) -{ - pull0_sock *s = arg; - nni_mtx_fini(&s->m); - nni_pollable_fini(&s->readable); -} - -static void -pull0_pipe_stop(void *arg) -{ - pull0_pipe *p = arg; - - nni_aio_stop(&p->aio); -} - -static void -pull0_pipe_fini(void *arg) -{ - pull0_pipe *p = arg; - - nni_aio_fini(&p->aio); - if (p->m) { - nni_msg_free(p->m); - } -} - -static int -pull0_pipe_init(void *arg, nni_pipe *pipe, void *s) -{ - pull0_pipe *p = arg; - - nni_aio_init(&p->aio, pull0_recv_cb, p); - p->p = pipe; - p->s = s; - return (0); -} - -static int -pull0_pipe_start(void *arg) -{ - pull0_pipe *p = arg; - - if (nni_pipe_peer(p->p) != NNI_PROTO_PUSH_V0) { - // Peer protocol mismatch. - return (NNG_EPROTO); - } - - // Start the pending receive... - nni_pipe_recv(p->p, &p->aio); - - return (0); -} - -static void -pull0_pipe_close(void *arg) -{ - pull0_pipe *p = arg; - pull0_sock *s = p->s; - - nni_mtx_lock(&s->m); - p->closed = true; - if (nni_list_node_active(&p->node)) { - nni_list_node_remove(&p->node); - if (nni_list_empty(&s->pl)) { - nni_pollable_clear(&s->readable); - } - } - nni_mtx_unlock(&s->m); - - nni_aio_close(&p->aio); -} - -static void -pull0_recv_cb(void *arg) -{ - pull0_pipe *p = arg; - pull0_sock *s = p->s; - nni_aio * ap = &p->aio; - nni_aio * as; - nni_msg * m; - - if (nni_aio_result(ap) != 0) { - // Failed to get a message, probably the pipe is closed. - nni_pipe_close(p->p); - return; - } - - // Got a message... start the put to send it up to the application. - m = nni_aio_get_msg(ap); - nni_aio_set_msg(ap, NULL); - nni_msg_set_pipe(m, nni_pipe_id(p->p)); - - nni_mtx_lock(&s->m); - if (p->closed) { - nni_mtx_unlock(&s->m); - nni_msg_free(m); - return; - } - if (nni_list_empty(&s->rq)) { - nni_list_append(&s->pl, p); - if (nni_list_first(&s->pl) == p) { - nni_pollable_raise(&s->readable); - } - p->m = m; - nni_mtx_unlock(&s->m); - return; - } - nni_pipe_recv(p->p, ap); - as = nni_list_first(&s->rq); - nni_aio_list_remove(as); - nni_mtx_unlock(&s->m); - nni_aio_set_msg(as, m); - nni_aio_finish_sync(as, 0, nni_msg_len(m)); -} - -static void -pull0_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -pull0_sock_close(void *arg) -{ - pull0_sock *s = arg; - nni_aio * a; - nni_mtx_lock(&s->m); - while ((a = nni_list_first(&s->rq)) != NULL) { - nni_aio_list_remove(a); - nni_aio_finish_error(a, NNG_ECLOSED); - } - // NB: The common socket framework closes pipes before this. - nni_mtx_unlock(&s->m); -} - -static void -pull0_sock_send(void *arg, nni_aio *aio) -{ - NNI_ARG_UNUSED(arg); - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -static void -pull0_cancel(nni_aio *aio, void *arg, int rv) -{ - pull0_sock *s = arg; - nni_mtx_lock(&s->m); - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&s->m); -} - -static void -pull0_sock_recv(void *arg, nni_aio *aio) -{ - pull0_sock *s = arg; - pull0_pipe *p; - - if (nni_aio_begin(aio) != 0) { - return; - } - - nni_mtx_lock(&s->m); - if ((p = nni_list_first(&s->pl)) == NULL) { - - int rv; - if ((rv = nni_aio_schedule(aio, pull0_cancel, s)) != 0) { - nni_mtx_unlock(&s->m); - nni_aio_finish_error(aio, rv); - return; - } - - nni_aio_list_append(&s->rq, aio); - nni_mtx_unlock(&s->m); - return; - } - - nni_list_remove(&s->pl, p); - if (nni_list_empty(&s->pl)) { - nni_pollable_clear(&s->readable); - } - nni_aio_finish_msg(aio, p->m); - p->m = NULL; - nni_pipe_recv(p->p, &p->aio); - nni_mtx_unlock(&s->m); -} - -static int -pull0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) -{ - pull0_sock *s = arg; - int rv; - int fd; - - if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { - return (rv); - } - return (nni_copyout_int(fd, buf, szp, t)); -} - -static nni_option pull0_sock_options[] = { - { - .o_name = NNG_OPT_RECVFD, - .o_get = pull0_sock_get_recv_fd, - }, - // terminate list - { - .o_name = NULL, - }, -}; - -static nni_proto_pipe_ops pull0_pipe_ops = { - .pipe_size = sizeof(pull0_pipe), - .pipe_init = pull0_pipe_init, - .pipe_fini = pull0_pipe_fini, - .pipe_start = pull0_pipe_start, - .pipe_close = pull0_pipe_close, - .pipe_stop = pull0_pipe_stop, -}; - -static nni_proto_sock_ops pull0_sock_ops = { - .sock_size = sizeof(pull0_sock), - .sock_init = pull0_sock_init, - .sock_fini = pull0_sock_fini, - .sock_open = pull0_sock_open, - .sock_close = pull0_sock_close, - .sock_send = pull0_sock_send, - .sock_recv = pull0_sock_recv, - .sock_options = pull0_sock_options, -}; - -static nni_proto pull0_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PULL_V0, "pull" }, - .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, - .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_pipe_ops = &pull0_pipe_ops, - .proto_sock_ops = &pull0_sock_ops, -}; - -static nni_proto pull0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PULL_V0, "pull" }, - .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, - .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, - .proto_pipe_ops = &pull0_pipe_ops, - .proto_sock_ops = &pull0_sock_ops, -}; - -int -nng_pull0_open(nng_socket *s) -{ - return (nni_proto_open(s, &pull0_proto)); -} - -int -nng_pull0_open_raw(nng_socket *s) -{ - return (nni_proto_open(s, &pull0_proto_raw)); -} diff --git a/src/protocol/pipeline0/pull_test.c b/src/protocol/pipeline0/pull_test.c deleted file mode 100644 index 25066093..00000000 --- a/src/protocol/pipeline0/pull_test.c +++ /dev/null @@ -1,264 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <nuts.h> - -static void -test_pull_identity(void) -{ - nng_socket s; - int p; - char * n; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); - NUTS_TRUE(p == NUTS_PROTO(5u, 1u)); // 81 - NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); - NUTS_TRUE(p == NUTS_PROTO(5u, 0u)); // 80 - NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); - NUTS_MATCH(n, "pull"); - nng_strfree(n); - NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); - NUTS_MATCH(n, "push"); - nng_strfree(n); - NUTS_CLOSE(s); -} - -static void -test_pull_cannot_send(void) -{ - nng_socket s; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_FAIL(nng_send(s, "", 0, 0), NNG_ENOTSUP); - NUTS_CLOSE(s); -} - -static void -test_pull_no_context(void) -{ - nng_socket s; - nng_ctx ctx; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); - NUTS_CLOSE(s); -} - -static void -test_pull_not_writeable(void) -{ - int fd; - nng_socket s; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_FAIL(nng_socket_get_int(s, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); - NUTS_CLOSE(s); -} - -static void -test_pull_poll_readable(void) -{ - int fd; - nng_socket pull; - nng_socket push; - - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_push0_open(&push)); - NUTS_PASS(nng_socket_set_ms(pull, NNG_OPT_RECVTIMEO, 1000)); - NUTS_PASS(nng_socket_set_ms(push, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_get_int(pull, NNG_OPT_RECVFD, &fd)); - NUTS_TRUE(fd >= 0); - - // Not readable if not connected! - NUTS_TRUE(nuts_poll_fd(fd) == false); - - // Even after connect (no message yet) - NUTS_MARRY(pull, push); - NUTS_TRUE(nuts_poll_fd(fd) == false); - - // But once we send messages, it is. - // We have to send a request, in order to send a reply. - NUTS_SEND(push, "abc"); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd)); - - // and receiving makes it no longer ready - NUTS_RECV(pull, "abc"); - NUTS_TRUE(nuts_poll_fd(fd) == false); - - NUTS_CLOSE(pull); - NUTS_CLOSE(push); -} - -static void -test_pull_close_pending(void) -{ - int fd; - nng_socket pull; - nng_socket push; - nng_pipe p1, p2; - char * addr; - - NUTS_ADDR(addr, "inproc"); - - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_push0_open(&push)); - NUTS_PASS(nng_socket_get_int(pull, NNG_OPT_RECVFD, &fd)); - NUTS_TRUE(fd >= 0); - NUTS_MARRY_EX(pull, push, addr, &p1, &p2); - - // Send a message -- it's ready for reading. - NUTS_SEND(push, "abc"); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd)); - - // NB: We have to close the pipe instead of the socket. - // This is because the socket won't notice the remote pipe - // disconnect until we collect the message and start another - // receive operation. - nng_pipe_close(p1); - nng_pipe_close(p2); - - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd) == false); - - NUTS_CLOSE(pull); - NUTS_CLOSE(push); -} - -void -test_pull_validate_peer(void) -{ - nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; - - NUTS_ADDR(addr, "inproc"); - - NUTS_PASS(nng_pull0_open(&s1)); - NUTS_PASS(nng_pull0_open(&s2)); - - NUTS_PASS(nng_listen(s1, addr, NULL, 0)); - NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); - - NUTS_SLEEP(100); - NUTS_PASS(nng_stats_get(&stats)); - - NUTS_TRUE(stats != NULL); - NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); - NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); - - NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); - NUTS_TRUE(nng_stat_value(reject) > 0); - - NUTS_CLOSE(s1); - NUTS_CLOSE(s2); - nng_stats_free(stats); -} - -static void -test_pull_recv_aio_stopped(void) -{ - nng_socket s; - nng_aio * aio; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - - nng_aio_stop(aio); - nng_recv_aio(s, aio); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); - NUTS_CLOSE(s); - nng_aio_free(aio); -} - -static void -test_pull_close_recv(void) -{ - nng_socket s; - nng_aio * aio; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - nng_aio_set_timeout(aio, 1000); - nng_recv_aio(s, aio); - NUTS_PASS(nng_close(s)); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); - - nng_aio_free(aio); -} - -static void -test_pull_recv_nonblock(void) -{ - nng_socket s; - nng_aio * aio; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - - nng_aio_set_timeout(aio, 0); // Instant timeout - nng_recv_aio(s, aio); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); - NUTS_CLOSE(s); - nng_aio_free(aio); -} - -static void -test_pull_recv_cancel(void) -{ - nng_socket s; - nng_aio * aio; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - - nng_aio_set_timeout(aio, 1000); - nng_recv_aio(s, aio); - nng_aio_abort(aio, NNG_ECANCELED); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); - NUTS_CLOSE(s); - nng_aio_free(aio); -} - -static void -test_pull_cooked(void) -{ - nng_socket s; - bool b; - - NUTS_PASS(nng_pull0_open(&s)); - NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); - NUTS_TRUE(!b); - NUTS_CLOSE(s); -} - -TEST_LIST = { - { "pull identity", test_pull_identity }, - { "pull cannot send", test_pull_cannot_send }, - { "pull no context", test_pull_no_context }, - { "pull not writeable", test_pull_not_writeable }, - { "pull poll readable", test_pull_poll_readable }, - { "pull close pending", test_pull_close_pending }, - { "pull validate peer", test_pull_validate_peer }, - { "pull recv aio stopped", test_pull_recv_aio_stopped }, - { "pull close recv", test_pull_close_recv }, - { "pull recv nonblock", test_pull_recv_nonblock }, - { "pull recv cancel", test_pull_recv_cancel }, - { "pull cooked", test_pull_cooked }, - { NULL, NULL }, -}; diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c deleted file mode 100644 index ad43d967..00000000 --- a/src/protocol/pipeline0/push.c +++ /dev/null @@ -1,442 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <stdlib.h> - -#include "core/nng_impl.h" -#include "nng/protocol/pipeline0/push.h" - -// Push protocol. The PUSH protocol is the "write" side of a pipeline. -// Push distributes fairly, or tries to, by giving messages in round-robin -// order. - -#ifndef NNI_PROTO_PULL_V0 -#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) -#endif - -#ifndef NNI_PROTO_PUSH_V0 -#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) -#endif - -typedef struct push0_pipe push0_pipe; -typedef struct push0_sock push0_sock; - -static void push0_send_cb(void *); -static void push0_recv_cb(void *); -static void push0_pipe_ready(push0_pipe *); - -// push0_sock is our per-socket protocol private structure. -struct push0_sock { - nni_lmq wq; // list of messages queued - nni_list aq; // list of aio senders waiting - nni_list pl; // list of pipes ready to send - nni_pollable writable; - nni_mtx m; -}; - -// push0_pipe is our per-pipe protocol private structure. -struct push0_pipe { - nni_pipe * pipe; - push0_sock * push; - nni_list_node node; - - nni_aio aio_recv; - nni_aio aio_send; -}; - -static int -push0_sock_init(void *arg, nni_sock *sock) -{ - push0_sock *s = arg; - NNI_ARG_UNUSED(sock); - - nni_mtx_init(&s->m); - nni_aio_list_init(&s->aq); - NNI_LIST_INIT(&s->pl, push0_pipe, node); - nni_lmq_init(&s->wq, 0); // initially we start unbuffered. - nni_pollable_init(&s->writable); - - return (0); -} - -static void -push0_sock_fini(void *arg) -{ - push0_sock *s = arg; - nni_pollable_fini(&s->writable); - nni_lmq_fini(&s->wq); - nni_mtx_fini(&s->m); -} - -static void -push0_sock_open(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - -static void -push0_sock_close(void *arg) -{ - push0_sock *s = arg; - nni_aio * a; - nni_mtx_lock(&s->m); - while ((a = nni_list_first(&s->aq)) != NULL) { - nni_aio_list_remove(a); - nni_aio_finish_error(a, NNG_ECLOSED); - } - nni_mtx_unlock(&s->m); -} - -static void -push0_pipe_stop(void *arg) -{ - push0_pipe *p = arg; - - nni_aio_stop(&p->aio_recv); - nni_aio_stop(&p->aio_send); -} - -static void -push0_pipe_fini(void *arg) -{ - push0_pipe *p = arg; - - nni_aio_fini(&p->aio_recv); - nni_aio_fini(&p->aio_send); -} - -static int -push0_pipe_init(void *arg, nni_pipe *pipe, void *s) -{ - push0_pipe *p = arg; - - nni_aio_init(&p->aio_recv, push0_recv_cb, p); - nni_aio_init(&p->aio_send, push0_send_cb, p); - NNI_LIST_NODE_INIT(&p->node); - p->pipe = pipe; - p->push = s; - return (0); -} - -static int -push0_pipe_start(void *arg) -{ - push0_pipe *p = arg; - - if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { - return (NNG_EPROTO); - } - - // Schedule a receiver. This is mostly so that we can detect - // a closed transport pipe. - nni_pipe_recv(p->pipe, &p->aio_recv); - push0_pipe_ready(p); - - return (0); -} - -static void -push0_pipe_close(void *arg) -{ - push0_pipe *p = arg; - push0_sock *s = p->push; - - nni_aio_close(&p->aio_recv); - nni_aio_close(&p->aio_send); - - nni_mtx_lock(&s->m); - if (nni_list_node_active(&p->node)) { - nni_list_node_remove(&p->node); - - if (nni_list_empty(&s->pl) && nni_lmq_full(&s->wq)) { - nni_pollable_clear(&s->writable); - } - } - nni_mtx_unlock(&s->m); -} - -static void -push0_recv_cb(void *arg) -{ - push0_pipe *p = arg; - - // We normally expect to receive an error. If a pipe actually - // sends us data, we just discard it. - if (nni_aio_result(&p->aio_recv) != 0) { - nni_pipe_close(p->pipe); - return; - } - nni_msg_free(nni_aio_get_msg(&p->aio_recv)); - nni_aio_set_msg(&p->aio_recv, NULL); - nni_pipe_recv(p->pipe, &p->aio_recv); -} - -static void -push0_pipe_ready(push0_pipe *p) -{ - push0_sock *s = p->push; - nni_msg * m; - nni_aio * a = NULL; - size_t l; - bool blocked; - - nni_mtx_lock(&s->m); - - blocked = nni_lmq_full(&s->wq) && nni_list_empty(&s->pl); - - // if message is waiting in the buffered queue - // then we prefer that. - if (nni_lmq_getq(&s->wq, &m) == 0) { - nni_aio_set_msg(&p->aio_send, m); - nni_pipe_send(p->pipe, &p->aio_send); - - if ((a = nni_list_first(&s->aq)) != NULL) { - nni_aio_list_remove(a); - m = nni_aio_get_msg(a); - l = nni_msg_len(m); - nni_lmq_putq(&s->wq, m); - } - - } else if ((a = nni_list_first(&s->aq)) != NULL) { - // Looks like we had the unbuffered case, but - // someone was waiting. - nni_aio_list_remove(a); - m = nni_aio_get_msg(a); - l = nni_msg_len(m); - - nni_aio_set_msg(&p->aio_send, m); - nni_pipe_send(p->pipe, &p->aio_send); - } else { - // We had nothing to send. Just put us in the ready list. - nni_list_append(&s->pl, p); - } - - if (blocked) { - // if we blocked, then toggle the status. - if ((!nni_lmq_full(&s->wq)) || (!nni_list_empty(&s->pl))) { - nni_pollable_raise(&s->writable); - } - } - - nni_mtx_unlock(&s->m); - - if (a != NULL) { - nni_aio_set_msg(a, NULL); - nni_aio_finish_sync(a, 0, l); - } -} - -static void -push0_send_cb(void *arg) -{ - push0_pipe *p = arg; - - if (nni_aio_result(&p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(&p->aio_send)); - nni_aio_set_msg(&p->aio_send, NULL); - nni_pipe_close(p->pipe); - return; - } - - push0_pipe_ready(p); -} - -static void -push0_cancel(nni_aio *aio, void *arg, int rv) -{ - push0_sock *s = arg; - - nni_mtx_lock(&s->m); - if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - } - nni_mtx_unlock(&s->m); -} - -static void -push0_sock_send(void *arg, nni_aio *aio) -{ - push0_sock *s = arg; - push0_pipe *p; - nni_msg * m; - size_t l; - int rv; - - if (nni_aio_begin(aio) != 0) { - return; - } - - m = nni_aio_get_msg(aio); - l = nni_msg_len(m); - - nni_mtx_lock(&s->m); - - // First we want to see if we can send it right now. - // Note that we don't block the sender until the read is complete, - // only until we have committed to send it. - if ((p = nni_list_first(&s->pl)) != NULL) { - nni_list_remove(&s->pl, p); - // NB: We won't have had any waiters in the message queue - // or the aio queue, because we would not put the pipe - // in the ready list in that case. Note though that the - // wq may be "full" if we are unbuffered. - if (nni_list_empty(&s->pl) && (nni_lmq_full(&s->wq))) { - nni_pollable_clear(&s->writable); - } - nni_aio_set_msg(aio, NULL); - nni_aio_finish(aio, 0, l); - nni_aio_set_msg(&p->aio_send, m); - nni_pipe_send(p->pipe, &p->aio_send); - nni_mtx_unlock(&s->m); - return; - } - - // Can we maybe queue it. - if (nni_lmq_putq(&s->wq, m) == 0) { - // Yay, we can. So we're done. - nni_aio_set_msg(aio, NULL); - nni_aio_finish(aio, 0, l); - if (nni_lmq_full(&s->wq)) { - nni_pollable_clear(&s->writable); - } - nni_mtx_unlock(&s->m); - return; - } - - if ((rv = nni_aio_schedule(aio, push0_cancel, s)) != 0) { - nni_aio_finish_error(aio, rv); - nni_mtx_unlock(&s->m); - return; - } - nni_aio_list_append(&s->aq, aio); - nni_mtx_unlock(&s->m); -} - -static void -push0_sock_recv(void *arg, nni_aio *aio) -{ - NNI_ARG_UNUSED(arg); - nni_aio_finish_error(aio, NNG_ENOTSUP); -} - -static int -push0_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t) -{ - push0_sock *s = arg; - int val; - int rv; - - if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) { - return (rv); - } - nni_mtx_lock(&s->m); - rv = nni_lmq_resize(&s->wq, (size_t) val); - // Changing the size of the queue can affect our readiness. - if (!nni_lmq_full(&s->wq)) { - nni_pollable_raise(&s->writable); - } else if (nni_list_empty(&s->pl)) { - nni_pollable_clear(&s->writable); - } - nni_mtx_unlock(&s->m); - return (rv); -} - -static int -push0_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t) -{ - push0_sock *s = arg; - int val; - - nni_mtx_lock(&s->m); - val = nni_lmq_cap(&s->wq); - nni_mtx_unlock(&s->m); - - return (nni_copyout_int(val, buf, szp, t)); -} - -static int -push0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) -{ - push0_sock *s = arg; - int rv; - int fd; - - if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { - return (rv); - } - return (nni_copyout_int(fd, buf, szp, t)); -} - -static nni_proto_pipe_ops push0_pipe_ops = { - .pipe_size = sizeof(push0_pipe), - .pipe_init = push0_pipe_init, - .pipe_fini = push0_pipe_fini, - .pipe_start = push0_pipe_start, - .pipe_close = push0_pipe_close, - .pipe_stop = push0_pipe_stop, -}; - -static nni_option push0_sock_options[] = { - { - .o_name = NNG_OPT_SENDFD, - .o_get = push0_sock_get_send_fd, - }, - { - .o_name = NNG_OPT_SENDBUF, - .o_get = push0_get_send_buf_len, - .o_set = push0_set_send_buf_len, - }, - // terminate list - { - .o_name = NULL, - }, -}; - -static nni_proto_sock_ops push0_sock_ops = { - .sock_size = sizeof(push0_sock), - .sock_init = push0_sock_init, - .sock_fini = push0_sock_fini, - .sock_open = push0_sock_open, - .sock_close = push0_sock_close, - .sock_options = push0_sock_options, - .sock_send = push0_sock_send, - .sock_recv = push0_sock_recv, -}; - -static nni_proto push0_proto = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PUSH_V0, "push" }, - .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, - .proto_flags = NNI_PROTO_FLAG_SND, - .proto_pipe_ops = &push0_pipe_ops, - .proto_sock_ops = &push0_sock_ops, -}; - -static nni_proto push0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_PUSH_V0, "push" }, - .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, - .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, - .proto_pipe_ops = &push0_pipe_ops, - .proto_sock_ops = &push0_sock_ops, -}; - -int -nng_push0_open(nng_socket *s) -{ - return (nni_proto_open(s, &push0_proto)); -} - -int -nng_push0_open_raw(nng_socket *s) -{ - return (nni_proto_open(s, &push0_proto_raw)); -} diff --git a/src/protocol/pipeline0/push_test.c b/src/protocol/pipeline0/push_test.c deleted file mode 100644 index d22ccaa4..00000000 --- a/src/protocol/pipeline0/push_test.c +++ /dev/null @@ -1,525 +0,0 @@ -// -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <nuts.h> - -static void -test_push_identity(void) -{ - nng_socket s; - int p; - char * n; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p)); - NUTS_TRUE(p == NUTS_PROTO(5u, 0u)); // 80 - NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p)); - NUTS_TRUE(p == NUTS_PROTO(5u, 1u)); // 81 - NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n)); - NUTS_MATCH(n, "push"); - nng_strfree(n); - NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n)); - NUTS_MATCH(n, "pull"); - nng_strfree(n); - NUTS_CLOSE(s); -} - -static void -test_push_cannot_recv(void) -{ - nng_socket s; - nng_msg * m = NULL; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_FAIL(nng_recvmsg(s, &m, 0), NNG_ENOTSUP); - NUTS_CLOSE(s); -} - -static void -test_push_no_context(void) -{ - nng_socket s; - nng_ctx ctx; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP); - NUTS_CLOSE(s); -} - -static void -test_push_not_readable(void) -{ - int fd; - nng_socket s; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_FAIL(nng_socket_get_int(s, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); - NUTS_CLOSE(s); -} - -static void -test_push_poll_writable(void) -{ - int fd; - nng_socket pull; - nng_socket push; - - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_push0_open(&push)); - NUTS_PASS(nng_socket_set_ms(pull, NNG_OPT_RECVTIMEO, 1000)); - NUTS_PASS(nng_socket_set_ms(push, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_get_int(push, NNG_OPT_SENDFD, &fd)); - NUTS_TRUE(fd >= 0); - - // This tests unbuffered sockets for now. - // Note that for this we are using unbuffered inproc. - // If using TCP or similar, then transport buffering will - // break assumptions in this test. - - // Not writable if not connected! - NUTS_TRUE(nuts_poll_fd(fd) == false); - - // After connect we can write. - NUTS_MARRY(pull, push); - NUTS_TRUE(nuts_poll_fd(fd) == true); - - // But once we send a message, it is not anymore. - NUTS_SEND(push, "abc"); - // Have to send a second message, because the remote socket - // will have consumed the first one. - NUTS_SEND(push, "def"); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd) == false); - - // and receiving receiving the message makes it possible again. - NUTS_RECV(pull, "abc"); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd)); - - NUTS_CLOSE(pull); - NUTS_CLOSE(push); -} - -static void -test_push_poll_buffered(void) -{ - int fd; - nng_socket pull; - nng_socket push; - - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_push0_open(&push)); - NUTS_PASS(nng_socket_set_ms(pull, NNG_OPT_RECVTIMEO, 1000)); - NUTS_PASS(nng_socket_set_ms(push, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_set_int(push, NNG_OPT_SENDBUF, 2)); - NUTS_PASS(nng_socket_get_int(push, NNG_OPT_SENDFD, &fd)); - NUTS_TRUE(fd >= 0); - - // We can write two message while unbuffered. - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_SEND(push, "abc"); - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_SEND(push, "def"); - NUTS_TRUE(nuts_poll_fd(fd) == false); - - // After connect we remote end will pick up one of them. - // Also, the local pipe itself will pick up one. So we - // have two. - NUTS_MARRY(pull, push); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_SEND(push, "ghi"); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_SEND(push, "jkl"); - // Now it should be full. - NUTS_TRUE(nuts_poll_fd(fd) == false); - - // and receiving receiving the message makes it possible again. - NUTS_RECV(pull, "abc"); - NUTS_SLEEP(100); - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_RECV(pull, "def"); - NUTS_RECV(pull, "ghi"); - NUTS_RECV(pull, "jkl"); - - NUTS_CLOSE(pull); - NUTS_CLOSE(push); -} - -static void -test_push_poll_truncate(void) -{ - int fd; - nng_socket pull; - nng_socket push; - - // This test starts with a buffer and then truncates it to see - // that shortening the buffer has an impact. - - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_push0_open(&push)); - NUTS_PASS(nng_socket_set_ms(pull, NNG_OPT_RECVTIMEO, 1000)); - NUTS_PASS(nng_socket_set_ms(push, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_set_int(push, NNG_OPT_SENDBUF, 3)); - NUTS_PASS(nng_socket_get_int(push, NNG_OPT_SENDFD, &fd)); - NUTS_TRUE(fd >= 0); - - // We can write two message while unbuffered. - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_SEND(push, "abc"); - NUTS_TRUE(nuts_poll_fd(fd)); - NUTS_SEND(push, "def"); - NUTS_TRUE(nuts_poll_fd(fd)); - - NUTS_PASS(nng_socket_set_int(push, NNG_OPT_SENDBUF, 1)); - NUTS_TRUE(nuts_poll_fd(fd) == false); - - NUTS_MARRY(pull, push); - NUTS_RECV(pull, "abc"); - // def got dropped - NUTS_SEND(push, "ghi"); - NUTS_RECV(pull, "ghi"); - - NUTS_CLOSE(pull); - NUTS_SLEEP(100); - - // We have a buffer of one. - NUTS_SEND(push, "jkl"); - // Resize to 0 (unbuffered) - NUTS_PASS(nng_socket_set_int(push, NNG_OPT_SENDBUF, 0)); - - // reopen the pull socket and connect it - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_MARRY(push, pull); - - // jkl got dropped. - NUTS_SEND(push, "mno"); - NUTS_RECV(pull, "mno"); - - NUTS_CLOSE(pull); - NUTS_CLOSE(push); -} - -void -test_push_validate_peer(void) -{ - nng_socket s1, s2; - nng_stat * stats; - nng_stat * reject; - char * addr; - - NUTS_ADDR(addr, "inproc"); - - NUTS_PASS(nng_push0_open(&s1)); - NUTS_PASS(nng_push0_open(&s2)); - - NUTS_PASS(nng_listen(s1, addr, NULL, 0)); - NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK)); - - NUTS_SLEEP(100); - NUTS_PASS(nng_stats_get(&stats)); - - NUTS_TRUE(stats != NULL); - NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL); - NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL); - - NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER); - NUTS_TRUE(nng_stat_value(reject) > 0); - - NUTS_CLOSE(s1); - NUTS_CLOSE(s2); - nng_stats_free(stats); -} - -static void -test_push_send_aio_stopped(void) -{ - nng_socket s; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - - nng_aio_set_msg(aio, m); - nng_aio_stop(aio); - nng_send_aio(s, aio); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); - NUTS_CLOSE(s); - nng_aio_free(aio); - nng_msg_free(m); -} - -static void -test_push_close_send(void) -{ - nng_socket s; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - nng_aio_set_timeout(aio, 1000); - nng_aio_set_msg(aio, m); - nng_send_aio(s, aio); - NUTS_PASS(nng_close(s)); - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECLOSED); - - nng_aio_free(aio); - nng_msg_free(m); -} - -static void -test_push_send_nonblock(void) -{ - nng_socket s; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - - nng_aio_set_timeout(aio, 0); // Instant timeout - nng_aio_set_msg(aio, m); - nng_send_aio(s, aio); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); - NUTS_CLOSE(s); - nng_aio_free(aio); - nng_msg_free(m); -} - -static void -test_push_send_timeout(void) -{ - nng_socket s; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - - nng_aio_set_timeout(aio, 10); - nng_aio_set_msg(aio, m); - nng_send_aio(s, aio); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); - NUTS_CLOSE(s); - nng_aio_free(aio); - nng_msg_free(m); -} - -static void -test_push_send_cancel(void) -{ - nng_socket s; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - - nng_aio_set_timeout(aio, 1000); - nng_aio_set_msg(aio, m); - nng_send_aio(s, aio); - nng_aio_abort(aio, NNG_ECANCELED); - - nng_aio_wait(aio); - NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED); - NUTS_CLOSE(s); - nng_aio_free(aio); - nng_msg_free(m); -} - -static void -test_push_send_late_unbuffered(void) -{ - nng_socket s; - nng_socket pull; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - NUTS_PASS(nng_msg_append(m, "123\0", 4)); - - nng_aio_set_timeout(aio, 1000); - nng_aio_set_msg(aio, m); - nng_send_aio(s, aio); - - NUTS_MARRY(s, pull); - - NUTS_RECV(pull, "123"); - - nng_aio_wait(aio); - NUTS_PASS(nng_aio_result(aio)); - NUTS_CLOSE(s); - nng_aio_free(aio); -} - - -static void -test_push_send_late_buffered(void) -{ - nng_socket s; - nng_socket pull; - nng_aio * aio; - nng_msg * m; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_pull0_open(&pull)); - NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); - NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 2)); - NUTS_PASS(nng_msg_alloc(&m, 0)); - NUTS_PASS(nng_msg_append(m, "123\0", 4)); - - nng_aio_set_timeout(aio, 1000); - nng_aio_set_msg(aio, m); - nng_send_aio(s, aio); - - NUTS_MARRY(s, pull); - - NUTS_RECV(pull, "123"); - - nng_aio_wait(aio); - NUTS_PASS(nng_aio_result(aio)); - NUTS_CLOSE(s); - nng_aio_free(aio); -} - -static void -test_push_cooked(void) -{ - nng_socket s; - bool b; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_socket_get_bool(s, NNG_OPT_RAW, &b)); - NUTS_TRUE(!b); - NUTS_CLOSE(s); -} - -static void -test_push_load_balance_buffered(void) -{ - nng_socket s; - nng_socket pull1; - nng_socket pull2; - nng_socket pull3; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_pull0_open(&pull1)); - NUTS_PASS(nng_pull0_open(&pull2)); - NUTS_PASS(nng_pull0_open(&pull3)); - NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 4)); - NUTS_MARRY(s, pull1); - NUTS_MARRY(s, pull2); - NUTS_MARRY(s, pull3); - NUTS_SLEEP(100); - NUTS_SEND(s, "one"); - NUTS_SEND(s, "two"); - NUTS_SEND(s, "three"); - NUTS_RECV(pull1, "one"); - NUTS_RECV(pull2, "two"); - NUTS_RECV(pull3, "three"); - NUTS_CLOSE(s); - NUTS_CLOSE(pull1); - NUTS_CLOSE(pull2); - NUTS_CLOSE(pull3); -} - -static void -test_push_load_balance_unbuffered(void) -{ - nng_socket s; - nng_socket pull1; - nng_socket pull2; - nng_socket pull3; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_pull0_open(&pull1)); - NUTS_PASS(nng_pull0_open(&pull2)); - NUTS_PASS(nng_pull0_open(&pull3)); - NUTS_MARRY(s, pull1); - NUTS_MARRY(s, pull2); - NUTS_MARRY(s, pull3); - NUTS_SLEEP(100); - NUTS_SEND(s, "one"); - NUTS_SEND(s, "two"); - NUTS_SEND(s, "three"); - NUTS_RECV(pull1, "one"); - NUTS_RECV(pull2, "two"); - NUTS_RECV(pull3, "three"); - // Loop around is unpredictable somewhat, because the the - // pull sockets can take different periods of time to return - // back to readiness. - NUTS_CLOSE(s); - NUTS_CLOSE(pull1); - NUTS_CLOSE(pull2); - NUTS_CLOSE(pull3); -} - -static void -test_push_send_buffer(void) -{ - nng_socket s; - int v; - bool b; - size_t sz; - - NUTS_PASS(nng_push0_open(&s)); - NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v)); - NUTS_TRUE(v == 0); - NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_SENDBUF, &b), NNG_EBADTYPE); - sz = 1; - NUTS_FAIL(nng_getopt(s, NNG_OPT_SENDBUF, &b, &sz), NNG_EINVAL); - NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, -1), NNG_EINVAL); - NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, 100000), NNG_EINVAL); - NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_SENDBUF, false), NNG_EBADTYPE); - NUTS_FAIL(nng_setopt(s, NNG_OPT_SENDBUF, &b, 1), NNG_EINVAL); - NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 100)); - NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v)); - NUTS_TRUE(v == 100); - NUTS_CLOSE(s); -} - -TEST_LIST = { - { "push identity", test_push_identity }, - { "push cannot recv", test_push_cannot_recv }, - { "push no context", test_push_no_context }, - { "push not readable", test_push_not_readable }, - { "push poll writable", test_push_poll_writable }, - { "push poll buffered", test_push_poll_buffered }, - { "push poll truncate", test_push_poll_truncate }, - { "push validate peer", test_push_validate_peer }, - { "push send aio stopped", test_push_send_aio_stopped }, - { "push close send", test_push_close_send }, - { "push send nonblock", test_push_send_nonblock }, - { "push send timeout", test_push_send_timeout }, - { "push send cancel", test_push_send_cancel }, - { "push send late buffered", test_push_send_late_buffered }, - { "push send late unbuffered", test_push_send_late_unbuffered }, - { "push cooked", test_push_cooked }, - { "push load balance buffered", test_push_load_balance_buffered }, - { "push load balance unbuffered", test_push_load_balance_unbuffered }, - { "push send buffer", test_push_send_buffer }, - { NULL, NULL }, -}; |
