aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_tcpconn.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-01-21 22:40:10 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-16 19:22:27 -0800
commit5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch)
treebf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/platform/posix/posix_tcpconn.c
parentca655b9db689ee0e655248b1a9f166b8db6cc984 (diff)
downloadnng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz
nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2
nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic stream API for all transports. There have been related bugs fixed along the way. Additionally the man pages have changed. The old non-polymorphic APIs are removed now. This is a breaking change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/platform/posix/posix_tcpconn.c')
-rw-r--r--src/platform/posix/posix_tcpconn.c186
1 files changed, 77 insertions, 109 deletions
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index ef6ee8e3..0d3c274d 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -36,7 +36,7 @@
#include "posix_tcp.h"
static void
-tcp_conn_dowrite(nni_tcp_conn *c)
+tcp_dowrite(nni_tcp_conn *c)
{
nni_aio *aio;
int fd;
@@ -118,7 +118,7 @@ tcp_conn_dowrite(nni_tcp_conn *c)
}
static void
-tcp_conn_doread(nni_tcp_conn *c)
+tcp_doread(nni_tcp_conn *c)
{
nni_aio *aio;
int fd;
@@ -195,9 +195,10 @@ tcp_conn_doread(nni_tcp_conn *c)
}
}
-void
-nni_tcp_conn_close(nni_tcp_conn *c)
+static void
+tcp_close(void *arg)
{
+ nni_tcp_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
nni_aio *aio;
@@ -212,21 +213,44 @@ nni_tcp_conn_close(nni_tcp_conn *c)
nni_mtx_unlock(&c->mtx);
}
+// tcp_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+tcp_fini(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ tcp_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
static void
-tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+tcp_free(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ nni_reap(&c->reap, tcp_fini, arg);
+}
+
+static void
+tcp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_conn *c = arg;
if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_tcp_conn_close(c);
+ tcp_close(c);
return;
}
nni_mtx_lock(&c->mtx);
if (events & POLLIN) {
- tcp_conn_doread(c);
+ tcp_doread(c);
}
if (events & POLLOUT) {
- tcp_conn_dowrite(c);
+ tcp_dowrite(c);
}
events = 0;
if (!nni_list_empty(&c->writeq)) {
@@ -242,7 +266,7 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
}
static void
-tcp_conn_cancel(nni_aio *aio, void *arg, int rv)
+tcp_cancel(nni_aio *aio, void *arg, int rv)
{
nni_tcp_conn *c = arg;
@@ -254,18 +278,18 @@ tcp_conn_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_send(void *arg, nni_aio *aio)
{
-
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -273,7 +297,7 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_aio_list_append(&c->writeq, aio);
if (nni_list_first(&c->writeq) == aio) {
- tcp_conn_dowrite(c);
+ tcp_dowrite(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -284,17 +308,18 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_recv(void *arg, nni_aio *aio)
{
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -306,7 +331,7 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
// many cases. We also need not arm a list if it was already
// armed.
if (nni_list_first(&c->readq) == aio) {
- tcp_conn_doread(c);
+ tcp_doread(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -317,59 +342,8 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-int
-nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay)
-{
-
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
static int
-tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn * c = arg;
struct sockaddr_storage ss;
@@ -388,7 +362,7 @@ tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn * c = arg;
struct sockaddr_storage ss;
@@ -407,7 +381,7 @@ tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
int fd;
@@ -427,7 +401,7 @@ tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
int fd;
@@ -447,7 +421,7 @@ tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
int fd = nni_posix_pfd_fd(c->pfd);
@@ -462,7 +436,7 @@ tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
int fd = nni_posix_pfd_fd(c->pfd);
@@ -476,46 +450,46 @@ tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_bool(val, buf, szp, t));
}
-static const nni_option tcp_conn_options[] = {
+static const nni_option tcp_options[] = {
{
.o_name = NNG_OPT_REMADDR,
- .o_get = tcp_conn_get_peername,
+ .o_get = tcp_get_peername,
},
{
.o_name = NNG_OPT_LOCADDR,
- .o_get = tcp_conn_get_sockname,
+ .o_get = tcp_get_sockname,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
- .o_get = tcp_conn_get_nodelay,
- .o_set = tcp_conn_set_nodelay,
+ .o_get = tcp_get_nodelay,
+ .o_set = tcp_set_nodelay,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
- .o_get = tcp_conn_get_keepalive,
- .o_set = tcp_conn_set_keepalive,
+ .o_get = tcp_get_keepalive,
+ .o_set = tcp_set_keepalive,
},
{
.o_name = NULL,
},
};
-int
-nni_tcp_conn_getopt(
- nni_tcp_conn *c, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tcp_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- return (nni_getopt(tcp_conn_options, name, c, buf, szp, t));
+ nni_tcp_conn *c = arg;
+ return (nni_getopt(tcp_options, name, c, buf, szp, t));
}
-int
-nni_tcp_conn_setopt(
- nni_tcp_conn *c, const char *name, const void *buf, size_t sz, nni_type t)
+static int
+tcp_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- return (nni_setopt(tcp_conn_options, name, c, buf, sz, t));
+ nni_tcp_conn *c = arg;
+ return (nni_setopt(tcp_options, name, c, buf, sz, t));
}
int
-nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
+nni_posix_tcp_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
{
nni_tcp_conn *c;
@@ -530,12 +504,19 @@ nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ c->stream.s_free = tcp_free;
+ c->stream.s_close = tcp_close;
+ c->stream.s_recv = tcp_recv;
+ c->stream.s_send = tcp_send;
+ c->stream.s_getx = tcp_getx;
+ c->stream.s_setx = tcp_setx;
+
*cp = c;
return (0);
}
void
-nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive)
+nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive)
{
// Configure the initial socket options.
(void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY,
@@ -543,18 +524,5 @@ nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive)
(void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE,
&keepalive, sizeof(int));
- nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c);
-}
-
-void
-nni_tcp_conn_fini(nni_tcp_conn *c)
-{
- nni_tcp_conn_close(c);
- nni_posix_pfd_fini(c->pfd);
- nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
- nni_mtx_fini(&c->mtx);
-
- NNI_FREE_STRUCT(c);
+ nni_posix_pfd_set_cb(c->pfd, tcp_cb, c);
}