diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-11-01 22:05:35 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-11-08 17:50:24 -0800 |
| commit | fc6882305f0b5e06e58a0a25740f422d133015b5 (patch) | |
| tree | 714b1fa4656253c8731a8f3f0861c24715440f95 /src/transport/ipc | |
| parent | 4bf06d03f6ebead7f4e0603a2da3b1b891887878 (diff) | |
| download | nng-fc6882305f0b5e06e58a0a25740f422d133015b5.tar.gz nng-fc6882305f0b5e06e58a0a25740f422d133015b5.tar.bz2 nng-fc6882305f0b5e06e58a0a25740f422d133015b5.zip | |
fixes #1041 Abstract socket address for IPC
fixes #1326 Linux IPC could use fchmod
fixes #1327 getsockname on ipc may not work
This introduces an abstract:// style transport, which on Linux
results in using the abstract socket with the given name (not
including the leading NULL byte). A new NNG_AF_ABSTRACT is
provided. Auto bind abstract sockets are also supported.
While here we have inlined the aios for the POSIX ipc pipe
objects, eliminating at least one set of failure paths, and
have also performed various other cleanups.
A unix:// alias is available on POSIX systems, which acts just
like ipc:// (and is fact just an alias). This is supplied so
that in the future we can add support for AF_UNIX on Windows.
We've also absorbed the ipcperms test into the new ipc_test suite.
Finally we are now enforcing that IPC path names on Windows are
not over the maximum size, rather than just silently truncating
them.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 849 | ||||
| -rw-r--r-- | src/transport/ipc/ipc_test.c | 318 |
3 files changed, 758 insertions, 412 deletions
diff --git a/src/transport/ipc/CMakeLists.txt b/src/transport/ipc/CMakeLists.txt index 7405e765..d9b771a9 100644 --- a/src/transport/ipc/CMakeLists.txt +++ b/src/transport/ipc/CMakeLists.txt @@ -14,4 +14,5 @@ mark_as_advanced(NNG_TRANSPORT_IPC) nng_sources_if(NNG_TRANSPORT_IPC ipc.c) nng_headers_if(NNG_TRANSPORT_IPC nng/transport/ipc/ipc.h) -nng_defines_if(NNG_TRANSPORT_IPC NNG_TRANSPORT_IPC)
\ No newline at end of file +nng_defines_if(NNG_TRANSPORT_IPC NNG_TRANSPORT_IPC) +nng_test(ipc_test)
\ No newline at end of file diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index c09b56be..a0b22506 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -21,198 +21,193 @@ // Windows named pipes. Other platforms could use other mechanisms, // but all implementations on the platform must use the same mechanism. -typedef struct ipctran_pipe ipctran_pipe; -typedef struct ipctran_ep ipctran_ep; +typedef struct ipc_pipe ipc_pipe; +typedef struct ipc_ep ipc_ep; // ipc_pipe is one end of an IPC connection. -struct ipctran_pipe { +struct ipc_pipe { nng_stream * conn; uint16_t peer; uint16_t proto; - size_t rcvmax; + size_t rcv_max; bool closed; nni_sockaddr sa; - ipctran_ep * ep; - nni_pipe * npipe; + ipc_ep * ep; + nni_pipe * pipe; nni_list_node node; nni_atomic_flag reaped; nni_reap_item reap; - uint8_t txhead[1 + sizeof(uint64_t)]; - uint8_t rxhead[1 + sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio * txaio; - nni_aio * rxaio; - nni_aio * negoaio; - nni_msg * rxmsg; + uint8_t tx_head[1 + sizeof(uint64_t)]; + uint8_t rx_head[1 + sizeof(uint64_t)]; + size_t got_tx_head; + size_t got_rx_head; + size_t want_tx_head; + size_t want_rx_head; + nni_list recv_q; + nni_list send_q; + nni_aio tx_aio; + nni_aio rx_aio; + nni_aio neg_aio; + nni_msg * rx_msg; nni_mtx mtx; }; -struct ipctran_ep { +struct ipc_ep { nni_mtx mtx; nni_sockaddr sa; - size_t rcvmax; + size_t rcv_max; uint16_t proto; bool started; bool closed; bool fini; - int refcnt; + int ref_cnt; nng_stream_dialer * dialer; nng_stream_listener *listener; - nni_aio * useraio; - nni_aio * connaio; - nni_aio * timeaio; - nni_list busypipes; // busy pipes -- ones passed to socket - nni_list waitpipes; // pipes waiting to match to socket - nni_list negopipes; // pipes busy negotiating + nni_aio * user_aio; + nni_aio * conn_aio; + nni_aio * time_aio; + nni_list busy_pipes; // busy pipes -- ones passed to socket + nni_list wait_pipes; // pipes waiting to match to socket + nni_list neg_pipes; // pipes busy negotiating nni_reap_item reap; - nni_stat_item st_rcvmaxsz; + nni_stat_item st_rcv_max; }; -static void ipctran_pipe_send_start(ipctran_pipe *); -static void ipctran_pipe_recv_start(ipctran_pipe *); -static void ipctran_pipe_send_cb(void *); -static void ipctran_pipe_recv_cb(void *); -static void ipctran_pipe_nego_cb(void *); -static void ipctran_ep_fini(void *); +static void ipc_pipe_send_start(ipc_pipe *p); +static void ipc_pipe_recv_start(ipc_pipe *p); +static void ipc_pipe_send_cb(void *); +static void ipc_pipe_recv_cb(void *); +static void ipc_pipe_neg_cb(void *); +static void ipc_ep_fini(void *); static int -ipctran_init(void) +ipc_tran_init(void) { return (0); } static void -ipctran_fini(void) +ipc_tran_fini(void) { } static void -ipctran_pipe_close(void *arg) +ipc_pipe_close(void *arg) { - ipctran_pipe *p = arg; + ipc_pipe *p = arg; nni_mtx_lock(&p->mtx); p->closed = true; nni_mtx_unlock(&p->mtx); - nni_aio_close(p->rxaio); - nni_aio_close(p->txaio); - nni_aio_close(p->negoaio); + nni_aio_close(&p->rx_aio); + nni_aio_close(&p->tx_aio); + nni_aio_close(&p->neg_aio); nng_stream_close(p->conn); } static void -ipctran_pipe_stop(void *arg) +ipc_pipe_stop(void *arg) { - ipctran_pipe *p = arg; + ipc_pipe *p = arg; - nni_aio_stop(p->rxaio); - nni_aio_stop(p->txaio); - nni_aio_stop(p->negoaio); + nni_aio_stop(&p->rx_aio); + nni_aio_stop(&p->tx_aio); + nni_aio_stop(&p->neg_aio); } static int -ipctran_pipe_init(void *arg, nni_pipe *npipe) +ipc_pipe_init(void *arg, nni_pipe *pipe) { - ipctran_pipe *p = arg; - p->npipe = npipe; + ipc_pipe *p = arg; + p->pipe = pipe; return (0); } static void -ipctran_pipe_fini(void *arg) +ipc_pipe_fini(void *arg) { - ipctran_pipe *p = arg; - ipctran_ep * ep; + ipc_pipe *p = arg; + ipc_ep * ep; - ipctran_pipe_stop(p); + ipc_pipe_stop(p); if ((ep = p->ep) != NULL) { nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&ep->reap, ipctran_ep_fini, ep); + ep->ref_cnt--; + if (ep->fini && (ep->ref_cnt == 0)) { + nni_reap(&ep->reap, ipc_ep_fini, ep); } nni_mtx_unlock(&ep->mtx); } - nni_aio_free(p->rxaio); - nni_aio_free(p->txaio); - nni_aio_free(p->negoaio); + nni_aio_fini(&p->rx_aio); + nni_aio_fini(&p->tx_aio); + nni_aio_fini(&p->neg_aio); nng_stream_free(p->conn); - if (p->rxmsg) { - nni_msg_free(p->rxmsg); + if (p->rx_msg) { + nni_msg_free(p->rx_msg); } nni_mtx_fini(&p->mtx); NNI_FREE_STRUCT(p); } static void -ipctran_pipe_reap(ipctran_pipe *p) +ipc_pipe_reap(ipc_pipe *p) { if (!nni_atomic_flag_test_and_set(&p->reaped)) { if (p->conn != NULL) { nng_stream_close(p->conn); } - nni_reap(&p->reap, ipctran_pipe_fini, p); + nni_reap(&p->reap, ipc_pipe_fini, p); } } static int -ipctran_pipe_alloc(ipctran_pipe **pipep) +ipc_pipe_alloc(ipc_pipe **pipe_p) { - ipctran_pipe *p; - int rv; + ipc_pipe *p; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_alloc(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->negoaio, ipctran_pipe_nego_cb, p)) != - 0)) { - ipctran_pipe_fini(p); - return (rv); - } - nni_aio_list_init(&p->sendq); - nni_aio_list_init(&p->recvq); + nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p); + nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p); + nni_aio_init(&p->neg_aio, ipc_pipe_neg_cb, p); + nni_aio_list_init(&p->send_q); + nni_aio_list_init(&p->recv_q); nni_atomic_flag_reset(&p->reaped); - *pipep = p; + *pipe_p = p; return (0); } static void -ipctran_ep_match(ipctran_ep *ep) +ipc_ep_match(ipc_ep *ep) { - nni_aio * aio; - ipctran_pipe *p; + nni_aio * aio; + ipc_pipe *p; - if (((aio = ep->useraio) == NULL) || - ((p = nni_list_first(&ep->waitpipes)) == NULL)) { + if (((aio = ep->user_aio) == NULL) || + ((p = nni_list_first(&ep->wait_pipes)) == NULL)) { return; } - nni_list_remove(&ep->waitpipes, p); - nni_list_append(&ep->busypipes, p); - ep->useraio = NULL; - p->rcvmax = ep->rcvmax; + nni_list_remove(&ep->wait_pipes, p); + nni_list_append(&ep->busy_pipes, p); + ep->user_aio = NULL; + p->rcv_max = ep->rcv_max; nni_aio_set_output(aio, 0, p); nni_aio_finish(aio, 0, 0); } static void -ipctran_pipe_nego_cb(void *arg) +ipc_pipe_neg_cb(void *arg) { - ipctran_pipe *p = arg; - ipctran_ep * ep = p->ep; - nni_aio * aio = p->negoaio; - nni_aio * uaio; - int rv; + ipc_pipe *p = arg; + ipc_ep * ep = p->ep; + nni_aio * aio = &p->neg_aio; + nni_aio * user_aio; + int rv; nni_mtx_lock(&ep->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -220,25 +215,25 @@ ipctran_pipe_nego_cb(void *arg) } // We start transmitting before we receive. - if (p->gottxhead < p->wanttxhead) { - p->gottxhead += nni_aio_count(aio); - } else if (p->gotrxhead < p->wantrxhead) { - p->gotrxhead += nni_aio_count(aio); + if (p->got_tx_head < p->want_tx_head) { + p->got_tx_head += nni_aio_count(aio); + } else if (p->got_rx_head < p->want_rx_head) { + p->got_rx_head += nni_aio_count(aio); } - if (p->gottxhead < p->wanttxhead) { + if (p->got_tx_head < p->want_tx_head) { nni_iov iov; - iov.iov_len = p->wanttxhead - p->gottxhead; - iov.iov_buf = &p->txhead[p->gottxhead]; + iov.iov_len = p->want_tx_head - p->got_tx_head; + iov.iov_buf = &p->tx_head[p->got_tx_head]; nni_aio_set_iov(aio, 1, &iov); // send it down... nng_stream_send(p->conn, aio); nni_mtx_unlock(&p->ep->mtx); return; } - if (p->gotrxhead < p->wantrxhead) { + if (p->got_rx_head < p->want_rx_head) { nni_iov iov; - iov.iov_len = p->wantrxhead - p->gotrxhead; - iov.iov_buf = &p->rxhead[p->gotrxhead]; + iov.iov_len = p->want_rx_head - p->got_rx_head; + iov.iov_buf = &p->rx_head[p->got_rx_head]; nni_aio_set_iov(aio, 1, &iov); nng_stream_recv(p->conn, aio); nni_mtx_unlock(&p->ep->mtx); @@ -246,21 +241,21 @@ ipctran_pipe_nego_cb(void *arg) } // We have both sent and received the headers. Lets check the // receive side header. - if ((p->rxhead[0] != 0) || (p->rxhead[1] != 'S') || - (p->rxhead[2] != 'P') || (p->rxhead[3] != 0) || - (p->rxhead[6] != 0) || (p->rxhead[7] != 0)) { + if ((p->rx_head[0] != 0) || (p->rx_head[1] != 'S') || + (p->rx_head[2] != 'P') || (p->rx_head[3] != 0) || + (p->rx_head[6] != 0) || (p->rx_head[7] != 0)) { rv = NNG_EPROTO; goto error; } - NNI_GET16(&p->rxhead[4], p->peer); + NNI_GET16(&p->rx_head[4], p->peer); // We are all ready now. We put this in the wait list, and // then try to run the matcher. - nni_list_remove(&ep->negopipes, p); - nni_list_append(&ep->waitpipes, p); + nni_list_remove(&ep->neg_pipes, p); + nni_list_append(&ep->wait_pipes, p); - ipctran_ep_match(ep); + ipc_ep_match(ep); nni_mtx_unlock(&ep->mtx); return; @@ -269,34 +264,34 @@ error: nng_stream_close(p->conn); // If we are waiting to negotiate on a client side, then a failure // here has to be passed to the user app. - if ((uaio = ep->useraio) != NULL) { - ep->useraio = NULL; - nni_aio_finish_error(uaio, rv); + if ((user_aio = ep->user_aio) != NULL) { + ep->user_aio = NULL; + nni_aio_finish_error(user_aio, rv); } nni_mtx_unlock(&ep->mtx); - ipctran_pipe_reap(p); + ipc_pipe_reap(p); } static void -ipctran_pipe_send_cb(void *arg) +ipc_pipe_send_cb(void *arg) { - ipctran_pipe *p = arg; - int rv; - nni_aio * aio; - size_t n; - nni_msg * msg; - nni_aio * txaio = p->txaio; + ipc_pipe *p = arg; + int rv; + nni_aio * aio; + size_t n; + nni_msg * msg; + nni_aio * tx_aio = &p->tx_aio; nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_result(txaio)) != 0) { - nni_pipe_bump_error(p->npipe, rv); + if ((rv = nni_aio_result(tx_aio)) != 0) { + nni_pipe_bump_error(p->pipe, rv); // Intentionally we do not queue up another transfer. // There's an excellent chance that the pipe is no longer // usable, with a partial transfer. // The protocol should see this error, and close the // pipe itself, we hope. - while ((aio = nni_list_first(&p->sendq)) != NULL) { + while ((aio = nni_list_first(&p->send_q)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } @@ -304,21 +299,21 @@ ipctran_pipe_send_cb(void *arg) return; } - n = nni_aio_count(txaio); - nni_aio_iov_advance(txaio, n); - if (nni_aio_iov_count(txaio) != 0) { - nng_stream_send(p->conn, txaio); + n = nni_aio_count(tx_aio); + nni_aio_iov_advance(tx_aio, n); + if (nni_aio_iov_count(tx_aio) != 0) { + nng_stream_send(p->conn, tx_aio); nni_mtx_unlock(&p->mtx); return; } - aio = nni_list_first(&p->sendq); + aio = nni_list_first(&p->send_q); nni_aio_list_remove(aio); - ipctran_pipe_send_start(p); + ipc_pipe_send_start(p); msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); - nni_pipe_bump_tx(p->npipe, n); + nni_pipe_bump_tx(p->pipe, n); nni_mtx_unlock(&p->mtx); nni_aio_set_msg(aio, NULL); @@ -327,29 +322,29 @@ ipctran_pipe_send_cb(void *arg) } static void -ipctran_pipe_recv_cb(void *arg) +ipc_pipe_recv_cb(void *arg) { - ipctran_pipe *p = arg; - nni_aio * aio; - int rv; - size_t n; - nni_msg * msg; - nni_aio * rxaio = p->rxaio; + ipc_pipe *p = arg; + nni_aio * aio; + int rv; + size_t n; + nni_msg * msg; + nni_aio * rx_aio = &p->rx_aio; nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_result(rxaio)) != 0) { + if ((rv = nni_aio_result(rx_aio)) != 0) { // Error on receive. This has to cause an error back - // to the user. Also, if we had allocated an rxmsg, lets + // to the user. Also, if we had allocated an rx_msg, lets // toss it. goto error; } - n = nni_aio_count(rxaio); - nni_aio_iov_advance(rxaio, n); - if (nni_aio_iov_count(rxaio) != 0) { + n = nni_aio_count(rx_aio); + nni_aio_iov_advance(rx_aio, n); + if (nni_aio_iov_count(rx_aio) != 0) { // Was this a partial read? If so then resubmit for the rest. - nng_stream_recv(p->conn, rxaio); + nng_stream_recv(p->conn, rx_aio); nni_mtx_unlock(&p->mtx); return; } @@ -357,21 +352,21 @@ ipctran_pipe_recv_cb(void *arg) // If we don't have a message yet, we were reading the message // header, which is just the length. This tells us the size of the // message to allocate and how much more to expect. - if (p->rxmsg == NULL) { + if (p->rx_msg == NULL) { uint64_t len; // Check to make sure we got msg type 1. - if (p->rxhead[0] != 1) { + if (p->rx_head[0] != 1) { rv = NNG_EPROTO; goto error; } // We should have gotten a message header. - NNI_GET64(p->rxhead + 1, len); + NNI_GET64(p->rx_head + 1, len); // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. - if ((len > p->rcvmax) && (p->rcvmax > 0)) { + if ((len > p->rcv_max) && (p->rcv_max > 0)) { rv = NNG_EMSGSIZE; goto error; } @@ -381,7 +376,7 @@ ipctran_pipe_recv_cb(void *arg) // lock for the read side in the future, so that we allow // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. - if ((rv = nni_msg_alloc(&p->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&p->rx_msg, (size_t) len)) != 0) { goto error; } @@ -389,11 +384,11 @@ ipctran_pipe_recv_cb(void *arg) nni_iov iov; // Submit the rest of the data for a read -- we want to // read the entire message now. - iov.iov_buf = nni_msg_body(p->rxmsg); + iov.iov_buf = nni_msg_body(p->rx_msg); iov.iov_len = (size_t) len; - nni_aio_set_iov(rxaio, 1, &iov); - nng_stream_recv(p->conn, rxaio); + nni_aio_set_iov(rx_aio, 1, &iov); + nng_stream_recv(p->conn, rx_aio); nni_mtx_unlock(&p->mtx); return; } @@ -402,13 +397,13 @@ ipctran_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. - aio = nni_list_first(&p->recvq); + aio = nni_list_first(&p->recv_q); nni_aio_list_remove(aio); - msg = p->rxmsg; - p->rxmsg = NULL; - n = nni_msg_len(msg); - nni_pipe_bump_rx(p->npipe, n); - ipctran_pipe_recv_start(p); + msg = p->rx_msg; + p->rx_msg = NULL; + n = nni_msg_len(msg); + nni_pipe_bump_rx(p->pipe, n); + ipc_pipe_recv_start(p); nni_mtx_unlock(&p->mtx); nni_aio_set_msg(aio, msg); @@ -416,13 +411,13 @@ ipctran_pipe_recv_cb(void *arg) return; error: - while ((aio = nni_list_first(&p->recvq)) != NULL) { + while ((aio = nni_list_first(&p->recv_q)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } - msg = p->rxmsg; - p->rxmsg = NULL; - nni_pipe_bump_error(p->npipe, rv); + msg = p->rx_msg; + p->rx_msg = NULL; + nni_pipe_bump_error(p->pipe, rv); // Intentionally, we do not queue up another receive. // The protocol should notice this error and close the pipe. nni_mtx_unlock(&p->mtx); @@ -431,9 +426,9 @@ error: } static void -ipctran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) +ipc_pipe_send_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = arg; + ipc_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -441,10 +436,10 @@ ipctran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) return; } // If this is being sent, then cancel the pending transfer. - // The callback on the txaio will cause the user aio to + // The callback on the tx_aio will cause the user aio to // be canceled too. - if (nni_list_first(&p->sendq) == aio) { - nni_aio_abort(p->txaio, rv); + if (nni_list_first(&p->send_q) == aio) { + nni_aio_abort(&p->tx_aio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -455,23 +450,22 @@ ipctran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) } static void -ipctran_pipe_send_start(ipctran_pipe *p) +ipc_pipe_send_start(ipc_pipe *p) { nni_aio *aio; - nni_aio *txaio; nni_msg *msg; - int niov; + int nio; nni_iov iov[3]; uint64_t len; if (p->closed) { - while ((aio = nni_list_first(&p->sendq)) != NULL) { - nni_list_remove(&p->sendq, aio); + while ((aio = nni_list_first(&p->send_q)) != NULL) { + nni_list_remove(&p->send_q, aio); nni_aio_finish_error(aio, NNG_ECLOSED); } return; } - if ((aio = nni_list_first(&p->sendq)) == NULL) { + if ((aio = nni_list_first(&p->send_q)) == NULL) { return; } @@ -479,54 +473,53 @@ ipctran_pipe_send_start(ipctran_pipe *p) msg = nni_aio_get_msg(aio); len = nni_msg_len(msg) + nni_msg_header_len(msg); - p->txhead[0] = 1; // message type, 1. - NNI_PUT64(p->txhead + 1, len); + p->tx_head[0] = 1; // message type, 1. + NNI_PUT64(p->tx_head + 1, len); - txaio = p->txaio; - niov = 0; - iov[0].iov_buf = p->txhead; - iov[0].iov_len = sizeof(p->txhead); - niov++; + nio = 0; + iov[0].iov_buf = p->tx_head; + iov[0].iov_len = sizeof(p->tx_head); + nio++; if (nni_msg_header_len(msg) > 0) { - iov[niov].iov_buf = nni_msg_header(msg); - iov[niov].iov_len = nni_msg_header_len(msg); - niov++; + iov[nio].iov_buf = nni_msg_header(msg); + iov[nio].iov_len = nni_msg_header_len(msg); + nio++; } if (nni_msg_len(msg) > 0) { - iov[niov].iov_buf = nni_msg_body(msg); - iov[niov].iov_len = nni_msg_len(msg); - niov++; + iov[nio].iov_buf = nni_msg_body(msg); + iov[nio].iov_len = nni_msg_len(msg); + nio++; } - nni_aio_set_iov(txaio, niov, iov); - nng_stream_send(p->conn, txaio); + nni_aio_set_iov(&p->tx_aio, nio, iov); + nng_stream_send(p->conn, &p->tx_aio); } static void -ipctran_pipe_send(void *arg, nni_aio *aio) +ipc_pipe_send(void *arg, nni_aio *aio) { - ipctran_pipe *p = arg; - int rv; + ipc_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, ipctran_pipe_send_cancel, p)) != 0) { + if ((rv = nni_aio_schedule(aio, ipc_pipe_send_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&p->sendq, aio); - if (nni_list_first(&p->sendq) == aio) { - ipctran_pipe_send_start(p); + nni_list_append(&p->send_q, aio); + if (nni_list_first(&p->send_q) == aio) { + ipc_pipe_send_start(p); } nni_mtx_unlock(&p->mtx); } static void -ipctran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +ipc_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_pipe *p = arg; + ipc_pipe *p = arg; nni_mtx_lock(&p->mtx); if (!nni_aio_list_active(aio)) { @@ -534,10 +527,10 @@ ipctran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) return; } // If receive in progress, then cancel the pending transfer. - // The callback on the rxaio will cause the user aio to + // The callback on the rx_aio will cause the user aio to // be canceled too. - if (nni_list_first(&p->recvq) == aio) { - nni_aio_abort(p->rxaio, rv); + if (nni_list_first(&p->recv_q) == aio) { + nni_aio_abort(&p->rx_aio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -547,38 +540,36 @@ ipctran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) } static void -ipctran_pipe_recv_start(ipctran_pipe *p) +ipc_pipe_recv_start(ipc_pipe *p) { - nni_aio *rxaio; - nni_iov iov; - NNI_ASSERT(p->rxmsg == NULL); + nni_iov iov; + NNI_ASSERT(p->rx_msg == NULL); if (p->closed) { nni_aio *aio; - while ((aio = nni_list_first(&p->recvq)) != NULL) { - nni_list_remove(&p->recvq, aio); + while ((aio = nni_list_first(&p->recv_q)) != NULL) { + nni_list_remove(&p->recv_q, aio); nni_aio_finish_error(aio, NNG_ECLOSED); } return; } - if (nni_list_empty(&p->recvq)) { + if (nni_list_empty(&p->recv_q)) { return; } // Schedule a read of the IPC header. - rxaio = p->rxaio; - iov.iov_buf = p->rxhead; - iov.iov_len = sizeof(p->rxhead); - nni_aio_set_iov(rxaio, 1, &iov); + iov.iov_buf = p->rx_head; + iov.iov_len = sizeof(p->rx_head); + nni_aio_set_iov(&p->rx_aio, 1, &iov); - nng_stream_recv(p->conn, rxaio); + nng_stream_recv(p->conn, &p->rx_aio); } static void -ipctran_pipe_recv(void *arg, nni_aio *aio) +ipc_pipe_recv(void *arg, nni_aio *aio) { - ipctran_pipe *p = arg; - int rv; + ipc_pipe *p = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -589,130 +580,130 @@ ipctran_pipe_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, ipctran_pipe_recv_cancel, p)) != 0) { + if ((rv = nni_aio_schedule(aio, ipc_pipe_recv_cancel, p)) != 0) { nni_mtx_unlock(&p->mtx); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&p->recvq, aio); - if (nni_list_first(&p->recvq) == aio) { - ipctran_pipe_recv_start(p); + nni_list_append(&p->recv_q, aio); + if (nni_list_first(&p->recv_q) == aio) { + ipc_pipe_recv_start(p); } nni_mtx_unlock(&p->mtx); } static uint16_t -ipctran_pipe_peer(void *arg) +ipc_pipe_peer(void *arg) { - ipctran_pipe *p = arg; + ipc_pipe *p = arg; return (p->peer); } static void -ipctran_pipe_start(ipctran_pipe *p, nng_stream *conn, ipctran_ep *ep) +ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep) { nni_iov iov; - ep->refcnt++; + ep->ref_cnt++; p->conn = conn; p->ep = ep; p->proto = ep->proto; - p->txhead[0] = 0; - p->txhead[1] = 'S'; - p->txhead[2] = 'P'; - p->txhead[3] = 0; - NNI_PUT16(&p->txhead[4], p->proto); - NNI_PUT16(&p->txhead[6], 0); - - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - iov.iov_len = 8; - iov.iov_buf = &p->txhead[0]; - nni_aio_set_iov(p->negoaio, 1, &iov); - nni_list_append(&ep->negopipes, p); - - nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate - nng_stream_send(p->conn, p->negoaio); + p->tx_head[0] = 0; + p->tx_head[1] = 'S'; + p->tx_head[2] = 'P'; + p->tx_head[3] = 0; + NNI_PUT16(&p->tx_head[4], p->proto); + NNI_PUT16(&p->tx_head[6], 0); + + p->got_rx_head = 0; + p->got_tx_head = 0; + p->want_rx_head = 8; + p->want_tx_head = 8; + iov.iov_len = 8; + iov.iov_buf = &p->tx_head[0]; + nni_aio_set_iov(&p->neg_aio, 1, &iov); + nni_list_append(&ep->neg_pipes, p); + + nni_aio_set_timeout(&p->neg_aio, 10000); // 10 sec timeout to negotiate + nng_stream_send(p->conn, &p->neg_aio); } static void -ipctran_ep_close(void *arg) +ipc_ep_close(void *arg) { - ipctran_ep * ep = arg; - ipctran_pipe *p; + ipc_ep * ep = arg; + ipc_pipe *p; nni_mtx_lock(&ep->mtx); ep->closed = true; - nni_aio_close(ep->timeaio); + nni_aio_close(ep->time_aio); if (ep->dialer != NULL) { nng_stream_dialer_close(ep->dialer); } if (ep->listener != NULL) { nng_stream_listener_close(ep->listener); } - NNI_LIST_FOREACH (&ep->negopipes, p) { - ipctran_pipe_close(p); + NNI_LIST_FOREACH (&ep->neg_pipes, p) { + ipc_pipe_close(p); } - NNI_LIST_FOREACH (&ep->waitpipes, p) { - ipctran_pipe_close(p); + NNI_LIST_FOREACH (&ep->wait_pipes, p) { + ipc_pipe_close(p); } - NNI_LIST_FOREACH (&ep->busypipes, p) { - ipctran_pipe_close(p); + NNI_LIST_FOREACH (&ep->busy_pipes, p) { + ipc_pipe_close(p); } - if (ep->useraio != NULL) { - nni_aio_finish_error(ep->useraio, NNG_ECLOSED); - ep->useraio = NULL; + if (ep->user_aio != NULL) { + nni_aio_finish_error(ep->user_aio, NNG_ECLOSED); + ep->user_aio = NULL; } nni_mtx_unlock(&ep->mtx); } static void -ipctran_ep_fini(void *arg) +ipc_ep_fini(void *arg) { - ipctran_ep *ep = arg; + ipc_ep *ep = arg; nni_mtx_lock(&ep->mtx); ep->fini = true; - if (ep->refcnt != 0) { + if (ep->ref_cnt != 0) { nni_mtx_unlock(&ep->mtx); return; } nni_mtx_unlock(&ep->mtx); - nni_aio_stop(ep->timeaio); - nni_aio_stop(ep->connaio); + nni_aio_stop(ep->time_aio); + nni_aio_stop(ep->conn_aio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_free(ep->timeaio); - nni_aio_free(ep->connaio); + nni_aio_free(ep->time_aio); + nni_aio_free(ep->conn_aio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } static void -ipctran_timer_cb(void *arg) +ipc_ep_timer_cb(void *arg) { - ipctran_ep *ep = arg; + ipc_ep *ep = arg; nni_mtx_lock(&ep->mtx); - if (nni_aio_result(ep->timeaio) == 0) { - nng_stream_listener_accept(ep->listener, ep->connaio); + if (nni_aio_result(ep->time_aio) == 0) { + nng_stream_listener_accept(ep->listener, ep->conn_aio); } nni_mtx_unlock(&ep->mtx); } static void -ipctran_accept_cb(void *arg) +ipc_ep_accept_cb(void *arg) { - ipctran_ep * ep = arg; - nni_aio * aio = ep->connaio; - ipctran_pipe *p; - int rv; - nng_stream * conn; + ipc_ep * ep = arg; + nni_aio * aio = ep->conn_aio; + ipc_pipe * p; + int rv; + nng_stream *conn; nni_mtx_lock(&ep->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -720,26 +711,26 @@ ipctran_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = ipctran_pipe_alloc(&p)) != 0) { + if ((rv = ipc_pipe_alloc(&p)) != 0) { nng_stream_free(conn); goto error; } if (ep->closed) { - ipctran_pipe_fini(p); + ipc_pipe_fini(p); nng_stream_free(conn); rv = NNG_ECLOSED; goto error; } - ipctran_pipe_start(p, conn, ep); - nng_stream_listener_accept(ep->listener, ep->connaio); + ipc_pipe_start(p, conn, ep); + nng_stream_listener_accept(ep->listener, ep->conn_aio); nni_mtx_unlock(&ep->mtx); return; error: // When an error here occurs, let's send a notice up to the consumer. // That way it can be reported properly. - if ((aio = ep->useraio) != NULL) { - ep->useraio = NULL; + if ((aio = ep->user_aio) != NULL) { + ep->user_aio = NULL; nni_aio_finish_error(aio, rv); } @@ -747,12 +738,12 @@ error: case NNG_ENOMEM: case NNG_ENOFILES: - nng_sleep_aio(10, ep->timeaio); + nng_sleep_aio(10, ep->time_aio); break; default: if (!ep->closed) { - nng_stream_listener_accept(ep->listener, ep->connaio); + nng_stream_listener_accept(ep->listener, ep->conn_aio); } break; } @@ -760,32 +751,32 @@ error: } static void -ipctran_dial_cb(void *arg) +ipc_ep_dial_cb(void *arg) { - ipctran_ep * ep = arg; - nni_aio * aio = ep->connaio; - ipctran_pipe *p; - int rv; - nng_stream * conn; + ipc_ep * ep = arg; + nni_aio * aio = ep->conn_aio; + ipc_pipe * p; + int rv; + nng_stream *conn; if ((rv = nni_aio_result(aio)) != 0) { goto error; } conn = nni_aio_get_output(aio, 0); - if ((rv = ipctran_pipe_alloc(&p)) != 0) { + if ((rv = ipc_pipe_alloc(&p)) != 0) { nng_stream_free(conn); goto error; } nni_mtx_lock(&ep->mtx); if (ep->closed) { - ipctran_pipe_fini(p); + ipc_pipe_fini(p); nng_stream_free(conn); rv = NNG_ECLOSED; nni_mtx_unlock(&ep->mtx); goto error; } else { - ipctran_pipe_start(p, conn, ep); + ipc_pipe_start(p, conn, ep); } nni_mtx_unlock(&ep->mtx); return; @@ -794,98 +785,98 @@ error: // Error connecting. We need to pass this straight back // to the user. nni_mtx_lock(&ep->mtx); - if ((aio = ep->useraio) != NULL) { - ep->useraio = NULL; + if ((aio = ep->user_aio) != NULL) { + ep->user_aio = NULL; nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&ep->mtx); } static int -ipctran_ep_init(ipctran_ep **epp, nni_sock *sock) +ipc_ep_init(ipc_ep **epp, nni_sock *sock) { - ipctran_ep *ep; + ipc_ep *ep; if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->busypipes, ipctran_pipe, node); - NNI_LIST_INIT(&ep->waitpipes, ipctran_pipe, node); - NNI_LIST_INIT(&ep->negopipes, ipctran_pipe, node); + NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node); + NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node); + NNI_LIST_INIT(&ep->neg_pipes, ipc_pipe, node); ep->proto = nni_sock_proto_id(sock); - nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); - nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL); - nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES); + nni_stat_init(&ep->st_rcv_max, "rcvmaxsz", "maximum receive size"); + nni_stat_set_type(&ep->st_rcv_max, NNG_STAT_LEVEL); + nni_stat_set_unit(&ep->st_rcv_max, NNG_UNIT_BYTES); *epp = ep; return (0); } static int -ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) +ipc_ep_init_dialer(void **dp, nni_url *url, nni_dialer *dialer) { - ipctran_ep *ep; - int rv; - nni_sock * sock = nni_dialer_sock(ndialer); + ipc_ep * ep; + int rv; + nni_sock *sock = nni_dialer_sock(dialer); - if ((rv = ipctran_ep_init(&ep, sock)) != 0) { + if ((rv = ipc_ep_init(&ep, sock)) != 0) { return (rv); } - if (((rv = nni_aio_alloc(&ep->connaio, ipctran_dial_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_dial_cb, ep)) != 0) || ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { - ipctran_ep_fini(ep); + ipc_ep_fini(ep); return (rv); } - nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz); + nni_dialer_add_stat(dialer, &ep->st_rcv_max); *dp = ep; return (0); } static int -ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) +ipc_ep_init_listener(void **dp, nni_url *url, nni_listener *listener) { - ipctran_ep *ep; - int rv; - nni_sock * sock = nni_listener_sock(nlistener); + ipc_ep * ep; + int rv; + nni_sock *sock = nni_listener_sock(listener); - if ((rv = ipctran_ep_init(&ep, sock)) != 0) { + if ((rv = ipc_ep_init(&ep, sock)) != 0) { return (rv); } - if (((rv = nni_aio_alloc(&ep->connaio, ipctran_accept_cb, ep)) != 0) || - ((rv = nni_aio_alloc(&ep->timeaio, ipctran_timer_cb, ep)) != 0) || + if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_accept_cb, ep)) != 0) || + ((rv = nni_aio_alloc(&ep->time_aio, ipc_ep_timer_cb, ep)) != 0) || ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { - ipctran_ep_fini(ep); + ipc_ep_fini(ep); return (rv); } - nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz); + nni_listener_add_stat(listener, &ep->st_rcv_max); *dp = ep; return (0); } static void -ipctran_ep_cancel(nni_aio *aio, void *arg, int rv) +ipc_ep_cancel(nni_aio *aio, void *arg, int rv) { - ipctran_ep *ep = arg; + ipc_ep *ep = arg; nni_mtx_lock(&ep->mtx); - if (aio == ep->useraio) { - ep->useraio = NULL; + if (aio == ep->user_aio) { + ep->user_aio = NULL; nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&ep->mtx); } static void -ipctran_ep_connect(void *arg, nni_aio *aio) +ipc_ep_connect(void *arg, nni_aio *aio) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -896,64 +887,64 @@ ipctran_ep_connect(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if (ep->useraio != NULL) { + if (ep->user_aio != NULL) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, NNG_EBUSY); return; } - if ((rv = nni_aio_schedule(aio, ipctran_ep_cancel, ep)) != 0) { + if ((rv = nni_aio_schedule(aio, ipc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; } - ep->useraio = aio; - nng_stream_dialer_dial(ep->dialer, ep->connaio); + ep->user_aio = aio; + nng_stream_dialer_dial(ep->dialer, ep->conn_aio); nni_mtx_unlock(&ep->mtx); } static int -ipctran_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_type t) +ipc_ep_get_recv_max_sz(void *arg, void *v, size_t *szp, nni_type t) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); - rv = nni_copyout_size(ep->rcvmax, v, szp, t); + rv = nni_copyout_size(ep->rcv_max, v, szp, t); nni_mtx_unlock(&ep->mtx); return (rv); } static int -ipctran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_type t) +ipc_ep_set_recv_max_sz(void *arg, const void *v, size_t sz, nni_type t) { - ipctran_ep *ep = arg; - size_t val; - int rv; + ipc_ep *ep = arg; + size_t val; + int rv; if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { - ipctran_pipe *p; + ipc_pipe *p; nni_mtx_lock(&ep->mtx); - ep->rcvmax = val; - NNI_LIST_FOREACH (&ep->waitpipes, p) { - p->rcvmax = val; + ep->rcv_max = val; + NNI_LIST_FOREACH (&ep->wait_pipes, p) { + p->rcv_max = val; } - NNI_LIST_FOREACH (&ep->negopipes, p) { - p->rcvmax = val; + NNI_LIST_FOREACH (&ep->neg_pipes, p) { + p->rcv_max = val; } - NNI_LIST_FOREACH (&ep->busypipes, p) { - p->rcvmax = val; + NNI_LIST_FOREACH (&ep->busy_pipes, p) { + p->rcv_max = val; } - nni_stat_set_value(&ep->st_rcvmaxsz, val); + nni_stat_set_value(&ep->st_rcv_max, val); nni_mtx_unlock(&ep->mtx); } return (rv); } static int -ipctran_ep_bind(void *arg) +ipc_ep_bind(void *arg) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nng_stream_listener_listen(ep->listener); @@ -962,10 +953,10 @@ ipctran_ep_bind(void *arg) } static void -ipctran_ep_accept(void *arg, nni_aio *aio) +ipc_ep_accept(void *arg, nni_aio *aio) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -976,52 +967,51 @@ ipctran_ep_accept(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); return; } - if (ep->useraio != NULL) { + if (ep->user_aio != NULL) { nni_aio_finish_error(aio, NNG_EBUSY); nni_mtx_unlock(&ep->mtx); return; } - if ((rv = nni_aio_schedule(aio, ipctran_ep_cancel, ep)) != 0) { + if ((rv = nni_aio_schedule(aio, ipc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); return; } - ep->useraio = aio; + ep->user_aio = aio; if (!ep->started) { ep->started = true; - nng_stream_listener_accept(ep->listener, ep->connaio); + nng_stream_listener_accept(ep->listener, ep->conn_aio); } else { - ipctran_ep_match(ep); + ipc_ep_match(ep); } nni_mtx_unlock(&ep->mtx); } static int -ipctran_pipe_getopt( - void *arg, const char *name, void *buf, size_t *szp, nni_type t) +ipc_pipe_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - ipctran_pipe *p = arg; + ipc_pipe *p = arg; return (nni_stream_getx(p->conn, name, buf, szp, t)); } -static nni_tran_pipe_ops ipctran_pipe_ops = { - .p_init = ipctran_pipe_init, - .p_fini = ipctran_pipe_fini, - .p_stop = ipctran_pipe_stop, - .p_send = ipctran_pipe_send, - .p_recv = ipctran_pipe_recv, - .p_close = ipctran_pipe_close, - .p_peer = ipctran_pipe_peer, - .p_getopt = ipctran_pipe_getopt, +static nni_tran_pipe_ops ipc_tran_pipe_ops = { + .p_init = ipc_pipe_init, + .p_fini = ipc_pipe_fini, + .p_stop = ipc_pipe_stop, + .p_send = ipc_pipe_send, + .p_recv = ipc_pipe_recv, + .p_close = ipc_pipe_close, + .p_peer = ipc_pipe_peer, + .p_getopt = ipc_pipe_get, }; -static const nni_option ipctran_ep_options[] = { +static const nni_option ipc_ep_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, - .o_get = ipctran_ep_get_recvmaxsz, - .o_set = ipctran_ep_set_recvmaxsz, + .o_get = ipc_ep_get_recv_max_sz, + .o_set = ipc_ep_set_recv_max_sz, }, // terminate list { @@ -1030,13 +1020,12 @@ static const nni_option ipctran_ep_options[] = { }; static int -ipctran_dialer_getopt( - void *arg, const char *name, void *buf, size_t *szp, nni_type t) +ipc_dialer_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; - rv = nni_getopt(ipctran_ep_options, name, ep, buf, szp, t); + rv = nni_getopt(ipc_ep_options, name, ep, buf, szp, t); if (rv == NNG_ENOTSUP) { rv = nni_stream_dialer_getx(ep->dialer, name, buf, szp, t); } @@ -1044,13 +1033,13 @@ ipctran_dialer_getopt( } static int -ipctran_dialer_setopt( +ipc_dialer_set( void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; - rv = nni_setopt(ipctran_ep_options, name, ep, buf, sz, t); + rv = nni_setopt(ipc_ep_options, name, ep, buf, sz, t); if (rv == NNG_ENOTSUP) { rv = nni_stream_dialer_setx(ep->dialer, name, buf, sz, t); } @@ -1058,13 +1047,13 @@ ipctran_dialer_setopt( } static int -ipctran_listener_getopt( +ipc_listener_get( void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; - rv = nni_getopt(ipctran_ep_options, name, ep, buf, szp, t); + rv = nni_getopt(ipc_ep_options, name, ep, buf, szp, t); if (rv == NNG_ENOTSUP) { rv = nni_stream_listener_getx(ep->listener, name, buf, szp, t); } @@ -1072,13 +1061,13 @@ ipctran_listener_getopt( } static int -ipctran_listener_setopt( +ipc_listener_set( void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - ipctran_ep *ep = arg; - int rv; + ipc_ep *ep = arg; + int rv; - rv = nni_setopt(ipctran_ep_options, name, ep, buf, sz, t); + rv = nni_setopt(ipc_ep_options, name, ep, buf, sz, t); if (rv == NNG_ENOTSUP) { rv = nni_stream_listener_setx(ep->listener, name, buf, sz, t); } @@ -1086,15 +1075,15 @@ ipctran_listener_setopt( } static int -ipctran_check_recvmaxsz(const void *v, size_t sz, nni_type t) +ipc_check_recv_max_sz(const void *v, size_t sz, nni_type t) { return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); } -static nni_chkoption ipctran_checkopts[] = { +static nni_chkoption ipc_check_opts[] = { { .o_name = NNG_OPT_RECVMAXSZ, - .o_check = ipctran_check_recvmaxsz, + .o_check = ipc_check_recv_max_sz, }, { .o_name = NULL, @@ -1102,48 +1091,86 @@ static nni_chkoption ipctran_checkopts[] = { }; static int -ipctran_checkopt(const char *name, const void *buf, size_t sz, nni_type t) +ipc_check_opt(const char *name, const void *buf, size_t sz, nni_type t) { int rv; - rv = nni_chkopt(ipctran_checkopts, name, buf, sz, t); + rv = nni_chkopt(ipc_check_opts, name, buf, sz, t); if (rv == NNG_ENOTSUP) { rv = nni_stream_checkopt("ipc", name, buf, sz, t); } return (rv); } -static nni_tran_dialer_ops ipctran_dialer_ops = { - .d_init = ipctran_ep_init_dialer, - .d_fini = ipctran_ep_fini, - .d_connect = ipctran_ep_connect, - .d_close = ipctran_ep_close, - .d_getopt = ipctran_dialer_getopt, - .d_setopt = ipctran_dialer_setopt, +static nni_tran_dialer_ops ipc_dialer_ops = { + .d_init = ipc_ep_init_dialer, + .d_fini = ipc_ep_fini, + .d_connect = ipc_ep_connect, + .d_close = ipc_ep_close, + .d_getopt = ipc_dialer_get, + .d_setopt = ipc_dialer_set, }; -static nni_tran_listener_ops ipctran_listener_ops = { - .l_init = ipctran_ep_init_listener, - .l_fini = ipctran_ep_fini, - .l_bind = ipctran_ep_bind, - .l_accept = ipctran_ep_accept, - .l_close = ipctran_ep_close, - .l_getopt = ipctran_listener_getopt, - .l_setopt = ipctran_listener_setopt, +static nni_tran_listener_ops ipc_listener_ops = { + .l_init = ipc_ep_init_listener, + .l_fini = ipc_ep_fini, + .l_bind = ipc_ep_bind, + .l_accept = ipc_ep_accept, + .l_close = ipc_ep_close, + .l_getopt = ipc_listener_get, + .l_setopt = ipc_listener_set, }; static nni_tran ipc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "ipc", - .tran_dialer = &ipctran_dialer_ops, - .tran_listener = &ipctran_listener_ops, - .tran_pipe = &ipctran_pipe_ops, - .tran_init = ipctran_init, - .tran_fini = ipctran_fini, - .tran_checkopt = ipctran_checkopt, + .tran_dialer = &ipc_dialer_ops, + .tran_listener = &ipc_listener_ops, + .tran_pipe = &ipc_tran_pipe_ops, + .tran_init = ipc_tran_init, + .tran_fini = ipc_tran_fini, + .tran_checkopt = ipc_check_opt, +}; + +#ifdef NNG_PLATFORM_POSIX +static nni_tran ipc_tran_unix = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "unix", + .tran_dialer = &ipc_dialer_ops, + .tran_listener = &ipc_listener_ops, + .tran_pipe = &ipc_tran_pipe_ops, + .tran_init = ipc_tran_init, + .tran_fini = ipc_tran_fini, + .tran_checkopt = ipc_check_opt, +}; +#endif + +#ifdef NNG_HAVE_ABSTRACT_SOCKETS +static nni_tran ipc_tran_abstract = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "abstract", + .tran_dialer = &ipc_dialer_ops, + .tran_listener = &ipc_listener_ops, + .tran_pipe = &ipc_tran_pipe_ops, + .tran_init = ipc_tran_init, + .tran_fini = ipc_tran_fini, + .tran_checkopt = ipc_check_opt, }; +#endif int nng_ipc_register(void) { - return (nni_tran_register(&ipc_tran)); + int rv; + if (((rv = nni_tran_register(&ipc_tran)) != 0) +#ifdef NNG_PLATFORM_POSIX + || ((rv = nni_tran_register(&ipc_tran_unix)) != 0) +#endif +#ifdef NNG_HAVE_ABSTRACT_SOCKETS + || ((rv = nni_tran_register(&ipc_tran_abstract)) != 0) +#endif + ) { + return (rv); + } + + return (0); } diff --git a/src/transport/ipc/ipc_test.c b/src/transport/ipc/ipc_test.c new file mode 100644 index 00000000..23353387 --- /dev/null +++ b/src/transport/ipc/ipc_test.c @@ -0,0 +1,318 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <nng/nng.h> +#include <nng/protocol/pair0/pair.h> +#include <nng/supplemental/util/platform.h> + +#include <testutil.h> + +#include <acutest.h> + +#ifdef NNG_PLATFORM_POSIX +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#endif + +void +test_path_too_long(void) +{ + nng_socket s1; + char addr[256]; + + // All our names have to be less than 128 bytes. + memset(addr, 'a', 255); + addr[255] = 0; + memcpy(addr, "ipc://", strlen("ipc://")); + + TEST_ASSERT(strlen(addr) == 255); + TEST_NNG_PASS(nng_pair0_open(&s1)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_FAIL(nng_listen(s1, addr, NULL, 0), NNG_EADDRINVAL); + TEST_NNG_FAIL( + nng_dial(s1, addr, NULL, NNG_FLAG_NONBLOCK), NNG_EADDRINVAL); + + TEST_NNG_PASS(nng_close(s1)); +} + +void +test_ipc_dialer_perms(void) +{ + nng_socket s; + nng_dialer d; + char addr[64]; + + testutil_scratch_addr("ipc", sizeof(addr), addr); + + TEST_NNG_PASS(nng_pair0_open(&s)); + TEST_NNG_PASS(nng_dialer_create(&d, s, addr)); + TEST_NNG_FAIL(nng_dialer_setopt_int(d, NNG_OPT_IPC_PERMISSIONS, 0444), + NNG_ENOTSUP); + + TEST_NNG_PASS(nng_close(s)); +} + +void +test_ipc_listener_perms(void) +{ + nng_socket s; + nng_listener l; + char addr[64]; +#ifndef _WIN32 + char * path; + struct stat st; +#endif + + testutil_scratch_addr("ipc", sizeof(addr), addr); + + TEST_NNG_PASS(nng_pair0_open(&s)); + TEST_NNG_PASS(nng_listener_create(&l, s, addr)); + +#ifdef _WIN32 + TEST_NNG_FAIL( + nng_listener_setopt_int(l, NNG_OPT_IPC_PERMISSIONS, 0444), + NNG_ENOTSUP); +#else + path = &addr[strlen("ipc://")]; + + // Attempt to set invalid permissions fails. + TEST_NNG_FAIL( + nng_listener_setopt_int(l, NNG_OPT_IPC_PERMISSIONS, S_IFREG), + NNG_EINVAL); + + TEST_NNG_PASS( + nng_listener_setopt_int(l, NNG_OPT_IPC_PERMISSIONS, 0444)); + TEST_NNG_PASS(nng_listener_start(l, 0)); + TEST_CHECK(stat(path, &st) == 0); + TEST_CHECK((st.st_mode & 0777) == 0444); + + // Now that it's running, we cannot set it. + TEST_NNG_FAIL( + nng_listener_setopt_int(l, NNG_OPT_IPC_PERMISSIONS, 0644), + NNG_EBUSY); +#endif + + TEST_NNG_PASS(nng_close(s)); +} + +void +test_abstract_sockets(void) +{ +#ifdef NNG_HAVE_ABSTRACT_SOCKETS + nng_socket s1; + nng_socket s2; + char addr[64]; + nng_pipe p1; + nng_pipe p2; + nng_sockaddr sa1; + nng_sockaddr sa2; + char * prefix = "abstract://"; + testutil_scratch_addr("abstract", sizeof(addr), addr); + + TEST_NNG_PASS(nng_pair0_open(&s1)); + TEST_NNG_PASS(nng_pair0_open(&s2)); + TEST_NNG_PASS(testutil_marry_ex(s1, s2, addr, &p1, &p2)); + TEST_NNG_PASS(nng_pipe_get_addr(p1, NNG_OPT_REMADDR, &sa1)); + TEST_NNG_PASS(nng_pipe_get_addr(p2, NNG_OPT_LOCADDR, &sa2)); + TEST_CHECK(sa1.s_family == sa2.s_family); + TEST_CHECK(sa1.s_family == NNG_AF_ABSTRACT); + TEST_CHECK(sa1.s_abstract.sa_len == strlen(addr) - strlen(prefix)); + TEST_CHECK(sa2.s_abstract.sa_len == strlen(addr) - strlen(prefix)); + TEST_NNG_SEND_STR(s1, "ping"); + TEST_NNG_RECV_STR(s2, "ping"); + TEST_NNG_PASS(nng_close(s1)); + TEST_NNG_PASS(nng_close(s2)); +#endif +} + +void +test_abstract_auto_bind(void) +{ +#ifdef NNG_HAVE_ABSTRACT_SOCKETS + nng_socket s1; + nng_socket s2; + char addr[40]; + char name[12]; + nng_sockaddr sa; + nng_listener l; + size_t len; + + snprintf(addr, sizeof(addr), "abstract://"); + + TEST_NNG_PASS(nng_pair0_open(&s1)); + TEST_NNG_PASS(nng_pair0_open(&s2)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_listen(s1, addr, &l, 0)); + + TEST_NNG_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + // Under linux there are either 8 or 5 hex characters. + TEST_CHECK(sa.s_family == NNG_AF_ABSTRACT); + TEST_CHECK(sa.s_abstract.sa_len < 10); + + len = sa.s_abstract.sa_len; + memcpy(name, sa.s_abstract.sa_name, len); + name[len] = '\0'; + TEST_CHECK(strlen(name) == len); + + (void) snprintf(addr, sizeof(addr), "abstract://%s", name); + TEST_NNG_PASS(nng_dial(s2, addr, NULL, 0)); + + // first send the ping + TEST_NNG_SEND_STR(s1, "ping"); + TEST_NNG_RECV_STR(s2, "ping"); + + TEST_NNG_SEND_STR(s2, "pong"); + TEST_NNG_RECV_STR(s1, "pong"); + + TEST_NNG_PASS(nng_close(s1)); + TEST_NNG_PASS(nng_close(s2)); +#endif +} + +void +test_abstract_too_long(void) +{ +#ifdef NNG_HAVE_ABSTRACT_SOCKETS + nng_socket s1; + char addr[256]; + + // All our names have to be less than 128 bytes. + memset(addr, 'a', 255); + addr[255] = 0; + memcpy(addr, "abstract://", strlen("abstract://")); + + TEST_ASSERT(strlen(addr) == 255); + TEST_NNG_PASS(nng_pair0_open(&s1)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_FAIL(nng_listen(s1, addr, NULL, 0), NNG_EADDRINVAL); + TEST_NNG_FAIL( + nng_dial(s1, addr, NULL, NNG_FLAG_NONBLOCK), NNG_EADDRINVAL); + + TEST_NNG_PASS(nng_close(s1)); +#endif +} + +void +test_abstract_null(void) +{ +#ifdef NNG_HAVE_ABSTRACT_SOCKETS + nng_socket s1; + nng_socket s2; + char addr[64]; + char name[40]; + char rng[20]; + + nng_sockaddr sa; + nng_listener l; + size_t len; + + snprintf(rng, sizeof(rng), "%08x%08x", nng_random(), nng_random()); + snprintf(name, sizeof(name), "a%%00b_%s", rng); + snprintf(addr, sizeof(addr), "abstract://%s", name); + + TEST_NNG_PASS(nng_pair0_open(&s1)); + TEST_NNG_PASS(nng_pair0_open(&s2)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_listen(s1, addr, &l, 0)); + + TEST_NNG_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + // Under linux there are either 8 or 5 hex characters. + TEST_CHECK(sa.s_family == NNG_AF_ABSTRACT); + TEST_CHECK(sa.s_abstract.sa_len < 32); + len = sa.s_abstract.sa_len; + TEST_CHECK(len == 20); + TEST_CHECK(sa.s_abstract.sa_name[0] == 'a'); + TEST_CHECK(sa.s_abstract.sa_name[1] == '\0'); + TEST_CHECK(sa.s_abstract.sa_name[2] == 'b'); + TEST_CHECK(sa.s_abstract.sa_name[3] == '_'); + TEST_CHECK(memcmp(&sa.s_abstract.sa_name[4], rng, 16) == 0); + + TEST_NNG_PASS(nng_dial(s2, addr, NULL, 0)); + + // first send the ping + TEST_NNG_SEND_STR(s1, "1234"); + TEST_NNG_RECV_STR(s2, "1234"); + + TEST_NNG_SEND_STR(s2, "5678"); + TEST_NNG_RECV_STR(s1, "5678"); + + TEST_NNG_PASS(nng_close(s1)); + TEST_NNG_PASS(nng_close(s2)); +#endif +} + +void +test_unix_alias(void) +{ +#ifdef NNG_PLATFORM_POSIX + nng_socket s1; + nng_socket s2; + char addr1[32]; + char addr2[32]; + char rng[20]; + nng_sockaddr sa1; + nng_sockaddr sa2; + nng_msg * msg; + nng_pipe p; + + // Presumes /tmp. + + (void) snprintf( + rng, sizeof(rng), "%08x%08x", nng_random(), nng_random()); + snprintf(addr1, sizeof(addr1), "ipc:///tmp/%s", rng); + snprintf(addr2, sizeof(addr2), "unix:///tmp/%s", rng); + + TEST_NNG_PASS(nng_pair0_open(&s1)); + TEST_NNG_PASS(nng_pair0_open(&s2)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s2, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(s2, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_listen(s1, addr1, NULL, 0)); + TEST_NNG_PASS(nng_dial(s2, addr2, NULL, 0)); + + // first send the ping + TEST_NNG_SEND_STR(s1, "ping"); + TEST_NNG_PASS(nng_recvmsg(s2, &msg, 0)); + TEST_ASSERT(msg != NULL); + TEST_CHECK(nng_msg_len(msg) == 5); + TEST_STREQUAL(nng_msg_body(msg), "ping"); + p = nng_msg_get_pipe(msg); + TEST_NNG_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa1)); + TEST_NNG_PASS(nng_pipe_get_addr(p, NNG_OPT_REMADDR, &sa2)); + TEST_CHECK(sa1.s_family == sa2.s_family); + TEST_CHECK(sa1.s_family == NNG_AF_IPC); + TEST_STREQUAL(sa1.s_ipc.sa_path, sa2.s_ipc.sa_path); + nng_msg_free(msg); + + TEST_NNG_PASS(nng_close(s1)); + TEST_NNG_PASS(nng_close(s2)); +#endif +} + +TEST_LIST = { + { "ipc path too long", test_path_too_long }, + { "ipc dialer perms", test_ipc_dialer_perms }, + { "ipc listener perms", test_ipc_listener_perms }, + { "ipc abstract sockets", test_abstract_sockets }, + { "ipc abstract auto bind", test_abstract_auto_bind }, + { "ipc abstract name too long", test_abstract_too_long }, + { "ipc abstract embedded null", test_abstract_null }, + { "ipc unix alias", test_unix_alias }, + { NULL, NULL }, +};
\ No newline at end of file |
