diff options
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 168 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 218 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 221 |
3 files changed, 261 insertions, 346 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 01ffd39a..330c1d13 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -7,9 +7,9 @@ // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include <stdlib.h> #include <string.h> -#include <stdio.h> #include "core/nng_impl.h" @@ -17,50 +17,46 @@ // peer to another. The inproc transport is only valid within the same // process. -typedef struct nni_inproc_pair nni_inproc_pair; -typedef struct nni_inproc_pipe nni_inproc_pipe; -typedef struct nni_inproc_ep nni_inproc_ep; +typedef struct nni_inproc_pair nni_inproc_pair; +typedef struct nni_inproc_pipe nni_inproc_pipe; +typedef struct nni_inproc_ep nni_inproc_ep; typedef struct { - nni_mtx mx; - nni_list servers; + nni_mtx mx; + nni_list servers; } nni_inproc_global; // nni_inproc_pipe represents one half of a connection. struct nni_inproc_pipe { - const char * addr; - nni_inproc_pair * pair; - nni_msgq * rq; - nni_msgq * wq; - uint16_t peer; - uint16_t proto; + const char * addr; + nni_inproc_pair *pair; + nni_msgq * rq; + nni_msgq * wq; + uint16_t peer; + uint16_t proto; }; // nni_inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct nni_inproc_pair { - nni_mtx mx; - int refcnt; - nni_msgq * q[2]; - nni_inproc_pipe * pipes[2]; + nni_mtx mx; + int refcnt; + nni_msgq * q[2]; + nni_inproc_pipe *pipes[2]; }; struct nni_inproc_ep { - char addr[NNG_MAXADDRLEN+1]; - int mode; - int closed; - int started; - nni_list_node node; - uint16_t proto; - nni_cv cv; - nni_list clients; - nni_list aios; + char addr[NNG_MAXADDRLEN + 1]; + int mode; + int closed; + int started; + nni_list_node node; + uint16_t proto; + nni_cv cv; + nni_list clients; + nni_list aios; }; -#define NNI_INPROC_EP_IDLE 0 -#define NNI_INPROC_EP_DIAL 1 -#define NNI_INPROC_EP_LISTEN 2 - // nni_inproc is our global state - this contains the list of active endpoints // which we use for coordinating rendezvous. static nni_inproc_global nni_inproc; @@ -79,14 +75,12 @@ nni_inproc_init(void) return (0); } - static void nni_inproc_fini(void) { nni_mtx_fini(&nni_inproc.mx); } - static void nni_inproc_pipe_close(void *arg) { @@ -100,7 +94,6 @@ nni_inproc_pipe_close(void *arg) } } - // nni_inproc_pair destroy is called when both pipe-ends of the pipe // have been destroyed. static void @@ -112,7 +105,6 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) NNI_FREE_STRUCT(pair); } - static int nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep) { @@ -122,12 +114,11 @@ nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep) return (NNG_ENOMEM); } pipe->proto = ep->proto; - pipe->addr = ep->addr; - *pipep = pipe; + pipe->addr = ep->addr; + *pipep = pipe; return (0); } - static void nni_inproc_pipe_fini(void *arg) { @@ -154,15 +145,14 @@ nni_inproc_pipe_fini(void *arg) NNI_FREE_STRUCT(pipe); } - static void nni_inproc_pipe_send(void *arg, nni_aio *aio) { nni_inproc_pipe *pipe = arg; - nni_msg *msg = aio->a_msg; - char *h; - size_t l; - int rv; + nni_msg * msg = aio->a_msg; + char * h; + size_t l; + int rv; // We need to move any header data to the body, because the other // side won't know what to do otherwise. @@ -176,7 +166,6 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio) nni_msgq_aio_put(pipe->wq, aio); } - static void nni_inproc_pipe_recv(void *arg, nni_aio *aio) { @@ -185,7 +174,6 @@ nni_inproc_pipe_recv(void *arg, nni_aio *aio) nni_msgq_aio_get(pipe->rq, aio); } - static uint16_t nni_inproc_pipe_peer(void *arg) { @@ -194,12 +182,11 @@ nni_inproc_pipe_peer(void *arg) return (pipe->peer); } - static int nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) { nni_inproc_pipe *pipe = arg; - size_t len; + size_t len; switch (option) { case NNG_OPT_LOCALADDR: @@ -216,32 +203,30 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) return (NNG_ENOTSUP); } - static int nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_inproc_ep *ep; - if (strlen(url) > NNG_MAXADDRLEN-1) { + if (strlen(url) > NNG_MAXADDRLEN - 1) { return (NNG_EINVAL); } if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } - ep->mode = mode; - ep->closed = 0; + ep->mode = mode; + ep->closed = 0; ep->started = 0; - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + (void) snprintf(ep->addr, sizeof(ep->addr), "%s", url); *epp = ep; return (0); } - static void nni_inproc_ep_fini(void *arg) { @@ -251,7 +236,6 @@ nni_inproc_ep_fini(void *arg) NNI_FREE_STRUCT(ep); } - static void nni_inproc_conn_finish(nni_aio *aio, int rv) { @@ -265,7 +249,7 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) } nni_aio_list_remove(aio); if (ep != NULL) { - if ((ep->mode != NNI_INPROC_EP_LISTEN) && + if ((ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { if (nni_list_active(&ep->clients, ep)) { nni_list_remove(&ep->clients, ep); @@ -275,13 +259,12 @@ nni_inproc_conn_finish(nni_aio *aio, int rv) nni_aio_finish(aio, rv, 0); } - static void nni_inproc_ep_close(void *arg) { nni_inproc_ep *ep = arg; nni_inproc_ep *client; - nni_aio *aio; + nni_aio * aio; nni_mtx_lock(&nni_inproc.mx); ep->closed = 1; @@ -301,7 +284,6 @@ nni_inproc_ep_close(void *arg) nni_mtx_unlock(&nni_inproc.mx); } - static void nni_inproc_connect_abort(nni_aio *aio) { @@ -315,7 +297,7 @@ nni_inproc_connect_abort(nni_aio *aio) } nni_aio_list_remove(aio); if (ep != NULL) { - if ((ep->mode != NNI_INPROC_EP_LISTEN) && + if ((ep->mode != NNI_EP_MODE_LISTEN) && nni_list_empty(&ep->aios)) { if (nni_list_active(&ep->clients, ep)) { nni_list_remove(&ep->clients, ep); @@ -325,14 +307,13 @@ nni_inproc_connect_abort(nni_aio *aio) nni_mtx_unlock(&nni_inproc.mx); } - static void nni_inproc_accept_clients(nni_inproc_ep *server) { - nni_inproc_ep *client, *nclient; - nni_aio *saio, *caio; + nni_inproc_ep * client, *nclient; + nni_aio * saio, *caio; nni_inproc_pair *pair; - int rv; + int rv; nclient = nni_list_first(&server->clients); while ((client = nclient) != NULL) { @@ -358,14 +339,14 @@ nni_inproc_accept_clients(nni_inproc_ep *server) continue; } - pair->pipes[0] = caio->a_pipe; - pair->pipes[1] = saio->a_pipe; + pair->pipes[0] = caio->a_pipe; + pair->pipes[1] = saio->a_pipe; pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; pair->pipes[0]->pair = pair->pipes[1]->pair = pair; pair->pipes[1]->peer = pair->pipes[0]->proto; pair->pipes[0]->peer = pair->pipes[1]->proto; - pair->refcnt = 2; + pair->refcnt = 2; nni_inproc_conn_finish(caio, 0); nni_inproc_conn_finish(saio, 0); @@ -381,14 +362,13 @@ nni_inproc_accept_clients(nni_inproc_ep *server) } } - static void nni_inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep *ep = arg; + nni_inproc_ep * ep = arg; nni_inproc_pipe *pipe; - nni_inproc_ep *server; - int rv; + nni_inproc_ep * server; + int rv; if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish(aio, NNG_EINVAL, 0); @@ -446,13 +426,12 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) nni_mtx_unlock(&nni_inproc.mx); } - static int nni_inproc_ep_bind(void *arg) { nni_inproc_ep *ep = arg; nni_inproc_ep *srch; - nni_list *list = &nni_inproc.servers; + nni_list * list = &nni_inproc.servers; if (ep->mode != NNI_EP_MODE_LISTEN) { return (NNG_EINVAL); @@ -467,8 +446,7 @@ nni_inproc_ep_bind(void *arg) return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { - if ((srch->mode != NNI_EP_MODE_LISTEN) || - (!srch->started)) { + if ((srch->mode != NNI_EP_MODE_LISTEN) || (!srch->started)) { continue; } if (strcmp(srch->addr, ep->addr) == 0) { @@ -482,13 +460,12 @@ nni_inproc_ep_bind(void *arg) return (0); } - static void nni_inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep *ep = arg; + nni_inproc_ep * ep = arg; nni_inproc_pipe *pipe; - int rv; + int rv; if (ep->mode != NNI_EP_MODE_LISTEN) { nni_aio_finish(aio, NNG_EINVAL, 0); @@ -520,33 +497,32 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_mtx_unlock(&nni_inproc.mx); } - static nni_tran_pipe nni_inproc_pipe_ops = { - .p_fini = nni_inproc_pipe_fini, - .p_send = nni_inproc_pipe_send, - .p_recv = nni_inproc_pipe_recv, - .p_close = nni_inproc_pipe_close, - .p_peer = nni_inproc_pipe_peer, - .p_getopt = nni_inproc_pipe_getopt, + .p_fini = nni_inproc_pipe_fini, + .p_send = nni_inproc_pipe_send, + .p_recv = nni_inproc_pipe_recv, + .p_close = nni_inproc_pipe_close, + .p_peer = nni_inproc_pipe_peer, + .p_getopt = nni_inproc_pipe_getopt, }; static nni_tran_ep nni_inproc_ep_ops = { - .ep_init = nni_inproc_ep_init, - .ep_fini = nni_inproc_ep_fini, - .ep_connect = nni_inproc_ep_connect, - .ep_bind = nni_inproc_ep_bind, - .ep_accept = nni_inproc_ep_accept, - .ep_close = nni_inproc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_inproc_ep_init, + .ep_fini = nni_inproc_ep_fini, + .ep_connect = nni_inproc_ep_connect, + .ep_bind = nni_inproc_ep_bind, + .ep_accept = nni_inproc_ep_accept, + .ep_close = nni_inproc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the inproc transport linkage, and should be the only global // symbol in this entire file. struct nni_tran nni_inproc_tran = { - .tran_scheme = "inproc", - .tran_ep = &nni_inproc_ep_ops, - .tran_pipe = &nni_inproc_pipe_ops, - .tran_init = nni_inproc_init, - .tran_fini = nni_inproc_fini, + .tran_scheme = "inproc", + .tran_ep = &nni_inproc_ep_ops, + .tran_pipe = &nni_inproc_pipe_ops, + .tran_init = nni_inproc_init, + .tran_fini = nni_inproc_fini, }; diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index fcfe44ea..56ce5ecd 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -7,9 +7,9 @@ // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include <stdlib.h> #include <string.h> -#include <stdio.h> #include "core/nng_impl.h" @@ -17,65 +17,61 @@ // supplied as well. Normally the IPC is UNIX domain sockets or // Windows named pipes. Other platforms could use other mechanisms. -typedef struct nni_ipc_pipe nni_ipc_pipe; -typedef struct nni_ipc_ep nni_ipc_ep; +typedef struct nni_ipc_pipe nni_ipc_pipe; +typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { - const char * addr; - nni_plat_ipc_pipe * ipp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - - uint8_t txhead[1+sizeof (uint64_t)]; - uint8_t rxhead[1+sizeof (uint64_t)]; - int gottxhead; - int gotrxhead; - int wanttxhead; - int wantrxhead; - - nni_aio * user_txaio; - nni_aio * user_rxaio; - nni_aio * user_negaio; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; - nni_msg * rxmsg; - nni_mtx mtx; + const char * addr; + nni_plat_ipc_pipe *ipp; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + + uint8_t txhead[1 + sizeof(uint64_t)]; + uint8_t rxhead[1 + sizeof(uint64_t)]; + int gottxhead; + int gotrxhead; + int wanttxhead; + int wantrxhead; + + nni_aio *user_txaio; + nni_aio *user_rxaio; + nni_aio *user_negaio; + nni_aio txaio; + nni_aio rxaio; + nni_aio negaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct nni_ipc_ep { - char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipc_ep * iep; - int closed; - uint16_t proto; - size_t rcvmax; - nni_aio aio; - nni_aio * user_aio; - nni_mtx mtx; + char addr[NNG_MAXADDRLEN + 1]; + nni_plat_ipc_ep *iep; + int closed; + uint16_t proto; + size_t rcvmax; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; - static void nni_ipc_pipe_send_cb(void *); static void nni_ipc_pipe_recv_cb(void *); static void nni_ipc_pipe_nego_cb(void *); static void nni_ipc_ep_cb(void *); - static int nni_ipc_tran_init(void) { return (0); } - static void nni_ipc_tran_fini(void) { } - static void nni_ipc_pipe_close(void *arg) { @@ -84,7 +80,6 @@ nni_ipc_pipe_close(void *arg) nni_plat_ipc_pipe_close(pipe->ipp); } - static void nni_ipc_pipe_fini(void *arg) { @@ -103,12 +98,11 @@ nni_ipc_pipe_fini(void *arg) NNI_FREE_STRUCT(pipe); } - static int nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) { nni_ipc_pipe *pipe; - int rv; + int rv; if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); @@ -129,10 +123,10 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) goto fail; } - pipe->proto = ep->proto; + pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - pipe->ipp = ipp; - pipe->addr = ep->addr; + pipe->ipp = ipp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -142,7 +136,6 @@ fail: return (rv); } - static void nni_ipc_cancel_start(nni_aio *aio) { @@ -156,13 +149,12 @@ nni_ipc_cancel_start(nni_aio *aio) nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_nego_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio *aio = &pipe->negaio; - int rv; + nni_aio * aio = &pipe->negaio; + int rv; nni_mtx_lock(&pipe->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -177,7 +169,7 @@ nni_ipc_pipe_nego_cb(void *arg) } if (pipe->gottxhead < pipe->wanttxhead) { - aio->a_niov = 1; + aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; // send it down... @@ -186,7 +178,7 @@ nni_ipc_pipe_nego_cb(void *arg) return; } if (pipe->gotrxhead < pipe->wantrxhead) { - aio->a_niov = 1; + aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; nni_plat_ipc_pipe_recv(pipe->ipp, aio); @@ -195,12 +187,9 @@ nni_ipc_pipe_nego_cb(void *arg) } // We have both sent and received the headers. Lets check the // receive side header. - if ((pipe->rxhead[0] != 0) || - (pipe->rxhead[1] != 'S') || - (pipe->rxhead[2] != 'P') || - (pipe->rxhead[3] != 0) || - (pipe->rxhead[6] != 0) || - (pipe->rxhead[7] != 0)) { + if ((pipe->rxhead[0] != 0) || (pipe->rxhead[1] != 'S') || + (pipe->rxhead[2] != 'P') || (pipe->rxhead[3] != 0) || + (pipe->rxhead[6] != 0) || (pipe->rxhead[7] != 0)) { rv = NNG_EPROTO; goto done; } @@ -215,14 +204,13 @@ done: nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio *aio; - int rv; - size_t len; + nni_aio * aio; + int rv; + size_t len; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { @@ -241,13 +229,12 @@ nni_ipc_pipe_send_cb(void *arg) nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_recv_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_aio *aio; - int rv; + nni_aio * aio; + int rv; nni_mtx_lock(&pipe->mtx); aio = pipe->user_rxaio; @@ -285,7 +272,7 @@ nni_ipc_pipe_recv_cb(void *arg) } // We should have gotten a message header. - NNI_GET64(pipe->rxhead+1, len); + NNI_GET64(pipe->rxhead + 1, len); // Make sure the message payload is not too big. If it is // the caller will shut down the pipe. @@ -312,7 +299,7 @@ nni_ipc_pipe_recv_cb(void *arg) // read the entire message now. pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - pipe->rxaio.a_niov = 1; + pipe->rxaio.a_niov = 1; nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); @@ -322,13 +309,12 @@ nni_ipc_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; - pipe->rxmsg = NULL; + aio->a_msg = pipe->rxmsg; + pipe->rxmsg = NULL; nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_cancel_tx(nni_aio *aio) { @@ -342,13 +328,12 @@ nni_ipc_cancel_tx(nni_aio *aio) nni_aio_stop(&pipe->txaio); } - static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - nni_msg *msg = aio->a_msg; - uint64_t len; + nni_msg * msg = aio->a_msg; + uint64_t len; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -360,22 +345,21 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->user_txaio = aio; - pipe->txhead[0] = 1; // message type, 1. + pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); pipe->txaio.a_iov[0].iov_buf = pipe->txhead; - pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txhead); + pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead); pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); - pipe->txaio.a_niov = 3; + pipe->txaio.a_niov = 3; nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_cancel_rx(nni_aio *aio) { @@ -389,7 +373,6 @@ nni_ipc_cancel_rx(nni_aio *aio) nni_aio_stop(&pipe->rxaio); } - static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { @@ -407,19 +390,18 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) // Schedule a read of the IPC header. pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead; - pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); - pipe->rxaio.a_niov = 1; + pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead); + pipe->rxaio.a_niov = 1; nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } - static void nni_ipc_pipe_start(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; - int rv; + int rv; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -429,12 +411,12 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; pipe->negaio.a_iov[0].iov_len = 8; pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0]; rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); @@ -446,7 +428,6 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); } - static uint16_t nni_ipc_pipe_peer(void *arg) { @@ -455,7 +436,6 @@ nni_ipc_pipe_peer(void *arg) return (pipe->peer); } - static int nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) { @@ -479,7 +459,6 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) return (NNG_ENOTSUP); } - static void nni_ipc_ep_fini(void *arg) { @@ -491,14 +470,13 @@ nni_ipc_ep_fini(void *arg) NNI_FREE_STRUCT(ep); } - static int nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_ipc_ep *ep; - int rv; + int rv; - if ((strlen(url) > NNG_MAXADDRLEN-1) || + if ((strlen(url) > NNG_MAXADDRLEN - 1) || (strncmp(url, "ipc://", strlen("ipc://")) != 0)) { return (NNG_EADDRINVAL); } @@ -513,15 +491,14 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) return (rv); } ep->closed = 0; - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + (void) snprintf(ep->addr, sizeof(ep->addr), "%s", url); *epp = ep; return (0); } - static void nni_ipc_ep_close(void *arg) { @@ -530,7 +507,6 @@ nni_ipc_ep_close(void *arg) nni_plat_ipc_ep_close(ep->iep); } - static int nni_ipc_ep_bind(void *arg) { @@ -539,13 +515,12 @@ nni_ipc_ep_bind(void *arg) return (nni_plat_ipc_ep_listen(ep->iep)); } - static void nni_ipc_ep_finish(nni_ipc_ep *ep) { - nni_aio *aio = ep->user_aio; + nni_aio * aio = ep->user_aio; nni_ipc_pipe *pipe; - int rv; + int rv; if ((aio = ep->user_aio) == NULL) { return; @@ -570,7 +545,6 @@ done: nni_aio_finish(aio, rv, 0); } - static void nni_ipc_ep_cb(void *arg) { @@ -581,7 +555,6 @@ nni_ipc_ep_cb(void *arg) nni_mtx_unlock(&ep->mtx); } - static void nni_ipc_cancel_ep(nni_aio *aio) { @@ -595,12 +568,11 @@ nni_ipc_cancel_ep(nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static void nni_ipc_ep_accept(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; + int rv; nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); @@ -617,12 +589,11 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static void nni_ipc_ep_connect(void *arg, nni_aio *aio) { nni_ipc_ep *ep = arg; - int rv; + int rv; nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); @@ -639,34 +610,33 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static nni_tran_pipe nni_ipc_pipe_ops = { - .p_fini = nni_ipc_pipe_fini, - .p_start = nni_ipc_pipe_start, - .p_send = nni_ipc_pipe_send, - .p_recv = nni_ipc_pipe_recv, - .p_close = nni_ipc_pipe_close, - .p_peer = nni_ipc_pipe_peer, - .p_getopt = nni_ipc_pipe_getopt, + .p_fini = nni_ipc_pipe_fini, + .p_start = nni_ipc_pipe_start, + .p_send = nni_ipc_pipe_send, + .p_recv = nni_ipc_pipe_recv, + .p_close = nni_ipc_pipe_close, + .p_peer = nni_ipc_pipe_peer, + .p_getopt = nni_ipc_pipe_getopt, }; static nni_tran_ep nni_ipc_ep_ops = { - .ep_init = nni_ipc_ep_init, - .ep_fini = nni_ipc_ep_fini, - .ep_connect = nni_ipc_ep_connect, - .ep_bind = nni_ipc_ep_bind, - .ep_accept = nni_ipc_ep_accept, - .ep_close = nni_ipc_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_ipc_ep_init, + .ep_fini = nni_ipc_ep_fini, + .ep_connect = nni_ipc_ep_connect, + .ep_bind = nni_ipc_ep_bind, + .ep_accept = nni_ipc_ep_accept, + .ep_close = nni_ipc_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the IPC transport linkage, and should be the only global // symbol in this entire file. struct nni_tran nni_ipc_tran = { - .tran_scheme = "ipc", - .tran_ep = &nni_ipc_ep_ops, - .tran_pipe = &nni_ipc_pipe_ops, - .tran_init = nni_ipc_tran_init, - .tran_fini = nni_ipc_tran_fini, + .tran_scheme = "ipc", + .tran_ep = &nni_ipc_ep_ops, + .tran_pipe = &nni_ipc_pipe_ops, + .tran_init = nni_ipc_tran_init, + .tran_fini = nni_ipc_tran_fini, }; diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 4b0f77f1..e60a1b1d 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -7,56 +7,55 @@ // found online at https://opensource.org/licenses/MIT. // +#include <stdio.h> #include <stdlib.h> #include <string.h> -#include <stdio.h> #include "core/nng_impl.h" // TCP transport. Platform specific TCP operations must be // supplied as well. -typedef struct nni_tcp_pipe nni_tcp_pipe; -typedef struct nni_tcp_ep nni_tcp_ep; +typedef struct nni_tcp_pipe nni_tcp_pipe; +typedef struct nni_tcp_ep nni_tcp_ep; // nni_tcp_pipe is one end of a TCP connection. struct nni_tcp_pipe { - const char * addr; - nni_plat_tcp_pipe * tpp; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - - nni_aio * user_txaio; - nni_aio * user_rxaio; - nni_aio * user_negaio; - - uint8_t txlen[sizeof (uint64_t)]; - uint8_t rxlen[sizeof (uint64_t)]; - int gottxhead; - int gotrxhead; - int wanttxhead; - int wantrxhead; - nni_aio txaio; - nni_aio rxaio; - nni_aio negaio; - nni_msg * rxmsg; - nni_mtx mtx; + const char * addr; + nni_plat_tcp_pipe *tpp; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + + nni_aio *user_txaio; + nni_aio *user_rxaio; + nni_aio *user_negaio; + + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + int gottxhead; + int gotrxhead; + int wanttxhead; + int wantrxhead; + nni_aio txaio; + nni_aio rxaio; + nni_aio negaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct nni_tcp_ep { - char addr[NNG_MAXADDRLEN+1]; - nni_plat_tcp_ep * tep; - int closed; - uint16_t proto; - size_t rcvmax; - int ipv4only; - nni_aio aio; - nni_aio * user_aio; - nni_mtx mtx; + char addr[NNG_MAXADDRLEN + 1]; + nni_plat_tcp_ep *tep; + int closed; + uint16_t proto; + size_t rcvmax; + int ipv4only; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; - static void nni_tcp_pipe_send_cb(void *); static void nni_tcp_pipe_recv_cb(void *); static void nni_tcp_pipe_nego_cb(void *); @@ -68,13 +67,11 @@ nni_tcp_tran_init(void) return (0); } - static void nni_tcp_tran_fini(void) { } - static void nni_tcp_pipe_close(void *arg) { @@ -83,7 +80,6 @@ nni_tcp_pipe_close(void *arg) nni_plat_tcp_pipe_close(pipe->tpp); } - static void nni_tcp_pipe_fini(void *arg) { @@ -102,12 +98,11 @@ nni_tcp_pipe_fini(void *arg) NNI_FREE_STRUCT(pipe); } - static int nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *pipe; - int rv; + int rv; if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); @@ -127,10 +122,10 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) if (rv != 0) { goto fail; } - pipe->proto = ep->proto; + pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - pipe->tpp = tpp; - pipe->addr = ep->addr; + pipe->tpp = tpp; + pipe->addr = ep->addr; *pipep = pipe; return (0); @@ -140,7 +135,6 @@ fail: return (rv); } - static void nni_tcp_cancel_nego(nni_aio *aio) { @@ -154,13 +148,12 @@ nni_tcp_cancel_nego(nni_aio *aio) nni_mtx_unlock(&pipe->mtx); } - static void nni_tcp_pipe_nego_cb(void *arg) { nni_tcp_pipe *pipe = arg; - nni_aio *aio = &pipe->negaio; - int rv; + nni_aio * aio = &pipe->negaio; + int rv; nni_mtx_lock(&pipe->mtx); if ((rv = nni_aio_result(aio)) != 0) { @@ -175,7 +168,7 @@ nni_tcp_pipe_nego_cb(void *arg) } if (pipe->gottxhead < pipe->wanttxhead) { - aio->a_niov = 1; + aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead]; // send it down... @@ -184,7 +177,7 @@ nni_tcp_pipe_nego_cb(void *arg) return; } if (pipe->gotrxhead < pipe->wantrxhead) { - aio->a_niov = 1; + aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead]; nni_plat_tcp_pipe_recv(pipe->tpp, aio); @@ -193,12 +186,9 @@ nni_tcp_pipe_nego_cb(void *arg) } // We have both sent and received the headers. Lets check the // receive side header. - if ((pipe->rxlen[0] != 0) || - (pipe->rxlen[1] != 'S') || - (pipe->rxlen[2] != 'P') || - (pipe->rxlen[3] != 0) || - (pipe->rxlen[6] != 0) || - (pipe->rxlen[7] != 0)) { + if ((pipe->rxlen[0] != 0) || (pipe->rxlen[1] != 'S') || + (pipe->rxlen[2] != 'P') || (pipe->rxlen[3] != 0) || + (pipe->rxlen[6] != 0) || (pipe->rxlen[7] != 0)) { rv = NNG_EPROTO; goto done; } @@ -213,14 +203,13 @@ done: nni_mtx_unlock(&pipe->mtx); } - static void nni_tcp_pipe_send_cb(void *arg) { nni_tcp_pipe *pipe = arg; - int rv; - nni_aio *aio; - size_t len; + int rv; + nni_aio * aio; + size_t len; nni_mtx_lock(&pipe->mtx); if ((aio = pipe->user_txaio) == NULL) { @@ -240,13 +229,12 @@ nni_tcp_pipe_send_cb(void *arg) nni_mtx_unlock(&pipe->mtx); } - static void nni_tcp_pipe_recv_cb(void *arg) { nni_tcp_pipe *pipe = arg; - nni_aio *aio; - int rv; + nni_aio * aio; + int rv; nni_mtx_lock(&pipe->mtx); @@ -298,7 +286,7 @@ nni_tcp_pipe_recv_cb(void *arg) // read the entire message now. pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); - pipe->rxaio.a_niov = 1; + pipe->rxaio.a_niov = 1; nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); @@ -308,13 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg) // Otherwise we got a message read completely. Let the user know the // good news. pipe->user_rxaio = NULL; - aio->a_msg = pipe->rxmsg; - pipe->rxmsg = NULL; + aio->a_msg = pipe->rxmsg; + pipe->rxmsg = NULL; nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); nni_mtx_unlock(&pipe->mtx); } - static void nni_tcp_cancel_tx(nni_aio *aio) { @@ -328,13 +315,12 @@ nni_tcp_cancel_tx(nni_aio *aio) nni_aio_stop(&pipe->txaio); } - static void nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *pipe = arg; - nni_msg *msg = aio->a_msg; - uint64_t len; + nni_msg * msg = aio->a_msg; + uint64_t len; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -350,18 +336,17 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) NNI_PUT64(pipe->txlen, len); pipe->txaio.a_iov[0].iov_buf = pipe->txlen; - pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txlen); + pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txlen); pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); - pipe->txaio.a_niov = 3; + pipe->txaio.a_niov = 3; nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } - static void nni_tcp_cancel_rx(nni_aio *aio) { @@ -375,7 +360,6 @@ nni_tcp_cancel_rx(nni_aio *aio) nni_aio_stop(&pipe->rxaio); } - static void nni_tcp_pipe_recv(void *arg, nni_aio *aio) { @@ -393,14 +377,13 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) // Schedule a read of the TCP header. pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen; - pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); - pipe->rxaio.a_niov = 1; + pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxlen); + pipe->rxaio.a_niov = 1; nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } - static uint16_t nni_tcp_pipe_peer(void *arg) { @@ -409,7 +392,6 @@ nni_tcp_pipe_peer(void *arg) return (pipe->peer); } - static int nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) { @@ -433,14 +415,13 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) return (NNG_ENOTSUP); } - static int nni_tcp_parse_pair(char *pair, char **hostp, char **servp) { char *host, *serv, *end; if (pair[0] == '[') { - host = pair+1; + host = pair + 1; // IP address enclosed ... for IPv6 usually. if ((end = strchr(host, ']')) == NULL) { return (NNG_EADDRINVAL); @@ -478,14 +459,13 @@ nni_tcp_parse_pair(char *pair, char **hostp, char **servp) return (0); } - // Note that the url *must* be in a modifiable buffer. int -nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2, - char **serv2) +nni_tcp_parse_url( + char *url, char **host1, char **serv1, char **host2, char **serv2) { char *h1; - int rv; + int rv; if (strncmp(url, "tcp://", strlen("tcp://")) != 0) { return (NNG_EADDRINVAL); @@ -516,7 +496,6 @@ nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2, return (0); } - static void nni_tcp_pipe_start(void *arg, nni_aio *aio) { @@ -530,12 +509,12 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txlen[4], pipe->proto); NNI_PUT16(&pipe->txlen[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - pipe->negaio.a_niov = 1; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + pipe->negaio.a_niov = 1; pipe->negaio.a_iov[0].iov_len = 8; pipe->negaio.a_iov[0].iov_buf = &pipe->txlen[0]; if (nni_aio_start(aio, nni_tcp_cancel_nego, pipe) != 0) { @@ -546,7 +525,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); } - static void nni_tcp_ep_fini(void *arg) { @@ -560,12 +538,11 @@ nni_tcp_ep_fini(void *arg) NNI_FREE_STRUCT(ep); } - static int nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_tcp_ep *ep; - int rv; + int rv; if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -577,15 +554,14 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) return (rv); } ep->closed = 0; - ep->proto = nni_sock_proto(sock); + ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + (void) snprintf(ep->addr, sizeof(ep->addr), "%s", url); *epp = ep; return (0); } - static void nni_tcp_ep_close(void *arg) { @@ -594,7 +570,6 @@ nni_tcp_ep_close(void *arg) nni_plat_tcp_ep_close(ep->tep); } - static int nni_tcp_ep_bind(void *arg) { @@ -603,13 +578,12 @@ nni_tcp_ep_bind(void *arg) return (nni_plat_tcp_ep_listen(ep->tep)); } - static void nni_tcp_ep_finish(nni_tcp_ep *ep) { - nni_aio *aio = ep->user_aio; + nni_aio * aio = ep->user_aio; nni_tcp_pipe *pipe; - int rv; + int rv; if ((aio = ep->user_aio) == NULL) { return; @@ -634,7 +608,6 @@ done: nni_aio_finish(aio, rv, 0); } - static void nni_tcp_ep_cb(void *arg) { @@ -645,7 +618,6 @@ nni_tcp_ep_cb(void *arg) nni_mtx_unlock(&ep->mtx); } - static void nni_tcp_cancel_ep(nni_aio *aio) { @@ -659,12 +631,11 @@ nni_tcp_cancel_ep(nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static void nni_tcp_ep_accept(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; - int rv; + int rv; nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); @@ -681,12 +652,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static void nni_tcp_ep_connect(void *arg, nni_aio *aio) { nni_tcp_ep *ep = arg; - int rv; + int rv; nni_mtx_lock(&ep->mtx); NNI_ASSERT(ep->user_aio == NULL); @@ -703,34 +673,33 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) nni_mtx_unlock(&ep->mtx); } - static nni_tran_pipe nni_tcp_pipe_ops = { - .p_fini = nni_tcp_pipe_fini, - .p_start = nni_tcp_pipe_start, - .p_send = nni_tcp_pipe_send, - .p_recv = nni_tcp_pipe_recv, - .p_close = nni_tcp_pipe_close, - .p_peer = nni_tcp_pipe_peer, - .p_getopt = nni_tcp_pipe_getopt, + .p_fini = nni_tcp_pipe_fini, + .p_start = nni_tcp_pipe_start, + .p_send = nni_tcp_pipe_send, + .p_recv = nni_tcp_pipe_recv, + .p_close = nni_tcp_pipe_close, + .p_peer = nni_tcp_pipe_peer, + .p_getopt = nni_tcp_pipe_getopt, }; static nni_tran_ep nni_tcp_ep_ops = { - .ep_init = nni_tcp_ep_init, - .ep_fini = nni_tcp_ep_fini, - .ep_connect = nni_tcp_ep_connect, - .ep_bind = nni_tcp_ep_bind, - .ep_accept = nni_tcp_ep_accept, - .ep_close = nni_tcp_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_tcp_ep_init, + .ep_fini = nni_tcp_ep_fini, + .ep_connect = nni_tcp_ep_connect, + .ep_bind = nni_tcp_ep_bind, + .ep_accept = nni_tcp_ep_accept, + .ep_close = nni_tcp_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the TCP transport linkage, and should be the only global // symbol in this entire file. struct nni_tran nni_tcp_tran = { - .tran_scheme = "tcp", - .tran_ep = &nni_tcp_ep_ops, - .tran_pipe = &nni_tcp_pipe_ops, - .tran_init = nni_tcp_tran_init, - .tran_fini = nni_tcp_tran_fini, + .tran_scheme = "tcp", + .tran_ep = &nni_tcp_ep_ops, + .tran_pipe = &nni_tcp_pipe_ops, + .tran_init = nni_tcp_tran_init, + .tran_fini = nni_tcp_tran_fini, }; |
