diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/platform.h | 16 | ||||
| -rw-r--r-- | src/platform/posix/posix_aiothr.c | 16 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 9 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipc.c | 78 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 7 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 248 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 3 |
7 files changed, 265 insertions, 112 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index e6eac8e3..08fcdb1c 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -216,7 +216,7 @@ extern int nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *); // nni_plat_ipc_init initializes the socket, for example it can // set underlying file descriptors to -1, etc. -extern int nni_plat_ipc_init(nni_plat_ipcsock *); +extern int nni_plat_ipc_init(nni_plat_ipcsock **); // nni_plat_ipc_fini just closes an IPC socket, and releases any related // resources. @@ -241,15 +241,15 @@ extern int nni_plat_ipc_accept(nni_plat_ipcsock *, nni_plat_ipcsock *); // nni_plat_ipc_connect is the client side. extern int nni_plat_ipc_connect(nni_plat_ipcsock *, const char *); -// nni_plat_ipc_send sends data to the peer. The platform is responsible +// nni_plat_ipc_aio_send sends data to the peer. The platform is responsible // for attempting to send all of the data. The iov count will never be // larger than 4. The platform may modify the iovs. -extern int nni_plat_ipc_send(nni_plat_ipcsock *, nni_iov *, int); +extern int nni_plat_ipc_aio_send(nni_plat_ipcsock *, nni_aio *); -// nni_plat_ipc_recv recvs data into the buffers provided by the +// nni_plat_ipc_aio_recv recvs data into the buffers provided by the // iovs. The implementation does not return until the iovs are completely // full, or an error condition occurs. -extern int nni_plat_ipc_recv(nni_plat_ipcsock *, nni_iov *, int); +extern int nni_plat_ipc_aio_recv(nni_plat_ipcsock *, nni_aio *); // nni_plat_seed_prng seeds the PRNG subsystem. The specified number // of bytes of entropy should be stashed. When possible, cryptographic @@ -281,6 +281,12 @@ extern void nni_plat_pipe_clear(int); // routine. extern void nni_plat_pipe_close(int, int); +// XXX: Stuff to REMOVE +extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int); +extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int); +extern int nni_plat_ipc_send(nni_plat_ipcsock *, nni_iov *, int); +extern int nni_plat_ipc_recv(nni_plat_ipcsock *, nni_iov *, int); + // Actual platforms we support. This is included up front so that we can // get the specific types that are supplied by the platform. #if defined(PLATFORM_POSIX) diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c index 2c11dcb2..a01fa194 100644 --- a/src/platform/posix/posix_aiothr.c +++ b/src/platform/posix/posix_aiothr.c @@ -239,14 +239,16 @@ nni_posix_aioq_start(nni_posix_aioq *q) static void nni_posix_aioq_fini(nni_posix_aioq *q) { - nni_mtx_lock(&q->aq_lk); - q->aq_fd = -1; - nni_cv_wake(&q->aq_cv); - nni_mtx_unlock(&q->aq_lk); + if (q->aq_fd > 0) { + nni_mtx_lock(&q->aq_lk); + q->aq_fd = -1; + nni_cv_wake(&q->aq_cv); + nni_mtx_unlock(&q->aq_lk); - nni_thr_fini(&q->aq_thr); - nni_cv_fini(&q->aq_cv); - nni_mtx_fini(&q->aq_lk); + nni_thr_fini(&q->aq_thr); + nni_cv_fini(&q->aq_cv); + nni_mtx_fini(&q->aq_lk); + } } diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 5da18323..dea09fa1 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -35,15 +35,6 @@ extern int nni_plat_errno(int); #endif - -#ifdef PLATFORM_POSIX_IPC -struct nni_plat_ipcsock { - int fd; - int devnull; // used for shutting down blocking accept() - char * unlink; // path to unlink at termination -}; -#endif - // Define types that this platform uses. #ifdef PLATFORM_POSIX_THREAD diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c index 7a9c6d52..e75edeca 100644 --- a/src/platform/posix/posix_ipc.c +++ b/src/platform/posix/posix_ipc.c @@ -10,6 +10,7 @@ #include "core/nng_impl.h" #ifdef PLATFORM_POSIX_IPC +#include "platform/posix/posix_aio.h" #include <errno.h> #include <stdlib.h> @@ -29,6 +30,13 @@ #undef sun #endif +struct nni_plat_ipcsock { + int fd; + int devnull; // for shutting down accept() + char * unlink; // path to unlink at fini + nni_posix_aio_pipe aiop; +}; + #ifdef SOCK_CLOEXEC #define NNI_IPC_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC) #else @@ -59,6 +67,20 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path) int +nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio) +{ + return (nni_posix_aio_write(&isp->aiop, aio)); +} + + +int +nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio) +{ + return (nni_posix_aio_read(&isp->aiop, aio)); +} + + +int nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt) { struct iovec iov[4]; // We never have more than 3 at present @@ -178,36 +200,46 @@ nni_plat_ipc_setopts(int fd) int -nni_plat_ipc_init(nni_plat_ipcsock *s) +nni_plat_ipc_init(nni_plat_ipcsock **ispp) { - s->fd = -1; + nni_plat_ipcsock *isp; + + if ((isp = NNI_ALLOC_STRUCT(isp)) == NULL) { + return (NNG_ENOMEM); + } + isp->fd = -1; + *ispp = isp; return (0); } void -nni_plat_ipc_fini(nni_plat_ipcsock *s) +nni_plat_ipc_fini(nni_plat_ipcsock *isp) { - if (s->fd != -1) { - (void) close(s->fd); - s->fd = -1; + if (isp->fd != -1) { + (void) close(isp->fd); + isp->fd = -1; } - if (s->unlink != NULL) { - (void) unlink(s->unlink); - nni_free(s->unlink, strlen(s->unlink) + 1); + if (isp->unlink != NULL) { + (void) unlink(isp->unlink); + nni_free(isp->unlink, strlen(isp->unlink) + 1); } + + nni_posix_aio_pipe_fini(&isp->aiop); + + NNI_FREE_STRUCT(isp); } void -nni_plat_ipc_shutdown(nni_plat_ipcsock *s) +nni_plat_ipc_shutdown(nni_plat_ipcsock *isp) { - if (s->fd != -1) { - (void) shutdown(s->fd, SHUT_RDWR); + if (isp->fd != -1) { + (void) shutdown(isp->fd, SHUT_RDWR); // This causes the equivalent of a close. Hopefully waking // up anything that didn't get the hint with the shutdown. // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); + (void) dup2(nni_plat_devnull, isp->fd); } } @@ -278,7 +310,7 @@ nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path) int -nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) +nni_plat_ipc_connect(nni_plat_ipcsock *isp, const char *path) { int fd; int len; @@ -305,15 +337,22 @@ nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path) } return (rv); } - s->fd = fd; + + if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) { + (void) close(fd); + return (rv); + } + + isp->fd = fd; return (0); } int -nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) +nni_plat_ipc_accept(nni_plat_ipcsock *isp, nni_plat_ipcsock *server) { int fd; + int rv; for (;;) { #ifdef NNG_USE_ACCEPT4 @@ -341,7 +380,12 @@ nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server) nni_plat_ipc_setopts(fd); - s->fd = fd; + if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) { + (void) close(fd); + return (rv); + } + + isp->fd = fd; return (0); } diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index c7651655..94dc2667 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -8,9 +8,9 @@ // #include "core/nng_impl.h" -#include "platform/posix/posix_aio.h" #ifdef PLATFORM_POSIX_NET +#include "platform/posix/posix_aio.h" #include <errno.h> #include <stdlib.h> @@ -31,13 +31,11 @@ #define NNI_TCP_SOCKTYPE SOCK_STREAM #endif -#ifdef PLATFORM_POSIX_NET struct nni_plat_tcpsock { int fd; - int devnull; // used for shutting down blocking accept() + int devnull; // for shutting down accept() nni_posix_aio_pipe aiop; }; -#endif static int nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) @@ -284,6 +282,7 @@ nni_plat_tcp_fini(nni_plat_tcpsock *tsp) (void) close(tsp->fd); tsp->fd = -1; } + nni_posix_aio_pipe_fini(&tsp->aiop); NNI_FREE_STRUCT(tsp); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 46003487..7ffea62d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -23,20 +23,33 @@ typedef struct nni_ipc_ep nni_ipc_ep; // nni_ipc_pipe is one end of an IPC connection. struct nni_ipc_pipe { const char * addr; - nni_plat_ipcsock fd; + nni_plat_ipcsock * isp; uint16_t peer; uint16_t proto; size_t rcvmax; + + uint8_t txhead[1+sizeof (uint64_t)]; + uint8_t rxhead[1+sizeof (uint64_t)]; + + nni_aio * user_txaio; + nni_aio * user_rxaio; + nni_aio txaio; + nni_aio rxaio; + nni_msg * rxmsg; }; struct nni_ipc_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_ipcsock fd; + nni_plat_ipcsock * isp; int closed; uint16_t proto; size_t rcvmax; }; + +static void nni_ipc_pipe_send_cb(void *); +static void nni_ipc_pipe_recv_cb(void *); + static int nni_ipc_tran_init(void) { @@ -55,7 +68,7 @@ nni_ipc_pipe_close(void *arg) { nni_ipc_pipe *pipe = arg; - nni_plat_ipc_shutdown(&pipe->fd); + nni_plat_ipc_shutdown(pipe->isp); } @@ -68,7 +81,21 @@ nni_ipc_pipe_init(void **argp) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) { + if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe); + if (rv != 0) { + nni_plat_ipc_fini(pipe->isp); + NNI_FREE_STRUCT(pipe); + return (rv); + } + + rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe); + if (rv != 0) { + nni_aio_fini(&pipe->txaio); + nni_plat_ipc_fini(pipe->isp); NNI_FREE_STRUCT(pipe); return (rv); } @@ -82,81 +109,164 @@ nni_ipc_pipe_fini(void *arg) { nni_ipc_pipe *pipe = arg; - nni_plat_ipc_fini(&pipe->fd); + if (pipe->rxmsg) { + nni_msg_free(pipe->rxmsg); + } + nni_aio_fini(&pipe->rxaio); + nni_aio_fini(&pipe->txaio); + nni_plat_ipc_fini(pipe->isp); NNI_FREE_STRUCT(pipe); } -static int -nni_ipc_pipe_send(void *arg, nni_msg *msg) +static void +nni_ipc_pipe_send_cb(void *arg) { nni_ipc_pipe *pipe = arg; - uint64_t len; - uint8_t buf[sizeof (len)]; - nni_iov iov[4]; + nni_aio *aio; int rv; - uint8_t msgtype; - - msgtype = 1; // "inband", the only defined option at present + size_t len; - iov[0].iov_buf = &msgtype; - iov[0].iov_len = 1; - iov[1].iov_buf = buf; - iov[1].iov_len = sizeof (buf); - iov[2].iov_buf = nni_msg_header(msg); - iov[2].iov_len = nni_msg_header_len(msg); - iov[3].iov_buf = nni_msg_body(msg); - iov[3].iov_len = nni_msg_len(msg); + if ((aio = pipe->user_txaio) == NULL) { + NNI_ASSERT(aio != NULL); + return; + } + pipe->user_txaio = NULL; + if ((rv = nni_aio_result(&pipe->txaio)) != 0) { + nni_aio_finish(aio, rv, 0); + return; + } - len = (uint64_t) iov[2].iov_len + (uint64_t) iov[3].iov_len; - NNI_PUT64(buf, len); + len = nni_msg_len(aio->a_msg); + nni_msg_free(aio->a_msg); + aio->a_msg = NULL; - if ((rv = nni_plat_ipc_send(&pipe->fd, iov, 4)) == 0) { - nni_msg_free(msg); - } - return (rv); + nni_aio_finish(aio, 0, len); } -static int -nni_ipc_pipe_recv(void *arg, nni_msg **msgp) +static void +nni_ipc_pipe_recv_cb(void *arg) { nni_ipc_pipe *pipe = arg; - nni_msg *msg; - uint64_t len; - uint8_t buf[sizeof (len)]; - nni_iov iov[2]; + nni_aio *aio; int rv; - uint8_t msgtype; - iov[0].iov_buf = &msgtype; - iov[0].iov_len = 1; - iov[1].iov_buf = buf; - iov[1].iov_len = sizeof (buf); - if ((rv = nni_plat_ipc_recv(&pipe->fd, iov, 2)) != 0) { - return (rv); - } - if (msgtype != 1) { - return (NNG_EPROTO); - } - NNI_GET64(buf, len); - if (len > pipe->rcvmax) { - return (NNG_EMSGSIZE); + aio = pipe->user_rxaio; + if (aio == NULL) { + // This should never ever happen. + NNI_ASSERT(aio != NULL); + return; } - if ((rv = nng_msg_alloc(&msg, (size_t) len)) != 0) { - return (rv); + if ((rv = nni_aio_result(&pipe->rxaio)) != 0) { + // Error on receive. This has to cause an error back + // to the user. Also, if we had allocated an rxmsg, lets + // toss it. + if (pipe->rxmsg != NULL) { + nni_msg_free(pipe->rxmsg); + pipe->rxmsg = NULL; + } + pipe->user_rxaio = NULL; + nni_aio_finish(aio, rv, 0); + return; } - iov[0].iov_len = nng_msg_len(msg); - iov[0].iov_buf = nng_msg_body(msg); + // If we don't have a message yet, we were reading the TCP message + // header, which is just the length. This tells us the size of the + // message to allocate and how much more to expect. + if (pipe->rxmsg == NULL) { + uint64_t len; - if ((rv = nni_plat_ipc_recv(&pipe->fd, iov, 1)) == 0) { - *msgp = msg; - } else { - nni_msg_free(msg); + // Check to make sure we got msg type 1. + if (pipe->rxhead[0] != 1) { + nni_aio_finish(aio, NNG_EPROTO, 0); + return; + } + + // We should have gotten a message header. + NNI_GET64(pipe->rxhead+1, len); + + // Make sure the message payload is not too big. If it is + // the caller will shut down the pipe. + if (len > pipe->rcvmax) { + pipe->user_rxaio = NULL; + nni_aio_finish(aio, NNG_EMSGSIZE, 0); + return; + } + + if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + pipe->user_rxaio = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + + // Submit the rest of the data for a read -- we want to + // read the entire message now. + pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); + pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); + pipe->rxaio.a_niov = 1; + + rv = nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio); + if (rv != 0) { + pipe->user_rxaio = NULL; + nni_msg_free(pipe->rxmsg); + pipe->rxmsg = NULL; + nni_aio_finish(aio, rv, 0); + return; + } + return; } - return (rv); + + // Otherwise we got a message read completely. Let the user know the + // good news. + pipe->user_rxaio = NULL; + aio->a_msg = pipe->rxmsg; + pipe->rxmsg = NULL; + nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg)); +} + + +static int +nni_ipc_pipe_aio_send(void *arg, nni_aio *aio) +{ + nni_ipc_pipe *pipe = arg; + nni_msg *msg = aio->a_msg; + uint64_t len; + + pipe->user_txaio = aio; + + pipe->txhead[0] = 1; // message type, 1. + len = nni_msg_len(msg) + nni_msg_header_len(msg); + NNI_PUT64(pipe->txhead + 1, len); + + pipe->txaio.a_iov[0].iov_buf = pipe->txhead; + pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txhead); + pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg); + pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg); + pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg); + pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); + pipe->txaio.a_niov = 3; + + return (nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio)); +} + + +static int +nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio) +{ + nni_ipc_pipe *pipe = arg; + + pipe->user_rxaio = aio; + + NNI_ASSERT(pipe->rxmsg == NULL); + + // Schedule a read of the IPC header. + pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead; + pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead); + pipe->rxaio.a_niov = 1; + + return (nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio)); } @@ -208,7 +318,7 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) ep->closed = 0; ep->proto = nni_sock_proto(sock); ep->rcvmax = nni_sock_rcvmaxsz(sock); - if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) { + if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) { NNI_FREE_STRUCT(ep); return (rv); } @@ -225,7 +335,7 @@ nni_ipc_ep_fini(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_fini(&ep->fd); + nni_plat_ipc_fini(ep->isp); NNI_FREE_STRUCT(ep); } @@ -235,7 +345,7 @@ nni_ipc_ep_close(void *arg) { nni_ipc_ep *ep = arg; - nni_plat_ipc_shutdown(&ep->fd); + nni_plat_ipc_shutdown(ep->isp); } @@ -256,13 +366,13 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe) iov.iov_buf = buf; iov.iov_len = 8; - if ((rv = nni_plat_ipc_send(&pipe->fd, &iov, 1)) != 0) { + if ((rv = nni_plat_ipc_send(pipe->isp, &iov, 1)) != 0) { return (rv); } iov.iov_buf = buf; iov.iov_len = 8; - if ((rv = nni_plat_ipc_recv(&pipe->fd, &iov, 1)) != 0) { + if ((rv = nni_plat_ipc_recv(pipe->isp, &iov, 1)) != 0) { return (rv); } @@ -293,15 +403,13 @@ nni_ipc_ep_connect(void *arg, void *pipearg) pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - rv = nni_plat_ipc_connect(&pipe->fd, path); + rv = nni_plat_ipc_connect(pipe->isp, path); if (rv != 0) { - nni_plat_ipc_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); return (rv); } if ((rv = nni_ipc_negotiate(pipe)) != 0) { - nni_plat_ipc_shutdown(&pipe->fd); + nni_plat_ipc_shutdown(pipe->isp); return (rv); } return (0); @@ -321,7 +429,7 @@ nni_ipc_ep_bind(void *arg) } path = ep->addr + strlen("ipc://"); - if ((rv = nni_plat_ipc_listen(&ep->fd, path)) != 0) { + if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) { return (rv); } return (0); @@ -338,13 +446,11 @@ nni_ipc_ep_accept(void *arg, void *pipearg) pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; - if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) { - nni_plat_ipc_fini(&pipe->fd); - NNI_FREE_STRUCT(pipe); + if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) { return (rv); } if ((rv = nni_ipc_negotiate(pipe)) != 0) { - nni_plat_ipc_shutdown(&pipe->fd); + nni_plat_ipc_shutdown(pipe->isp); return (rv); } return (0); @@ -354,6 +460,8 @@ nni_ipc_ep_accept(void *arg, void *pipearg) static nni_tran_pipe nni_ipc_pipe_ops = { .p_init = nni_ipc_pipe_init, .p_fini = nni_ipc_pipe_fini, + .p_aio_send = nni_ipc_pipe_aio_send, + .p_aio_recv = nni_ipc_pipe_aio_recv, .p_close = nni_ipc_pipe_close, .p_peer = nni_ipc_pipe_peer, .p_getopt = nni_ipc_pipe_getopt, diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 1cf64768..7e712cc8 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -107,6 +107,9 @@ nni_tcp_pipe_fini(void *arg) { nni_tcp_pipe *pipe = arg; + if (pipe->rxmsg) { + nni_msg_free(pipe->rxmsg); + } nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_plat_tcp_fini(pipe->tsp); |
