aboutsummaryrefslogtreecommitdiff
path: root/src/transport/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/tcp')
-rw-r--r--src/transport/tcp/tcp.c221
1 files changed, 95 insertions, 126 deletions
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 4b0f77f1..e60a1b1d 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -7,56 +7,55 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <stdio.h>
#include "core/nng_impl.h"
// TCP transport. Platform specific TCP operations must be
// supplied as well.
-typedef struct nni_tcp_pipe nni_tcp_pipe;
-typedef struct nni_tcp_ep nni_tcp_ep;
+typedef struct nni_tcp_pipe nni_tcp_pipe;
+typedef struct nni_tcp_ep nni_tcp_ep;
// nni_tcp_pipe is one end of a TCP connection.
struct nni_tcp_pipe {
- const char * addr;
- nni_plat_tcp_pipe * tpp;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
-
- nni_aio * user_txaio;
- nni_aio * user_rxaio;
- nni_aio * user_negaio;
-
- uint8_t txlen[sizeof (uint64_t)];
- uint8_t rxlen[sizeof (uint64_t)];
- int gottxhead;
- int gotrxhead;
- int wanttxhead;
- int wantrxhead;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
- nni_msg * rxmsg;
- nni_mtx mtx;
+ const char * addr;
+ nni_plat_tcp_pipe *tpp;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
+
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio *user_negaio;
+
+ uint8_t txlen[sizeof(uint64_t)];
+ uint8_t rxlen[sizeof(uint64_t)];
+ int gottxhead;
+ int gotrxhead;
+ int wanttxhead;
+ int wantrxhead;
+ nni_aio txaio;
+ nni_aio rxaio;
+ nni_aio negaio;
+ nni_msg *rxmsg;
+ nni_mtx mtx;
};
struct nni_tcp_ep {
- char addr[NNG_MAXADDRLEN+1];
- nni_plat_tcp_ep * tep;
- int closed;
- uint16_t proto;
- size_t rcvmax;
- int ipv4only;
- nni_aio aio;
- nni_aio * user_aio;
- nni_mtx mtx;
+ char addr[NNG_MAXADDRLEN + 1];
+ nni_plat_tcp_ep *tep;
+ int closed;
+ uint16_t proto;
+ size_t rcvmax;
+ int ipv4only;
+ nni_aio aio;
+ nni_aio * user_aio;
+ nni_mtx mtx;
};
-
static void nni_tcp_pipe_send_cb(void *);
static void nni_tcp_pipe_recv_cb(void *);
static void nni_tcp_pipe_nego_cb(void *);
@@ -68,13 +67,11 @@ nni_tcp_tran_init(void)
return (0);
}
-
static void
nni_tcp_tran_fini(void)
{
}
-
static void
nni_tcp_pipe_close(void *arg)
{
@@ -83,7 +80,6 @@ nni_tcp_pipe_close(void *arg)
nni_plat_tcp_pipe_close(pipe->tpp);
}
-
static void
nni_tcp_pipe_fini(void *arg)
{
@@ -102,12 +98,11 @@ nni_tcp_pipe_fini(void *arg)
NNI_FREE_STRUCT(pipe);
}
-
static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *pipe;
- int rv;
+ int rv;
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
@@ -127,10 +122,10 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
if (rv != 0) {
goto fail;
}
- pipe->proto = ep->proto;
+ pipe->proto = ep->proto;
pipe->rcvmax = ep->rcvmax;
- pipe->tpp = tpp;
- pipe->addr = ep->addr;
+ pipe->tpp = tpp;
+ pipe->addr = ep->addr;
*pipep = pipe;
return (0);
@@ -140,7 +135,6 @@ fail:
return (rv);
}
-
static void
nni_tcp_cancel_nego(nni_aio *aio)
{
@@ -154,13 +148,12 @@ nni_tcp_cancel_nego(nni_aio *aio)
nni_mtx_unlock(&pipe->mtx);
}
-
static void
nni_tcp_pipe_nego_cb(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_aio *aio = &pipe->negaio;
- int rv;
+ nni_aio * aio = &pipe->negaio;
+ int rv;
nni_mtx_lock(&pipe->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
@@ -175,7 +168,7 @@ nni_tcp_pipe_nego_cb(void *arg)
}
if (pipe->gottxhead < pipe->wanttxhead) {
- aio->a_niov = 1;
+ aio->a_niov = 1;
aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead;
aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead];
// send it down...
@@ -184,7 +177,7 @@ nni_tcp_pipe_nego_cb(void *arg)
return;
}
if (pipe->gotrxhead < pipe->wantrxhead) {
- aio->a_niov = 1;
+ aio->a_niov = 1;
aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead;
aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead];
nni_plat_tcp_pipe_recv(pipe->tpp, aio);
@@ -193,12 +186,9 @@ nni_tcp_pipe_nego_cb(void *arg)
}
// We have both sent and received the headers. Lets check the
// receive side header.
- if ((pipe->rxlen[0] != 0) ||
- (pipe->rxlen[1] != 'S') ||
- (pipe->rxlen[2] != 'P') ||
- (pipe->rxlen[3] != 0) ||
- (pipe->rxlen[6] != 0) ||
- (pipe->rxlen[7] != 0)) {
+ if ((pipe->rxlen[0] != 0) || (pipe->rxlen[1] != 'S') ||
+ (pipe->rxlen[2] != 'P') || (pipe->rxlen[3] != 0) ||
+ (pipe->rxlen[6] != 0) || (pipe->rxlen[7] != 0)) {
rv = NNG_EPROTO;
goto done;
}
@@ -213,14 +203,13 @@ done:
nni_mtx_unlock(&pipe->mtx);
}
-
static void
nni_tcp_pipe_send_cb(void *arg)
{
nni_tcp_pipe *pipe = arg;
- int rv;
- nni_aio *aio;
- size_t len;
+ int rv;
+ nni_aio * aio;
+ size_t len;
nni_mtx_lock(&pipe->mtx);
if ((aio = pipe->user_txaio) == NULL) {
@@ -240,13 +229,12 @@ nni_tcp_pipe_send_cb(void *arg)
nni_mtx_unlock(&pipe->mtx);
}
-
static void
nni_tcp_pipe_recv_cb(void *arg)
{
nni_tcp_pipe *pipe = arg;
- nni_aio *aio;
- int rv;
+ nni_aio * aio;
+ int rv;
nni_mtx_lock(&pipe->mtx);
@@ -298,7 +286,7 @@ nni_tcp_pipe_recv_cb(void *arg)
// read the entire message now.
pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- pipe->rxaio.a_niov = 1;
+ pipe->rxaio.a_niov = 1;
nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
@@ -308,13 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg)
// Otherwise we got a message read completely. Let the user know the
// good news.
pipe->user_rxaio = NULL;
- aio->a_msg = pipe->rxmsg;
- pipe->rxmsg = NULL;
+ aio->a_msg = pipe->rxmsg;
+ pipe->rxmsg = NULL;
nni_aio_finish(aio, 0, nni_msg_len(aio->a_msg));
nni_mtx_unlock(&pipe->mtx);
}
-
static void
nni_tcp_cancel_tx(nni_aio *aio)
{
@@ -328,13 +315,12 @@ nni_tcp_cancel_tx(nni_aio *aio)
nni_aio_stop(&pipe->txaio);
}
-
static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *pipe = arg;
- nni_msg *msg = aio->a_msg;
- uint64_t len;
+ nni_msg * msg = aio->a_msg;
+ uint64_t len;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -350,18 +336,17 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(pipe->txlen, len);
pipe->txaio.a_iov[0].iov_buf = pipe->txlen;
- pipe->txaio.a_iov[0].iov_len = sizeof (pipe->txlen);
+ pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txlen);
pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- pipe->txaio.a_niov = 3;
+ pipe->txaio.a_niov = 3;
nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio);
nni_mtx_unlock(&pipe->mtx);
}
-
static void
nni_tcp_cancel_rx(nni_aio *aio)
{
@@ -375,7 +360,6 @@ nni_tcp_cancel_rx(nni_aio *aio)
nni_aio_stop(&pipe->rxaio);
}
-
static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
@@ -393,14 +377,13 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
// Schedule a read of the TCP header.
pipe->rxaio.a_iov[0].iov_buf = pipe->rxlen;
- pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen);
- pipe->rxaio.a_niov = 1;
+ pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxlen);
+ pipe->rxaio.a_niov = 1;
nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio);
nni_mtx_unlock(&pipe->mtx);
}
-
static uint16_t
nni_tcp_pipe_peer(void *arg)
{
@@ -409,7 +392,6 @@ nni_tcp_pipe_peer(void *arg)
return (pipe->peer);
}
-
static int
nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
{
@@ -433,14 +415,13 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
return (NNG_ENOTSUP);
}
-
static int
nni_tcp_parse_pair(char *pair, char **hostp, char **servp)
{
char *host, *serv, *end;
if (pair[0] == '[') {
- host = pair+1;
+ host = pair + 1;
// IP address enclosed ... for IPv6 usually.
if ((end = strchr(host, ']')) == NULL) {
return (NNG_EADDRINVAL);
@@ -478,14 +459,13 @@ nni_tcp_parse_pair(char *pair, char **hostp, char **servp)
return (0);
}
-
// Note that the url *must* be in a modifiable buffer.
int
-nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2,
- char **serv2)
+nni_tcp_parse_url(
+ char *url, char **host1, char **serv1, char **host2, char **serv2)
{
char *h1;
- int rv;
+ int rv;
if (strncmp(url, "tcp://", strlen("tcp://")) != 0) {
return (NNG_EADDRINVAL);
@@ -516,7 +496,6 @@ nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2,
return (0);
}
-
static void
nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
@@ -530,12 +509,12 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&pipe->txlen[4], pipe->proto);
NNI_PUT16(&pipe->txlen[6], 0);
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- pipe->negaio.a_niov = 1;
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ pipe->negaio.a_niov = 1;
pipe->negaio.a_iov[0].iov_len = 8;
pipe->negaio.a_iov[0].iov_buf = &pipe->txlen[0];
if (nni_aio_start(aio, nni_tcp_cancel_nego, pipe) != 0) {
@@ -546,7 +525,6 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
nni_mtx_unlock(&pipe->mtx);
}
-
static void
nni_tcp_ep_fini(void *arg)
{
@@ -560,12 +538,11 @@ nni_tcp_ep_fini(void *arg)
NNI_FREE_STRUCT(ep);
}
-
static int
nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
{
nni_tcp_ep *ep;
- int rv;
+ int rv;
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -577,15 +554,14 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
return (rv);
}
ep->closed = 0;
- ep->proto = nni_sock_proto(sock);
+ ep->proto = nni_sock_proto(sock);
ep->rcvmax = nni_sock_rcvmaxsz(sock);
- (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
+ (void) snprintf(ep->addr, sizeof(ep->addr), "%s", url);
*epp = ep;
return (0);
}
-
static void
nni_tcp_ep_close(void *arg)
{
@@ -594,7 +570,6 @@ nni_tcp_ep_close(void *arg)
nni_plat_tcp_ep_close(ep->tep);
}
-
static int
nni_tcp_ep_bind(void *arg)
{
@@ -603,13 +578,12 @@ nni_tcp_ep_bind(void *arg)
return (nni_plat_tcp_ep_listen(ep->tep));
}
-
static void
nni_tcp_ep_finish(nni_tcp_ep *ep)
{
- nni_aio *aio = ep->user_aio;
+ nni_aio * aio = ep->user_aio;
nni_tcp_pipe *pipe;
- int rv;
+ int rv;
if ((aio = ep->user_aio) == NULL) {
return;
@@ -634,7 +608,6 @@ done:
nni_aio_finish(aio, rv, 0);
}
-
static void
nni_tcp_ep_cb(void *arg)
{
@@ -645,7 +618,6 @@ nni_tcp_ep_cb(void *arg)
nni_mtx_unlock(&ep->mtx);
}
-
static void
nni_tcp_cancel_ep(nni_aio *aio)
{
@@ -659,12 +631,11 @@ nni_tcp_cancel_ep(nni_aio *aio)
nni_mtx_unlock(&ep->mtx);
}
-
static void
nni_tcp_ep_accept(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
+ int rv;
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
@@ -681,12 +652,11 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
nni_mtx_unlock(&ep->mtx);
}
-
static void
nni_tcp_ep_connect(void *arg, nni_aio *aio)
{
nni_tcp_ep *ep = arg;
- int rv;
+ int rv;
nni_mtx_lock(&ep->mtx);
NNI_ASSERT(ep->user_aio == NULL);
@@ -703,34 +673,33 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
nni_mtx_unlock(&ep->mtx);
}
-
static nni_tran_pipe nni_tcp_pipe_ops = {
- .p_fini = nni_tcp_pipe_fini,
- .p_start = nni_tcp_pipe_start,
- .p_send = nni_tcp_pipe_send,
- .p_recv = nni_tcp_pipe_recv,
- .p_close = nni_tcp_pipe_close,
- .p_peer = nni_tcp_pipe_peer,
- .p_getopt = nni_tcp_pipe_getopt,
+ .p_fini = nni_tcp_pipe_fini,
+ .p_start = nni_tcp_pipe_start,
+ .p_send = nni_tcp_pipe_send,
+ .p_recv = nni_tcp_pipe_recv,
+ .p_close = nni_tcp_pipe_close,
+ .p_peer = nni_tcp_pipe_peer,
+ .p_getopt = nni_tcp_pipe_getopt,
};
static nni_tran_ep nni_tcp_ep_ops = {
- .ep_init = nni_tcp_ep_init,
- .ep_fini = nni_tcp_ep_fini,
- .ep_connect = nni_tcp_ep_connect,
- .ep_bind = nni_tcp_ep_bind,
- .ep_accept = nni_tcp_ep_accept,
- .ep_close = nni_tcp_ep_close,
- .ep_setopt = NULL,
- .ep_getopt = NULL,
+ .ep_init = nni_tcp_ep_init,
+ .ep_fini = nni_tcp_ep_fini,
+ .ep_connect = nni_tcp_ep_connect,
+ .ep_bind = nni_tcp_ep_bind,
+ .ep_accept = nni_tcp_ep_accept,
+ .ep_close = nni_tcp_ep_close,
+ .ep_setopt = NULL,
+ .ep_getopt = NULL,
};
// This is the TCP transport linkage, and should be the only global
// symbol in this entire file.
struct nni_tran nni_tcp_tran = {
- .tran_scheme = "tcp",
- .tran_ep = &nni_tcp_ep_ops,
- .tran_pipe = &nni_tcp_pipe_ops,
- .tran_init = nni_tcp_tran_init,
- .tran_fini = nni_tcp_tran_fini,
+ .tran_scheme = "tcp",
+ .tran_ep = &nni_tcp_ep_ops,
+ .tran_pipe = &nni_tcp_pipe_ops,
+ .tran_init = nni_tcp_tran_init,
+ .tran_fini = nni_tcp_tran_fini,
};