aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-29 17:05:08 -0700
committerGarrett D'Amore <garrett@damore.org>2017-03-29 17:05:08 -0700
commit86eaf052cc535658783dd5c3d5925f58fd70f983 (patch)
treee7675f617b749a5a4c5aeb8ca00e0b9be6f84e2a /src
parent84990c6ecb35ef322b74b8cc9e74ad5964b66ee5 (diff)
downloadnng-86eaf052cc535658783dd5c3d5925f58fd70f983.tar.gz
nng-86eaf052cc535658783dd5c3d5925f58fd70f983.tar.bz2
nng-86eaf052cc535658783dd5c3d5925f58fd70f983.zip
IPC send/recv works asynchronously for POSIX.
As with TCP, we're still using threads under the hood. But this completes the send/recv logic conversion for POSIX to our AIO framework, and hence represents a substantial milestone towards full asyncronous operation. We still need to do accept/connect operations asynchronously, then making. Windows overlapped IO work properly. After that, poll/epoll/kqueue, etc.
Diffstat (limited to 'src')
-rw-r--r--src/core/platform.h16
-rw-r--r--src/platform/posix/posix_aiothr.c16
-rw-r--r--src/platform/posix/posix_impl.h9
-rw-r--r--src/platform/posix/posix_ipc.c78
-rw-r--r--src/platform/posix/posix_net.c7
-rw-r--r--src/transport/ipc/ipc.c248
-rw-r--r--src/transport/tcp/tcp.c3
7 files changed, 265 insertions, 112 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index e6eac8e3..08fcdb1c 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -216,7 +216,7 @@ extern int nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
// nni_plat_ipc_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern int nni_plat_ipc_init(nni_plat_ipcsock *);
+extern int nni_plat_ipc_init(nni_plat_ipcsock **);
// nni_plat_ipc_fini just closes an IPC socket, and releases any related
// resources.
@@ -241,15 +241,15 @@ extern int nni_plat_ipc_accept(nni_plat_ipcsock *, nni_plat_ipcsock *);
// nni_plat_ipc_connect is the client side.
extern int nni_plat_ipc_connect(nni_plat_ipcsock *, const char *);
-// nni_plat_ipc_send sends data to the peer. The platform is responsible
+// nni_plat_ipc_aio_send sends data to the peer. The platform is responsible
// for attempting to send all of the data. The iov count will never be
// larger than 4. The platform may modify the iovs.
-extern int nni_plat_ipc_send(nni_plat_ipcsock *, nni_iov *, int);
+extern int nni_plat_ipc_aio_send(nni_plat_ipcsock *, nni_aio *);
-// nni_plat_ipc_recv recvs data into the buffers provided by the
+// nni_plat_ipc_aio_recv recvs data into the buffers provided by the
// iovs. The implementation does not return until the iovs are completely
// full, or an error condition occurs.
-extern int nni_plat_ipc_recv(nni_plat_ipcsock *, nni_iov *, int);
+extern int nni_plat_ipc_aio_recv(nni_plat_ipcsock *, nni_aio *);
// nni_plat_seed_prng seeds the PRNG subsystem. The specified number
// of bytes of entropy should be stashed. When possible, cryptographic
@@ -281,6 +281,12 @@ extern void nni_plat_pipe_clear(int);
// routine.
extern void nni_plat_pipe_close(int, int);
+// XXX: Stuff to REMOVE
+extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int);
+extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
+extern int nni_plat_ipc_send(nni_plat_ipcsock *, nni_iov *, int);
+extern int nni_plat_ipc_recv(nni_plat_ipcsock *, nni_iov *, int);
+
// Actual platforms we support. This is included up front so that we can
// get the specific types that are supplied by the platform.
#if defined(PLATFORM_POSIX)
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
index 2c11dcb2..a01fa194 100644
--- a/src/platform/posix/posix_aiothr.c
+++ b/src/platform/posix/posix_aiothr.c
@@ -239,14 +239,16 @@ nni_posix_aioq_start(nni_posix_aioq *q)
static void
nni_posix_aioq_fini(nni_posix_aioq *q)
{
- nni_mtx_lock(&q->aq_lk);
- q->aq_fd = -1;
- nni_cv_wake(&q->aq_cv);
- nni_mtx_unlock(&q->aq_lk);
+ if (q->aq_fd > 0) {
+ nni_mtx_lock(&q->aq_lk);
+ q->aq_fd = -1;
+ nni_cv_wake(&q->aq_cv);
+ nni_mtx_unlock(&q->aq_lk);
- nni_thr_fini(&q->aq_thr);
- nni_cv_fini(&q->aq_cv);
- nni_mtx_fini(&q->aq_lk);
+ nni_thr_fini(&q->aq_thr);
+ nni_cv_fini(&q->aq_cv);
+ nni_mtx_fini(&q->aq_lk);
+ }
}
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 5da18323..dea09fa1 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -35,15 +35,6 @@ extern int nni_plat_errno(int);
#endif
-
-#ifdef PLATFORM_POSIX_IPC
-struct nni_plat_ipcsock {
- int fd;
- int devnull; // used for shutting down blocking accept()
- char * unlink; // path to unlink at termination
-};
-#endif
-
// Define types that this platform uses.
#ifdef PLATFORM_POSIX_THREAD
diff --git a/src/platform/posix/posix_ipc.c b/src/platform/posix/posix_ipc.c
index 7a9c6d52..e75edeca 100644
--- a/src/platform/posix/posix_ipc.c
+++ b/src/platform/posix/posix_ipc.c
@@ -10,6 +10,7 @@
#include "core/nng_impl.h"
#ifdef PLATFORM_POSIX_IPC
+#include "platform/posix/posix_aio.h"
#include <errno.h>
#include <stdlib.h>
@@ -29,6 +30,13 @@
#undef sun
#endif
+struct nni_plat_ipcsock {
+ int fd;
+ int devnull; // for shutting down accept()
+ char * unlink; // path to unlink at fini
+ nni_posix_aio_pipe aiop;
+};
+
#ifdef SOCK_CLOEXEC
#define NNI_IPC_SOCKTYPE (SOCK_STREAM | SOCK_CLOEXEC)
#else
@@ -59,6 +67,20 @@ nni_plat_ipc_path_to_sockaddr(struct sockaddr_un *sun, const char *path)
int
+nni_plat_ipc_aio_send(nni_plat_ipcsock *isp, nni_aio *aio)
+{
+ return (nni_posix_aio_write(&isp->aiop, aio));
+}
+
+
+int
+nni_plat_ipc_aio_recv(nni_plat_ipcsock *isp, nni_aio *aio)
+{
+ return (nni_posix_aio_read(&isp->aiop, aio));
+}
+
+
+int
nni_plat_ipc_send(nni_plat_ipcsock *s, nni_iov *iovs, int cnt)
{
struct iovec iov[4]; // We never have more than 3 at present
@@ -178,36 +200,46 @@ nni_plat_ipc_setopts(int fd)
int
-nni_plat_ipc_init(nni_plat_ipcsock *s)
+nni_plat_ipc_init(nni_plat_ipcsock **ispp)
{
- s->fd = -1;
+ nni_plat_ipcsock *isp;
+
+ if ((isp = NNI_ALLOC_STRUCT(isp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ isp->fd = -1;
+ *ispp = isp;
return (0);
}
void
-nni_plat_ipc_fini(nni_plat_ipcsock *s)
+nni_plat_ipc_fini(nni_plat_ipcsock *isp)
{
- if (s->fd != -1) {
- (void) close(s->fd);
- s->fd = -1;
+ if (isp->fd != -1) {
+ (void) close(isp->fd);
+ isp->fd = -1;
}
- if (s->unlink != NULL) {
- (void) unlink(s->unlink);
- nni_free(s->unlink, strlen(s->unlink) + 1);
+ if (isp->unlink != NULL) {
+ (void) unlink(isp->unlink);
+ nni_free(isp->unlink, strlen(isp->unlink) + 1);
}
+
+ nni_posix_aio_pipe_fini(&isp->aiop);
+
+ NNI_FREE_STRUCT(isp);
}
void
-nni_plat_ipc_shutdown(nni_plat_ipcsock *s)
+nni_plat_ipc_shutdown(nni_plat_ipcsock *isp)
{
- if (s->fd != -1) {
- (void) shutdown(s->fd, SHUT_RDWR);
+ if (isp->fd != -1) {
+ (void) shutdown(isp->fd, SHUT_RDWR);
// This causes the equivalent of a close. Hopefully waking
// up anything that didn't get the hint with the shutdown.
// (macOS does not see the shtudown).
- (void) dup2(nni_plat_devnull, s->fd);
+ (void) dup2(nni_plat_devnull, isp->fd);
}
}
@@ -278,7 +310,7 @@ nni_plat_ipc_listen(nni_plat_ipcsock *s, const char *path)
int
-nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
+nni_plat_ipc_connect(nni_plat_ipcsock *isp, const char *path)
{
int fd;
int len;
@@ -305,15 +337,22 @@ nni_plat_ipc_connect(nni_plat_ipcsock *s, const char *path)
}
return (rv);
}
- s->fd = fd;
+
+ if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) {
+ (void) close(fd);
+ return (rv);
+ }
+
+ isp->fd = fd;
return (0);
}
int
-nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
+nni_plat_ipc_accept(nni_plat_ipcsock *isp, nni_plat_ipcsock *server)
{
int fd;
+ int rv;
for (;;) {
#ifdef NNG_USE_ACCEPT4
@@ -341,7 +380,12 @@ nni_plat_ipc_accept(nni_plat_ipcsock *s, nni_plat_ipcsock *server)
nni_plat_ipc_setopts(fd);
- s->fd = fd;
+ if ((rv = nni_posix_aio_pipe_init(&isp->aiop, fd)) != 0) {
+ (void) close(fd);
+ return (rv);
+ }
+
+ isp->fd = fd;
return (0);
}
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index c7651655..94dc2667 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -8,9 +8,9 @@
//
#include "core/nng_impl.h"
-#include "platform/posix/posix_aio.h"
#ifdef PLATFORM_POSIX_NET
+#include "platform/posix/posix_aio.h"
#include <errno.h>
#include <stdlib.h>
@@ -31,13 +31,11 @@
#define NNI_TCP_SOCKTYPE SOCK_STREAM
#endif
-#ifdef PLATFORM_POSIX_NET
struct nni_plat_tcpsock {
int fd;
- int devnull; // used for shutting down blocking accept()
+ int devnull; // for shutting down accept()
nni_posix_aio_pipe aiop;
};
-#endif
static int
nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
@@ -284,6 +282,7 @@ nni_plat_tcp_fini(nni_plat_tcpsock *tsp)
(void) close(tsp->fd);
tsp->fd = -1;
}
+ nni_posix_aio_pipe_fini(&tsp->aiop);
NNI_FREE_STRUCT(tsp);
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 46003487..7ffea62d 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -23,20 +23,33 @@ typedef struct nni_ipc_ep nni_ipc_ep;
// nni_ipc_pipe is one end of an IPC connection.
struct nni_ipc_pipe {
const char * addr;
- nni_plat_ipcsock fd;
+ nni_plat_ipcsock * isp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
+
+ uint8_t txhead[1+sizeof (uint64_t)];
+ uint8_t rxhead[1+sizeof (uint64_t)];
+
+ nni_aio * user_txaio;
+ nni_aio * user_rxaio;
+ nni_aio txaio;
+ nni_aio rxaio;
+ nni_msg * rxmsg;
};
struct nni_ipc_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_ipcsock fd;
+ nni_plat_ipcsock * isp;
int closed;
uint16_t proto;
size_t rcvmax;
};
+
+static void nni_ipc_pipe_send_cb(void *);
+static void nni_ipc_pipe_recv_cb(void *);
+
static int
nni_ipc_tran_init(void)
{
@@ -55,7 +68,7 @@ nni_ipc_pipe_close(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_plat_ipc_shutdown(&pipe->fd);
+ nni_plat_ipc_shutdown(pipe->isp);
}
@@ -68,7 +81,21 @@ nni_ipc_pipe_init(void **argp)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
+ if ((rv = nni_plat_ipc_init(&pipe->isp)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ rv = nni_aio_init(&pipe->txaio, nni_ipc_pipe_send_cb, pipe);
+ if (rv != 0) {
+ nni_plat_ipc_fini(pipe->isp);
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+
+ rv = nni_aio_init(&pipe->rxaio, nni_ipc_pipe_recv_cb, pipe);
+ if (rv != 0) {
+ nni_aio_fini(&pipe->txaio);
+ nni_plat_ipc_fini(pipe->isp);
NNI_FREE_STRUCT(pipe);
return (rv);
}
@@ -82,81 +109,164 @@ nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_plat_ipc_fini(&pipe->fd);
+ if (pipe->rxmsg) {
+ nni_msg_free(pipe->rxmsg);
+ }
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ nni_plat_ipc_fini(pipe->isp);
NNI_FREE_STRUCT(pipe);
}
-static int
-nni_ipc_pipe_send(void *arg, nni_msg *msg)
+static void
+nni_ipc_pipe_send_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
- uint64_t len;
- uint8_t buf[sizeof (len)];
- nni_iov iov[4];
+ nni_aio *aio;
int rv;
- uint8_t msgtype;
-
- msgtype = 1; // "inband", the only defined option at present
+ size_t len;
- iov[0].iov_buf = &msgtype;
- iov[0].iov_len = 1;
- iov[1].iov_buf = buf;
- iov[1].iov_len = sizeof (buf);
- iov[2].iov_buf = nni_msg_header(msg);
- iov[2].iov_len = nni_msg_header_len(msg);
- iov[3].iov_buf = nni_msg_body(msg);
- iov[3].iov_len = nni_msg_len(msg);
+ if ((aio = pipe->user_txaio) == NULL) {
+ NNI_ASSERT(aio != NULL);
+ return;
+ }
+ pipe->user_txaio = NULL;
+ if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
- len = (uint64_t) iov[2].iov_len + (uint64_t) iov[3].iov_len;
- NNI_PUT64(buf, len);
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
- if ((rv = nni_plat_ipc_send(&pipe->fd, iov, 4)) == 0) {
- nni_msg_free(msg);
- }
- return (rv);
+ nni_aio_finish(aio, 0, len);
}
-static int
-nni_ipc_pipe_recv(void *arg, nni_msg **msgp)
+static void
+nni_ipc_pipe_recv_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_msg *msg;
- uint64_t len;
- uint8_t buf[sizeof (len)];
- nni_iov iov[2];
+ nni_aio *aio;
int rv;
- uint8_t msgtype;
- iov[0].iov_buf = &msgtype;
- iov[0].iov_len = 1;
- iov[1].iov_buf = buf;
- iov[1].iov_len = sizeof (buf);
- if ((rv = nni_plat_ipc_recv(&pipe->fd, iov, 2)) != 0) {
- return (rv);
- }
- if (msgtype != 1) {
- return (NNG_EPROTO);
- }
- NNI_GET64(buf, len);
- if (len > pipe->rcvmax) {
- return (NNG_EMSGSIZE);
+ aio = pipe->user_rxaio;
+ if (aio == NULL) {
+ // This should never ever happen.
+ NNI_ASSERT(aio != NULL);
+ return;
}
- if ((rv = nng_msg_alloc(&msg, (size_t) len)) != 0) {
- return (rv);
+ if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ // Error on receive. This has to cause an error back
+ // to the user. Also, if we had allocated an rxmsg, lets
+ // toss it.
+ if (pipe->rxmsg != NULL) {
+ nni_msg_free(pipe->rxmsg);
+ pipe->rxmsg = NULL;
+ }
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
}
- iov[0].iov_len = nng_msg_len(msg);
- iov[0].iov_buf = nng_msg_body(msg);
+ // If we don't have a message yet, we were reading the TCP message
+ // header, which is just the length. This tells us the size of the
+ // message to allocate and how much more to expect.
+ if (pipe->rxmsg == NULL) {
+ uint64_t len;
- if ((rv = nni_plat_ipc_recv(&pipe->fd, iov, 1)) == 0) {
- *msgp = msg;
- } else {
- nni_msg_free(msg);
+ // Check to make sure we got msg type 1.
+ if (pipe->rxhead[0] != 1) {
+ nni_aio_finish(aio, NNG_EPROTO, 0);
+ return;
+ }
+
+ // We should have gotten a message header.
+ NNI_GET64(pipe->rxhead+1, len);
+
+ // Make sure the message payload is not too big. If it is
+ // the caller will shut down the pipe.
+ if (len > pipe->rcvmax) {
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ return;
+ }
+
+ if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ // Submit the rest of the data for a read -- we want to
+ // read the entire message now.
+ pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
+ pipe->rxaio.a_niov = 1;
+
+ rv = nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio);
+ if (rv != 0) {
+ pipe->user_rxaio = NULL;
+ nni_msg_free(pipe->rxmsg);
+ pipe->rxmsg = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+ return;
}
- return (rv);
+
+ // Otherwise we got a message read completely. Let the user know the
+ // good news.
+ pipe->user_rxaio = NULL;
+ aio->a_msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+}
+
+
+static int
+nni_ipc_pipe_aio_send(void *arg, nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = arg;
+ nni_msg *msg = aio->a_msg;
+ uint64_t len;
+
+ pipe->user_txaio = aio;
+
+ pipe->txhead[0] = 1; // message type, 1.
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+ NNI_PUT64(pipe->txhead + 1, len);
+
+ pipe->txaio.a_iov[0].iov_buf = pipe->txhead;
+ pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txhead);
+ pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
+ pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
+ pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
+ pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
+ pipe->txaio.a_niov = 3;
+
+ return (nni_plat_ipc_aio_send(pipe->isp, &pipe->txaio));
+}
+
+
+static int
+nni_ipc_pipe_aio_recv(void *arg, nni_aio *aio)
+{
+ nni_ipc_pipe *pipe = arg;
+
+ pipe->user_rxaio = aio;
+
+ NNI_ASSERT(pipe->rxmsg == NULL);
+
+ // Schedule a read of the IPC header.
+ pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead;
+ pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxhead);
+ pipe->rxaio.a_niov = 1;
+
+ return (nni_plat_ipc_aio_recv(pipe->isp, &pipe->rxaio));
}
@@ -208,7 +318,7 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
ep->closed = 0;
ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) {
+ if ((rv = nni_plat_ipc_init(&ep->isp)) != 0) {
NNI_FREE_STRUCT(ep);
return (rv);
}
@@ -225,7 +335,7 @@ nni_ipc_ep_fini(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_fini(&ep->fd);
+ nni_plat_ipc_fini(ep->isp);
NNI_FREE_STRUCT(ep);
}
@@ -235,7 +345,7 @@ nni_ipc_ep_close(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_plat_ipc_shutdown(&ep->fd);
+ nni_plat_ipc_shutdown(ep->isp);
}
@@ -256,13 +366,13 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe)
iov.iov_buf = buf;
iov.iov_len = 8;
- if ((rv = nni_plat_ipc_send(&pipe->fd, &iov, 1)) != 0) {
+ if ((rv = nni_plat_ipc_send(pipe->isp, &iov, 1)) != 0) {
return (rv);
}
iov.iov_buf = buf;
iov.iov_len = 8;
- if ((rv = nni_plat_ipc_recv(&pipe->fd, &iov, 1)) != 0) {
+ if ((rv = nni_plat_ipc_recv(pipe->isp, &iov, 1)) != 0) {
return (rv);
}
@@ -293,15 +403,13 @@ nni_ipc_ep_connect(void *arg, void *pipearg)
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- rv = nni_plat_ipc_connect(&pipe->fd, path);
+ rv = nni_plat_ipc_connect(pipe->isp, path);
if (rv != 0) {
- nni_plat_ipc_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
- nni_plat_ipc_shutdown(&pipe->fd);
+ nni_plat_ipc_shutdown(pipe->isp);
return (rv);
}
return (0);
@@ -321,7 +429,7 @@ nni_ipc_ep_bind(void *arg)
}
path = ep->addr + strlen("ipc://");
- if ((rv = nni_plat_ipc_listen(&ep->fd, path)) != 0) {
+ if ((rv = nni_plat_ipc_listen(ep->isp, path)) != 0) {
return (rv);
}
return (0);
@@ -338,13 +446,11 @@ nni_ipc_ep_accept(void *arg, void *pipearg)
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) {
- nni_plat_ipc_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ if ((rv = nni_plat_ipc_accept(pipe->isp, ep->isp)) != 0) {
return (rv);
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
- nni_plat_ipc_shutdown(&pipe->fd);
+ nni_plat_ipc_shutdown(pipe->isp);
return (rv);
}
return (0);
@@ -354,6 +460,8 @@ nni_ipc_ep_accept(void *arg, void *pipearg)
static nni_tran_pipe nni_ipc_pipe_ops = {
.p_init = nni_ipc_pipe_init,
.p_fini = nni_ipc_pipe_fini,
+ .p_aio_send = nni_ipc_pipe_aio_send,
+ .p_aio_recv = nni_ipc_pipe_aio_recv,
.p_close = nni_ipc_pipe_close,
.p_peer = nni_ipc_pipe_peer,
.p_getopt = nni_ipc_pipe_getopt,
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 1cf64768..7e712cc8 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -107,6 +107,9 @@ nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *pipe = arg;
+ if (pipe->rxmsg) {
+ nni_msg_free(pipe->rxmsg);
+ }
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_plat_tcp_fini(pipe->tsp);