aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-04 02:10:13 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-04 02:10:13 -0800
commit1d650869f32c56f6d49d898c38f7525191a60bd1 (patch)
tree7a27136068de192a3166ce40ea7a541f68be9d96
parent856c5c8e2aa4e07b2b628dd194a63ae13dae7ae3 (diff)
downloadnng-1d650869f32c56f6d49d898c38f7525191a60bd1.tar.gz
nng-1d650869f32c56f6d49d898c38f7525191a60bd1.tar.bz2
nng-1d650869f32c56f6d49d898c38f7525191a60bd1.zip
Initial cut at TCP, totally untested beyond compilation.
This also adds checks in the protocols to verify that pipe peers are of the proper protocol.
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/defs.h77
-rw-r--r--src/core/pipe.h2
-rw-r--r--src/core/platform.h38
-rw-r--r--src/core/transport.c2
-rw-r--r--src/platform/posix/posix_impl.h5
-rw-r--r--src/platform/posix/posix_net.c374
-rw-r--r--src/protocol/pair/pair.c3
-rw-r--r--src/protocol/reqrep/rep.c3
-rw-r--r--src/protocol/reqrep/req.c7
-rw-r--r--src/transport/tcp/tcp.c448
11 files changed, 924 insertions, 37 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 58564291..a1c0fac7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -71,6 +71,8 @@ set (NNG_SOURCES
protocol/reqrep/req.c
transport/inproc/inproc.c
+
+ transport/tcp/tcp.c
)
include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src)
diff --git a/src/core/defs.h b/src/core/defs.h
index be7f71f7..ed2e353f 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -52,43 +52,54 @@ typedef struct {
#define NNI_ALLOC_STRUCT(s) nni_alloc(sizeof (*s))
#define NNI_FREE_STRUCT(s) nni_free((s), sizeof (*s))
-#define NNI_PUT32(ptr, u) \
- do { \
- ptr[0] = (uint8_t) (((uint32_t) u) >> 24); \
- ptr[1] = (uint8_t) (((uint32_t) u) >> 16); \
- ptr[2] = (uint8_t) (((uint32_t) u) >> 8); \
- ptr[3] = (uint8_t) ((uint32_t) u); \
- } \
+#define NNI_PUT16(ptr, u) \
+ do { \
+ (ptr)[0] = (uint8_t) (((uint16_t) (u)) >> 8); \
+ (ptr)[0] = (uint8_t) ((uint16_t) (u)); \
+ } \
while (0)
-#define NNI_PUT64(ptr, u) \
- do { \
- ptr[0] = (uint8_t) (((uint64_t) u) >> 56); \
- ptr[1] = (uint8_t) (((uint64_t) u) >> 48); \
- ptr[2] = (uint8_t) (((uint64_t) u) >> 40); \
- ptr[3] = (uint8_t) (((uint64_t) u) >> 32); \
- ptr[4] = (uint8_t) (((uint64_t) u) >> 24); \
- ptr[5] = (uint8_t) (((uint64_t) u) >> 16); \
- ptr[6] = (uint8_t) (((uint64_t) u) >> 8); \
- ptr[7] = (uint8_t) ((uint64_t) u); \
- } \
+#define NNI_PUT32(ptr, u) \
+ do { \
+ (ptr)[0] = (uint8_t) (((uint32_t) (u)) >> 24); \
+ (ptr)[1] = (uint8_t) (((uint32_t) (u)) >> 16); \
+ (ptr)[2] = (uint8_t) (((uint32_t) (u)) >> 8); \
+ (ptr)[3] = (uint8_t) ((uint32_t) (u)); \
+ } \
while (0)
-#define NNI_GET32(ptr, v) \
- v = (((uint32_t) ((uint8_t) ptr[0])) << 24) + \
- (((uint32_t) ((uint8_t) ptr[1])) << 16) + \
- (((uint32_t) ((uint8_t) ptr[2])) << 8) + \
- (((uint32_t) (uint8_t) ptr[3]))
-
-#define NNI_GET64(ptr, v) \
- v = (((uint64_t) ((uint8_t) ptr[0])) << 56) + \
- (((uint64_t) ((uint8_t) ptr[1])) << 48) + \
- (((uint64_t) ((uint8_t) ptr[2])) << 40) + \
- (((uint64_t) ((uint8_t) ptr[3])) << 32) + \
- (((uint64_t) ((uint8_t) ptr[4])) << 24) + \
- (((uint64_t) ((uint8_t) ptr[5])) << 16) + \
- (((uint64_t) ((uint8_t) ptr[6])) << 8) + \
- (((uint64_t) (uint8_t) ptr[7]))
+#define NNI_PUT64(ptr, u) \
+ do { \
+ (ptr)[0] = (uint8_t) (((uint64_t) (u)) >> 56); \
+ (ptr)[1] = (uint8_t) (((uint64_t) (u)) >> 48); \
+ (ptr)[2] = (uint8_t) (((uint64_t) (u)) >> 40); \
+ (ptr)[3] = (uint8_t) (((uint64_t) (u)) >> 32); \
+ (ptr)[4] = (uint8_t) (((uint64_t) (u)) >> 24); \
+ (ptr)[5] = (uint8_t) (((uint64_t) (u)) >> 16); \
+ (ptr)[6] = (uint8_t) (((uint64_t) (u)) >> 8); \
+ (ptr)[7] = (uint8_t) ((uint64_t) (u)); \
+ } \
+ while (0)
+
+#define NNI_GET16(ptr, v) \
+ v = (((uint32_t) ((uint8_t) (ptr)[0])) << 8) + \
+ (((uint32_t) (uint8_t) (ptr)[1]))
+
+#define NNI_GET32(ptr, v) \
+ v = (((uint32_t) ((uint8_t) (ptr)[0])) << 24) + \
+ (((uint32_t) ((uint8_t) (ptr)[1])) << 16) + \
+ (((uint32_t) ((uint8_t) (ptr)[2])) << 8) + \
+ (((uint32_t) (uint8_t) (ptr)[3]))
+
+#define NNI_GET64(ptr, v) \
+ v = (((uint64_t) ((uint8_t) (ptr)[0])) << 56) + \
+ (((uint64_t) ((uint8_t) (ptr)[1])) << 48) + \
+ (((uint64_t) ((uint8_t) (ptr)[2])) << 40) + \
+ (((uint64_t) ((uint8_t) (ptr)[3])) << 32) + \
+ (((uint64_t) ((uint8_t) (ptr)[4])) << 24) + \
+ (((uint64_t) ((uint8_t) (ptr)[5])) << 16) + \
+ (((uint64_t) ((uint8_t) (ptr)[6])) << 8) + \
+ (((uint64_t) (uint8_t) (ptr)[7]))
// A few assorted other items.
#define NNI_FLAG_IPV4ONLY 1
diff --git a/src/core/pipe.h b/src/core/pipe.h
index a8d7b286..6ebb7090 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -44,6 +44,8 @@ extern int nni_pipe_create(nni_pipe **, nni_ep *);
extern void nni_pipe_destroy(nni_pipe *);
+extern uint16_t nni_pipe_proto(nni_pipe *);
+extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
diff --git a/src/core/platform.h b/src/core/platform.h
index 2134ed97..3292d6f9 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -63,6 +63,7 @@ extern void nni_free(void *, size_t);
typedef struct nni_plat_mtx nni_plat_mtx;
typedef struct nni_plat_cv nni_plat_cv;
typedef struct nni_plat_thr nni_plat_thr;
+typedef struct nni_plat_tcpsock nni_plat_tcpsock;
// Mutex handling.
@@ -166,6 +167,43 @@ extern uint32_t nni_plat_nextid(void);
// used.)
extern const char *nni_plat_strerror(int);
+// nni_plat_lookup_host looks up a hostname in DNS, or the local hosts
+// file, or whatever. If your platform lacks support for naming, it must
+// at least cope with converting IP addresses in string form. The final
+// flags may include NNI_FLAG_IPV4ONLY to prevent IPv6 names from being
+// returned on dual stack machines.
+extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int);
+
+// nni_plat_tcp_close just closes a TCP socket.
+extern void nni_plat_tcp_close(nni_plat_tcpsock *);
+
+// nni_plat_tcp_listen creates a TCP socket in listening mode, bound
+// to the specified address. Note that nni_plat_tcpsock should be defined
+// to whatever your platform uses. For most systems its just "int".
+extern int nni_plat_tcp_listen(nni_plat_tcpsock *, const nni_sockaddr *);
+
+// nni_plat_tcp_accept does the accept to accept an inbound connection.
+// The tcpsock used for the server will have been set up with the
+// nni_plat_tcp_listen.
+extern int nni_plat_tcp_accept(nni_plat_tcpsock *, nni_plat_tcpsock *);
+
+// nni_plat_tcp_connect is the client side. Two addresses are supplied,
+// as the client may specify a local address to which to bind. This
+// second address may be NULL to use ephemeral ports, which is the
+// usual default.
+extern int nni_plat_tcp_connect(nni_plat_tcpsock *, const nni_sockaddr *,
+ const nni_sockaddr *);
+
+// nni_plat_tcp_send sends data to the remote side. The platform is
+// responsible for attempting to send all of the data. The iov count
+// will never be larger than 4. THe platform may modify the iovs.
+extern int nni_plat_tcp_send(nni_plat_tcpsock *, nni_iov *, int);
+
+// nni_plat_tcp_recv recvs data into the buffers provided by the
+// iovs. The implementation does not return until the iovs are completely
+// full, or an error condition occurs.
+extern int nni_plat_tcp_recv(nni_plat_tcpsock *, nni_iov *, int);
+
// Actual platforms we support. This is included up front so that we can
// get the specific types that are supplied by the platform.
#if defined(PLATFORM_POSIX)
diff --git a/src/core/transport.c b/src/core/transport.c
index b6ebe3c2..cf9f38c3 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -14,9 +14,11 @@
// For now the list of transports is hard-wired. Adding new transports
// to the system dynamically is something that might be considered later.
extern nni_tran nni_inproc_tran;
+extern nni_tran nni_tcp_tran;
static nni_tran *transports[] = {
&nni_inproc_tran,
+ &nni_tcp_tran,
NULL
};
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 38fe27c9..2b72f574 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -32,8 +32,11 @@ extern int nni_plat_errno(int);
#endif
+
#ifdef PLATFORM_POSIX_NET
-typedef int nni_plat_tcpsock;
+struct nni_plat_tcpsock {
+ int fd;
+};
#endif
// Define types that this platform uses.
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
new file mode 100644
index 00000000..2fdab245
--- /dev/null
+++ b/src/platform/posix/posix_net.c
@@ -0,0 +1,374 @@
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+#ifdef PLATFORM_POSIX_NET
+
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <netdb.h>
+
+static int
+nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
+{
+ struct sockaddr_in *sin;
+ struct sockaddr_in6 *sin6;
+
+ switch (sa->s_un.s_family) {
+ case NNG_AF_INET:
+ sin = (void *) ss;
+ memset(sin, 0, sizeof (*sin));
+ sin->sin_family = PF_INET;
+ sin->sin_port = sa->s_un.s_in.sa_port;
+ sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr;
+ return (sizeof (*sin));
+
+ case NNG_AF_INET6:
+ sin6 = (void *) ss;
+ memset(&sin6, 0, sizeof (sin6));
+#ifdef SIN6_LEN
+ sin6->sin6_len = sizeof (*sin6);
+#endif
+ sin6->sin6_family = PF_INET6;
+ sin6->sin6_port = sa->s_un.s_in6.sa_port;
+ memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16);
+ return (sizeof (*sin6));
+ }
+ return (-1);
+}
+
+
+static int
+nni_plat_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss)
+{
+ const struct sockaddr_in *sin;
+ const struct sockaddr_in6 *sin6;
+
+ memset(sa, 0, sizeof (*sa));
+ switch (ss->sa_family) {
+ case PF_INET:
+ sin = (const void *) ss;
+ sa->s_un.s_in.sa_family = NNG_AF_INET;
+ sa->s_un.s_in.sa_port = sin->sin_port;
+ sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr;
+ return (0);
+
+ case PF_INET6:
+ sin6 = (const void *) ss;
+ sa->s_un.s_in6.sa_family = NNG_AF_INET6;
+ sa->s_un.s_in6.sa_port = sin6->sin6_port;
+ memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
+ return (0);
+ }
+ return (-1);
+}
+
+
+int
+nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags)
+{
+ struct addrinfo hint;
+ struct addrinfo *ai;
+
+ memset(&hint, 0, sizeof (hint));
+ hint.ai_flags = AI_DEFAULT | AI_PASSIVE;
+ hint.ai_family = PF_UNSPEC;
+ hint.ai_socktype = SOCK_STREAM;
+ if (flags & NNI_FLAG_IPV4ONLY) {
+ hint.ai_family = PF_INET;
+ }
+
+ if (getaddrinfo(host, NULL, &hint, &ai) != 0) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if (nni_plat_from_sockaddr(addr, ai->ai_addr) < 0) {
+ freeaddrinfo(ai);
+ return (NNG_EADDRINVAL);
+ }
+ freeaddrinfo(ai);
+ return (0);
+}
+
+
+int
+nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
+{
+ struct iovec iov[4]; // We never have more than 3 at present
+ int i;
+ int offset;
+ int resid = 0;
+ int rv;
+
+ if (cnt > 4) {
+ return (NNG_EINVAL);
+ }
+
+ for (i = 0; i < cnt; i++) {
+ iov[i].iov_base = iovs[i].iov_buf;
+ iov[i].iov_len = iovs[i].iov_len;
+ resid += iov[i].iov_len;
+ }
+
+ i = 0;
+ while (resid) {
+ rv = writev(s->fd, iov, cnt);
+ if (rv < 0) {
+ if (rv == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ if (rv > resid) {
+ nni_panic("writev says it wrote too much!");
+ }
+
+ resid -= rv;
+ while (rv) {
+ if (iov[i].iov_len <= rv) {
+ rv -= iov[i].iov_len;
+ i++;
+ cnt--;
+ } else {
+ iov[i].iov_len -= rv;
+ iov[i].iov_base += rv;
+ rv = 0;
+ }
+ }
+ }
+
+ return (0);
+}
+
+
+int
+nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
+{
+ struct iovec iov[4]; // We never have more than 3 at present
+ int i;
+ int offset;
+ int resid = 0;
+ int rv;
+
+ if (cnt > 4) {
+ return (NNG_EINVAL);
+ }
+
+ for (i = 0; i < cnt; i++) {
+ iov[i].iov_base = iovs[i].iov_buf;
+ iov[i].iov_len = iovs[i].iov_len;
+ resid += iov[i].iov_len;
+ }
+
+ i = 0;
+ while (resid) {
+ rv = readv(s->fd, iov, cnt);
+ if (rv < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ }
+ if (rv > resid) {
+ nni_panic("readv says it read too much!");
+ }
+
+ resid -= rv;
+ while (rv) {
+ if (iov[i].iov_len <= rv) {
+ rv -= iov[i].iov_len;
+ i++;
+ cnt--;
+ } else {
+ iov[i].iov_len -= rv;
+ iov[i].iov_base += rv;
+ rv = 0;
+ }
+ }
+ }
+
+ return (0);
+}
+
+
+static void
+nni_plat_tcp_setopts(int fd)
+{
+ int one;
+
+ // Try to ensure that both CLOEXEC is set, and that we don't
+ // generate SIGPIPE. (Note that SIGPIPE suppression in this way
+ // only works on BSD systems. Linux wants us to use sendmsg().)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+#if defined(F_SETNOSIGPIPE)
+ (void) fcntl(fd, F_SETNOSIGPIPE, 1);
+#elif defined(SO_NOSIGPIPE)
+ one = 1;
+ (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one));
+#endif
+
+ // Also disable Nagle. We are careful to group data with writev,
+ // and latency is king for most of our users. (Consider adding
+ // a method to enable this later.)
+ one = 1;
+ (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one));
+}
+
+
+void
+nni_plat_tcp_close(nni_plat_tcpsock *s)
+{
+ (void) close(s->fd);
+ s->fd = -1;
+}
+
+// nni_plat_tcp_bind creates a file descriptor bound to the given address.
+// This basically does the equivalent of socket, bind, and listen. We have
+// chosen a default value for the listen backlog of 128, which should be
+// plenty. (If it isn't, then the accept thread can't get enough resources
+// to keep up, and your clients are going to experience bad things. Normally
+// the actual backlog should hover near 0 anyway.)
+int
+nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
+{
+ int fd;
+ int len;
+ struct sockaddr_storage ss;
+ int rv;
+
+ len = nni_plat_to_sockaddr(&ss, addr);
+ if (len < 0) {
+ return (NNG_EADDRINVAL);
+ }
+
+#ifdef SOCK_CLOEXEC
+ fd = socket(ss.ss_family, SOCK_STREAM, SOCK_CLOEXEC);
+#else
+ fd = socket(ss.ss_family, SOCK_STREAM, 0);
+#endif
+ if (fd < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ nni_plat_tcp_setopts(fd);
+
+ if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+
+ // Listen -- 128 depth is probably sufficient. If it isn't, other
+ // bad things are going to happen.
+ if (listen(fd, 128) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+
+ s->fd = fd;
+ return (0);
+}
+
+
+// nni_plat_tcp_connect establishes an outbound connection. It the
+// bind address is not null, then it will attempt to bind to the local
+// address specified first.
+int
+nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
+ const nni_sockaddr *bindaddr)
+{
+ int fd;
+ int len;
+ struct sockaddr_storage ss;
+ struct sockaddr_storage bss;
+ int rv;
+
+ len = nni_plat_to_sockaddr(&ss, addr);
+ if (len < 0) {
+ return (NNG_EADDRINVAL);
+ }
+
+#ifdef SOCK_CLOEXEC
+ fd = socket(ss.ss_family, SOCK_STREAM, SOCK_CLOEXEC);
+#else
+ fd = socket(ss.ss_family, SOCK_STREAM, 0);
+#endif
+ if (fd < 0) {
+ return (nni_plat_errno(errno));
+ }
+
+ if (bindaddr != NULL) {
+ if (bindaddr->s_un.s_family != addr->s_un.s_family) {
+ return (NNG_EINVAL);
+ }
+ if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) {
+ return (NNG_EADDRINVAL);
+ }
+ if (bind(fd, (struct sockaddr *) &bss, len) < 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+ }
+
+ nni_plat_tcp_setopts(fd);
+
+ if (connect(fd, (struct sockaddr *) &ss, len) != 0) {
+ rv = nni_plat_errno(errno);
+ (void) close(fd);
+ return (rv);
+ }
+ s->fd = fd;
+ return (0);
+}
+
+
+int
+nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
+{
+ int fd;
+
+ for (;;) {
+#ifdef NNG_USE_ACCEPT4
+ fd = accept4(server, NULL, NULL, SOCK_CLOEXEC);
+ if ((fd < 0) && ((errrno == ENOSYS) || (errno == ENOTSUP))) {
+ fd = accept(server, NULL, NULL);
+ }
+#else
+ fd = accept(server->fd, NULL, NULL);
+#endif
+
+ if (fd < 0) {
+ if ((errno == EINTR) || (errno == ECONNABORTED)) {
+ // These are not fatal errors, keep trying
+ continue;
+ }
+ return (nni_plat_errno(errno));
+ } else {
+ break;
+ }
+ }
+
+ nni_plat_tcp_setopts(fd);
+ s->fd = fd;
+ return (0);
+}
+
+
+#endif // PLATFORM_POSIX_NET
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index cc906791..f86a9fa7 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -102,6 +102,9 @@ nni_pair_pipe_add(void *arg)
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
+ if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PAIR) {
+ return (NNG_EPROTO);
+ }
if (pair->pipe != NULL) {
return (NNG_EBUSY); // Already have a peer, denied.
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index ca59d799..8c375b61 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -137,6 +137,9 @@ nni_rep_pipe_add(void *arg)
nni_rep_sock *rep = rp->rep;
int rv;
+ if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) {
+ return (NNG_EPROTO);
+ }
nni_mtx_lock(&rep->mx);
rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
nni_mtx_unlock(&rep->mx);
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 4ce66ab6..b9981c3c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -138,9 +138,10 @@ nni_req_pipe_fini(void *arg)
static int
nni_req_pipe_add(void *arg)
{
- // We have nothing to do, since we don't need to maintain a global
- // list of related pipes.
- NNI_ARG_UNUSED(arg);
+ nni_req_pipe *rp = arg;
+ if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
+ return (NNG_EPROTO);
+ }
return (0);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
new file mode 100644
index 00000000..6c581b40
--- /dev/null
+++ b/src/transport/tcp/tcp.c
@@ -0,0 +1,448 @@
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "core/nng_impl.h"
+
+// TCP transport. Platform specific TCP operations must be
+// supplied as well.
+
+typedef struct nni_tcp_pipe nni_tcp_pipe;
+typedef struct nni_tcp_ep nni_tcp_ep;
+
+// nni_tcp_pipe is one end of a TCP connection.
+struct nni_tcp_pipe {
+ const char * addr;
+ nni_plat_tcpsock fd;
+ uint16_t peer;
+ uint16_t proto;
+ uint32_t rcvmax;
+};
+
+struct nni_tcp_ep {
+ char addr[NNG_MAXADDRLEN+1];
+ nni_plat_tcpsock fd;
+ int closed;
+ uint16_t proto;
+ uint32_t rcvmax;
+ int ipv4only;
+};
+
+static int
+nni_tcp_tran_init(void)
+{
+ return (0);
+}
+
+
+static void
+nni_tcp_tran_fini(void)
+{
+}
+
+
+static void
+nni_tcp_pipe_close(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ nni_plat_tcp_close(&pipe->fd);
+}
+
+
+static void
+nni_tcp_pipe_destroy(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ NNI_FREE_STRUCT(pipe);
+}
+
+
+static int
+nni_tcp_pipe_send(void *arg, nni_msg *msg)
+{
+ nni_tcp_pipe *pipe = arg;
+ uint64_t len;
+ uint8_t buf[sizeof (len)];
+ nni_iov iov[3];
+ int rv;
+
+ iov[0].iov_buf = buf;
+ iov[0].iov_len = sizeof (buf);
+ iov[1].iov_buf = nni_msg_header(msg, &iov[1].iov_len);
+ iov[2].iov_buf = nni_msg_body(msg, &iov[2].iov_len);
+
+ len = (uint64_t) iov[1].iov_len + (uint64_t) iov[2].iov_len;
+ NNI_PUT64(buf, len);
+
+ if ((rv = nni_plat_tcp_send(&pipe->fd, iov, 3)) == 0) {
+ nni_msg_free(msg);
+ }
+ return (rv);
+}
+
+
+static int
+nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
+{
+ nni_tcp_pipe *pipe = arg;
+ nni_msg *msg;
+ uint64_t len;
+ uint8_t buf[sizeof (len)];
+ nni_iov iov[1];
+ int rv;
+
+ iov[0].iov_buf = buf;
+ iov[0].iov_len = sizeof (buf);
+ if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) != 0) {
+ return (rv);
+ }
+ NNI_GET64(buf, len);
+ // Check MAXRCVSIZE...
+ if (len > pipe->rcvmax) {
+ return (NNG_EPROTO);
+ }
+
+ if ((rv = nng_msg_alloc(&msg, len)) != 0) {
+ return (rv);
+ }
+
+ iov[0].iov_buf = nng_msg_body(msg, &iov[0].iov_len);
+
+ if ((rv = nni_plat_tcp_recv(&pipe->fd, iov, 1)) == 0) {
+ *msgp = msg;
+ } else {
+ nni_msg_free(msg);
+ }
+ return (rv);
+}
+
+
+static uint16_t
+nni_tcp_pipe_peer(void *arg)
+{
+ nni_tcp_pipe *pipe = arg;
+
+ return (pipe->peer);
+}
+
+
+static int
+nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
+{
+#if 0
+ nni_inproc_pipe *pipe = arg;
+ size_t len;
+
+ switch (option) {
+ case NNG_OPT_LOCALADDR:
+ case NNG_OPT_REMOTEADDR:
+ len = strlen(pipe->addr) + 1;
+ if (len > *szp) {
+ (void) memcpy(buf, pipe->addr, *szp);
+ } else {
+ (void) memcpy(buf, pipe->addr, len);
+ }
+ *szp = len;
+ return (0);
+ }
+#endif
+ return (NNG_ENOTSUP);
+}
+
+
+static int
+nni_tcp_ep_init(void **epp, const char *url, uint16_t proto)
+{
+ nni_tcp_ep *ep;
+ int rv;
+
+ if (strlen(url) > NNG_MAXADDRLEN-1) {
+ return (NNG_EINVAL);
+ }
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ep->closed = 0;
+ ep->proto = proto;
+ ep->ipv4only = 0;
+
+ (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
+
+ *epp = ep;
+ return (0);
+}
+
+
+static void
+nni_tcp_ep_fini(void *arg)
+{
+ nni_tcp_ep *ep = arg;
+
+ NNI_FREE_STRUCT(ep);
+}
+
+
+static void
+nni_tcp_ep_close(void *arg)
+{
+ nni_tcp_ep *ep = arg;
+
+ nni_plat_tcp_close(&ep->fd);
+}
+
+
+static int
+nni_parseaddr(char *pair, char **hostp, uint16_t *portp)
+{
+ char *host, *port, *end;
+ char c;
+ int val;
+
+ if (pair[0] == '[') {
+ host = pair+1;
+ // IP address enclosed ... for IPv6 usually.
+ if ((end = strchr(host, ']')) == NULL) {
+ return (NNG_EADDRINVAL);
+ }
+ *end = '\0';
+ port = end + 1;
+ if (*port == ':') {
+ port++;
+ } else if (port != '\0') {
+ return (NNG_EADDRINVAL);
+ }
+ } else {
+ host = pair;
+ port = strchr(host, ':');
+ if (port != NULL) {
+ *port = '\0';
+ port++;
+ }
+ }
+ val = 0;
+ while ((c = *port) != '\0') {
+ val *= 10;
+ if ((c >= '0') && (c <= '9')) {
+ val += (c - '0');
+ } else {
+ return (NNG_EADDRINVAL);
+ }
+ if (val > 65535) {
+ return (NNG_EADDRINVAL);
+ }
+ port++;
+ }
+ if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) {
+ *hostp = NULL;
+ } else {
+ *hostp = host;
+ }
+ // Stash the port in big endian (network) byte order.
+ NNI_PUT16((uint8_t *) portp, val);
+ return (0);
+}
+
+
+static int
+nni_tcp_negotiate(nni_tcp_pipe *pipe)
+{
+ int rv;
+ nni_iov iov;
+ uint8_t buf[8];
+ uint16_t peer;
+
+ // First send our header..
+ buf[0] = 0;
+ buf[1] = 'S';
+ buf[2] = 'P';
+ buf[3] = 0; // version
+ NNI_PUT16(&buf[4], pipe->proto);
+ NNI_PUT16(&buf[6], 0);
+
+ iov.iov_buf = buf;
+ iov.iov_len = 8;
+ if ((rv = nni_plat_tcp_send(&pipe->fd, &iov, 1)) != 0) {
+ return (rv);
+ }
+
+ iov.iov_buf = buf;
+ iov.iov_len = 8;
+ if ((rv = nni_plat_tcp_recv(&pipe->fd, &iov, 1)) != 0) {
+ return (rv);
+ }
+
+ if ((buf[0] != 0) || (buf[1] != 'S') ||
+ (buf[2] != 'P') || (buf[3] != 0) ||
+ (buf[6] != 0) || (buf[7] != 0)) {
+ return (NNG_EPROTO);
+ }
+
+ NNI_GET16((&buf[4]), pipe->peer);
+ return (0);
+}
+
+
+static int
+nni_tcp_ep_connect(void *arg, void **pipep)
+{
+ nni_tcp_ep *ep = arg;
+ nni_tcp_pipe *pipe;
+ char *host;
+ uint16_t port;
+ int flag;
+ char addr[NNG_MAXADDRLEN+1];
+ nni_sockaddr lcladdr;
+ nni_sockaddr remaddr;
+ int rv;
+
+ char *lclpart;
+ char *rempart;
+
+ flag = ep->ipv4only ? NNI_FLAG_IPV4ONLY : 0;
+ snprintf(addr, sizeof (addr), "%s", ep->addr);
+
+ if ((rempart = strchr(addr, ';')) != NULL) {
+ *rempart = '\0';
+ rempart++;
+ lclpart = addr;
+
+ if ((rv = nni_parseaddr(lclpart, &host, &port)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_plat_lookup_host(host, &lcladdr, flag)) != 0) {
+ return (rv);
+ }
+ // The port is in the same offset for both v4 and v6.
+ lcladdr.s_un.s_in.sa_port = port;
+ } else {
+ lclpart = NULL;
+ rempart = addr;
+ }
+
+ if ((rv = nni_parseaddr(rempart, &host, &port)) != 0) {
+ return (rv);
+ }
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ pipe->proto = ep->proto;
+
+ // Port is in the same place for both v4 and v6.
+ remaddr.s_un.s_in.sa_port = port;
+
+ rv = nni_plat_tcp_connect(&pipe->fd, &remaddr,
+ lclpart == NULL ? NULL : &lcladdr);
+ if (rv != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+
+ if ((rv = nni_tcp_negotiate(pipe)) != 0) {
+ nni_plat_tcp_close(&pipe->fd);
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ *pipep = pipe;
+ return (0);
+}
+
+
+static int
+nni_tcp_ep_bind(void *arg)
+{
+ nni_tcp_ep *ep = arg;
+ char addr[NNG_MAXADDRLEN+1];
+ char *host;
+ uint16_t port;
+ int flag;
+ int rv;
+ nni_sockaddr baddr;
+
+ flag = ep->ipv4only ? NNI_FLAG_IPV4ONLY : 0;
+
+ // We want to strok this, so make a copy. Skip the scheme.
+ snprintf(addr, sizeof (addr), "%s", ep->addr + strlen("tcp://"));
+
+ if ((rv = nni_parseaddr(addr, &host, &port)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_plat_lookup_host(host, &baddr, flag)) != 0) {
+ return (rv);
+ }
+ baddr.s_un.s_in.sa_port = port;
+
+ if ((rv == nni_plat_tcp_listen(&ep->fd, &baddr)) != 0) {
+ return (rv);
+ }
+ return (0);
+}
+
+
+static int
+nni_tcp_ep_accept(void *arg, void **pipep)
+{
+ nni_tcp_ep *ep = arg;
+ nni_tcp_pipe *pipe;
+ int rv;
+
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) != NULL) {
+ return (NNG_ENOMEM);
+ }
+ pipe->proto = ep->proto;
+
+ if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) {
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ if ((rv = nni_tcp_negotiate(pipe)) != 0) {
+ nni_plat_tcp_close(&pipe->fd);
+ NNI_FREE_STRUCT(pipe);
+ return (rv);
+ }
+ *pipep = pipe;
+ return (0);
+}
+
+
+static nni_tran_pipe nni_tcp_pipe_ops = {
+ .pipe_destroy = nni_tcp_pipe_destroy,
+ .pipe_send = nni_tcp_pipe_send,
+ .pipe_recv = nni_tcp_pipe_recv,
+ .pipe_close = nni_tcp_pipe_close,
+ .pipe_peer = nni_tcp_pipe_peer,
+ .pipe_getopt = nni_tcp_pipe_getopt,
+};
+
+static nni_tran_ep nni_tcp_ep_ops = {
+ .ep_init = nni_tcp_ep_init,
+ .ep_fini = nni_tcp_ep_fini,
+ .ep_connect = nni_tcp_ep_connect,
+ .ep_bind = nni_tcp_ep_bind,
+ .ep_accept = nni_tcp_ep_accept,
+ .ep_close = nni_tcp_ep_close,
+ .ep_setopt = NULL,
+ .ep_getopt = NULL,
+};
+
+// This is the TCP transport linkage, and should be the only global
+// symbol in this entire file.
+struct nni_tran nni_tcp_tran = {
+ .tran_scheme = "tcp",
+ .tran_ep = &nni_tcp_ep_ops,
+ .tran_pipe = &nni_tcp_pipe_ops,
+ .tran_init = nni_tcp_tran_init,
+ .tran_fini = nni_tcp_tran_fini,
+};