diff options
| author | Garrett D'Amore <garrett@damore.org> | 2023-12-18 01:12:01 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2023-12-29 15:20:21 -0800 |
| commit | 9caabf76621ba81e7fed5df42971f355b648ff59 (patch) | |
| tree | 2f243965e202862f36c9d57c3053f57806bf70cf /src/sp | |
| parent | e5261536d4f72dccbf1a424bfe426f9635b9d1c3 (diff) | |
| download | nng-9caabf76621ba81e7fed5df42971f355b648ff59.tar.gz nng-9caabf76621ba81e7fed5df42971f355b648ff59.tar.bz2 nng-9caabf76621ba81e7fed5df42971f355b648ff59.zip | |
fixes #1746 Create a new socket:// transport for socketpair() based connections
This transport only listens, and creates connections when
the application calls setopt on the lister with NNG_OPT_SOCKET_FD,
to pass a file descriptor. The FD is turned into an nng_stream,
and utilized for SP. The protocol over the descriptor is identical
to the TCP protocol (not the IPC protocol).
The options for peer information are borrowed from the IPC transport,
as they may be useful for these purposes.
This includes a test suite and full documentation.
Diffstat (limited to 'src/sp')
| -rw-r--r-- | src/sp/transport.c | 8 | ||||
| -rw-r--r-- | src/sp/transport/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/sp/transport/socket/CMakeLists.txt | 15 | ||||
| -rw-r--r-- | src/sp/transport/socket/sockfd.c | 1011 | ||||
| -rw-r--r-- | src/sp/transport/socket/sockfd_test.c | 461 |
5 files changed, 1496 insertions, 2 deletions
diff --git a/src/sp/transport.c b/src/sp/transport.c index 9f4c6522..d0a75b22 100644 --- a/src/sp/transport.c +++ b/src/sp/transport.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -70,6 +70,9 @@ extern void nni_sp_wss_register(void); #ifdef NNG_TRANSPORT_ZEROTIER extern void nni_sp_zt_register(void); #endif +#ifdef NNG_TRANSPORT_FDC +extern void nni_sp_sfd_register(void); +#endif void nni_sp_tran_sys_init(void) @@ -95,6 +98,9 @@ nni_sp_tran_sys_init(void) #ifdef NNG_TRANSPORT_ZEROTIER nni_sp_zt_register(); #endif +#ifdef NNG_TRANSPORT_FDC + nni_sp_sfd_register(); +#endif } // nni_sp_tran_sys_fini finalizes the entire transport system, including all diff --git a/src/sp/transport/CMakeLists.txt b/src/sp/transport/CMakeLists.txt index add8a9c9..0de80015 100644 --- a/src/sp/transport/CMakeLists.txt +++ b/src/sp/transport/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2020 Staysail Systems, Inc. <info@staystail.tech> +# Copyright 2023 Staysail Systems, Inc. <info@staystail.tech> # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -10,6 +10,7 @@ # Transports. nng_directory(transport) +add_subdirectory(socket) add_subdirectory(inproc) add_subdirectory(ipc) add_subdirectory(tcp) diff --git a/src/sp/transport/socket/CMakeLists.txt b/src/sp/transport/socket/CMakeLists.txt new file mode 100644 index 00000000..d79b261e --- /dev/null +++ b/src/sp/transport/socket/CMakeLists.txt @@ -0,0 +1,15 @@ +# +# Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# File Descriptor (or Handle) based connections +nng_directory(socket) + +nng_sources_if(NNG_TRANSPORT_FDC sockfd.c) +nng_defines_if(NNG_TRANSPORT_FDC NNG_TRANSPORT_FDC) +nng_test(sockfd_test)
\ No newline at end of file diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c new file mode 100644 index 00000000..2db052ac --- /dev/null +++ b/src/sp/transport/socket/sockfd.c @@ -0,0 +1,1011 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2019 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> + +#include "core/nng_impl.h" + +// Socket transport. This takes sockets that may have been +// created via another mechanism (usually socketpair) and +// builds a connection using them. The socket is passed in to the listener +// using the NNG_OPT_SOCKFD_FD option. + +typedef struct sfd_tran_pipe sfd_tran_pipe; +typedef struct sfd_tran_ep sfd_tran_ep; + +// sfd_tran_pipe wraps an open file descriptor +struct sfd_tran_pipe { + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + nni_atomic_flag reaped; + nni_reap_node reap; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; +}; + +struct sfd_tran_ep { + nni_mtx mtx; + uint16_t proto; + size_t rcvmax; + bool fini; + bool started; + bool closed; + nng_sockaddr src; + int refcnt; // active pipes + nni_aio *useraio; + nni_aio *connaio; + nni_aio *timeaio; + nni_list busypipes; // busy pipes -- ones passed to socket + nni_list waitpipes; // pipes waiting to match to socket + nni_list negopipes; // pipes busy negotiating + nni_reap_node reap; + nng_stream_listener *listener; + +#ifdef NNG_ENABLE_STATS + nni_stat_item st_rcv_max; +#endif +}; + +static void sfd_tran_pipe_send_start(sfd_tran_pipe *); +static void sfd_tran_pipe_recv_start(sfd_tran_pipe *); +static void sfd_tran_pipe_send_cb(void *); +static void sfd_tran_pipe_recv_cb(void *); +static void sfd_tran_pipe_nego_cb(void *); +static void sfd_tran_ep_fini(void *); +static void sfd_tran_pipe_fini(void *); + +static nni_reap_list sfd_tran_ep_reap_list = { + .rl_offset = offsetof(sfd_tran_ep, reap), + .rl_func = sfd_tran_ep_fini, +}; + +static nni_reap_list sfd_tran_pipe_reap_list = { + .rl_offset = offsetof(sfd_tran_pipe, reap), + .rl_func = sfd_tran_pipe_fini, +}; + +static void +sfd_tran_init(void) +{ +} + +static void +sfd_tran_fini(void) +{ +} + +static void +sfd_tran_pipe_close(void *arg) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + p->closed = true; + nni_mtx_unlock(&p->mtx); + + nni_aio_close(&p->rxaio); + nni_aio_close(&p->txaio); + nni_aio_close(&p->negoaio); + + nng_stream_close(p->conn); +} + +static void +sfd_tran_pipe_stop(void *arg) +{ + sfd_tran_pipe *p = arg; + + nni_aio_stop(&p->rxaio); + nni_aio_stop(&p->txaio); + nni_aio_stop(&p->negoaio); +} + +static int +sfd_tran_pipe_init(void *arg, nni_pipe *npipe) +{ + sfd_tran_pipe *p = arg; + p->npipe = npipe; + + return (0); +} + +static void +sfd_tran_pipe_fini(void *arg) +{ + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep; + + sfd_tran_pipe_stop(p); + if ((ep = p->ep) != NULL) { + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + ep->refcnt--; + if (ep->fini && (ep->refcnt == 0)) { + nni_reap(&sfd_tran_ep_reap_list, ep); + } + nni_mtx_unlock(&ep->mtx); + } + + nni_aio_fini(&p->rxaio); + nni_aio_fini(&p->txaio); + nni_aio_fini(&p->negoaio); + nng_stream_free(p->conn); + nni_msg_free(p->rxmsg); + nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); +} + +static void +sfd_tran_pipe_reap(sfd_tran_pipe *p) +{ + if (!nni_atomic_flag_test_and_set(&p->reaped)) { + if (p->conn != NULL) { + nng_stream_close(p->conn); + } + nni_reap(&sfd_tran_pipe_reap_list, p); + } +} + +static int +sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) +{ + sfd_tran_pipe *p; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); + nni_atomic_flag_reset(&p->reaped); + + *pipep = p; + + return (0); +} + +static void +sfd_tran_ep_match(sfd_tran_ep *ep) +{ + nni_aio *aio; + sfd_tran_pipe *p; + + if (((aio = ep->useraio) == NULL) || + ((p = nni_list_first(&ep->waitpipes)) == NULL)) { + return; + } + nni_list_remove(&ep->waitpipes, p); + nni_list_append(&ep->busypipes, p); + ep->useraio = NULL; + p->rcvmax = ep->rcvmax; + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +sfd_tran_pipe_nego_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + sfd_tran_ep *ep = p->ep; + nni_aio *aio = &p->negoaio; + nni_aio *uaio; + int rv; + + nni_mtx_lock(&ep->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + // We start transmitting before we receive. + if (p->gottxhead < p->wanttxhead) { + p->gottxhead += nni_aio_count(aio); + } else if (p->gotrxhead < p->wantrxhead) { + p->gotrxhead += nni_aio_count(aio); + } + + if (p->gottxhead < p->wanttxhead) { + nni_iov iov; + iov.iov_len = p->wanttxhead - p->gottxhead; + iov.iov_buf = &p->txlen[p->gottxhead]; + // send it down... + nni_aio_set_iov(aio, 1, &iov); + nng_stream_send(p->conn, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + if (p->gotrxhead < p->wantrxhead) { + nni_iov iov; + iov.iov_len = p->wantrxhead - p->gotrxhead; + iov.iov_buf = &p->rxlen[p->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); + nng_stream_recv(p->conn, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + // We have both sent and received the headers. Let's check the + // receiver. + if ((p->rxlen[0] != 0) || (p->rxlen[1] != 'S') || + (p->rxlen[2] != 'P') || (p->rxlen[3] != 0) || (p->rxlen[6] != 0) || + (p->rxlen[7] != 0)) { + rv = NNG_EPROTO; + goto error; + } + + NNI_GET16(&p->rxlen[4], p->peer); + + // We are ready now. We put this in the wait list, and + // then try to run the matcher. + nni_list_remove(&ep->negopipes, p); + nni_list_append(&ep->waitpipes, p); + + sfd_tran_ep_match(ep); + nni_mtx_unlock(&ep->mtx); + + return; + +error: + // If the connection is closed, we need to pass back a different + // error code. This is necessary to avoid a problem where the + // closed status is confused with the accept file descriptor + // being closed. + if (rv == NNG_ECLOSED) { + rv = NNG_ECONNSHUT; + } + nng_stream_close(p->conn); + + if ((uaio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&ep->mtx); + sfd_tran_pipe_reap(p); +} + +static void +sfd_tran_pipe_send_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + int rv; + nni_aio *aio; + size_t n; + nni_msg *msg; + nni_aio *txaio = &p->txaio; + + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->sendq); + + if ((rv = nni_aio_result(txaio)) != 0) { + nni_pipe_bump_error(p->npipe, rv); + // Intentionally we do not queue up another transfer. + // There's an excellent chance that the pipe is no longer + // usable, with a partial transfer. + // The protocol should see this error, and close the + // pipe itself, we hope. + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + n = nni_aio_count(txaio); + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) > 0) { + nng_stream_send(p->conn, txaio); + nni_mtx_unlock(&p->mtx); + return; + } + + nni_aio_list_remove(aio); + sfd_tran_pipe_send_start(p); + + msg = nni_aio_get_msg(aio); + n = nni_msg_len(msg); + nni_pipe_bump_tx(p->npipe, n); + nni_mtx_unlock(&p->mtx); + + nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); + nni_aio_finish_sync(aio, 0, n); +} + +static void +sfd_tran_pipe_recv_cb(void *arg) +{ + sfd_tran_pipe *p = arg; + nni_aio *aio; + int rv; + size_t n; + nni_msg *msg; + nni_aio *rxaio = &p->rxaio; + + nni_mtx_lock(&p->mtx); + aio = nni_list_first(&p->recvq); + + if ((rv = nni_aio_result(rxaio)) != 0) { + goto recv_error; + } + + if (p->closed) { + rv = NNG_ECLOSED; + goto recv_error; + } + + n = nni_aio_count(rxaio); + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) > 0) { + nng_stream_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } + + // If we don't have a message yet, we were reading the message + // header, which is just the length. This tells us the size of the + // message to allocate and how much more to expect. + if (p->rxmsg == NULL) { + uint64_t len; + // We should have gotten a message header. + NNI_GET64(p->rxlen, len); + + // Make sure the message payload is not too big. If it is + // the caller will shut down the pipe. + if ((len > p->rcvmax) && (p->rcvmax > 0)) { + rv = NNG_EMSGSIZE; + goto recv_error; + } + + if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + goto recv_error; + } + + // Submit the rest of the data for a read -- we want to + // read the entire message now. + if (len != 0) { + nni_iov iov; + iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_len = (size_t) len; + + nni_aio_set_iov(rxaio, 1, &iov); + nng_stream_recv(p->conn, rxaio); + nni_mtx_unlock(&p->mtx); + return; + } + } + + // We read a message completely. Let the user know the good news. + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + n = nni_msg_len(msg); + + nni_pipe_bump_rx(p->npipe, n); + sfd_tran_pipe_recv_start(p); + nni_mtx_unlock(&p->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_sync(aio, 0, n); + return; + +recv_error: + nni_aio_list_remove(aio); + msg = p->rxmsg; + p->rxmsg = NULL; + nni_pipe_bump_error(p->npipe, rv); + // Intentionally, we do not queue up another receive. + // The protocol should notice this error and close the pipe. + nni_mtx_unlock(&p->mtx); + + nni_msg_free(msg); + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If this is being sent, then cancel the pending transfer. + // The callback on the txaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->sendq) == aio) { + nni_aio_abort(&p->txaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_send_start(sfd_tran_pipe *p) +{ + nni_aio *aio; + nni_aio *txaio; + nni_msg *msg; + int niov; + nni_iov iov[3]; + uint64_t len; + + if (p->closed) { + while ((aio = nni_list_first(&p->sendq)) != NULL) { + nni_list_remove(&p->sendq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + return; + } + + if ((aio = nni_list_first(&p->sendq)) == NULL) { + return; + } + + // This runs to send the message. + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg) + nni_msg_header_len(msg); + + NNI_PUT64(p->txlen, len); + + txaio = &p->txaio; + niov = 0; + iov[0].iov_buf = p->txlen; + iov[0].iov_len = sizeof(p->txlen); + niov++; + if (nni_msg_header_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + nni_aio_set_iov(txaio, niov, iov); + nng_stream_send(p->conn, txaio); +} + +static void +sfd_tran_pipe_send(void *arg, nni_aio *aio) +{ + sfd_tran_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + // No way to give the message back to the protocol, so + // we just discard it silently to prevent it from leaking. + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, sfd_tran_pipe_send_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&p->sendq, aio); + if (nni_list_first(&p->sendq) == aio) { + sfd_tran_pipe_send_start(p); + } + nni_mtx_unlock(&p->mtx); +} + +static void +sfd_tran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_pipe *p = arg; + + nni_mtx_lock(&p->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&p->mtx); + return; + } + // If receive in progress, then cancel the pending transfer. + // The callback on the rxaio will cause the user aio to + // be canceled too. + if (nni_list_first(&p->recvq) == aio) { + nni_aio_abort(&p->rxaio, rv); + nni_mtx_unlock(&p->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); +} + +static void +sfd_tran_pipe_recv_start(sfd_tran_pipe *p) +{ + nni_aio *rxaio; + nni_iov iov; + NNI_ASSERT(p->rxmsg == NULL); + + if (p->closed) { + nni_aio *aio; + while ((aio = nni_list_first(&p->recvq)) != NULL) { + nni_list_remove(&p->recvq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + return; + } + if (nni_list_empty(&p->recvq)) { + return; + } + + // Schedule a read of the header. + rxaio = &p->rxaio; + iov.iov_buf = p->rxlen; + iov.iov_len = sizeof(p->rxlen); + nni_aio_set_iov(rxaio, 1, &iov); + + nng_stream_recv(p->conn, rxaio); +} + +static void +sfd_tran_pipe_recv(void *arg, nni_aio *aio) +{ + sfd_tran_pipe *p = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&p->mtx); + if ((rv = nni_aio_schedule(aio, sfd_tran_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&p->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&p->recvq, aio); + if (nni_list_first(&p->recvq) == aio) { + sfd_tran_pipe_recv_start(p); + } + nni_mtx_unlock(&p->mtx); +} + +static uint16_t +sfd_tran_pipe_peer(void *arg) +{ + sfd_tran_pipe *p = arg; + + return (p->peer); +} + +static int +sfd_tran_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_tran_pipe *p = arg; + return (nni_stream_get(p->conn, name, buf, szp, t)); +} + +static void +sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) +{ + nni_iov iov; + + ep->refcnt++; + + p->conn = conn; + p->ep = ep; + p->proto = ep->proto; + + p->txlen[0] = 0; + p->txlen[1] = 'S'; + p->txlen[2] = 'P'; + p->txlen[3] = 0; + NNI_PUT16(&p->txlen[4], p->proto); + NNI_PUT16(&p->txlen[6], 0); + + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + iov.iov_len = 8; + iov.iov_buf = &p->txlen[0]; + nni_aio_set_iov(&p->negoaio, 1, &iov); + nni_list_append(&ep->negopipes, p); + + // No timeouts here -- the purpose of timeouts was to guard against + // untrusted callers forcing us to burn files. In this case the + // application is providing us with a file, and should be reasonably + // trusted not to be doing a DoS against itself! :-) Part of the + // rationale is that it may take a while for a child process to reach + // the point where it is ready to negotiate the other side of a + // connection. + nni_aio_set_timeout(&p->negoaio, NNG_DURATION_INFINITE); + nng_stream_send(p->conn, &p->negoaio); +} + +static void +sfd_tran_ep_fini(void *arg) +{ + sfd_tran_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + ep->fini = true; + if (ep->refcnt != 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_mtx_unlock(&ep->mtx); + nni_aio_stop(ep->timeaio); + nni_aio_stop(ep->connaio); + nng_stream_listener_free(ep->listener); + nni_aio_free(ep->timeaio); + nni_aio_free(ep->connaio); + + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + +static void +sfd_tran_ep_close(void *arg) +{ + sfd_tran_ep *ep = arg; + sfd_tran_pipe *p; + + nni_mtx_lock(&ep->mtx); + + ep->closed = true; + nni_aio_close(ep->timeaio); + if (ep->listener != NULL) { + nng_stream_listener_close(ep->listener); + } + NNI_LIST_FOREACH (&ep->negopipes, p) { + sfd_tran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->waitpipes, p) { + sfd_tran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->busypipes, p) { + sfd_tran_pipe_close(p); + } + if (ep->useraio != NULL) { + nni_aio_finish_error(ep->useraio, NNG_ECLOSED); + ep->useraio = NULL; + } + + nni_mtx_unlock(&ep->mtx); +} + +static void +sfd_tran_timer_cb(void *arg) +{ + sfd_tran_ep *ep = arg; + if (nni_aio_result(ep->timeaio) == 0) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } +} + +static void +sfd_tran_accept_cb(void *arg) +{ + sfd_tran_ep *ep = arg; + nni_aio *aio = ep->connaio; + sfd_tran_pipe *p; + int rv; + nng_stream *conn; + + nni_mtx_lock(&ep->mtx); + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + conn = nni_aio_get_output(aio, 0); + if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + nng_stream_free(conn); + goto error; + } + + if (ep->closed) { + sfd_tran_pipe_fini(p); + nng_stream_free(conn); + rv = NNG_ECLOSED; + goto error; + } + sfd_tran_pipe_start(p, conn, ep); + nng_stream_listener_accept(ep->listener, ep->connaio); + nni_mtx_unlock(&ep->mtx); + return; + +error: + // When an error here occurs, let's send a notice up to the consumer. + // That way it can be reported properly. + if ((aio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + } + switch (rv) { + + case NNG_ENOMEM: + case NNG_ENOFILES: + nng_sleep_aio(10, ep->timeaio); + break; + + default: + if (!ep->closed) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } + break; + } + nni_mtx_unlock(&ep->mtx); +} + +static int +sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +{ + sfd_tran_ep *ep; + NNI_ARG_UNUSED(url); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + NNI_LIST_INIT(&ep->busypipes, sfd_tran_pipe, node); + NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); + NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); + + ep->proto = nni_sock_proto_id(sock); + +#ifdef NNG_ENABLE_STATS + static const nni_stat_info rcv_max_info = { + .si_name = "rcv_max", + .si_desc = "maximum receive size", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, + }; + nni_stat_init(&ep->st_rcv_max, &rcv_max_info); +#endif + + *epp = ep; + return (0); +} + +static int +sfd_tran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) +{ + NNI_ARG_UNUSED(dp); + NNI_ARG_UNUSED(url); + NNI_ARG_UNUSED(ndialer); + return (NNG_ENOTSUP); +} + +static int +sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) +{ + sfd_tran_ep *ep; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); + + // Check for invalid URL components -- we only accept a bare scheme + if ((strlen(url->u_host) != 0) || (strlen(url->u_path) != 0) || + (url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } + + if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + + if (((rv = nni_aio_alloc(&ep->connaio, sfd_tran_accept_cb, ep)) != + 0) || + ((rv = nni_aio_alloc(&ep->timeaio, sfd_tran_timer_cb, ep)) != 0) || + ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { + sfd_tran_ep_fini(ep); + return (rv); + } +#ifdef NNG_ENABLE_STATS + nni_listener_add_stat(nlistener, &ep->st_rcv_max); +#endif + + *lp = ep; + return (0); +} + +static void +sfd_tran_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + sfd_tran_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (ep->useraio == aio) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static int +sfd_tran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +sfd_tran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + sfd_tran_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); +#ifdef NNG_ENABLE_STATS + nni_stat_set_value(&ep->st_rcv_max, val); +#endif + } + return (rv); +} + +static int +sfd_tran_ep_bind(void *arg) +{ + sfd_tran_ep *ep = arg; + return (nng_stream_listener_listen(ep->listener)); +} + +static void +sfd_tran_ep_accept(void *arg, nni_aio *aio) +{ + sfd_tran_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (ep->useraio != NULL) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_EBUSY); + return; + } + if ((rv = nni_aio_schedule(aio, sfd_tran_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + ep->useraio = aio; + if (!ep->started) { + ep->started = true; + nng_stream_listener_accept(ep->listener, ep->connaio); + } else { + sfd_tran_ep_match(ep); + } + nni_mtx_unlock(&ep->mtx); +} + +static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_init = sfd_tran_pipe_init, + .p_fini = sfd_tran_pipe_fini, + .p_stop = sfd_tran_pipe_stop, + .p_send = sfd_tran_pipe_send, + .p_recv = sfd_tran_pipe_recv, + .p_close = sfd_tran_pipe_close, + .p_peer = sfd_tran_pipe_peer, + .p_getopt = sfd_tran_pipe_getopt, +}; + +static const nni_option sfd_tran_ep_opts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = sfd_tran_ep_get_recvmaxsz, + .o_set = sfd_tran_ep_set_recvmaxsz, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +sfd_tran_listener_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + rv = nni_stream_listener_get(ep->listener, name, buf, szp, t); + if (rv == NNG_ENOTSUP) { + rv = nni_getopt(sfd_tran_ep_opts, name, ep, buf, szp, t); + } + return (rv); +} + +static int +sfd_tran_listener_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + sfd_tran_ep *ep = arg; + int rv; + + rv = nni_stream_listener_set(ep->listener, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_setopt(sfd_tran_ep_opts, name, ep, buf, sz, t); + } + return (rv); +} + +static nni_sp_dialer_ops sfd_tran_dialer_ops = { + .d_init = sfd_tran_dialer_init, + .d_fini = NULL, + .d_connect = NULL, + .d_close = NULL, + .d_getopt = NULL, + .d_setopt = NULL, +}; + +static nni_sp_listener_ops sfd_tran_listener_ops = { + .l_init = sfd_tran_listener_init, + .l_fini = sfd_tran_ep_fini, + .l_bind = sfd_tran_ep_bind, + .l_accept = sfd_tran_ep_accept, + .l_close = sfd_tran_ep_close, + .l_getopt = sfd_tran_listener_getopt, + .l_setopt = sfd_tran_listener_setopt, +}; + +static nni_sp_tran sfd_tran = { + .tran_scheme = "socket", + .tran_dialer = &sfd_tran_dialer_ops, + .tran_listener = &sfd_tran_listener_ops, + .tran_pipe = &sfd_tran_pipe_ops, + .tran_init = sfd_tran_init, + .tran_fini = sfd_tran_fini, +}; + +void +nni_sp_sfd_register(void) +{ + nni_sp_tran_register(&sfd_tran); +} diff --git a/src/sp/transport/socket/sockfd_test.c b/src/sp/transport/socket/sockfd_test.c new file mode 100644 index 00000000..c12d4466 --- /dev/null +++ b/src/sp/transport/socket/sockfd_test.c @@ -0,0 +1,461 @@ +// +// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2018 Cody Piersall <cody.piersall@gmail.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nuts.h> + +#ifdef NNG_PLATFORM_POSIX +#include <unistd.h> +#include <fcntl.h> +#ifdef NNG_PLATFORM_SUNOS +#include <zone.h> +#endif +#endif + +// FDC tests. +static void +test_sfd_connect_fail(void) +{ + nng_socket s; + + NUTS_OPEN(s); + NUTS_FAIL(nng_dial(s, "socket://", NULL, 0), NNG_ENOTSUP); + NUTS_CLOSE(s); +} + +void +test_sfd_malformed_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_listen(s1, "socket://junk", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_sfd_listen(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s1, "socket://", NULL, 0)); + NUTS_CLOSE(s1); +} + +void +test_sfd_accept(void) +{ + nng_socket s1, s2; + nng_listener l; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + // make sure we won't have to deal with SIGPIPE - EPIPE is better + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l, s1, "socket://")); + NUTS_PASS(nng_listener_start(l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_SLEEP(10); + NUTS_CLOSE(s1); + close(fds[1]); +} + +void +test_sfd_exchange(void) +{ + nng_socket s1, s2; + nng_listener l1, l2; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_create(&l2, s2, "socket://")); + NUTS_PASS(nng_listener_start(l2, 0)); + NUTS_PASS(nng_listener_set_int(l2, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_SLEEP(10); + NUTS_SEND(s1, "hello"); + NUTS_RECV(s2, "hello"); + NUTS_SEND(s2, "there"); + NUTS_RECV(s1, "there"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + close(fds[1]); +} + +void +test_sfd_exchange_late(void) +{ + nng_socket s1, s2; + nng_listener l1, l2; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_create(&l2, s2, "socket://")); + NUTS_PASS(nng_listener_set_int(l2, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_listener_start(l2, 0)); + NUTS_SLEEP(10); + NUTS_SEND(s1, "hello"); + NUTS_RECV(s2, "hello"); + NUTS_SEND(s2, "there"); + NUTS_RECV(s1, "there"); + + NUTS_CLOSE(s1); + NUTS_CLOSE(s2); + close(fds[1]); +} + +void +test_sfd_recv_max(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l0; + nng_listener l1; + size_t sz; + size_t scratch; + int fds[2]; + + NUTS_PASS(nng_socket_pair(fds)); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 200)); + NUTS_PASS(nng_listener_create(&l0, s0, "socket://")); + NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz)); + NUTS_TRUE(sz == 200); + NUTS_PASS(nng_listener_set_size(l0, NNG_OPT_RECVMAXSZ, 100)); + NUTS_PASS(nng_listener_get_size(l0, NNG_OPT_RECVMAXSZ, &scratch)); + NUTS_ASSERT(scratch == 100); + NUTS_PASS(nng_listener_start(l0, 0)); + NUTS_PASS(nng_listener_set_int(l0, NNG_OPT_SOCKET_FD, fds[0])); + + NUTS_OPEN(s1); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_send(s1, msg, 95, 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 95); + NUTS_PASS(nng_send(s1, msg, 150, 0)); + NUTS_FAIL(nng_recv(s0, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_sfd_large(void) +{ + char *buf; + nng_socket s0; + nng_socket s1; + nng_listener l0; + nng_listener l1; + size_t sz; + nng_msg *msg; + int fds[2]; + + sz = 1U << 20; + buf = nng_alloc(sz); // a MB + buf[sz - 1] = 0; + memset(buf, 'A', sz - 1); + NUTS_PASS(nng_socket_pair(fds)); + NUTS_PASS(nng_msg_alloc(&msg, sz)); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 2U << 20)); + NUTS_PASS(nng_listener_create(&l0, s0, "socket://")); + NUTS_PASS(nng_listener_start(l0, 0)); + NUTS_PASS(nng_listener_set_int(l0, NNG_OPT_SOCKET_FD, fds[0])); + + NUTS_OPEN(s1); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_size(s1, NNG_OPT_RECVMAXSZ, 2U << 20)); + NUTS_PASS(nng_listener_create(&l1, s1, "socket://")); + NUTS_PASS(nng_listener_start(l1, 0)); + NUTS_PASS(nng_listener_set_int(l1, NNG_OPT_SOCKET_FD, fds[1])); + nng_msleep(100); + + nng_msg_clear(msg); + NUTS_PASS(nng_msg_append(msg, buf, sz)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + memset(nng_msg_body(msg), 'B', sz - 1); + memset(buf, 'B', sz - 1); + + NUTS_PASS(nng_sendmsg(s1, msg, 0)); + NUTS_PASS(nng_recvmsg(s0, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + nng_msg_clear(msg); + NUTS_PASS(nng_msg_append(msg, buf, sz)); + NUTS_PASS(nng_sendmsg(s0, msg, 0)); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + NUTS_ASSERT(strcmp(nng_msg_body(msg), buf) == 0); + + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); + nng_msg_free(msg); + nng_free(buf, sz); +} + +void +test_sockfd_close_pending(void) +{ + // this test verifies that closing a socket pair that has not + // started negotiation with the other side still works. + int fds[2]; + nng_socket s0; + nng_listener l; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + nng_listen(s0, "socket://", &l, 0); + nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0]); + nng_msleep(10); + NUTS_CLOSE(s0); + close(fds[1]); +} + +void +test_sockfd_close_peer(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0; + nng_listener l; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + close(fds[1]); + nng_msleep(100); + NUTS_CLOSE(s0); +} + +void +test_sockfd_listener_sockaddr(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0; + nng_listener l; + nng_sockaddr sa; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + close(fds[1]); + NUTS_CLOSE(s0); +} + +void +test_sockfd_pipe_sockaddr(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0, s1; + nng_listener l; + nng_sockaddr sa; + nng_msg *msg; + nng_pipe p; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listen(s1, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + + NUTS_SEND(s0, "something"); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_LOCADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + NUTS_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa)); + NUTS_ASSERT(sa.s_family == NNG_AF_UNSPEC); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); + nng_msg_free(msg); +} + +void +test_sockfd_pipe_peer(void) +{ + // this test verifies that closing a socket peer + // during negotiation is ok. + int fds[2]; + nng_socket s0, s1; + nng_listener l; + nng_msg *msg; + nng_pipe p; + uint64_t id; + + NUTS_PASS(nng_socket_pair(fds)); + NUTS_OPEN(s0); + NUTS_OPEN(s1); + NUTS_PASS(nng_listen(s0, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[0])); + NUTS_PASS(nng_listen(s1, "socket://", &l, 0)); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[1])); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + + NUTS_SEND(s0, "something"); + NUTS_PASS(nng_recvmsg(s1, &msg, 0)); + p = nng_msg_get_pipe(msg); + NUTS_ASSERT(nng_pipe_id(p) != -1); +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_PID, &id)); + NUTS_ASSERT(id == (uint64_t) getpid()); +#endif +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_UID, &id)); + NUTS_ASSERT(id == (uint64_t) getuid()); +#endif +#if defined(NNG_PLATFORM_DARWIN) || defined(NNG_PLATFORM_LINUX) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_GID, &id)); + NUTS_ASSERT(id == (uint64_t) getgid()); +#endif +#if defined(NNG_PLATFORM_SUNOS) + NUTS_PASS(nng_pipe_get_uint64(p, NNG_OPT_PEER_ZONEID, &id)); + NUTS_ASSERT(id == (uint64_t) getzoneid()); +#else + NUTS_FAIL( + nng_pipe_get_uint64(p, NNG_OPT_PEER_ZONEID, &id), NNG_ENOTSUP); +#endif + + nng_msg_free(msg); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + +void +test_sfd_listen_full(void) +{ +#ifndef NNG_SFD_LISTEN_QUEUE +#define NNG_SFD_LISTEN_QUEUE 16 +#endif + + int fds[NNG_SFD_LISTEN_QUEUE * 2]; + nng_socket s; + int i; + nng_listener l; + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i += 2) { + int pair[2]; + NUTS_PASS(nng_socket_pair(pair)); + fds[i] = pair[0]; + fds[i+1] = pair[1]; + } + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i++) { + if (i < NNG_SFD_LISTEN_QUEUE) { + NUTS_PASS(nng_listener_set_int( + l, NNG_OPT_SOCKET_FD, fds[i])); + } else { + NUTS_FAIL( + nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fds[i]), + NNG_ENOSPC); + } + } + for (i = 0; i < NNG_SFD_LISTEN_QUEUE * 2; i++) { + close(fds[i]); + } + NUTS_CLOSE(s); +} + +void +test_sfd_fd_option_type(void) +{ + nng_socket s; + nng_listener l; + + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + NUTS_FAIL(nng_listener_set_bool(l, NNG_OPT_SOCKET_FD, false), NNG_EBADTYPE); + NUTS_CLOSE(s); +} + +void +test_sfd_fd_dev_zero(void) +{ +#ifdef NNG_PLATFORM_POSIX + nng_socket s; + nng_listener l; + int fd; + + // dev/zero produces a stream of zero bytes leading to protocol error + NUTS_ASSERT((fd = open("/dev/zero", O_RDONLY, 0777)) >= 0); + + NUTS_OPEN(s); + NUTS_PASS(nng_listener_create(&l, s, "socket://")); + NUTS_PASS(nng_listener_set_int(l, NNG_OPT_SOCKET_FD, fd)); + nng_msleep(100); + NUTS_CLOSE(s); +#endif +} + +NUTS_TESTS = { + { "socket connect fail", test_sfd_connect_fail }, + { "socket malformed address", test_sfd_malformed_address }, +#ifdef NNG_HAVE_SOCKETPAIR + { "socket listen", test_sfd_listen }, + { "socket accept", test_sfd_accept }, + { "socket exchange", test_sfd_exchange }, + { "socket exchange late", test_sfd_exchange_late }, + { "socket recv max", test_sfd_recv_max }, + { "socket exchange large", test_sfd_large }, + { "socket close pending", test_sockfd_close_pending }, + { "socket close peer", test_sockfd_close_peer }, + { "socket listener address", test_sockfd_listener_sockaddr }, + { "socket pipe address", test_sockfd_pipe_sockaddr }, + { "socket pipe peer id", test_sockfd_pipe_peer }, + { "socket listen full", test_sfd_listen_full }, + { "socket bad fd type", test_sfd_fd_option_type }, + { "socket dev zero", test_sfd_fd_dev_zero }, +#endif + + { NULL, NULL }, +};
\ No newline at end of file |
