diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-02-18 12:28:23 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-02-18 12:28:23 -0800 |
| commit | b4512a2ff043e09370a49f951e0ed2bd9a946913 (patch) | |
| tree | 4080029e5b4303203d70c9effc73d7a5719f5270 /src | |
| parent | ee697a28909e357f724f8dd90ff9cb27f7892ec8 (diff) | |
| download | nng-b4512a2ff043e09370a49f951e0ed2bd9a946913.tar.gz nng-b4512a2ff043e09370a49f951e0ed2bd9a946913.tar.bz2 nng-b4512a2ff043e09370a49f951e0ed2bd9a946913.zip | |
Add support for socket:// in NUTS marry function.
This also adds a HUGE test for REP using socket so that we can
discriminate failures that might exist using sockets instead of inproc.
Diffstat (limited to 'src')
| -rw-r--r-- | src/sp/protocol/reqrep0/rep_test.c | 59 | ||||
| -rw-r--r-- | src/testing/marry.c | 54 |
2 files changed, 104 insertions, 9 deletions
diff --git a/src/sp/protocol/reqrep0/rep_test.c b/src/sp/protocol/reqrep0/rep_test.c index 5f13d2f7..57da9cac 100644 --- a/src/sp/protocol/reqrep0/rep_test.c +++ b/src/sp/protocol/reqrep0/rep_test.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include <nuts.h> static void @@ -241,6 +242,63 @@ test_rep_huge_send(void) } void +test_rep_huge_send_socket(void) +{ + nng_socket rep; + nng_socket req; + nng_msg *m; + nng_msg *d; + nng_aio *aio; + + NUTS_PASS(nng_rep_open(&rep)); + NUTS_PASS(nng_req_open(&req)); + NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + NUTS_PASS(nng_msg_alloc(&m, 10 << 20)); // 10 MB + NUTS_PASS(nng_socket_set_size(req, NNG_OPT_RECVMAXSZ, 1 << 30)); + NUTS_PASS(nng_socket_set_size(rep, NNG_OPT_RECVMAXSZ, 1 << 30)); + NUTS_MARRY_EX(req, rep, "socket://", NULL, NULL); + char *body = nng_msg_body(m); + + NUTS_ASSERT(nng_msg_len(m) == 10 << 20); + for (size_t i = 0; i < nng_msg_len(m); i++) { + body[i] = i % 16 + 'A'; + } + NUTS_PASS(nng_msg_dup(&d, m)); + NUTS_SEND(req, "R"); + NUTS_RECV(rep, "R"); + nng_aio_set_msg(aio, m); + nng_send_aio(rep, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + nng_aio_set_msg(aio, NULL); + m = NULL; + nng_recv_aio(req, aio); + nng_aio_wait(aio); + NUTS_PASS(nng_aio_result(aio)); + m = nng_aio_get_msg(aio); + NUTS_ASSERT(m != NULL); + NUTS_ASSERT(nng_msg_len(m) == nng_msg_len(d)); + NUTS_ASSERT( + memcmp(nng_msg_body(m), nng_msg_body(d), nng_msg_len(m)) == 0); + + // make sure other messages still flow afterwards + NUTS_SEND(req, "E"); + NUTS_RECV(rep, "E"); + NUTS_SEND(rep, "E"); + NUTS_RECV(req, "E"); + + nng_aio_free(aio); + nng_msg_free(m); + nng_msg_free(d); + NUTS_CLOSE(rep); + NUTS_CLOSE(req); +} + +void test_rep_close_pipe_before_send(void) { nng_socket rep; @@ -708,6 +766,7 @@ NUTS_TESTS = { { "rep context does not poll", test_rep_context_no_poll }, { "rep validate peer", test_rep_validate_peer }, { "rep huge send", test_rep_huge_send }, + { "rep huge send socket", test_rep_huge_send_socket }, { "rep double recv", test_rep_double_recv }, { "rep send nonblock", test_rep_send_nonblock }, { "rep close pipe before send", test_rep_close_pipe_before_send }, diff --git a/src/testing/marry.c b/src/testing/marry.c index cfaec6cc..2485a666 100644 --- a/src/testing/marry.c +++ b/src/testing/marry.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #ifdef _WIN32 #ifndef WIN32_LEAN_AND_MEAN @@ -99,12 +100,12 @@ nuts_scratch_addr(const char *scheme, size_t sz, char *addr) uint16_t nuts_next_port(void) { - char * name; - FILE * f; + char *name; + FILE *f; uint16_t port; uint16_t base; uint16_t end; - char * str; + char *str; #ifdef _WIN32 OVERLAPPED olp; HANDLE h; @@ -180,7 +181,7 @@ nuts_next_port(void) struct marriage_notice { nng_mtx *mx; - nng_cv * cv; + nng_cv *cv; int s1; int s2; int cnt1; @@ -249,6 +250,7 @@ nuts_marry_ex( char addr[64]; nng_listener l; int port; + int fd[2]; if (url == NULL) { (void) snprintf(addr, sizeof(addr), @@ -268,14 +270,47 @@ nuts_marry_ex( ((rv = nng_pipe_notify( s1, NNG_PIPE_EV_ADD_POST, married, ¬e)) != 0) || ((rv = nng_pipe_notify( - s2, NNG_PIPE_EV_ADD_POST, married, ¬e)) != 0) || - ((rv = nng_listen(s1, url, &l, 0)) != 0)) { + s2, NNG_PIPE_EV_ADD_POST, married, ¬e)) != 0)) { goto done; } - // If a TCP port of zero was selected, let's ask for the actual - // port bound. + // If socket:// is requested we will try to use that, otherwise we + // fake it with a TCP loopback socket. + if (strcmp(url, "socket://") == 0) { + rv = nng_socket_pair(fd); + if (rv == 0) { + nng_listener l2; + if (((rv = nng_listen(s1, url, &l, 0)) != 0) || + ((rv = nng_listen(s2, url, &l2, 0)) != 0) || + ((rv = nng_listener_set_int( + l, NNG_OPT_SOCKET_FD, fd[0])) != 0) || + ((rv = nng_listener_set_int( + l2, NNG_OPT_SOCKET_FD, fd[1])) != 0)) { +#ifdef _WIN32 + CloseHandle((HANDLE) fd[0]); + CloseHandle((HANDLE) fd[1]); +#else + close(fd[0]); + close(fd[1]); +#endif + return (rv); + } + } else if (rv == NNG_ENOTSUP) { + url = "tcp://127.0.0.1:0"; + rv = 0; + } else { + return (rv); + } + } + + if (strcmp(url, "socket://") != 0) { + if ((rv = nng_listen(s1, url, &l, 0)) != 0) { + return (rv); + } + } if ((strstr(url, ":0") != NULL) && + // If a TCP port of zero was selected, let's ask for the actual + // port bound. (nng_listener_get_int(l, NNG_OPT_TCP_BOUND_PORT, &port) == 0) && (port > 0)) { replace_port_zero(url, addr, port); @@ -285,7 +320,8 @@ nuts_marry_ex( ((rv = nng_socket_set_ms(s2, NNG_OPT_RECONNMAXT, 10)) != 0)) { goto done; } - if ((rv = nng_dial(s2, url, NULL, 0)) != 0) { + if ((strcmp(url, "socket://") != 0) && + ((rv = nng_dial(s2, url, NULL, 0)) != 0)) { goto done; } |
