aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.h4
-rw-r--r--src/core/defs.h2
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/pipe.c22
-rw-r--r--src/core/platform.h11
-rw-r--r--src/core/transport.h29
6 files changed, 45 insertions, 27 deletions
diff --git a/src/core/aio.h b/src/core/aio.h
index 96b04857..c377ca93 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -15,9 +15,7 @@
#include "core/taskq.h"
#include "core/thread.h"
-typedef struct nni_aio_ops nni_aio_ops;
-typedef struct nni_aio nni_aio;
-
+typedef struct nni_aio_ops nni_aio_ops;
// An nni_aio is an async I/O handle.
struct nni_aio {
diff --git a/src/core/defs.h b/src/core/defs.h
index 82d0dfaf..1c44e9af 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -49,6 +49,8 @@ typedef int nni_signal; // Wakeup channel.
typedef uint64_t nni_time; // Abs. time (usec).
typedef int64_t nni_duration; // Rel. time (usec).
+typedef struct nni_aio nni_aio;
+
typedef void (*nni_cb)(void *);
// Used by transports for scatter gather I/O.
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 0310783a..1108a504 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -207,7 +207,7 @@ nni_ep_connect(nni_ep *ep, nni_pipe **pp)
if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) {
return (rv);
}
- rv = ep->ep_ops.ep_connect(ep->ep_data, pipe);
+ rv = ep->ep_ops.ep_connect(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
nni_pipe_destroy(pipe);
return (rv);
@@ -359,7 +359,7 @@ nni_ep_accept(nni_ep *ep, nni_pipe **pp)
if ((rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran)) != 0) {
return (rv);
}
- rv = ep->ep_ops.ep_accept(ep->ep_data, pipe);
+ rv = ep->ep_ops.ep_accept(ep->ep_data, pipe->p_tran_data);
if (rv != 0) {
nni_pipe_destroy(pipe);
return (rv);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 85a0bd4a..4b37cfa9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -39,14 +39,14 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
int
nni_pipe_aio_recv(nni_pipe *p, nni_aio *aio)
{
- return (p->p_tran_ops.pipe_aio_recv(p->p_tran_data, aio));
+ return (p->p_tran_ops.p_aio_recv(p->p_tran_data, aio));
}
int
nni_pipe_aio_send(nni_pipe *p, nni_aio *aio)
{
- return (p->p_tran_ops.pipe_aio_send(p->p_tran_data, aio));
+ return (p->p_tran_ops.p_aio_send(p->p_tran_data, aio));
}
@@ -92,7 +92,7 @@ nni_pipe_close(nni_pipe *p)
// Close the underlying transport.
if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_close(p->p_tran_data);
+ p->p_tran_ops.p_close(p->p_tran_data);
}
// Unregister our ID so nobody else can find it.
@@ -116,7 +116,7 @@ nni_pipe_close(nni_pipe *p)
uint16_t
nni_pipe_peer(nni_pipe *p)
{
- return (p->p_tran_ops.pipe_peer(p->p_tran_data));
+ return (p->p_tran_ops.p_peer(p->p_tran_data));
}
@@ -145,7 +145,15 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *tran->tran_pipe;
+ // Initialize the transport pipe data.
+ if ((rv = p->p_tran_ops.p_init(&p->p_tran_data)) != 0) {
+ nni_mtx_fini(&p->p_mtx);
+ NNI_FREE_STRUCT(p);
+ return (rv);
+ }
+
if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
+ p->p_tran_ops.p_fini(p->p_tran_data);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
return (rv);
@@ -164,7 +172,7 @@ nni_pipe_destroy(nni_pipe *p)
// The caller is responsible for ensuring that the pipe
// is not in use by any other consumers. It must not be started
if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_destroy(p->p_tran_data);
+ p->p_tran_ops.p_fini(p->p_tran_data);
}
nni_sock_pipe_rem(p->p_sock, p);
nni_mtx_fini(&p->p_mtx);
@@ -176,10 +184,10 @@ int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
/* This should only be called with the mutex held... */
- if (p->p_tran_ops.pipe_getopt == NULL) {
+ if (p->p_tran_ops.p_getopt == NULL) {
return (NNG_ENOTSUP);
}
- return (p->p_tran_ops.pipe_getopt(p->p_tran_data, opt, val, szp));
+ return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp));
}
diff --git a/src/core/platform.h b/src/core/platform.h
index b1e824e2..1644973c 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -174,7 +174,7 @@ extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
// nni_plat_tcp_init initializes the socket, for example it can
// set underlying file descriptors to -1, etc.
-extern int nni_plat_tcp_init(nni_plat_tcpsock *);
+extern int nni_plat_tcp_init(nni_plat_tcpsock **);
// nni_plat_tcp_fini just closes a TCP socket, and releases any related
// resources.
@@ -203,14 +203,17 @@ extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *);
extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *,
const nni_sockaddr *);
-// nni_plat_tcp_send sends data to the remote side. The platform is
-// responsible for attempting to send all of the data. The iov count
-// will never be larger than 4. THe platform may modify the iovs.
+// nni_plat_tcp_aio_send sends the data to the remote side asynchronously.
+// The data to send is stored in the a_iov field of the aio, and the array
+// of iovs will never be larger than 4. The platform may modify the iovs,
+// or the iov list.
+extern int nni_plat_tcp_aio_send(nni_plat_tcpsock *, nni_aio *);
extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int);
// nni_plat_tcp_recv recvs data into the buffers provided by the
// iovs. The implementation does not return until the iovs are completely
// full, or an error condition occurs.
+extern int nni_plat_tcp_aio_recv(nni_plat_tcpsock *, nni_aio *);
extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
// nni_plat_ipc_init initializes the socket, for example it can
diff --git a/src/core/transport.h b/src/core/transport.h
index 65cd775f..ab43e497 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -48,8 +48,10 @@ struct nni_tran_ep {
// ep_connect establishes a connection. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
- // NNG_ETIMEDOUT, and NNG_EPROTO.
- int (*ep_connect)(void *, nni_pipe *);
+ // NNG_ETIMEDOUT, and NNG_EPROTO. The first argument is the
+ // transport specific endpoint, and the second is the transport
+ // specific pipe structure.
+ int (*ep_connect)(void *, void *);
// ep_bind just does the bind() and listen() work,
// reserving the address but not creating any connections.
@@ -58,8 +60,10 @@ struct nni_tran_ep {
// address, or NNG_EACCESS for permission problems.
int (*ep_bind)(void *);
- // ep_accept accepts an inbound connection.
- int (*ep_accept)(void *, nni_pipe *);
+ // ep_accept accepts an inbound connection. The first argument
+ // is the transport-specific endpoint, and the second is the
+ // transport-specific pipe (which will have already been created.)
+ int (*ep_accept)(void *, void *);
// ep_close stops the endpoint from operating altogether. It does
// not affect pipes that have already been created.
@@ -77,14 +81,17 @@ struct nni_tran_ep {
// back into the socket at this point. (Which is one reason pointers back
// to socket or even enclosing pipe state, are not provided.)
struct nni_tran_pipe {
- // p_destroy destroys the pipe. This should clean up all local
+ // p_init initializes the pipe structure, allocating the structure.
+ int (*p_init)(void **);
+
+ // p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used by
// the pipe. After this call returns, the system will not make
// further calls on the same pipe.
- void (*pipe_destroy)(void *);
+ void (*p_fini)(void *);
- int (*pipe_aio_send)(void *, nni_aio *);
- int (*pipe_aio_recv)(void *, nni_aio *);
+ int (*p_aio_send)(void *, nni_aio *);
+ int (*p_aio_recv)(void *, nni_aio *);
// p_send sends the message. If the message cannot be received, then
// the caller may try again with the same message (or free it). If
@@ -104,15 +111,15 @@ struct nni_tran_pipe {
// p_close closes the pipe. Further recv or send operations should
// return back NNG_ECLOSED.
- void (*pipe_close)(void *);
+ void (*p_close)(void *);
// p_peer returns the peer protocol. This may arrive in whatever
// transport specific manner is appropriate.
- uint16_t (*pipe_peer)(void *);
+ uint16_t (*p_peer)(void *);
// p_getopt gets an pipe (transport-specific) property. These values
// may not be changed once the pipe is created.
- int (*pipe_getopt)(void *, int, void *, size_t *);
+ int (*p_getopt)(void *, int, void *, size_t *);
};
// These APIs are used by the framework internally, and not for use by