diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-12-28 10:55:13 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-12-28 12:41:31 -0800 |
| commit | 255a85d10f68a898c671f9387da61e0ba62a61c7 (patch) | |
| tree | 17b93ed72f14b023677234a5cb870efe4f9f4f35 | |
| parent | 50b5f060548987de6f5ac65f39fa0eecb597decb (diff) | |
| download | nng-255a85d10f68a898c671f9387da61e0ba62a61c7.tar.gz nng-255a85d10f68a898c671f9387da61e0ba62a61c7.tar.bz2 nng-255a85d10f68a898c671f9387da61e0ba62a61c7.zip | |
fixes #986 ws_read_finish_str free invalid pointer
Also, this has refactored the websocket stream test to the new
acutest.h, and includes a much deeper test of fragmentation and
reassembly of websocket streams.
| -rw-r--r-- | src/supplemental/websocket/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 31 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket_test.c | 416 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | tests/testutil.c | 12 | ||||
| -rw-r--r-- | tests/testutil.h | 3 | ||||
| -rw-r--r-- | tests/wsstream.c | 204 |
7 files changed, 450 insertions, 220 deletions
diff --git a/src/supplemental/websocket/CMakeLists.txt b/src/supplemental/websocket/CMakeLists.txt index 421c8915..8effa931 100644 --- a/src/supplemental/websocket/CMakeLists.txt +++ b/src/supplemental/websocket/CMakeLists.txt @@ -16,3 +16,4 @@ if (NNG_SUPP_WEBSOCKET) else () nng_sources(stub.c) endif () +nng_test(websocket_test) diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 29d09c0e..55f4e9e4 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -156,7 +156,8 @@ struct ws_frame { enum ws_type op; bool final; bool masked; - size_t bufsz; // allocated size + size_t asize; // allocated size + uint8_t * adata; uint8_t * buf; nng_aio * aio; }; @@ -341,8 +342,8 @@ ws_make_accept(const char *key, char *accept) static void ws_frame_fini(ws_frame *frame) { - if (frame->bufsz != 0) { - nni_free(frame->buf, frame->bufsz); + if (frame->asize != 0) { + nni_free(frame->adata, frame->asize); } NNI_FREE_STRUCT(frame); } @@ -403,7 +404,7 @@ ws_msg_init_control( frame->head[1] = len & 0x7F; frame->hlen = 2; frame->buf = frame->sdata; - frame->bufsz = 0; + frame->asize = 0; if (ws->server) { frame->masked = false; @@ -446,14 +447,15 @@ ws_frame_prep_tx(nni_ws *ws, ws_frame *frame) } // Potentially allocate space for the data if we need to. // Note that an empty message is legal. - if ((frame->bufsz < frame->len) && (frame->len > 0)) { - nni_free(frame->buf, frame->bufsz); - frame->buf = nni_alloc(frame->len); - if (frame->buf == NULL) { - frame->bufsz = 0; + if ((frame->asize < frame->len) && (frame->len > 0)) { + nni_free(frame->adata, frame->asize); + frame->adata = nni_alloc(frame->len); + if (frame->adata == NULL) { + frame->asize = 0; return (NNG_ENOMEM); } - frame->bufsz = frame->len; + frame->asize = frame->len; + frame->buf = frame->adata; } buf = frame->buf; @@ -1109,15 +1111,16 @@ ws_read_cb(void *arg) // Short frames can avoid an alloc if (frame->len < 126) { frame->buf = frame->sdata; - frame->bufsz = 0; + frame->asize = 0; } else { - frame->buf = nni_alloc(frame->len); - if (frame->buf == NULL) { + frame->adata = nni_alloc(frame->len); + if (frame->adata == NULL) { ws_close(ws, WS_CLOSE_INTERNAL); nni_mtx_unlock(&ws->mtx); return; } - frame->bufsz = frame->len; + frame->asize = frame->len; + frame->buf = frame->adata; } iov.iov_buf = frame->buf; diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c new file mode 100644 index 00000000..831c6d43 --- /dev/null +++ b/src/supplemental/websocket/websocket_test.c @@ -0,0 +1,416 @@ +// +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// +#include <string.h> + +#include <nng/nng.h> +#include <nng/supplemental/util/platform.h> + +#include "supplemental/sha1/sha1.h" + +#include <acutest.h> +#include <testutil.h> + +void +test_websocket_wildcard(void) +{ + nng_stream_dialer * d = NULL; + nng_stream_listener *l = NULL; + nng_sockaddr sa1; + nng_sockaddr sa2; + size_t sz; + nng_aio * daio = NULL; + nng_aio * laio = NULL; + nng_aio * aio1 = NULL; + nng_aio * aio2 = NULL; + nng_stream * c1 = NULL; + nng_stream * c2 = NULL; + nng_iov iov; + char buf1[8]; + char buf2[8]; + char uri[64]; + + TEST_NNG_PASS(nng_stream_listener_alloc(&l, "ws://127.0.0.1:0/test")); + + TEST_NNG_PASS(nng_stream_listener_listen(l)); + + // Let's get the address we're going to use to dial -- also check + // that it is correct. + sz = sizeof(sa1); + TEST_NNG_PASS(nng_stream_listener_get(l, NNG_OPT_LOCADDR, &sa1, &sz)); + TEST_CHECK(sz == sizeof(sa1)); + TEST_CHECK(sa1.s_in.sa_port != 0); + TEST_CHECK(sa1.s_family == NNG_AF_INET); + TEST_CHECK(testutil_htonl(sa1.s_in.sa_addr) == 0x7F000001u); + + (void) snprintf(uri, sizeof(uri), "ws://127.0.0.1:%d/test", + testutil_htons(sa1.s_in.sa_port)); + + TEST_NNG_PASS(nng_stream_dialer_alloc(&d, uri)); + TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&aio1, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&aio2, NULL, NULL)); + nng_aio_set_timeout(daio, 5000); // 5 seconds + nng_aio_set_timeout(laio, 5000); + nng_aio_set_timeout(aio1, 5000); + nng_aio_set_timeout(aio2, 5000); + + nng_stream_dialer_dial(d, daio); + nng_stream_listener_accept(l, laio); + + nng_aio_wait(laio); + nng_aio_wait(daio); + + TEST_NNG_PASS(nng_aio_result(laio)); + TEST_NNG_PASS(nng_aio_result(daio)); + c1 = nng_aio_get_output(laio, 0); + c2 = nng_aio_get_output(daio, 0); + TEST_CHECK(c1 != NULL); + TEST_CHECK(c2 != NULL); + + // Let's compare the peer addresses + TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_REMADDR, &sa2)); + TEST_CHECK(sa1.s_family == sa2.s_family); + TEST_CHECK(sa1.s_in.sa_addr == sa2.s_in.sa_addr); + TEST_CHECK(sa1.s_in.sa_port == sa2.s_in.sa_port); + + TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_REMADDR, &sa1)); + TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_LOCADDR, &sa2)); + TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x", + sa1.s_family, sa2.s_family); + TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr, + "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), + testutil_htonl(sa2.s_in.sa_addr)); + TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port, + "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), + testutil_htons(sa2.s_in.sa_port)); + + // This relies on send completing for for just 5 bytes, and on + // recv doing the same. Technically this isn't/ guaranteed, but + // it would be weird to split such a small payload. + + memcpy(buf1, "TEST", 5); + memset(buf2, 0, 5); + iov.iov_buf = buf1; + iov.iov_len = 5; + TEST_NNG_PASS(nng_aio_set_iov(aio1, 1, &iov)); + + iov.iov_buf = buf2; + iov.iov_len = 5; + TEST_NNG_PASS(nng_aio_set_iov(aio2, 1, &iov)); + + nng_stream_send(c1, aio1); + nng_stream_recv(c2, aio2); + nng_aio_wait(aio1); + nng_aio_wait(aio2); + + TEST_NNG_PASS(nng_aio_result(aio1)); + TEST_CHECK(nng_aio_count(aio1) == 5); + + TEST_NNG_PASS(nng_aio_result(aio2)); + TEST_CHECK(nng_aio_count(aio2) == 5); + TEST_CHECK(memcmp(buf1, buf2, 5) == 0); + + nng_stream_close(c1); + nng_stream_free(c1); + nng_stream_close(c2); + nng_stream_free(c2); + nng_aio_free(daio); + nng_aio_free(laio); + nng_aio_free(aio1); + nng_aio_free(aio2); + nng_stream_listener_free(l); + nng_stream_dialer_free(d); +} + +void +test_websocket_conn_props(void) +{ + nng_stream_dialer * d = NULL; + nng_stream_listener *l = NULL; + nng_sockaddr sa1; + nng_sockaddr sa2; + size_t sz; + nng_aio * daio = NULL; + nng_aio * laio = NULL; + nng_stream * c1 = NULL; + nng_stream * c2 = NULL; + char uri[64]; + bool on; + char * str; + uint16_t port = testutil_next_port(); + + (void) snprintf(uri, sizeof(uri), "ws://127.0.0.1:%d/test", port); + + TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL)); + nng_aio_set_timeout(daio, 5000); // 5 seconds + nng_aio_set_timeout(laio, 5000); + + TEST_NNG_PASS(nng_stream_listener_alloc(&l, uri)); + TEST_NNG_PASS(nng_stream_listener_listen(l)); + TEST_NNG_PASS(nng_stream_dialer_alloc(&d, uri)); + + nng_stream_dialer_dial(d, daio); + nng_stream_listener_accept(l, laio); + + nng_aio_wait(laio); + nng_aio_wait(daio); + + TEST_NNG_PASS(nng_aio_result(laio)); + TEST_NNG_PASS(nng_aio_result(daio)); + c1 = nng_aio_get_output(laio, 0); + c2 = nng_aio_get_output(daio, 0); + TEST_CHECK(c1 != NULL); + TEST_CHECK(c2 != NULL); + + // Let's compare the peer addresses + TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_LOCADDR, &sa1)); + TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_REMADDR, &sa2)); + TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x", + sa1.s_family, sa2.s_family); + TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr, + "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), + testutil_htonl(sa2.s_in.sa_addr)); + TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port, + "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), + testutil_htons(sa2.s_in.sa_port)); + + TEST_NNG_PASS(nng_stream_get_addr(c1, NNG_OPT_REMADDR, &sa1)); + TEST_NNG_PASS(nng_stream_get_addr(c2, NNG_OPT_LOCADDR, &sa2)); + TEST_CHECK_(sa1.s_family == sa2.s_family, "families match %x == %x", + sa1.s_family, sa2.s_family); + TEST_CHECK_(sa1.s_in.sa_addr == sa2.s_in.sa_addr, + "addresses match %x == %x", testutil_htonl(sa1.s_in.sa_addr), + testutil_htonl(sa2.s_in.sa_addr)); + TEST_CHECK_(sa1.s_in.sa_port == sa2.s_in.sa_port, + "ports match %u == %u", testutil_htons(sa1.s_in.sa_port), + testutil_htons(sa2.s_in.sa_port)); + + on = true; + TEST_NNG_PASS(nng_stream_set_bool(c1, NNG_OPT_TCP_NODELAY, on)); + TEST_NNG_PASS(nng_stream_set_bool(c2, NNG_OPT_TCP_NODELAY, on)); + + TEST_NNG_PASS(nng_stream_set_bool(c1, NNG_OPT_TCP_KEEPALIVE, on)); + TEST_NNG_PASS(nng_stream_set_bool(c2, NNG_OPT_TCP_KEEPALIVE, on)); + TEST_NNG_FAIL(nng_stream_set_string(c1, NNG_OPT_TCP_KEEPALIVE, "nope"), + NNG_EBADTYPE); + + on = false; + sz = sizeof(on); + TEST_NNG_PASS(nng_stream_get(c1, NNG_OPT_TCP_NODELAY, &on, &sz)); + TEST_CHECK(sz == sizeof(on)); + TEST_CHECK(on == true); + + on = false; + sz = sizeof(on); + TEST_NNG_PASS(nng_stream_get(c2, NNG_OPT_TCP_KEEPALIVE, &on, &sz)); + TEST_CHECK(sz == sizeof(on)); + TEST_CHECK(on == true); + + TEST_NNG_FAIL( + nng_stream_get_size(c1, NNG_OPT_TCP_NODELAY, &sz), NNG_EBADTYPE); + + TEST_NNG_PASS(nng_stream_get_string( + c1, NNG_OPT_WS_REQUEST_HEADER "Sec-WebSocket-Version", &str)); + TEST_CHECK(str != NULL); + TEST_CHECK(strcmp(str, "13") == 0); + nng_strfree(str); + + nng_stream_close(c1); + nng_stream_free(c1); + nng_stream_close(c2); + nng_stream_free(c2); + nng_aio_free(daio); + nng_aio_free(laio); + nng_stream_listener_free(l); + nng_stream_dialer_free(d); +} + +typedef struct recv_state { + nng_stream * c; + int total; + int xfr; + nng_mtx * lock; + nng_cv * cv; + nng_aio * aio; + int err; + bool done; + uint8_t * send_buf; + uint8_t * buf; + nni_sha1_ctx sum; +} recv_state; + +static void +frag_recv_cb(void *arg) +{ + recv_state *s = arg; + + if ((s->err = nng_aio_result(s->aio)) == 0) { + int len = (int) nng_aio_count(s->aio); + int resid = s->total - s->xfr; + + nni_sha1_update(&s->sum, s->buf, (size_t) len); + s->buf += len; + s->xfr += len; + resid -= len; + + if (resid > 0) { + nng_iov iov; + iov.iov_buf = s->buf; + iov.iov_len = resid > 1024 ? 1024 : resid; + nng_aio_set_iov(s->aio, 1, &iov); + + nng_aio_set_timeout(s->aio, 2000); + nng_stream_recv(s->c, s->aio); + return; + } + } + + nng_mtx_lock(s->lock); + s->done = true; + nng_cv_wake(s->cv); + nng_mtx_unlock(s->lock); +} + +// This case tests some edges where receive and transmit fragmentation +// don't align. See bug 986. +void +test_websocket_fragmentation(void) +{ + nng_stream_listener *l = NULL; + nng_stream_dialer * d = NULL; + nng_stream * c = NULL; + uint16_t port; + char url[64]; + nng_aio * daio = NULL; + nng_aio * laio = NULL; + nng_aio * caio = NULL; + int resid; + recv_state state; + uint8_t sum1[20]; + uint8_t sum2[20]; + uint8_t * recv_buf; + uint8_t * send_buf; + uint8_t * buf; + nng_iov iov; + + memset(&state, 0, sizeof(state)); + state.total = 200000; // total to send + state.xfr = 0; + state.err = 0; + TEST_CHECK((recv_buf = nng_alloc(state.total)) != NULL); + TEST_CHECK((send_buf = nng_alloc(state.total)) != NULL); + TEST_NNG_PASS(nng_mtx_alloc(&state.lock)); + TEST_NNG_PASS(nng_cv_alloc(&state.cv, state.lock)); + TEST_NNG_PASS(nng_aio_alloc(&state.aio, frag_recv_cb, &state)); + nng_aio_set_timeout(state.aio, 2000); + state.buf = recv_buf; + + // Random fill the send buffer. + for (int i = 0; i < state.total; i++) { + send_buf[i] = nng_random() % 0xff; + } + + nni_sha1(send_buf, state.total, sum1); + nni_sha1_init(&state.sum); + + port = testutil_next_port(); + (void) snprintf(url, sizeof(url), "ws://127.0.0.1:%u", port); + + TEST_NNG_PASS(nng_stream_listener_alloc(&l, url)); + TEST_NNG_PASS(nng_stream_dialer_alloc(&d, url)); + TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&caio, NULL, NULL)); + + TEST_NNG_PASS( + nng_stream_listener_set_bool(l, NNG_OPT_TCP_NODELAY, true)); + TEST_NNG_PASS( + nng_stream_listener_set_size(l, NNG_OPT_WS_SENDMAXFRAME, 1000000)); + TEST_NNG_PASS(nng_stream_listener_listen(l)); + + TEST_NNG_PASS(nng_aio_alloc(&laio, NULL, NULL)); + TEST_NNG_PASS(nng_aio_alloc(&daio, NULL, NULL)); + nng_aio_set_timeout(laio, 2000); + nng_aio_set_timeout(daio, 2000); + + nng_stream_listener_accept(l, laio); + nng_stream_dialer_dial(d, daio); + + nng_aio_wait(laio); + nng_aio_wait(daio); + + TEST_NNG_PASS(nng_aio_result(laio)); + TEST_NNG_PASS(nng_aio_result(daio)); + state.c = nng_aio_get_output(daio, 0); + c = nng_aio_get_output(laio, 0); + + // start the receiver + iov.iov_buf = state.buf; + iov.iov_len = 1024; + nng_aio_set_iov(state.aio, 1, &iov); + nng_stream_recv(state.c, state.aio); + + buf = send_buf; + resid = state.total; + while (resid > 0) { + int len = resid < 9500 ? resid : 9500; + iov.iov_len = len; + iov.iov_buf = buf; + + TEST_NNG_PASS(nng_aio_set_iov(caio, 1, &iov)); + nng_stream_send(c, caio); + nng_aio_wait(caio); + TEST_NNG_PASS(nng_aio_result(caio)); + TEST_CHECK(nng_aio_count(caio) > 0); + len = nng_aio_count(caio); + + resid -= len; + buf += len; + } + + nng_mtx_lock(state.lock); + while (!state.done) { + nng_cv_wait(state.cv); + } + nng_mtx_unlock(state.lock); + + TEST_NNG_PASS(state.err); + TEST_CHECK_(state.xfr == state.total, + "send count (%d) == recv count (%d)", state.total, state.xfr); + + nni_sha1_final(&state.sum, sum2); + TEST_CHECK(memcmp(recv_buf, send_buf, state.total) == 0); + TEST_CHECK(memcmp(sum1, sum2, 20) == 0); + + nng_aio_free(caio); + nng_stream_close(c); + nng_stream_free(c); + + nng_aio_free(state.aio); + nng_stream_free(state.c); + nng_cv_free(state.cv); + nng_mtx_free(state.lock); + + nng_free(send_buf, state.total); + nng_free(recv_buf, state.total); + nng_aio_free(daio); + nng_aio_free(laio); + nng_stream_dialer_free(d); + nng_stream_listener_free(l); +} + +TEST_LIST = { + { "websocket stream wildcard", test_websocket_wildcard }, + { "websocket conn properties", test_websocket_conn_props }, + { "websocket fragmentation", test_websocket_fragmentation }, + { NULL, NULL }, +}; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bab00692..c4856aed 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -161,9 +161,8 @@ add_nng_test(tcp6 60) add_nng_test(transport 5) add_nng_test(udp 5) add_nng_test(url 5) -add_nng_test1(ws 30 NNG_TRANSPORT_WS) +add_nng_test(ws 30) add_nng_test1(wss 30 NNG_TRANSPORT_WSS) -add_nng_test1(wsstream 10 NNG_TRANSPORT_WS) add_nng_test1(zt 60 NNG_TRANSPORT_ZEROTIER) add_nng_test(bus 5) diff --git a/tests/testutil.c b/tests/testutil.c index ef748b99..1c65c996 100644 --- a/tests/testutil.c +++ b/tests/testutil.c @@ -123,6 +123,18 @@ testutil_htons(uint16_t in) return (in); } +uint32_t +testutil_htonl(uint32_t in) +{ +#ifdef NNG_LITTLE_ENDIAN + in = ((in >> 24u) & 0xffu) | + ((in >> 8u) & 0xff00u) | + ((in << 8u) & 0xff0000u) | + ((in << 24u) & 0xff000000u); +#endif + return (in); +} + // testutil_next_port returns a "next" allocation port. // Ports are chosen by starting from a random point within a // range (normally 38000-40000, but other good places to choose diff --git a/tests/testutil.h b/tests/testutil.h index ab792557..b9acfedf 100644 --- a/tests/testutil.h +++ b/tests/testutil.h @@ -31,6 +31,9 @@ extern bool testutil_pollfd(int); // testutil_htons is just htons portably. extern uint16_t testutil_htons(uint16_t); +// testutil_htonl is just htonl portably. +extern uint32_t testutil_htonl(uint32_t); + // testutil_sleep sleeps the specified number of msec extern void testutil_sleep(int); diff --git a/tests/wsstream.c b/tests/wsstream.c deleted file mode 100644 index c51e2630..00000000 --- a/tests/wsstream.c +++ /dev/null @@ -1,204 +0,0 @@ -// -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> -// Copyright 2018 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include <string.h> - -#include <nng/nng.h> - -#include "convey.h" -#include "stubs.h" - -TestMain("Websocket STREAM", { - atexit(nng_fini); - Convey("We can create a dialer and listener", { - nng_stream_dialer * d = NULL; - nng_stream_listener *l = NULL; - Reset({ - nng_stream_listener_free(l); - nng_stream_dialer_free(d); - l = NULL; - d = NULL; - }); - Convey("Listener listens (wildcard)", { - nng_sockaddr sa; - size_t sz; - uint8_t ip[4]; - - So(nng_stream_listener_alloc( - &l, "ws://127.0.0.1:0/test") == 0); - So(nng_stream_listener_listen(l) == 0); - - sz = sizeof(sa); - ip[0] = 127; - ip[1] = 0; - ip[2] = 0; - ip[3] = 1; - So(nng_stream_listener_get( - l, NNG_OPT_LOCADDR, &sa, &sz) == 0); - So(sz == sizeof(sa)); - So(sa.s_in.sa_port != 0); - So(memcmp(&sa.s_in.sa_addr, ip, 4) == 0); - - Convey("We can dial it", { - nng_aio * daio = NULL; - nng_aio * laio = NULL; - nng_aio * maio = NULL; - nng_stream *c1 = NULL; - nng_stream *c2 = NULL; - - char uri[64]; - snprintf(uri, sizeof(uri), - "ws://127.0.0.1:%d/test", - test_htons(sa.s_in.sa_port)); - - So(nng_stream_dialer_alloc(&d, uri) == 0); - So(nng_aio_alloc(&daio, NULL, NULL) == 0); - So(nng_aio_alloc(&laio, NULL, NULL) == 0); - So(nng_aio_alloc(&maio, NULL, NULL) == 0); - - Reset({ - nng_aio_free(daio); - nng_aio_free(laio); - nng_aio_free(maio); - if (c1 != NULL) { - nng_stream_close(c1); - nng_stream_free(c1); - } - if (c2 != NULL) { - nng_stream_close(c2); - nng_stream_free(c2); - } - }); - - nng_stream_dialer_dial(d, daio); - nng_stream_listener_accept(l, laio); - - nng_aio_wait(daio); - So(nng_aio_result(daio) == 0); - nng_aio_wait(laio); - So(nng_aio_result(laio) == 0); - - So(nng_aio_result(daio) == 0); - So(nng_aio_result(laio) == 0); - - c1 = nng_aio_get_output(daio, 0); - c2 = nng_aio_get_output(laio, 0); - So(c1 != NULL); - So(c2 != NULL); - - Convey("They exchange messages", { - nng_aio * aio1; - nng_aio * aio2; - nng_iov iov; - nng_sockaddr sa2; - char buf1[5]; - char buf2[5]; - bool on; - size_t sz; - - So(nng_aio_alloc(&aio1, NULL, NULL) == - 0); - So(nng_aio_alloc(&aio2, NULL, NULL) == - 0); - - Reset({ - nng_aio_free(aio1); - nng_aio_free(aio2); - }); - - on = true; - So(nng_stream_set(c1, - NNG_OPT_TCP_NODELAY, &on, - sizeof(on)) == 0); - So(nng_stream_set(c2, - NNG_OPT_TCP_NODELAY, &on, - sizeof(on)) == 0); - - So(nng_stream_set(c1, - NNG_OPT_TCP_KEEPALIVE, &on, - sizeof(on)) == 0); - - on = false; - sz = sizeof(on); - So(nng_stream_get(c1, - NNG_OPT_TCP_NODELAY, &on, - &sz) == 0); - So(sz == sizeof(on)); - So(on == true); - - on = false; - sz = sizeof(on); - So(nng_stream_get(c1, - NNG_OPT_TCP_KEEPALIVE, &on, - &sz) == 0); - So(sz == sizeof(on)); - So(on == true); - - // This relies on send completing for - // for just 5 bytes, and on recv doing - // the same. Technically this isn't - // guaranteed, but it would be weird - // to split such a small payload. - memcpy(buf1, "TEST", 5); - memset(buf2, 0, 5); - iov.iov_buf = buf1; - iov.iov_len = 5; - - nng_aio_set_iov(aio1, 1, &iov); - - iov.iov_buf = buf2; - iov.iov_len = 5; - nng_aio_set_iov(aio2, 1, &iov); - nng_stream_send(c1, aio1); - nng_stream_recv(c2, aio2); - nng_aio_wait(aio1); - nng_aio_wait(aio2); - - So(nng_aio_result(aio1) == 0); - So(nng_aio_count(aio1) == 5); - - So(nng_aio_result(aio2) == 0); - So(nng_aio_count(aio2) == 5); - - So(memcmp(buf1, buf2, 5) == 0); - - Convey("Socket name matches", { - sz = sizeof(sa2); - So(nng_stream_get(c2, - NNG_OPT_LOCADDR, &sa2, - &sz) == 0); - So(sz == sizeof(sa2)); - So(sa2.s_in.sa_family == - NNG_AF_INET); - - So(sa2.s_in.sa_addr == - sa.s_in.sa_addr); - So(sa2.s_in.sa_port == - sa.s_in.sa_port); - }); - - Convey("Peer name matches", { - sz = sizeof(sa2); - So(nng_stream_get(c1, - NNG_OPT_REMADDR, &sa2, - &sz) == 0); - So(sz == sizeof(sa2)); - So(sa2.s_in.sa_family == - NNG_AF_INET); - So(sa2.s_in.sa_addr == - sa.s_in.sa_addr); - So(sa2.s_in.sa_port == - sa.s_in.sa_port); - }); - }); - }); - }); - }); -}) |
