diff options
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/defs.h | 77 | ||||
| -rw-r--r-- | src/core/pipe.h | 2 | ||||
| -rw-r--r-- | src/core/platform.h | 38 | ||||
| -rw-r--r-- | src/core/transport.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 374 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 3 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 3 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 7 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 448 |
11 files changed, 924 insertions, 37 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 58564291..a1c0fac7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -71,6 +71,8 @@ set (NNG_SOURCES protocol/reqrep/req.c transport/inproc/inproc.c + + transport/tcp/tcp.c ) include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src) diff --git a/src/core/defs.h b/src/core/defs.h index be7f71f7..ed2e353f 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -52,43 +52,54 @@ typedef struct { #define NNI_ALLOC_STRUCT(s) nni_alloc(sizeof (*s)) #define NNI_FREE_STRUCT(s) nni_free((s), sizeof (*s)) -#define NNI_PUT32(ptr, u) \ - do { \ - ptr[0] = (uint8_t) (((uint32_t) u) >> 24); \ - ptr[1] = (uint8_t) (((uint32_t) u) >> 16); \ - ptr[2] = (uint8_t) (((uint32_t) u) >> 8); \ - ptr[3] = (uint8_t) ((uint32_t) u); \ - } \ +#define NNI_PUT16(ptr, u) \ + do { \ + (ptr)[0] = (uint8_t) (((uint16_t) (u)) >> 8); \ + (ptr)[0] = (uint8_t) ((uint16_t) (u)); \ + } \ while (0) -#define NNI_PUT64(ptr, u) \ - do { \ - ptr[0] = (uint8_t) (((uint64_t) u) >> 56); \ - ptr[1] = (uint8_t) (((uint64_t) u) >> 48); \ - ptr[2] = (uint8_t) (((uint64_t) u) >> 40); \ - ptr[3] = (uint8_t) (((uint64_t) u) >> 32); \ - ptr[4] = (uint8_t) (((uint64_t) u) >> 24); \ - ptr[5] = (uint8_t) (((uint64_t) u) >> 16); \ - ptr[6] = (uint8_t) (((uint64_t) u) >> 8); \ - ptr[7] = (uint8_t) ((uint64_t) u); \ - } \ +#define NNI_PUT32(ptr, u) \ + do { \ + (ptr)[0] = (uint8_t) (((uint32_t) (u)) >> 24); \ + (ptr)[1] = (uint8_t) (((uint32_t) (u)) >> 16); \ + (ptr)[2] = (uint8_t) (((uint32_t) (u)) >> 8); \ + (ptr)[3] = (uint8_t) ((uint32_t) (u)); \ + } \ while (0) -#define NNI_GET32(ptr, v) \ - v = (((uint32_t) ((uint8_t) ptr[0])) << 24) + \ - (((uint32_t) ((uint8_t) ptr[1])) << 16) + \ - (((uint32_t) ((uint8_t) ptr[2])) << 8) + \ - (((uint32_t) (uint8_t) ptr[3])) - -#define NNI_GET64(ptr, v) \ - v = (((uint64_t) ((uint8_t) ptr[0])) << 56) + \ - (((uint64_t) ((uint8_t) ptr[1])) << 48) + \ - (((uint64_t) ((uint8_t) ptr[2])) << 40) + \ - (((uint64_t) ((uint8_t) ptr[3])) << 32) + \ - (((uint64_t) ((uint8_t) ptr[4])) << 24) + \ - (((uint64_t) ((uint8_t) ptr[5])) << 16) + \ - (((uint64_t) ((uint8_t) ptr[6])) << 8) + \ - (((uint64_t) (uint8_t) ptr[7])) +#define NNI_PUT64(ptr, u) \ + do { \ + (ptr)[0] = (uint8_t) (((uint64_t) (u)) >> 56); \ + (ptr)[1] = (uint8_t) (((uint64_t) (u)) >> 48); \ + (ptr)[2] = (uint8_t) (((uint64_t) (u)) >> 40); \ + (ptr)[3] = (uint8_t) (((uint64_t) (u)) >> 32); \ + (ptr)[4] = (uint8_t) (((uint64_t) (u)) >> 24); \ + (ptr)[5] = (uint8_t) (((uint64_t) (u)) >> 16); \ + (ptr)[6] = (uint8_t) (((uint64_t) (u)) >> 8); \ + (ptr)[7] = (uint8_t) ((uint64_t) (u)); \ + } \ + while (0) + +#define NNI_GET16(ptr, v) \ + v = (((uint32_t) ((uint8_t) (ptr)[0])) << 8) + \ + (((uint32_t) (uint8_t) (ptr)[1])) + +#define NNI_GET32(ptr, v) \ + v = (((uint32_t) ((uint8_t) (ptr)[0])) << 24) + \ + (((uint32_t) ((uint8_t) (ptr)[1])) << 16) + \ + (((uint32_t) ((uint8_t) (ptr)[2])) << 8) + \ + (((uint32_t) (uint8_t) (ptr)[3])) + +#define NNI_GET64(ptr, v) \ + v = (((uint64_t) ((uint8_t) (ptr)[0])) << 56) + \ + (((uint64_t) ((uint8_t) (ptr)[1])) << 48) + \ + (((uint64_t) ((uint8_t) (ptr)[2])) << 40) + \ + (((uint64_t) ((uint8_t) (ptr)[3])) << 32) + \ + (((uint64_t) ((uint8_t) (ptr)[4])) << 24) + \ + (((uint64_t) ((uint8_t) (ptr)[5])) << 16) + \ + (((uint64_t) ((uint8_t) (ptr)[6])) << 8) + \ + (((uint64_t) (uint8_t) (ptr)[7])) // A few assorted other items. #define NNI_FLAG_IPV4ONLY 1 diff --git a/src/core/pipe.h b/src/core/pipe.h index a8d7b286..6ebb7090 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -44,6 +44,8 @@ extern int nni_pipe_create(nni_pipe **, nni_ep *); extern void nni_pipe_destroy(nni_pipe *); +extern uint16_t nni_pipe_proto(nni_pipe *); +extern uint16_t nni_pipe_peer(nni_pipe *); extern int nni_pipe_start(nni_pipe *); extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); diff --git a/src/core/platform.h b/src/core/platform.h index 2134ed97..3292d6f9 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -63,6 +63,7 @@ extern void nni_free(void *, size_t); typedef struct nni_plat_mtx nni_plat_mtx; typedef struct nni_plat_cv nni_plat_cv; typedef struct nni_plat_thr nni_plat_thr; +typedef struct nni_plat_tcpsock nni_plat_tcpsock; // Mutex handling. @@ -166,6 +167,43 @@ extern uint32_t nni_plat_nextid(void); // used.) extern const char *nni_plat_strerror(int); +// nni_plat_lookup_host looks up a hostname in DNS, or the local hosts +// file, or whatever. If your platform lacks support for naming, it must +// at least cope with converting IP addresses in string form. The final +// flags may include NNI_FLAG_IPV4ONLY to prevent IPv6 names from being +// returned on dual stack machines. +extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); + +// nni_plat_tcp_close just closes a TCP socket. +extern void nni_plat_tcp_close(nni_plat_tcpsock *); + +// nni_plat_tcp_listen creates a TCP socket in listening mode, bound +// to the specified address. Note that nni_plat_tcpsock should be defined +// to whatever your platform uses. For most systems its just "int". +extern int nni_plat_tcp_listen(nni_plat_tcpsock *, const nni_sockaddr *); + +// nni_plat_tcp_accept does the accept to accept an inbound connection. +// The tcpsock used for the server will have been set up with the +// nni_plat_tcp_listen. +extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *); + +// nni_plat_tcp_connect is the client side. Two addresses are supplied, +// as the client may specify a local address to which to bind. This +// second address may be NULL to use ephemeral ports, which is the +// usual default. +extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *, + const nni_sockaddr *); + +// nni_plat_tcp_send sends data to the remote side. The platform is +// responsible for attempting to send all of the data. The iov count +// will never be larger than 4. THe platform may modify the iovs. +extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int); + +// nni_plat_tcp_recv recvs data into the buffers provided by the +// iovs. The implementation does not return until the iovs are completely +// full, or an error condition occurs. +extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int); + // Actual platforms we support. This is included up front so that we can // get the specific types that are supplied by the platform. #if defined(PLATFORM_POSIX) diff --git a/src/core/transport.c b/src/core/transport.c index b6ebe3c2..cf9f38c3 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -14,9 +14,11 @@ // For now the list of transports is hard-wired. Adding new transports // to the system dynamically is something that might be considered later. extern nni_tran nni_inproc_tran; +extern nni_tran nni_tcp_tran; static nni_tran *transports[] = { &nni_inproc_tran, + &nni_tcp_tran, NULL }; diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 38fe27c9..2b72f574 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -32,8 +32,11 @@ extern int nni_plat_errno(int); #endif + #ifdef PLATFORM_POSIX_NET -typedef int nni_plat_tcpsock; +struct nni_plat_tcpsock { + int fd; +}; #endif // Define types that this platform uses. diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c new file mode 100644 index 00000000..2fdab245 --- /dev/null +++ b/src/platform/posix/posix_net.c @@ -0,0 +1,374 @@ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#ifdef PLATFORM_POSIX_NET + +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <unistd.h> +#include <netdb.h> + +static int +nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) +{ + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; + + switch (sa->s_un.s_family) { + case NNG_AF_INET: + sin = (void *) ss; + memset(sin, 0, sizeof (*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; + sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; + return (sizeof (*sin)); + + case NNG_AF_INET6: + sin6 = (void *) ss; + memset(&sin6, 0, sizeof (sin6)); +#ifdef SIN6_LEN + sin6->sin6_len = sizeof (*sin6); +#endif + sin6->sin6_family = PF_INET6; + sin6->sin6_port = sa->s_un.s_in6.sa_port; + memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); + return (sizeof (*sin6)); + } + return (-1); +} + + +static int +nni_plat_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) +{ + const struct sockaddr_in *sin; + const struct sockaddr_in6 *sin6; + + memset(sa, 0, sizeof (*sa)); + switch (ss->sa_family) { + case PF_INET: + sin = (const void *) ss; + sa->s_un.s_in.sa_family = NNG_AF_INET; + sa->s_un.s_in.sa_port = sin->sin_port; + sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; + return (0); + + case PF_INET6: + sin6 = (const void *) ss; + sa->s_un.s_in6.sa_family = NNG_AF_INET6; + sa->s_un.s_in6.sa_port = sin6->sin6_port; + memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); + return (0); + } + return (-1); +} + + +int +nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) +{ + struct addrinfo hint; + struct addrinfo *ai; + + memset(&hint, 0, sizeof (hint)); + hint.ai_flags = AI_DEFAULT | AI_PASSIVE; + hint.ai_family = PF_UNSPEC; + hint.ai_socktype = SOCK_STREAM; + if (flags & NNI_FLAG_IPV4ONLY) { + hint.ai_family = PF_INET; + } + + if (getaddrinfo(host, NULL, &hint, &ai) != 0) { + return (NNG_EADDRINVAL); + } + + if (nni_plat_from_sockaddr(addr, ai->ai_addr) < 0) { + freeaddrinfo(ai); + return (NNG_EADDRINVAL); + } + freeaddrinfo(ai); + return (0); +} + + +int +nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) +{ + struct iovec iov[4]; // We never have more than 3 at present + int i; + int offset; + int resid = 0; + int rv; + + if (cnt > 4) { + return (NNG_EINVAL); + } + + for (i = 0; i < cnt; i++) { + iov[i].iov_base = iovs[i].iov_buf; + iov[i].iov_len = iovs[i].iov_len; + resid += iov[i].iov_len; + } + + i = 0; + while (resid) { + rv = writev(s->fd, iov, cnt); + if (rv < 0) { + if (rv == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + if (rv > resid) { + nni_panic("writev says it wrote too much!"); + } + + resid -= rv; + while (rv) { + if (iov[i].iov_len <= rv) { + rv -= iov[i].iov_len; + i++; + cnt--; + } else { + iov[i].iov_len -= rv; + iov[i].iov_base += rv; + rv = 0; + } + } + } + + return (0); +} + + +int +nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) +{ + struct iovec iov[4]; // We never have more than 3 at present + int i; + int offset; + int resid = 0; + int rv; + + if (cnt > 4) { + return (NNG_EINVAL); + } + + for (i = 0; i < cnt; i++) { + iov[i].iov_base = iovs[i].iov_buf; + iov[i].iov_len = iovs[i].iov_len; + resid += iov[i].iov_len; + } + + i = 0; + while (resid) { + rv = readv(s->fd, iov, cnt); + if (rv < 0) { + if (errno == EINTR) { + continue; + } + return (nni_plat_errno(errno)); + } + if (rv > resid) { + nni_panic("readv says it read too much!"); + } + + resid -= rv; + while (rv) { + if (iov[i].iov_len <= rv) { + rv -= iov[i].iov_len; + i++; + cnt--; + } else { + iov[i].iov_len -= rv; + iov[i].iov_base += rv; + rv = 0; + } + } + } + + return (0); +} + + +static void +nni_plat_tcp_setopts(int fd) +{ + int one; + + // Try to ensure that both CLOEXEC is set, and that we don't + // generate SIGPIPE. (Note that SIGPIPE suppression in this way + // only works on BSD systems. Linux wants us to use sendmsg().) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); +#if defined(F_SETNOSIGPIPE) + (void) fcntl(fd, F_SETNOSIGPIPE, 1); +#elif defined(SO_NOSIGPIPE) + one = 1; + (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)); +#endif + + // Also disable Nagle. We are careful to group data with writev, + // and latency is king for most of our users. (Consider adding + // a method to enable this later.) + one = 1; + (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one)); +} + + +void +nni_plat_tcp_close(nni_plat_tcpsock *s) +{ + (void) close(s->fd); + s->fd = -1; +} + +// nni_plat_tcp_bind creates a file descriptor bound to the given address. +// This basically does the equivalent of socket, bind, and listen. We have +// chosen a default value for the listen backlog of 128, which should be +// plenty. (If it isn't, then the accept thread can't get enough resources +// to keep up, and your clients are going to experience bad things. Normally +// the actual backlog should hover near 0 anyway.) +int +nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +{ + int fd; + int len; + struct sockaddr_storage ss; + int rv; + + len = nni_plat_to_sockaddr(&ss, addr); + if (len < 0) { + return (NNG_EADDRINVAL); + } + +#ifdef SOCK_CLOEXEC + fd = socket(ss.ss_family, SOCK_STREAM, SOCK_CLOEXEC); +#else + fd = socket(ss.ss_family, SOCK_STREAM, 0); +#endif + if (fd < 0) { + return (nni_plat_errno(errno)); + } + + nni_plat_tcp_setopts(fd); + + if (bind(fd, (struct sockaddr *) &ss, len) < 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + + // Listen -- 128 depth is probably sufficient. If it isn't, other + // bad things are going to happen. + if (listen(fd, 128) != 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + + s->fd = fd; + return (0); +} + + +// nni_plat_tcp_connect establishes an outbound connection. It the +// bind address is not null, then it will attempt to bind to the local +// address specified first. +int +nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, + const nni_sockaddr *bindaddr) +{ + int fd; + int len; + struct sockaddr_storage ss; + struct sockaddr_storage bss; + int rv; + + len = nni_plat_to_sockaddr(&ss, addr); + if (len < 0) { + return (NNG_EADDRINVAL); + } + +#ifdef SOCK_CLOEXEC + fd = socket(ss.ss_family, SOCK_STREAM, SOCK_CLOEXEC); +#else + fd = socket(ss.ss_family, SOCK_STREAM, 0); +#endif + if (fd < 0) { + return (nni_plat_errno(errno)); + } + + if (bindaddr != NULL) { + if (bindaddr->s_un.s_family != addr->s_un.s_family) { + return (NNG_EINVAL); + } + if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) { + return (NNG_EADDRINVAL); + } + if (bind(fd, (struct sockaddr *) &bss, len) < 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + } + + nni_plat_tcp_setopts(fd); + + if (connect(fd, (struct sockaddr *) &ss, len) != 0) { + rv = nni_plat_errno(errno); + (void) close(fd); + return (rv); + } + s->fd = fd; + return (0); +} + + +int +nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +{ + int fd; + + for (;;) { +#ifdef NNG_USE_ACCEPT4 + fd = accept4(server, NULL, NULL, SOCK_CLOEXEC); + if ((fd < 0) && ((errrno == ENOSYS) || (errno == ENOTSUP))) { + fd = accept(server, NULL, NULL); + } +#else + fd = accept(server->fd, NULL, NULL); +#endif + + if (fd < 0) { + if ((errno == EINTR) || (errno == ECONNABORTED)) { + // These are not fatal errors, keep trying + continue; + } + return (nni_plat_errno(errno)); + } else { + break; + } + } + + nni_plat_tcp_setopts(fd); + s->fd = fd; + return (0); +} + + +#endif // PLATFORM_POSIX_NET diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index cc906791..f86a9fa7 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -102,6 +102,9 @@ nni_pair_pipe_add(void *arg) nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; + if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PAIR) { + return (NNG_EPROTO); + } if (pair->pipe != NULL) { return (NNG_EBUSY); // Already have a peer, denied. } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index ca59d799..8c375b61 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -137,6 +137,9 @@ nni_rep_pipe_add(void *arg) nni_rep_sock *rep = rp->rep; int rv; + if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) { + return (NNG_EPROTO); + } nni_mtx_lock(&rep->mx); rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); nni_mtx_unlock(&rep->mx); diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 4ce66ab6..b9981c3c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -138,9 +138,10 @@ nni_req_pipe_fini(void *arg) static int nni_req_pipe_add(void *arg) { - // We have nothing to do, since we don't need to maintain a global - // list of related pipes. - NNI_ARG_UNUSED(arg); + nni_req_pipe *rp = arg; + if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { + return (NNG_EPROTO); + } return (0); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c new file mode 100644 index 00000000..6c581b40 --- /dev/null +++ b/src/transport/tcp/tcp.c @@ -0,0 +1,448 @@ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <stdlib.h> +#include <string.h> +#include <stdio.h> + +#include "core/nng_impl.h" + +// TCP transport. Platform specific TCP operations must be +// supplied as well. + +typedef struct nni_tcp_pipe nni_tcp_pipe; +typedef struct nni_tcp_ep nni_tcp_ep; + +// nni_tcp_pipe is one end of a TCP connection. +struct nni_tcp_pipe { + const char * addr; + nni_plat_tcpsock fd; + uint16_t peer; + uint16_t proto; + uint32_t rcvmax; +}; + +struct nni_tcp_ep { + char addr[NNG_MAXADDRLEN+1]; + nni_plat_tcpsock fd; + int closed; + uint16_t proto; + uint32_t rcvmax; + int ipv4only; +}; + +static int +nni_tcp_tran_init(void) +{ + return (0); +} + + +static void +nni_tcp_tran_fini(void) +{ +} + + +static void +nni_tcp_pipe_close(void *arg) +{ + nni_tcp_pipe *pipe = arg; + + nni_plat_tcp_close(&pipe->fd); +} + + +static void +nni_tcp_pipe_destroy(void *arg) +{ + nni_tcp_pipe *pipe = arg; + + NNI_FREE_STRUCT(pipe); +} + + +static int +nni_tcp_pipe_send(void *arg, nni_msg *msg) +{ + nni_tcp_pipe *pipe = arg; + uint64_t len; + uint8_t buf[sizeof (len)]; + nni_iov iov[3]; + int rv; + + iov[0].iov_buf = buf; + iov[0].iov_len = sizeof (buf); + iov[1].iov_buf = nni_msg_header(msg, &iov[1].iov_len); + iov[2].iov_buf = nni_msg_body(msg, &iov[2].iov_len); + + len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len; + NNI_PUT64(buf, len); + + if ((rv = nni_plat_tcp_send(&pipe->fd, iov, 3)) == 0) { + nni_msg_free(msg); + } + return (rv); +} + + +static int +nni_tcp_pipe_recv(void *arg, nni_msg **msgp) +{ + nni_tcp_pipe *pipe = arg; + nni_msg *msg; + uint64_t len; + uint8_t buf[sizeof (len)]; + nni_iov iov[1]; + int rv; + + iov[0].iov_buf = buf; + iov[0].iov_len = sizeof (buf); + if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) != 0) { + return (rv); + } + NNI_GET64(buf, len); + // Check MAXRCVSIZE... + if (len > pipe->rcvmax) { + return (NNG_EPROTO); + } + + if ((rv = nng_msg_alloc(&msg, len)) != 0) { + return (rv); + } + + iov[0].iov_buf = nng_msg_body(msg, &iov[0].iov_len); + + if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) { + *msgp = msg; + } else { + nni_msg_free(msg); + } + return (rv); +} + + +static uint16_t +nni_tcp_pipe_peer(void *arg) +{ + nni_tcp_pipe *pipe = arg; + + return (pipe->peer); +} + + +static int +nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) +{ +#if 0 + nni_inproc_pipe *pipe = arg; + size_t len; + + switch (option) { + case NNG_OPT_LOCALADDR: + case NNG_OPT_REMOTEADDR: + len = strlen(pipe->addr) + 1; + if (len > *szp) { + (void) memcpy(buf, pipe->addr, *szp); + } else { + (void) memcpy(buf, pipe->addr, len); + } + *szp = len; + return (0); + } +#endif + return (NNG_ENOTSUP); +} + + +static int +nni_tcp_ep_init(void **epp, const char *url, uint16_t proto) +{ + nni_tcp_ep *ep; + int rv; + + if (strlen(url) > NNG_MAXADDRLEN-1) { + return (NNG_EINVAL); + } + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + ep->closed = 0; + ep->proto = proto; + ep->ipv4only = 0; + + (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + + *epp = ep; + return (0); +} + + +static void +nni_tcp_ep_fini(void *arg) +{ + nni_tcp_ep *ep = arg; + + NNI_FREE_STRUCT(ep); +} + + +static void +nni_tcp_ep_close(void *arg) +{ + nni_tcp_ep *ep = arg; + + nni_plat_tcp_close(&ep->fd); +} + + +static int +nni_parseaddr(char *pair, char **hostp, uint16_t *portp) +{ + char *host, *port, *end; + char c; + int val; + + if (pair[0] == '[') { + host = pair+1; + // IP address enclosed ... for IPv6 usually. + if ((end = strchr(host, ']')) == NULL) { + return (NNG_EADDRINVAL); + } + *end = '\0'; + port = end + 1; + if (*port == ':') { + port++; + } else if (port != '\0') { + return (NNG_EADDRINVAL); + } + } else { + host = pair; + port = strchr(host, ':'); + if (port != NULL) { + *port = '\0'; + port++; + } + } + val = 0; + while ((c = *port) != '\0') { + val *= 10; + if ((c >= '0') && (c <= '9')) { + val += (c - '0'); + } else { + return (NNG_EADDRINVAL); + } + if (val > 65535) { + return (NNG_EADDRINVAL); + } + port++; + } + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; + } else { + *hostp = host; + } + // Stash the port in big endian (network) byte order. + NNI_PUT16((uint8_t *) portp, val); + return (0); +} + + +static int +nni_tcp_negotiate(nni_tcp_pipe *pipe) +{ + int rv; + nni_iov iov; + uint8_t buf[8]; + uint16_t peer; + + // First send our header.. + buf[0] = 0; + buf[1] = 'S'; + buf[2] = 'P'; + buf[3] = 0; // version + NNI_PUT16(&buf[4], pipe->proto); + NNI_PUT16(&buf[6], 0); + + iov.iov_buf = buf; + iov.iov_len = 8; + if ((rv = nni_plat_tcp_send(&pipe->fd, &iov, 1)) != 0) { + return (rv); + } + + iov.iov_buf = buf; + iov.iov_len = 8; + if ((rv = nni_plat_tcp_recv(&pipe->fd, &iov, 1)) != 0) { + return (rv); + } + + if ((buf[0] != 0) || (buf[1] != 'S') || + (buf[2] != 'P') || (buf[3] != 0) || + (buf[6] != 0) || (buf[7] != 0)) { + return (NNG_EPROTO); + } + + NNI_GET16((&buf[4]), pipe->peer); + return (0); +} + + +static int +nni_tcp_ep_connect(void *arg, void **pipep) +{ + nni_tcp_ep *ep = arg; + nni_tcp_pipe *pipe; + char *host; + uint16_t port; + int flag; + char addr[NNG_MAXADDRLEN+1]; + nni_sockaddr lcladdr; + nni_sockaddr remaddr; + int rv; + + char *lclpart; + char *rempart; + + flag = ep->ipv4only ? NNI_FLAG_IPV4ONLY : 0; + snprintf(addr, sizeof (addr), "%s", ep->addr); + + if ((rempart = strchr(addr, ';')) != NULL) { + *rempart = '\0'; + rempart++; + lclpart = addr; + + if ((rv = nni_parseaddr(lclpart, &host, &port)) != 0) { + return (rv); + } + if ((rv = nni_plat_lookup_host(host, &lcladdr, flag)) != 0) { + return (rv); + } + // The port is in the same offset for both v4 and v6. + lcladdr.s_un.s_in.sa_port = port; + } else { + lclpart = NULL; + rempart = addr; + } + + if ((rv = nni_parseaddr(rempart, &host, &port)) != 0) { + return (rv); + } + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + pipe->proto = ep->proto; + + // Port is in the same place for both v4 and v6. + remaddr.s_un.s_in.sa_port = port; + + rv = nni_plat_tcp_connect(&pipe->fd, &remaddr, + lclpart == NULL ? NULL : &lcladdr); + if (rv != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + + if ((rv = nni_tcp_negotiate(pipe)) != 0) { + nni_plat_tcp_close(&pipe->fd); + NNI_FREE_STRUCT(pipe); + return (rv); + } + *pipep = pipe; + return (0); +} + + +static int +nni_tcp_ep_bind(void *arg) +{ + nni_tcp_ep *ep = arg; + char addr[NNG_MAXADDRLEN+1]; + char *host; + uint16_t port; + int flag; + int rv; + nni_sockaddr baddr; + + flag = ep->ipv4only ? NNI_FLAG_IPV4ONLY : 0; + + // We want to strok this, so make a copy. Skip the scheme. + snprintf(addr, sizeof (addr), "%s", ep->addr + strlen("tcp://")); + + if ((rv = nni_parseaddr(addr, &host, &port)) != 0) { + return (rv); + } + if ((rv = nni_plat_lookup_host(host, &baddr, flag)) != 0) { + return (rv); + } + baddr.s_un.s_in.sa_port = port; + + if ((rv == nni_plat_tcp_listen(&ep->fd, &baddr)) != 0) { + return (rv); + } + return (0); +} + + +static int +nni_tcp_ep_accept(void *arg, void **pipep) +{ + nni_tcp_ep *ep = arg; + nni_tcp_pipe *pipe; + int rv; + + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) != NULL) { + return (NNG_ENOMEM); + } + pipe->proto = ep->proto; + + if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) { + NNI_FREE_STRUCT(pipe); + return (rv); + } + if ((rv = nni_tcp_negotiate(pipe)) != 0) { + nni_plat_tcp_close(&pipe->fd); + NNI_FREE_STRUCT(pipe); + return (rv); + } + *pipep = pipe; + return (0); +} + + +static nni_tran_pipe nni_tcp_pipe_ops = { + .pipe_destroy = nni_tcp_pipe_destroy, + .pipe_send = nni_tcp_pipe_send, + .pipe_recv = nni_tcp_pipe_recv, + .pipe_close = nni_tcp_pipe_close, + .pipe_peer = nni_tcp_pipe_peer, + .pipe_getopt = nni_tcp_pipe_getopt, +}; + +static nni_tran_ep nni_tcp_ep_ops = { + .ep_init = nni_tcp_ep_init, + .ep_fini = nni_tcp_ep_fini, + .ep_connect = nni_tcp_ep_connect, + .ep_bind = nni_tcp_ep_bind, + .ep_accept = nni_tcp_ep_accept, + .ep_close = nni_tcp_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, +}; + +// This is the TCP transport linkage, and should be the only global +// symbol in this entire file. +struct nni_tran nni_tcp_tran = { + .tran_scheme = "tcp", + .tran_ep = &nni_tcp_ep_ops, + .tran_pipe = &nni_tcp_pipe_ops, + .tran_init = nni_tcp_tran_init, + .tran_fini = nni_tcp_tran_fini, +}; |
