aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
committerGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
commit374f93a18edca2e0656c337a5b54927169ec31fa (patch)
treecbaef995db10cfafd795953be203de744dc688c9 /src
parent6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff)
downloadnng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz
nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2
nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit. The POSIX send/recv logic still uses threads under the hood, but makes use of the AIO framework for send/recv. This is a key stepping stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.h4
-rw-r--r--src/core/defs.h2
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/pipe.c22
-rw-r--r--src/core/platform.h11
-rw-r--r--src/core/transport.h29
-rw-r--r--src/platform/posix/posix_aio.h4
-rw-r--r--src/platform/posix/posix_aiothr.c15
-rw-r--r--src/platform/posix/posix_impl.h6
-rw-r--r--src/platform/posix/posix_net.c72
-rw-r--r--src/transport/inproc/inproc.c162
-rw-r--r--src/transport/ipc/ipc.c60
-rw-r--r--src/transport/tcp/tcp.c276
13 files changed, 458 insertions, 209 deletions
diff --git a/src/core/aio.h b/src/core/aio.h
index 96b04857..c377ca93 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -15,9 +15,7 @@
#include "core/taskq.h"
#include "core/thread.h"
-typedef struct nni_aio_ops nni_aio_ops;
-typedef struct nni_aio nni_aio;
-
+typedef struct nni_aio_ops nni_aio_ops;
// An nni_aio is an async I/O handle.
struct nni_aio {
diff --git a/src/core/defs.h b/src/core/defs.h
index 82d0dfaf..1c44e9af 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -49,6 +49,8 @@ typedef int nni_signal; // Wakeup channel.
typedef uint64_t nni_time; // Abs. time (usec).
typedef int64_t nni_duration; // Rel. time (usec).
+typedef struct nni_aio nni_aio;
+
typedef void (*nni_cb)(void *);
// Used by transports for scatter gather I/O.
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0310783a..1108a504 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -207,7 +207,7 @@ nni_ep_connect(nni_ep *ep, nni_pipe **pp)
if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) {
return (rv);
}
- rv = ep->ep_ops.ep_connect(ep->ep_data, pipe);
+ rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
nni_pipe_destroy(pipe);
return (rv);
@@ -359,7 +359,7 @@ nni_ep_accept(nni_ep *ep, nni_pipe **pp)
if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) {
return (rv);
}
- rv = ep->ep_ops.ep_accept(ep->ep_data, pipe);
+ rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
nni_pipe_destroy(pipe);
return (rv);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 85a0bd4a..4b37cfa9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -39,14 +39,14 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
int
nni_pipe_aio_recv(nni_pipe *p, nni_aio *aio)
{
- return (p->p_tran_ops.pipe_aio_recv(p->p_tran_data, aio));
+ return (p->p_tran_ops.p_aio_recv(p->p_tran_data, aio));
}
int
nni_pipe_aio_send(nni_pipe *p, nni_aio *aio)
{
- return (p->p_tran_ops.pipe_aio_send(p->p_tran_data, aio));
+ return (p->p_tran_ops.p_aio_send(p->p_tran_data, aio));
}
@@ -92,7 +92,7 @@ nni_pipe_close(nni_pipe *p)
// Close the underlying transport.
if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_close(p->p_tran_data);
+ p->p_tran_ops.p_close(p->p_tran_data);
}
// Unregister our ID so nobody else can find it.
@@ -116,7 +116,7 @@ nni_pipe_close(nni_pipe *p)
uint16_t
nni_pipe_peer(nni_pipe *p)
{
- return (p->p_tran_ops.pipe_peer(p->p_tran_data));
+ return (p->p_tran_ops.p_peer(p->p_tran_data));
}
@@ -145,7 +145,15 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *tran->tran_pipe;
+ // Initialize the transport pipe data.
+ if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
+ nni_mtx_fini(&p->p_mtx);
+ NNI_FREE_STRUCT(p);
+ return (rv);
+ }
+
if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
+ p->p_tran_ops.p_fini(p->p_tran_data);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
return (rv);
@@ -164,7 +172,7 @@ nni_pipe_destroy(nni_pipe *p)
// The caller is responsible for ensuring that the pipe
// is not in use by any other consumers. It must not be started
if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_destroy(p->p_tran_data);
+ p->p_tran_ops.p_fini(p->p_tran_data);
}
nni_sock_pipe_rem(p->p_sock, p);
nni_mtx_fini(&p->p_mtx);
@@ -176,10 +184,10 @@ int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
/* This should only be called with the mutex held... */
- if (p->p_tran_ops.pipe_getopt == NULL) {
+ if (p->p_tran_ops.p_getopt == NULL) {
return (NNG_ENOTSUP);
}
- return (p->p_tran_ops.pipe_getopt(p->p_tran_data, opt, val, szp));
+ return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp));
}
diff --git a/src/core/platform.h b/src/core/platform.h
index b1e824e2..1644973c 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -174,7 +174,7 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
// nni_plat_tcp_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern int nni_plat_tcp_init(nni_plat_tcpsock *);
+extern int nni_plat_tcp_init(nni_plat_tcpsock **);
// nni_plat_tcp_fini just closes a TCP socket, and releases any related
// resources.
@@ -203,14 +203,17 @@ extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *);
extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *,
const nni_sockaddr *);
-// nni_plat_tcp_send sends data to the remote side. The platform is
-// responsible for attempting to send all of the data. The iov count
-// will never be larger than 4. THe platform may modify the iovs.
+// nni_plat_tcp_aio_send sends the data to the remote side asynchronously.
+// The data to send is stored in the a_iov field of the aio, and the array
+// of iovs will never be larger than 4. The platform may modify the iovs,
+// or the iov list.
+extern int nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int);
// nni_plat_tcp_recv recvs data into the buffers provided by the
// iovs. The implementation does not return until the iovs are completely
// full, or an error condition occurs.
+extern int nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
// nni_plat_ipc_init initializes the socket, for example it can
diff --git a/src/core/transport.h b/src/core/transport.h
index 65cd775f..ab43e497 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -48,8 +48,10 @@ struct nni_tran_ep {
// ep_connect establishes a connection. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
- // NNG_ETIMEDOUT, and NNG_EPROTO.
- int (*ep_connect)(void *, nni_pipe *);
+ // NNG_ETIMEDOUT, and NNG_EPROTO. The first argument is the
+ // transport specific endpoint, and the second is the transport
+ // specific pipe structure.
+ int (*ep_connect)(void *, void *);
// ep_bind just does the bind() and listen() work,
// reserving the address but not creating any connections.
@@ -58,8 +60,10 @@ struct nni_tran_ep {
// address, or NNG_EACCESS for permission problems.
int (*ep_bind)(void *);
- // ep_accept accepts an inbound connection.
- int (*ep_accept)(void *, nni_pipe *);
+ // ep_accept accepts an inbound connection. The first argument
+ // is the transport-specific endpoint, and the second is the
+ // transport-specific pipe (which will have already been created.)
+ int (*ep_accept)(void *, void *);
// ep_close stops the endpoint from operating altogether. It does
// not affect pipes that have already been created.
@@ -77,14 +81,17 @@ struct nni_tran_ep {
// back into the socket at this point. (Which is one reason pointers back
// to socket or even enclosing pipe state, are not provided.)
struct nni_tran_pipe {
- // p_destroy destroys the pipe. This should clean up all local
+ // p_init initializes the pipe structure, allocating the structure.
+ int (*p_init)(void **);
+
+ // p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used by
// the pipe. After this call returns, the system will not make
// further calls on the same pipe.
- void (*pipe_destroy)(void *);
+ void (*p_fini)(void *);
- int (*pipe_aio_send)(void *, nni_aio *);
- int (*pipe_aio_recv)(void *, nni_aio *);
+ int (*p_aio_send)(void *, nni_aio *);
+ int (*p_aio_recv)(void *, nni_aio *);
// p_send sends the message. If the message cannot be received, then
// the caller may try again with the same message (or free it). If
@@ -104,15 +111,15 @@ struct nni_tran_pipe {
// p_close closes the pipe. Further recv or send operations should
// return back NNG_ECLOSED.
- void (*pipe_close)(void *);
+ void (*p_close)(void *);
// p_peer returns the peer protocol. This may arrive in whatever
// transport specific manner is appropriate.
- uint16_t (*pipe_peer)(void *);
+ uint16_t (*p_peer)(void *);
// p_getopt gets an pipe (transport-specific) property. These values
// may not be changed once the pipe is created.
- int (*pipe_getopt)(void *, int, void *, size_t *);
+ int (*p_getopt)(void *, int, void *, size_t *);
};
// These APIs are used by the framework internally, and not for use by
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 08c731bc..797f9e43 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -51,8 +51,8 @@ extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *);
// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-extern int nni_posix_aio_read(nni_posix_aioq *, nni_aio *);
-extern int nni_posix_aio_write(nni_posix_aioq *, nni_aio *);
+extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *);
+extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *);
// extern int nni_posix_aio_connect();
// extern int nni_posix_aio_accept();
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
index 013e4599..2d31aede 100644
--- a/src/platform/posix/posix_aiothr.c
+++ b/src/platform/posix/posix_aiothr.c
@@ -178,16 +178,17 @@ nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *))
}
nni_list_remove(&q->aq_aios, aio);
- nni_mtx_unlock(&q->aq_lk);
+ //nni_mtx_unlock(&q->aq_lk);
// Call the callback.
nni_aio_finish(aio, rv, aio->a_count);
}
while ((aio = nni_list_first(&q->aq_aios)) != NULL) {
- nni_mtx_unlock(&q->aq_lk);
+ nni_list_remove(&q->aq_aios, aio);
+ //nni_mtx_unlock(&q->aq_lk);
nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
- nni_mtx_lock(&q->aq_lk);
+ //nni_mtx_lock(&q->aq_lk);
}
nni_mtx_unlock(&q->aq_lk);
@@ -299,16 +300,16 @@ nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio)
int
-nni_posix_aio_read(nni_posix_aioq *q, nni_aio *aio)
+nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio)
{
- return (nni_posix_aio_submit(q, aio));
+ return (nni_posix_aio_submit(&p->ap_readq, aio));
}
int
-nni_posix_aio_write(nni_posix_aioq *q, nni_aio *aio)
+nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio)
{
- return (nni_posix_aio_submit(q, aio));
+ return (nni_posix_aio_submit(&p->ap_writeq, aio));
}
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 56cc5937..5da18323 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -35,12 +35,6 @@ extern int nni_plat_errno(int);
#endif
-#ifdef PLATFORM_POSIX_NET
-struct nni_plat_tcpsock {
- int fd;
- int devnull; // used for shutting down blocking accept()
-};
-#endif
#ifdef PLATFORM_POSIX_IPC
struct nni_plat_ipcsock {
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 24b45819..c7651655 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -8,6 +8,7 @@
//
#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
#ifdef PLATFORM_POSIX_NET
@@ -30,6 +31,14 @@
#define NNI_TCP_SOCKTYPE SOCK_STREAM
#endif
+#ifdef PLATFORM_POSIX_NET
+struct nni_plat_tcpsock {
+ int fd;
+ int devnull; // used for shutting down blocking accept()
+ nni_posix_aio_pipe aiop;
+};
+#endif
+
static int
nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
{
@@ -164,6 +173,20 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
int
+nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
+{
+ return (nni_posix_aio_write(&s->aiop, aio));
+}
+
+
+int
+nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
+{
+ return (nni_posix_aio_read(&s->aiop, aio));
+}
+
+
+int
nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
struct iovec iov[4]; // We never have more than 3 at present
@@ -241,32 +264,39 @@ nni_plat_tcp_setopts(int fd)
int
-nni_plat_tcp_init(nni_plat_tcpsock *s)
+nni_plat_tcp_init(nni_plat_tcpsock **tspp)
{
- s->fd = -1;
+ nni_plat_tcpsock *tsp;
+
+ if ((tsp = NNI_ALLOC_STRUCT(tsp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ tsp->fd = -1;
+ *tspp = tsp;
return (0);
}
void
-nni_plat_tcp_fini(nni_plat_tcpsock *s)
+nni_plat_tcp_fini(nni_plat_tcpsock *tsp)
{
- if (s->fd != -1) {
- (void) close(s->fd);
- s->fd = -1;
+ if (tsp->fd != -1) {
+ (void) close(tsp->fd);
+ tsp->fd = -1;
}
+ NNI_FREE_STRUCT(tsp);
}
void
-nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
+nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp)
{
- if (s->fd != -1) {
- (void) shutdown(s->fd, SHUT_RDWR);
+ if (tsp->fd != -1) {
+ (void) shutdown(tsp->fd, SHUT_RDWR);
// This causes the equivalent of a close. Hopefully waking
// up anything that didn't get the hint with the shutdown.
// (macOS does not see the shtudown).
- (void) dup2(nni_plat_devnull, s->fd);
+ (void) dup2(nni_plat_devnull, tsp->fd);
}
}
@@ -278,7 +308,7 @@ nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
// to keep up, and your clients are going to experience bad things. Normally
// the actual backlog should hover near 0 anyway.)
int
-nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
+nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr)
{
int fd;
int len;
@@ -310,7 +340,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
return (rv);
}
- s->fd = fd;
+ tsp->fd = fd;
return (0);
}
@@ -319,7 +349,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
// bind address is not null, then it will attempt to bind to the local
// address specified first.
int
-nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
+nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr,
const nni_sockaddr *bindaddr)
{
int fd;
@@ -358,15 +388,20 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
(void) close(fd);
return (rv);
}
- s->fd = fd;
+ if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ (void) close(fd);
+ return (rv);
+ }
+ tsp->fd = fd;
return (0);
}
int
-nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
+nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server)
{
int fd;
+ int rv;
for (;;) {
#ifdef NNG_USE_ACCEPT4
@@ -387,7 +422,12 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
nni_plat_tcp_setopts(fd);
- s->fd = fd;
+ if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ close(fd);
+ return (rv);
+ }
+
+ tsp->fd = fd;
return (0);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index de9b5e91..c6f908f8 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -38,22 +38,22 @@ struct nni_inproc_pipe {
// nni_inproc_pair represents a pair of pipes. Because we control both
// sides of the pipes, we can allocate and free this in one structure.
struct nni_inproc_pair {
- nni_mtx mx;
- int refcnt;
- nni_msgq * q[2];
- nni_inproc_pipe pipe[2];
- char addr[NNG_MAXADDRLEN+1];
+ nni_mtx mx;
+ int refcnt;
+ nni_msgq * q[2];
+ nni_inproc_pipe * pipes[2];
+ char addr[NNG_MAXADDRLEN+1];
};
struct nni_inproc_ep {
- char addr[NNG_MAXADDRLEN+1];
- int mode;
- int closed;
- nni_list_node node;
- uint16_t proto;
- nni_cv cv;
- nni_list clients;
- void * cpipe; // connected pipe (DIAL only)
+ char addr[NNG_MAXADDRLEN+1];
+ int mode;
+ int closed;
+ nni_list_node node;
+ uint16_t proto;
+ nni_cv cv;
+ nni_list clients;
+ nni_inproc_pipe * cpipe; // connected pipe (DIAL only)
};
#define NNI_INPROC_EP_IDLE 0
@@ -108,23 +108,45 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
}
+static int
+nni_inproc_pipe_init(void **argp)
+{
+ nni_inproc_pipe *pipe;
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ *argp = pipe;
+ return (0);
+}
+
+
static void
-nni_inproc_pipe_destroy(void *arg)
+nni_inproc_pipe_fini(void *arg)
{
nni_inproc_pipe *pipe = arg;
- nni_inproc_pair *pair = pipe->pair;
+ nni_inproc_pair *pair;
// We could assert the pipe closed...
- // If we are the last peer, then toss the pair structure.
- nni_mtx_lock(&pair->mx);
- pair->refcnt--;
- if (pair->refcnt == 0) {
- nni_mtx_unlock(&pair->mx);
- nni_inproc_pair_destroy(pair);
- } else {
- nni_mtx_unlock(&pair->mx);
+ if ((pair = pipe->pair) != NULL) {
+ // If we are the last peer, then toss the pair structure.
+ nni_mtx_lock(&pair->mx);
+ if (pair->pipes[0] == pipe) {
+ pair->pipes[0] = NULL;
+ } else if (pair->pipes[1] == pipe) {
+ pair->pipes[1] = NULL;
+ }
+ pair->refcnt--;
+ if (pair->refcnt == 0) {
+ nni_mtx_unlock(&pair->mx);
+ nni_inproc_pair_destroy(pair);
+ } else {
+ nni_mtx_unlock(&pair->mx);
+ }
}
+
+ NNI_FREE_STRUCT(pipe);
}
@@ -262,49 +284,54 @@ nni_inproc_ep_close(void *arg)
static int
-nni_inproc_ep_connect(void *arg, nni_pipe *npipe)
+nni_inproc_ep_connect(void *arg, void *pipearg)
{
+ nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *ep = arg;
+ nni_inproc_ep *server;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
nni_mtx_lock(&nni_inproc.mx);
- for (;;) {
- nni_inproc_ep *server;
- if (ep->closed) {
- nni_mtx_unlock(&nni_inproc.mx);
- return (NNG_ECLOSED);
+ // Find a server.
+ NNI_LIST_FOREACH (&nni_inproc.servers, server) {
+ if (server->mode != NNI_INPROC_EP_LISTEN) {
+ continue;
}
- if (ep->cpipe != NULL) {
+ if (strcmp(server->addr, ep->addr) == 0) {
break;
}
- // Find a server.
- NNI_LIST_FOREACH (&nni_inproc.servers, server) {
- if (server->mode != NNI_INPROC_EP_LISTEN) {
- continue;
- }
- if (strcmp(server->addr, ep->addr) == 0) {
- break;
- }
- }
- if (server == NULL) {
+ }
+ if (server == NULL) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ return (NNG_ECONNREFUSED);
+ }
+
+ ep->mode = NNI_INPROC_EP_DIAL;
+ ep->cpipe = pipe;
+ nni_list_append(&server->clients, ep);
+
+ while (ep->mode != NNI_INPROC_EP_IDLE) {
+ if (ep->closed) {
+ nni_list_remove(&server->clients, ep);
nni_mtx_unlock(&nni_inproc.mx);
- return (NNG_ECONNREFUSED);
+ return (NNG_ECLOSED);
}
-
- ep->mode = NNI_INPROC_EP_DIAL;
- nni_list_append(&server->clients, ep);
nni_cv_wake(&server->cv);
nni_cv_wait(&ep->cv);
- if (ep->mode == NNI_INPROC_EP_DIAL) {
- ep->mode = NNI_INPROC_EP_IDLE;
- nni_list_remove(&server->clients, ep);
- }
}
- nni_pipe_set_tran_data(npipe, ep->cpipe);
- ep->cpipe = NULL;
+
+ // If we got here, either we connected successfully, or the far end
+ // server closed on us. In the former case our cpipe will be NULL,
+ // having been cleared by the server. In the latter, the cpipe will
+ // still be set, indicating server shutdown.
+ if (ep->cpipe != NULL) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ return (NNG_ECONNRESET);
+ }
+
nni_mtx_unlock(&nni_inproc.mx);
return (0);
}
@@ -342,9 +369,10 @@ nni_inproc_ep_bind(void *arg)
static int
-nni_inproc_ep_accept(void *arg, nni_pipe *npipe)
+nni_inproc_ep_accept(void *arg, void *pipearg)
{
nni_inproc_ep *ep = arg;
+ nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *client;
nni_inproc_pair *pair;
int rv;
@@ -383,17 +411,18 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe)
}
nni_list_remove(&ep->clients, client);
- client->mode = NNI_INPROC_EP_IDLE;
+ pair->pipes[0] = client->cpipe;
+ pair->pipes[1] = pipe;
(void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr);
- pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0];
- pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1];
- pair->pipe[0].pair = pair->pipe[1].pair = pair;
- pair->pipe[0].addr = pair->pipe[1].addr = pair->addr;
- pair->pipe[1].peer = client->proto;
- pair->pipe[0].peer = ep->proto;
+ pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
+ pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1];
+ pair->pipes[0]->pair = pair->pipes[1]->pair = pair;
+ pair->pipes[0]->addr = pair->pipes[1]->addr = pair->addr;
+ pair->pipes[1]->peer = client->proto;
+ pair->pipes[0]->peer = ep->proto;
pair->refcnt = 2;
- client->cpipe = &pair->pipe[0];
- nni_pipe_set_tran_data(npipe, &pair->pipe[1]);
+ client->mode = NNI_INPROC_EP_IDLE;
+ client->cpipe = NULL;
nni_cv_wake(&client->cv);
nni_mtx_unlock(&nni_inproc.mx);
@@ -403,12 +432,13 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe)
static nni_tran_pipe nni_inproc_pipe_ops = {
- .pipe_destroy = nni_inproc_pipe_destroy,
- .pipe_aio_send = nni_inproc_pipe_aio_send,
- .pipe_aio_recv = nni_inproc_pipe_aio_recv,
- .pipe_close = nni_inproc_pipe_close,
- .pipe_peer = nni_inproc_pipe_peer,
- .pipe_getopt = nni_inproc_pipe_getopt,
+ .p_init = nni_inproc_pipe_init,
+ .p_fini = nni_inproc_pipe_fini,
+ .p_aio_send = nni_inproc_pipe_aio_send,
+ .p_aio_recv = nni_inproc_pipe_aio_recv,
+ .p_close = nni_inproc_pipe_close,
+ .p_peer = nni_inproc_pipe_peer,
+ .p_getopt = nni_inproc_pipe_getopt,
};
static nni_tran_ep nni_inproc_ep_ops = {
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 11965d17..46003487 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -59,8 +59,26 @@ nni_ipc_pipe_close(void *arg)
}
+static int
+nni_ipc_pipe_init(void **argp)
+{
+ nni_ipc_pipe *pipe;
+ int rv;
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ *argp = pipe;
+ return (0);
+}
+
+
static void
-nni_ipc_pipe_destroy(void *arg)
+nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
@@ -260,10 +278,10 @@ nni_ipc_negotiate(nni_ipc_pipe *pipe)
static int
-nni_ipc_ep_connect(void *arg, nni_pipe *npipe)
+nni_ipc_ep_connect(void *arg, void *pipearg)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe;
+ nni_ipc_pipe *pipe = pipearg;
int rv;
const char *path;
@@ -272,13 +290,6 @@ nni_ipc_ep_connect(void *arg, nni_pipe *npipe)
}
path = ep->addr + strlen("ipc://");
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -291,11 +302,8 @@ nni_ipc_ep_connect(void *arg, nni_pipe *npipe)
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
nni_plat_ipc_shutdown(&pipe->fd);
- nni_plat_ipc_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
return (rv);
}
- nni_pipe_set_tran_data(npipe, pipe);
return (0);
}
@@ -321,22 +329,14 @@ nni_ipc_ep_bind(void *arg)
static int
-nni_ipc_ep_accept(void *arg, nni_pipe *npipe)
+nni_ipc_ep_accept(void *arg, void *pipearg)
{
nni_ipc_ep *ep = arg;
- nni_ipc_pipe *pipe;
+ nni_ipc_pipe *pipe = pipearg;
int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- if ((rv = nni_plat_ipc_init(&pipe->fd)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
if ((rv = nni_plat_ipc_accept(&pipe->fd, &ep->fd)) != 0) {
nni_plat_ipc_fini(&pipe->fd);
@@ -345,22 +345,18 @@ nni_ipc_ep_accept(void *arg, nni_pipe *npipe)
}
if ((rv = nni_ipc_negotiate(pipe)) != 0) {
nni_plat_ipc_shutdown(&pipe->fd);
- nni_plat_ipc_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
return (rv);
}
- nni_pipe_set_tran_data(npipe, pipe);
return (0);
}
static nni_tran_pipe nni_ipc_pipe_ops = {
- .pipe_destroy = nni_ipc_pipe_destroy,
- .pipe_send = nni_ipc_pipe_send,
- .pipe_recv = nni_ipc_pipe_recv,
- .pipe_close = nni_ipc_pipe_close,
- .pipe_peer = nni_ipc_pipe_peer,
- .pipe_getopt = nni_ipc_pipe_getopt,
+ .p_init = nni_ipc_pipe_init,
+ .p_fini = nni_ipc_pipe_fini,
+ .p_close = nni_ipc_pipe_close,
+ .p_peer = nni_ipc_pipe_peer,
+ .p_getopt = nni_ipc_pipe_getopt,
};
static nni_tran_ep nni_ipc_ep_ops = {
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index e198a927..f220e49e 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -22,21 +22,34 @@ typedef struct nni_tcp_ep nni_tcp_ep;
// nni_tcp_pipe is one end of a TCP connection.
struct nni_tcp_pipe {
const char * addr;
- nni_plat_tcpsock fd;
+ nni_plat_tcpsock * tsp;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
+
+ nni_aio * user_txaio;
+ nni_aio * user_rxaio;
+
+ uint8_t txlen[sizeof (uint64_t)];
+ uint8_t rxlen[sizeof (uint64_t)];
+ nni_aio txaio;
+ nni_aio rxaio;
+ nni_msg * rxmsg;
};
struct nni_tcp_ep {
char addr[NNG_MAXADDRLEN+1];
- nni_plat_tcpsock fd;
+ nni_plat_tcpsock * tsp;
int closed;
uint16_t proto;
size_t rcvmax;
int ipv4only;
};
+
+static void nni_tcp_pipe_send_cb(void *);
+static void nni_tcp_pipe_recv_cb(void *);
+
static int
nni_tcp_tran_init(void)
{
@@ -55,23 +68,201 @@ nni_tcp_pipe_close(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_plat_tcp_shutdown(&pipe->fd);
+ nni_plat_tcp_shutdown(pipe->tsp);
+}
+
+
+static int
+nni_tcp_pipe_init(void **argp)
+{
+ nni_tcp_pipe *pipe;
+ int rv;
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe);
+ if (rv != 0) {
+ nni_plat_tcp_fini(pipe->tsp);
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ rv = nni_aio_init(&pipe->rxaio, nni_tcp_pipe_recv_cb, pipe);
+ if (rv != 0) {
+ nni_aio_fini(&pipe->txaio);
+ nni_plat_tcp_fini(pipe->tsp);
+ NNI_FREE_STRUCT(pipe);
+ }
+ *argp = pipe;
+ return (0);
}
static void
-nni_tcp_pipe_destroy(void *arg)
+nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_plat_tcp_fini(&pipe->fd);
+ nni_aio_fini(&pipe->rxaio);
+ nni_aio_fini(&pipe->txaio);
+ nni_plat_tcp_fini(pipe->tsp);
NNI_FREE_STRUCT(pipe);
}
+static void
+nni_tcp_pipe_send_cb(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+ int rv;
+ nni_aio *aio;
+ size_t len;
+
+ if ((aio = pipe->user_txaio) == NULL) {
+ // This should never ever happen.
+ NNI_ASSERT(aio != NULL);
+ return;
+ }
+ pipe->user_txaio = NULL;
+
+ if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ len = nni_msg_len(aio->a_msg);
+ nni_msg_free(aio->a_msg);
+ aio->a_msg = NULL;
+
+ nni_aio_finish(aio, 0, len);
+}
+
+
+static void
+nni_tcp_pipe_recv_cb(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+ nni_aio *aio;
+ int rv;
+
+ aio = pipe->user_rxaio;
+ if (aio == NULL) {
+ // This should never ever happen.
+ NNI_ASSERT(aio != NULL);
+ return;
+ }
+
+ if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ // Error on receive. This has to cause an error back
+ // to the user. Also, if we had allocated an rxmsg, lets
+ // toss it.
+ if (pipe->rxmsg != NULL) {
+ nni_msg_free(pipe->rxmsg);
+ pipe->rxmsg = NULL;
+ }
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ // If we don't have a message yet, we were reading the TCP message
+ // header, which is just the length. This tells us the size of the
+ // message to allocate and how much more to expect.
+ if (pipe->rxmsg == NULL) {
+ uint64_t len;
+ // We should have gotten a message header.
+ NNI_GET64(pipe->rxlen, len);
+
+ // Make sure the message payload is not too big. If it is
+ // the caller will shut down the pipe.
+ if (len > pipe->rcvmax) {
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, NNG_EMSGSIZE, 0);
+ return;
+ }
+
+ if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) {
+ pipe->user_rxaio = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+
+ // Submit the rest of the data for a read -- we want to
+ // read the entire message now.
+ pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
+ pipe->rxaio.a_niov = 1;
+
+ rv = nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio);
+ if (rv != 0) {
+ pipe->user_rxaio = NULL;
+ nni_msg_free(pipe->rxmsg);
+ pipe->rxmsg = NULL;
+ nni_aio_finish(aio, rv, 0);
+ return;
+ }
+ return;
+ }
+
+ // Otherwise we got a message read completely. Let the user know the
+ // good news.
+ pipe->user_rxaio = NULL;
+ aio->a_msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
+ nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
+}
+
+
+static int
+nni_tcp_pipe_aio_send(void *arg, nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = arg;
+ nni_msg *msg = aio->a_msg;
+ uint64_t len;
+
+ pipe->user_txaio = aio;
+
+ len = nni_msg_len(msg) + nni_msg_header_len(msg);
+ NNI_PUT64(pipe->txlen, len);
+
+ pipe->txaio.a_iov[0].iov_buf = pipe->txlen;
+ pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txlen);
+ pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
+ pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
+ pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
+ pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
+ pipe->txaio.a_niov = 3;
+
+ return (nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio));
+}
+
+
+static int
+nni_tcp_pipe_aio_recv(void *arg, nni_aio *aio)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ pipe->user_rxaio = aio;
+
+ NNI_ASSERT(pipe->rxmsg == NULL);
+
+ // Schedule a read of the TCP header.
+ pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen;
+ pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen);
+ pipe->rxaio.a_niov = 1;
+
+ return (nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio));
+}
+
+
static int
nni_tcp_pipe_send(void *arg, nni_msg *msg)
{
+#if 0
nni_tcp_pipe *pipe = arg;
uint64_t len;
uint8_t buf[sizeof (len)];
@@ -88,10 +279,13 @@ nni_tcp_pipe_send(void *arg, nni_msg *msg)
len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len;
NNI_PUT64(buf, len);
- if ((rv = nni_plat_tcp_send(&pipe->fd, iov, 3)) == 0) {
+ if ((rv = nni_plat_tcp_send(pipe->tsp, iov, 3)) == 0) {
nni_msg_free(msg);
}
return (rv);
+
+#endif
+ return (NNG_EINVAL);
}
@@ -107,7 +301,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
iov[0].iov_buf = buf;
iov[0].iov_len = sizeof (buf);
- if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) != 0) {
+ if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) != 0) {
return (rv);
}
NNI_GET64(buf, len);
@@ -122,7 +316,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
iov[0].iov_len = nng_msg_len(msg);
iov[0].iov_buf = nng_msg_body(msg);
- if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) {
+ if ((rv = nni_plat_tcp_recv(pipe->tsp, iov, 1)) == 0) {
*msgp = msg;
} else {
nni_msg_free(msg);
@@ -181,7 +375,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock)
ep->ipv4only = 0; // XXX: FIXME
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) {
+ if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) {
NNI_FREE_STRUCT(ep);
return (rv);
}
@@ -198,7 +392,7 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_plat_tcp_fini(&ep->fd);
+ nni_plat_tcp_fini(ep->tsp);
NNI_FREE_STRUCT(ep);
}
@@ -208,7 +402,7 @@ nni_tcp_ep_close(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_plat_tcp_shutdown(&ep->fd);
+ nni_plat_tcp_shutdown(ep->tsp);
}
@@ -281,13 +475,13 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
iov.iov_buf = buf;
iov.iov_len = 8;
- if ((rv = nni_plat_tcp_send(&pipe->fd, &iov, 1)) != 0) {
+ if ((rv = nni_plat_tcp_send(pipe->tsp, &iov, 1)) != 0) {
return (rv);
}
iov.iov_buf = buf;
iov.iov_len = 8;
- if ((rv = nni_plat_tcp_recv(&pipe->fd, &iov, 1)) != 0) {
+ if ((rv = nni_plat_tcp_recv(pipe->tsp, &iov, 1)) != 0) {
return (rv);
}
@@ -303,10 +497,10 @@ nni_tcp_negotiate(nni_tcp_pipe *pipe)
static int
-nni_tcp_ep_connect(void *arg, nni_pipe *npipe)
+nni_tcp_ep_connect(void *arg, void *pipearg)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe;
+ nni_tcp_pipe *pipe = pipearg;
char *host;
uint16_t port;
int flag;
@@ -350,13 +544,6 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe)
return (rv);
}
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
- NNI_FREE_STRUCT(pipe);
- return (rv);
- }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
@@ -364,20 +551,15 @@ nni_tcp_ep_connect(void *arg, nni_pipe *npipe)
remaddr.s_un.s_in.sa_port = port;
bindaddr = lclpart == NULL ? NULL : &lcladdr;
- rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, bindaddr);
+ rv = nni_plat_tcp_connect(pipe->tsp, &remaddr, bindaddr);
if (rv != 0) {
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
- nni_plat_tcp_shutdown(&pipe->fd);
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ nni_plat_tcp_shutdown(pipe->tsp);
return (rv);
}
- nni_pipe_set_tran_data(npipe, pipe);
return (0);
}
@@ -406,7 +588,7 @@ nni_tcp_ep_bind(void *arg)
}
baddr.s_un.s_in.sa_port = port;
- if ((rv = nni_plat_tcp_listen(&ep->fd, &baddr)) != 0) {
+ if ((rv = nni_plat_tcp_listen(ep->tsp, &baddr)) != 0) {
return (rv);
}
return (0);
@@ -414,46 +596,34 @@ nni_tcp_ep_bind(void *arg)
static int
-nni_tcp_ep_accept(void *arg, nni_pipe *npipe)
+nni_tcp_ep_accept(void *arg, void *pipearg)
{
nni_tcp_ep *ep = arg;
- nni_tcp_pipe *pipe;
+ nni_tcp_pipe *pipe = pipearg;
int rv;
-
- if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
- return (NNG_ENOMEM);
- }
pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- if ((rv = nni_plat_tcp_init(&pipe->fd)) != 0) {
- NNI_FREE_STRUCT(pipe);
- }
-
- if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) {
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) {
return (rv);
}
if ((rv = nni_tcp_negotiate(pipe)) != 0) {
- nni_plat_tcp_shutdown(&pipe->fd);
- nni_plat_tcp_fini(&pipe->fd);
- NNI_FREE_STRUCT(pipe);
+ nni_plat_tcp_shutdown(pipe->tsp);
return (rv);
}
- nni_pipe_set_tran_data(npipe, pipe);
return (0);
}
static nni_tran_pipe nni_tcp_pipe_ops = {
- .pipe_destroy = nni_tcp_pipe_destroy,
- .pipe_send = nni_tcp_pipe_send,
- .pipe_recv = nni_tcp_pipe_recv,
- .pipe_close = nni_tcp_pipe_close,
- .pipe_peer = nni_tcp_pipe_peer,
- .pipe_getopt = nni_tcp_pipe_getopt,
+ .p_init = nni_tcp_pipe_init,
+ .p_fini = nni_tcp_pipe_fini,
+ .p_aio_send = nni_tcp_pipe_aio_send,
+ .p_aio_recv = nni_tcp_pipe_aio_recv,
+ .p_close = nni_tcp_pipe_close,
+ .p_peer = nni_tcp_pipe_peer,
+ .p_getopt = nni_tcp_pipe_getopt,
};
static nni_tran_ep nni_tcp_ep_ops = {