diff options
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 218 |
1 files changed, 94 insertions, 124 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index fcfe44ea..56ce5ecd 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -7,9 +7,9 @@ // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include <stdlib.h> #include <string.h> -#include <stdio.h> #include "core/nng_impl.h" @@ -17,65 +17,61 @@ // supplied as well. Normally the IPC is UNIX domain sockets or // Windows named pipes. Other platforms could use other mechanisms. -typedef struct nni_ipc_pipe nni_ipc_pipe; -typedef struct nni_ipc_ep nni_ipc_ep; +typedef struct nni_ipc_pipe nni_ipc_pipe; +typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { - const char * addr; - nni_plat_ipc_pipe * ipp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - - uint8_t txhead[1+sizeof (uint64_t)]; - uint8_t rxhead[1+sizeof (uint64_t)]; - int gottxhead; - int gotrxhead; - int wanttxhead; - int wantrxhead; - - nni_aio * user_txaio; - nni_aio * user_rxaio; - nni_aio * user_negaio; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; - nni_msg * rxmsg; - nni_mtx mtx; + const char * addr; + nni_plat_ipc_pipe *ipp; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + + uint8_t txhead[1 + sizeof(uint64_t)]; + uint8_t rxhead[1 + sizeof(uint64_t)]; + int gottxhead; + int gotrxhead; + int wanttxhead; + int wantrxhead; + + nni_aio *user_txaio; + nni_aio *user_rxaio; + nni_aio *user_negaio; + nni_aio txaio; + nni_aio rxaio; + nni_aio negaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct nni_ipc_ep { - char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipc_ep * iep; - int closed; - uint16_t proto; - size_t rcvmax; - nni_aio aio; - nni_aio * user_aio; - nni_mtx mtx; + char addr[NNG_MAXADDRLEN + 1]; + nni_plat_ipc_ep *iep; + int closed; + uint16_t proto; + size_t rcvmax; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; - static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); static void nni_ipc_ep_cb(void *); - static int nni_ipc_tran_init(void) { return (0); } - static void nni_ipc_tran_fini(void) { } - static void nni_ipc_pipe_close(void *arg) { @@ -84,7 +80,6 @@ nni_ipc_pipe_close(void *arg) nni_plat_ipc_pipe_close(pipe->ipp); } - static void nni_ipc_pipe_fini(void *arg) { @@ -103,12 +98,11 @@ nni_ipc_pipe_fini(void *arg) NNI_FREE_STRUCT(pipe); } - static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *pipe; - int rv; + int rv; if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); @@ -129,10 +123,10 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) goto fail; } - pipe->proto = ep->proto; + pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - pipe->ipp = ipp; - pipe->addr = ep->addr; + pipe->ipp = ipp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -142,7 +136,6 @@ fail: return (rv); } - static void nni_ipc_cancel_start(nni_aio *aio) { @@ -156,13 +149,12 @@ nni_ipc_cancel_start(nni_aio *aio) nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_nego_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio *aio = &pipe->negaio; - int rv; + nni_aio * aio = &pipe->negaio; + int rv; nni_mtx_lock(&pipe->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -177,7 +169,7 @@ nni_ipc_pipe_nego_cb(void *arg) } if (pipe->gottxhead < pipe->wanttxhead) { - aio->a_niov = 1; + aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; // send it down... @@ -186,7 +178,7 @@ nni_ipc_pipe_nego_cb(void *arg) return; } if (pipe->gotrxhead < pipe->wantrxhead) { - aio->a_niov = 1; + aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; nni_plat_ipc_pipe_recv(pipe->ipp, aio); @@ -195,12 +187,9 @@ nni_ipc_pipe_nego_cb(void *arg) } // We have both sent and received the headers. Lets check the // receive side header. - if ((pipe->rxhead[0] != 0) || - (pipe->rxhead[1] != 'S') || - (pipe->rxhead[2] != 'P') || - (pipe->rxhead[3] != 0) || - (pipe->rxhead[6] != 0) || - (pipe->rxhead[7] != 0)) { + if ((pipe->rxhead[0] != 0) || (pipe->rxhead[1] != 'S') || + (pipe->rxhead[2] != 'P') || (pipe->rxhead[3] != 0) || + (pipe->rxhead[6] != 0) || (pipe->rxhead[7] != 0)) { rv = NNG_EPROTO; goto done; } @@ -215,14 +204,13 @@ done: nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio *aio; - int rv; - size_t len; + nni_aio * aio; + int rv; + size_t len; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { @@ -241,13 +229,12 @@ nni_ipc_pipe_send_cb(void *arg) nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_recv_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio *aio; - int rv; + nni_aio * aio; + int rv; nni_mtx_lock(&pipe->mtx); aio = pipe->user_rxaio; @@ -285,7 +272,7 @@ nni_ipc_pipe_recv_cb(void *arg) } // We should have gotten a message header. - NNI_GET64(pipe->rxhead+1, len); + NNI_GET64(pipe->rxhead + 1, len); // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. @@ -312,7 +299,7 @@ nni_ipc_pipe_recv_cb(void *arg) // read the entire message now. pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - pipe->rxaio.a_niov = 1; + pipe->rxaio.a_niov = 1; nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); @@ -322,13 +309,12 @@ nni_ipc_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; - pipe->rxmsg = NULL; + aio->a_msg = pipe->rxmsg; + pipe->rxmsg = NULL; nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_cancel_tx(nni_aio *aio) { @@ -342,13 +328,12 @@ nni_ipc_cancel_tx(nni_aio *aio) nni_aio_stop(&pipe->txaio); } - static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_msg *msg = aio->a_msg; - uint64_t len; + nni_msg * msg = aio->a_msg; + uint64_t len; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -360,22 +345,21 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->user_txaio = aio; - pipe->txhead[0] = 1; // message type, 1. + pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); pipe->txaio.a_iov[0].iov_buf = pipe->txhead; - pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txhead); + pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead); pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); - pipe->txaio.a_niov = 3; + pipe->txaio.a_niov = 3; nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_cancel_rx(nni_aio *aio) { @@ -389,7 +373,6 @@ nni_ipc_cancel_rx(nni_aio *aio) nni_aio_stop(&pipe->rxaio); } - static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { @@ -407,19 +390,18 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) // Schedule a read of the IPC header. pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead; - pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); - pipe->rxaio.a_niov = 1; + pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead); + pipe->rxaio.a_niov = 1; nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - int rv; + int rv; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -429,12 +411,12 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; pipe->negaio.a_iov[0].iov_len = 8; pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); @@ -446,7 +428,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); } - static uint16_t nni_ipc_pipe_peer(void *arg) { @@ -455,7 +436,6 @@ nni_ipc_pipe_peer(void *arg) return (pipe->peer); } - static int nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) { @@ -479,7 +459,6 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) return (NNG_ENOTSUP); } - static void nni_ipc_ep_fini(void *arg) { @@ -491,14 +470,13 @@ nni_ipc_ep_fini(void *arg) NNI_FREE_STRUCT(ep); } - static int nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_ipc_ep *ep; - int rv; + int rv; - if ((strlen(url) > NNG_MAXADDRLEN-1) || + if ((strlen(url) > NNG_MAXADDRLEN - 1) || (strncmp(url, "ipc://", strlen("ipc://")) != 0)) { return (NNG_EADDRINVAL); } @@ -513,15 +491,14 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) return (rv); } ep->closed = 0; - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + (void) snprintf(ep->addr, sizeof(ep->addr), "%s", url); *epp = ep; return (0); } - static void nni_ipc_ep_close(void *arg) { @@ -530,7 +507,6 @@ nni_ipc_ep_close(void *arg) nni_plat_ipc_ep_close(ep->iep); } - static int nni_ipc_ep_bind(void *arg) { @@ -539,13 +515,12 @@ nni_ipc_ep_bind(void *arg) return (nni_plat_ipc_ep_listen(ep->iep)); } - static void nni_ipc_ep_finish(nni_ipc_ep *ep) { - nni_aio *aio = ep->user_aio; + nni_aio * aio = ep->user_aio; nni_ipc_pipe *pipe; - int rv; + int rv; if ((aio = ep->user_aio) == NULL) { return; @@ -570,7 +545,6 @@ done: nni_aio_finish(aio, rv, 0); } - static void nni_ipc_ep_cb(void *arg) { @@ -581,7 +555,6 @@ nni_ipc_ep_cb(void *arg) nni_mtx_unlock(&ep->mtx); } - static void nni_ipc_cancel_ep(nni_aio *aio) { @@ -595,12 +568,11 @@ nni_ipc_cancel_ep(nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static void nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; + int rv; nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); @@ -617,12 +589,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static void nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; + int rv; nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); @@ -639,34 +610,33 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static nni_tran_pipe nni_ipc_pipe_ops = { - .p_fini = nni_ipc_pipe_fini, - .p_start = nni_ipc_pipe_start, - .p_send = nni_ipc_pipe_send, - .p_recv = nni_ipc_pipe_recv, - .p_close = nni_ipc_pipe_close, - .p_peer = nni_ipc_pipe_peer, - .p_getopt = nni_ipc_pipe_getopt, + .p_fini = nni_ipc_pipe_fini, + .p_start = nni_ipc_pipe_start, + .p_send = nni_ipc_pipe_send, + .p_recv = nni_ipc_pipe_recv, + .p_close = nni_ipc_pipe_close, + .p_peer = nni_ipc_pipe_peer, + .p_getopt = nni_ipc_pipe_getopt, }; static nni_tran_ep nni_ipc_ep_ops = { - .ep_init = nni_ipc_ep_init, - .ep_fini = nni_ipc_ep_fini, - .ep_connect = nni_ipc_ep_connect, - .ep_bind = nni_ipc_ep_bind, - .ep_accept = nni_ipc_ep_accept, - .ep_close = nni_ipc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_ipc_ep_init, + .ep_fini = nni_ipc_ep_fini, + .ep_connect = nni_ipc_ep_connect, + .ep_bind = nni_ipc_ep_bind, + .ep_accept = nni_ipc_ep_accept, + .ep_close = nni_ipc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the IPC transport linkage, and should be the only global // symbol in this entire file. struct nni_tran nni_ipc_tran = { - .tran_scheme = "ipc", - .tran_ep = &nni_ipc_ep_ops, - .tran_pipe = &nni_ipc_pipe_ops, - .tran_init = nni_ipc_tran_init, - .tran_fini = nni_ipc_tran_fini, + .tran_scheme = "ipc", + .tran_ep = &nni_ipc_ep_ops, + .tran_pipe = &nni_ipc_pipe_ops, + .tran_init = nni_ipc_tran_init, + .tran_fini = nni_ipc_tran_fini, }; |
