aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-01-21 22:40:10 -0800
committerGarrett D'Amore <garrett@damore.org>2019-02-16 19:22:27 -0800
commit5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch)
treebf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/platform/posix
parentca655b9db689ee0e655248b1a9f166b8db6cc984 (diff)
downloadnng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz
nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2
nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic stream API for all transports. There have been related bugs fixed along the way. Additionally the man pages have changed. The old non-polymorphic APIs are removed now. This is a breaking change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_ipc.h28
-rw-r--r--src/platform/posix/posix_ipcconn.c197
-rw-r--r--src/platform/posix/posix_ipcdial.c100
-rw-r--r--src/platform/posix/posix_ipclisten.c173
-rw-r--r--src/platform/posix/posix_resolv_gai.c7
-rw-r--r--src/platform/posix/posix_tcp.h28
-rw-r--r--src/platform/posix/posix_tcpconn.c186
-rw-r--r--src/platform/posix/posix_tcpdial.c35
-rw-r--r--src/platform/posix/posix_tcplisten.c14
9 files changed, 410 insertions, 358 deletions
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
index bbe11f0d..f570b172 100644
--- a/src/platform/posix/posix_ipc.h
+++ b/src/platform/posix/posix_ipc.h
@@ -1,7 +1,7 @@
//
// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -10,6 +10,7 @@
//
#include "core/nng_impl.h"
+#include "core/stream.h"
#ifdef NNG_PLATFORM_POSIX
#include "platform/posix/posix_aio.h"
@@ -17,6 +18,7 @@
#include <sys/types.h> // For mode_t
struct nni_ipc_conn {
+ nng_stream stream;
nni_posix_pfd * pfd;
nni_list readq;
nni_list writeq;
@@ -24,27 +26,17 @@ struct nni_ipc_conn {
nni_mtx mtx;
nni_aio * dial_aio;
nni_ipc_dialer *dialer;
- nni_reap_item reap;
};
struct nni_ipc_dialer {
- nni_list connq; // pending connections
- bool closed;
- nni_mtx mtx;
+ nng_stream_dialer sd;
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+ nng_sockaddr sa;
};
-struct nni_ipc_listener {
- nni_posix_pfd *pfd;
- nng_sockaddr sa;
- nni_list acceptq;
- bool started;
- bool closed;
- char * path;
- mode_t perms;
- nni_mtx mtx;
-};
-
-extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *);
-extern void nni_posix_ipc_conn_start(nni_ipc_conn *);
+extern int nni_posix_ipc_init(nni_ipc_conn **, nni_posix_pfd *);
+extern void nni_posix_ipc_start(nni_ipc_conn *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 48bd75a4..07ec6213 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -39,8 +39,10 @@
#include "posix_ipc.h"
+typedef struct nni_ipc_conn ipc_conn;
+
static void
-ipc_conn_dowrite(nni_ipc_conn *c)
+ipc_dowrite(ipc_conn *c)
{
nni_aio *aio;
int fd;
@@ -122,7 +124,7 @@ ipc_conn_dowrite(nni_ipc_conn *c)
}
static void
-ipc_conn_doread(nni_ipc_conn *c)
+ipc_doread(ipc_conn *c)
{
nni_aio *aio;
int fd;
@@ -199,9 +201,10 @@ ipc_conn_doread(nni_ipc_conn *c)
}
}
-void
-nni_ipc_conn_close(nni_ipc_conn *c)
+static void
+ipc_close(void *arg)
{
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
nni_aio *aio;
@@ -217,20 +220,20 @@ nni_ipc_conn_close(nni_ipc_conn *c)
}
static void
-ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+ipc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
- nni_ipc_conn *c = arg;
+ ipc_conn *c = arg;
if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_ipc_conn_close(c);
+ ipc_close(c);
return;
}
nni_mtx_lock(&c->mtx);
if (events & POLLIN) {
- ipc_conn_doread(c);
+ ipc_doread(c);
}
if (events & POLLOUT) {
- ipc_conn_dowrite(c);
+ ipc_dowrite(c);
}
events = 0;
if (!nni_list_empty(&c->writeq)) {
@@ -246,9 +249,9 @@ ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
}
static void
-ipc_conn_cancel(nni_aio *aio, void *arg, int rv)
+ipc_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_conn *c = arg;
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (nni_aio_list_active(aio)) {
@@ -258,18 +261,18 @@ ipc_conn_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+static void
+ipc_send(void *arg, nni_aio *aio)
{
-
- int rv;
+ ipc_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -277,7 +280,7 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
nni_aio_list_append(&c->writeq, aio);
if (nni_list_first(&c->writeq) == aio) {
- ipc_conn_dowrite(c);
+ ipc_dowrite(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -288,17 +291,18 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+static void
+ipc_recv(void *arg, nni_aio *aio)
{
- int rv;
+ ipc_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -310,7 +314,7 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
// many cases. We also need not arm a list if it was already
// armed.
if (nni_list_first(&c->readq) == aio) {
- ipc_conn_doread(c);
+ ipc_doread(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -322,8 +326,8 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
}
static int
-ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
- uint64_t *prid, uint64_t *znid)
+ipc_peerid(ipc_conn *c, uint64_t *euid, uint64_t *egid, uint64_t *prid,
+ uint64_t *znid)
{
int fd = nni_posix_pfd_fd(c->pfd);
#if defined(NNG_HAVE_GETPEEREID)
@@ -403,43 +407,43 @@ ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
#endif
}
-int
-ipc_conn_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
+static int
+ipc_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &id, &ignore, &ignore, &ignore)) != 0) {
+ if ((rv = ipc_peerid(c, &id, &ignore, &ignore, &ignore)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_conn_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &ignore, &id, &ignore, &ignore)) != 0) {
+ if ((rv = ipc_peerid(c, &ignore, &id, &ignore, &ignore)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_conn_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
+ if ((rv = ipc_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
return (rv);
}
if (id == (uint64_t) -1) {
@@ -450,14 +454,14 @@ ipc_conn_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-ipc_conn_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
+ if ((rv = ipc_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
return (rv);
}
if (id == (uint64_t) -1) {
@@ -468,9 +472,9 @@ ipc_conn_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-ipc_conn_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn * c = arg;
+ ipc_conn * c = arg;
nni_sockaddr sa;
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
@@ -486,36 +490,17 @@ ipc_conn_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_sockaddr(&sa, buf, szp, t));
}
-int
-nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
-{
- nni_ipc_conn *c;
-
- if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- c->closed = false;
- c->pfd = pfd;
-
- nni_mtx_init(&c->mtx);
- nni_aio_list_init(&c->readq);
- nni_aio_list_init(&c->writeq);
-
- *cp = c;
- return (0);
-}
-
void
-nni_posix_ipc_conn_start(nni_ipc_conn *c)
+nni_posix_ipc_start(nni_ipc_conn *c)
{
- nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c);
+ nni_posix_pfd_set_cb(c->pfd, ipc_cb, c);
}
-void
-nni_ipc_conn_fini(nni_ipc_conn *c)
+static void
+ipc_free(void *arg)
{
- nni_ipc_conn_close(c);
+ ipc_conn *c = arg;
+ ipc_close(c);
nni_posix_pfd_fini(c->pfd);
nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
c->pfd = NULL;
@@ -525,46 +510,72 @@ nni_ipc_conn_fini(nni_ipc_conn *c)
NNI_FREE_STRUCT(c);
}
-static const nni_option ipc_conn_options[] = {
+static const nni_option ipc_options[] = {
{
.o_name = NNG_OPT_LOCADDR,
- .o_get = ipc_conn_get_addr,
+ .o_get = ipc_get_addr,
},
{
.o_name = NNG_OPT_REMADDR,
- .o_get = ipc_conn_get_addr,
+ .o_get = ipc_get_addr,
},
{
.o_name = NNG_OPT_IPC_PEER_PID,
- .o_get = ipc_conn_get_peer_pid,
+ .o_get = ipc_get_peer_pid,
},
{
.o_name = NNG_OPT_IPC_PEER_UID,
- .o_get = ipc_conn_get_peer_uid,
+ .o_get = ipc_get_peer_uid,
},
{
.o_name = NNG_OPT_IPC_PEER_GID,
- .o_get = ipc_conn_get_peer_gid,
+ .o_get = ipc_get_peer_gid,
},
{
.o_name = NNG_OPT_IPC_PEER_ZONEID,
- .o_get = ipc_conn_get_peer_zoneid,
+ .o_get = ipc_get_peer_zoneid,
},
{
.o_name = NULL,
},
};
-int
-nni_ipc_conn_getopt(
- nni_ipc_conn *c, const char *name, void *val, size_t *szp, nni_type t)
+static int
+ipc_getx(void *arg, const char *name, void *val, size_t *szp, nni_type t)
{
- return (nni_getopt(ipc_conn_options, name, c, val, szp, t));
+ ipc_conn *c = arg;
+ return (nni_getopt(ipc_options, name, c, val, szp, t));
+}
+
+static int
+ipc_setx(void *arg, const char *name, const void *val, size_t sz, nni_type t)
+{
+ ipc_conn *c = arg;
+ return (nni_setopt(ipc_options, name, c, val, sz, t));
}
int
-nni_ipc_conn_setopt(
- nni_ipc_conn *c, const char *name, const void *val, size_t sz, nni_type t)
+nni_posix_ipc_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
{
- return (nni_setopt(ipc_conn_options, name, c, val, sz, t));
-} \ No newline at end of file
+ ipc_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+ c->stream.s_free = ipc_free;
+ c->stream.s_close = ipc_close;
+ c->stream.s_send = ipc_send;
+ c->stream.s_recv = ipc_recv;
+ c->stream.s_getx = ipc_getx;
+ c->stream.s_setx = ipc_setx;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index d3dc2109..3182b390 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -29,25 +29,13 @@
#include "posix_ipc.h"
-// Dialer stuff.
-int
-nni_ipc_dialer_init(nni_ipc_dialer **dp)
-{
- nni_ipc_dialer *d;
+typedef struct nni_ipc_dialer ipc_dialer;
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&d->mtx);
- d->closed = false;
- nni_aio_list_init(&d->connq);
- *dp = d;
- return (0);
-}
-
-void
-nni_ipc_dialer_close(nni_ipc_dialer *d)
+// Dialer stuff.
+static void
+ipc_dialer_close(void *arg)
{
+ ipc_dialer *d = arg;
nni_mtx_lock(&d->mtx);
if (!d->closed) {
nni_aio *aio;
@@ -58,9 +46,8 @@ nni_ipc_dialer_close(nni_ipc_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->dial_aio = NULL;
nni_aio_set_prov_extra(aio, 0, NULL);
- nni_ipc_conn_close(c);
- nni_reap(
- &c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
}
@@ -68,10 +55,11 @@ nni_ipc_dialer_close(nni_ipc_dialer *d)
nni_mtx_unlock(&d->mtx);
}
-void
-nni_ipc_dialer_fini(nni_ipc_dialer *d)
+static void
+ipc_dialer_free(void *arg)
{
- nni_ipc_dialer_close(d);
+ ipc_dialer *d = arg;
+ ipc_dialer_close(d);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
@@ -94,7 +82,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
- nni_ipc_conn_fini(c);
+ nng_stream_free(&c->stream);
}
static void
@@ -137,13 +125,13 @@ ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
- nni_ipc_conn_close(c);
- nni_ipc_conn_fini(c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
}
- nni_posix_ipc_conn_start(c);
+ nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -151,8 +139,9 @@ ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
// We don't give local address binding support. Outbound dialers always
// get an ephemeral port.
void
-nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+ipc_dialer_dial(void *arg, nni_aio *aio)
{
+ ipc_dialer * d = arg;
nni_ipc_conn * c;
nni_posix_pfd * pfd = NULL;
struct sockaddr_storage ss;
@@ -164,7 +153,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
return;
}
- if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ if (((sslen = nni_posix_nn2sockaddr(&ss, &d->sa)) == 0) ||
(ss.ss_family != AF_UNIX)) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
@@ -182,7 +171,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_ipc_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_finish_error(aio, rv);
return;
@@ -222,7 +211,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
// on loopback, and probably not on every platform.
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&d->mtx);
- nni_posix_ipc_conn_start(c);
+ nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
return;
@@ -230,7 +219,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&d->mtx);
- nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
@@ -241,15 +230,44 @@ static const nni_option ipc_dialer_options[] = {
};
int
-nni_ipc_dialer_getopt(
- nni_ipc_dialer *d, const char *name, void *buf, size_t *szp, nni_type t)
+ipc_dialer_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ ipc_dialer *d = arg;
+ return (nni_getopt(ipc_dialer_options, nm, d, buf, szp, t));
+}
+
+int
+ipc_dialer_setx(
+ void *arg, const char *nm, const void *buf, size_t sz, nni_type t)
{
- return (nni_getopt(ipc_dialer_options, name, d, buf, szp, t));
+ ipc_dialer *d = arg;
+ return (nni_setopt(ipc_dialer_options, nm, d, buf, sz, t));
}
int
-nni_ipc_dialer_setopt(nni_ipc_dialer *d, const char *name, const void *buf,
- size_t sz, nni_type t)
+nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
{
- return (nni_setopt(ipc_dialer_options, name, d, buf, sz, t));
-} \ No newline at end of file
+ ipc_dialer *d;
+
+ if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) ||
+ (strlen(url->u_path) == 0) ||
+ (strlen(url->u_path) >= NNG_MAXADDRLEN)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->connq);
+ d->closed = false;
+ d->sa.s_ipc.sa_family = NNG_AF_IPC;
+ strcpy(d->sa.s_ipc.sa_path, url->u_path);
+ d->sd.sd_free = ipc_dialer_free;
+ d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_dial = ipc_dialer_dial;
+ d->sd.sd_getx = ipc_dialer_getx;
+ d->sd.sd_setx = ipc_dialer_setx;
+
+ *dp = (void *) d;
+ return (0);
+}
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index 11b56ab0..2b5d0edd 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -29,28 +29,20 @@
#include "posix_ipc.h"
-int
-nni_ipc_listener_init(nni_ipc_listener **lp)
-{
- nni_ipc_listener *l;
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- nni_mtx_init(&l->mtx);
-
- l->pfd = NULL;
- l->closed = false;
- l->started = false;
- l->perms = 0;
-
- nni_aio_list_init(&l->acceptq);
- *lp = l;
- return (0);
-}
+typedef struct {
+ nng_stream_listener sl;
+ nni_posix_pfd * pfd;
+ nng_sockaddr sa;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ char * path;
+ mode_t perms;
+ nni_mtx mtx;
+} ipc_listener;
static void
-ipc_listener_doclose(nni_ipc_listener *l)
+ipc_listener_doclose(ipc_listener *l)
{
nni_aio *aio;
char * path;
@@ -71,16 +63,17 @@ ipc_listener_doclose(nni_ipc_listener *l)
}
}
-void
-nni_ipc_listener_close(nni_ipc_listener *l)
+static void
+ipc_listener_close(void *arg)
{
+ ipc_listener *l = arg;
nni_mtx_lock(&l->mtx);
ipc_listener_doclose(l);
nni_mtx_unlock(&l->mtx);
}
static void
-ipc_listener_doaccept(nni_ipc_listener *l)
+ipc_listener_doaccept(ipc_listener *l)
{
nni_aio *aio;
@@ -138,7 +131,7 @@ ipc_listener_doaccept(nni_ipc_listener *l)
continue;
}
- if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_ipc_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -146,7 +139,7 @@ ipc_listener_doaccept(nni_ipc_listener *l)
}
nni_aio_list_remove(aio);
- nni_posix_ipc_conn_start(c);
+ nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -155,7 +148,7 @@ ipc_listener_doaccept(nni_ipc_listener *l)
static void
ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
@@ -173,7 +166,7 @@ ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
static void
ipc_listener_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
// This is dead easy, because we'll ignore the completion if there
// isn't anything to do the accept on!
@@ -222,16 +215,16 @@ ipc_remove_stale(const char *path)
static int
ipc_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
return (nni_copyout_sockaddr(&l->sa, buf, szp, t));
}
static int
ipc_listener_set_perms(void *arg, const void *buf, size_t sz, nni_type t)
{
- nni_ipc_listener *l = arg;
- int mode;
- int rv;
+ ipc_listener *l = arg;
+ int mode;
+ int rv;
if ((rv = nni_copyin_int(&mode, buf, sz, 0, S_IFMT, t)) != 0) {
return (rv);
@@ -239,16 +232,14 @@ ipc_listener_set_perms(void *arg, const void *buf, size_t sz, nni_type t)
if ((mode & S_IFMT) != 0) {
return (NNG_EINVAL);
}
- if (l != NULL) {
- mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
- nni_mtx_lock(&l->mtx);
- if (l->started) {
- nni_mtx_unlock(&l->mtx);
- return (NNG_EBUSY);
- }
- l->perms = mode;
+ mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
}
+ l->perms = mode;
+ nni_mtx_unlock(&l->mtx);
return (0);
}
@@ -266,23 +257,26 @@ static const nni_option ipc_listener_options[] = {
},
};
-int
-nni_ipc_listener_getopt(
- nni_ipc_listener *l, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+ipc_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
+ ipc_listener *l = arg;
return (nni_getopt(ipc_listener_options, name, l, buf, szp, t));
}
-int
-nni_ipc_listener_setopt(nni_ipc_listener *l, const char *name, const void *buf,
- size_t sz, nni_type t)
+static int
+ipc_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
+ ipc_listener *l = arg;
return (nni_setopt(ipc_listener_options, name, l, buf, sz, t));
}
int
-nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+ipc_listener_listen(void *arg)
{
+ ipc_listener * l = arg;
socklen_t len;
struct sockaddr_storage ss;
int rv;
@@ -290,7 +284,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
nni_posix_pfd * pfd;
char * path;
- if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ if (((len = nni_posix_nn2sockaddr(&ss, &l->sa)) == 0) ||
(ss.ss_family != AF_UNIX)) {
return (NNG_EADDRINVAL);
}
@@ -304,7 +298,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
nni_mtx_unlock(&l->mtx);
return (NNG_ECLOSED);
}
- path = nni_strdup(sa->s_ipc.sa_path);
+ path = nni_strdup(l->sa.s_ipc.sa_path);
if (path == NULL) {
return (NNG_ENOMEM);
}
@@ -352,15 +346,15 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
l->pfd = pfd;
l->started = true;
l->path = path;
- l->sa = *sa;
nni_mtx_unlock(&l->mtx);
return (0);
}
-void
-nni_ipc_listener_fini(nni_ipc_listener *l)
+static void
+ipc_listener_free(void *arg)
{
+ ipc_listener * l = arg;
nni_posix_pfd *pfd;
nni_mtx_lock(&l->mtx);
@@ -375,10 +369,11 @@ nni_ipc_listener_fini(nni_ipc_listener *l)
NNI_FREE_STRUCT(l);
}
-void
-nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+static void
+ipc_listener_accept(void *arg, nni_aio *aio)
{
- int rv;
+ ipc_listener *l = arg;
+ int rv;
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
@@ -410,3 +405,69 @@ nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
}
nni_mtx_unlock(&l->mtx);
}
+
+int
+nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
+{
+ ipc_listener *l;
+
+ if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) ||
+ (strlen(url->u_path) == 0) ||
+ (strlen(url->u_path) >= NNG_MAXADDRLEN)) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+ nni_aio_list_init(&l->acceptq);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+ l->perms = 0;
+ l->sa.s_ipc.sa_family = NNG_AF_IPC;
+ strcpy(l->sa.s_ipc.sa_path, url->u_path);
+ l->sl.sl_free = ipc_listener_free;
+ l->sl.sl_close = ipc_listener_close;
+ l->sl.sl_listen = ipc_listener_listen;
+ l->sl.sl_accept = ipc_listener_accept;
+ l->sl.sl_getx = ipc_listener_getx;
+ l->sl.sl_setx = ipc_listener_setx;
+
+ *lp = (void *) l;
+ return (0);
+}
+
+static int
+ipc_check_perms(const void *buf, size_t sz, nni_type t)
+{
+ int32_t mode;
+ int rv;
+
+ if ((rv = nni_copyin_int(&mode, buf, sz, 0, S_IFMT, t)) != 0) {
+ return (rv);
+ }
+ if ((mode & S_IFMT) != 0) {
+ return (NNG_EINVAL);
+ }
+ return (0);
+}
+
+static const nni_chkoption ipc_chkopts[] = {
+ {
+ .o_name = NNG_OPT_IPC_PERMISSIONS,
+ .o_check = ipc_check_perms,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+int
+nni_ipc_checkopt(const char *name, const void *data, size_t sz, nni_type t)
+{
+ return (nni_chkopt(ipc_chkopts, name, data, sz, t));
+}
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index b4d63b59..bb6db188 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -321,10 +321,11 @@ resolv_worker(void *notused)
// Check to make sure we were not canceled.
if ((aio = item->aio) != NULL) {
- nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+
nni_aio_set_prov_extra(aio, 0, NULL);
item->aio = NULL;
- memcpy(sa, &item->sa, sizeof(*sa));
+
+ nni_aio_set_sockaddr(aio, &item->sa);
nni_aio_finish(aio, rv, 0);
NNI_FREE_STRUCT(item);
diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h
index 788fcbf8..1638df61 100644
--- a/src/platform/posix/posix_tcp.h
+++ b/src/platform/posix/posix_tcp.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -14,6 +14,7 @@
#include "platform/posix/posix_aio.h"
struct nni_tcp_conn {
+ nng_stream stream;
nni_posix_pfd * pfd;
nni_list readq;
nni_list writeq;
@@ -23,26 +24,5 @@ struct nni_tcp_conn {
nni_tcp_dialer *dialer;
nni_reap_item reap;
};
-
-struct nni_tcp_dialer {
- nni_list connq; // pending connections
- bool closed;
- bool nodelay;
- bool keepalive;
- struct sockaddr_storage src;
- size_t srclen;
- nni_mtx mtx;
-};
-
-struct nni_tcp_listener {
- nni_posix_pfd *pfd;
- nni_list acceptq;
- bool started;
- bool closed;
- bool nodelay;
- bool keepalive;
- nni_mtx mtx;
-};
-
-extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *);
-extern void nni_posix_tcp_conn_start(nni_tcp_conn *, int, int);
+extern int nni_posix_tcp_init(nni_tcp_conn **, nni_posix_pfd *);
+extern void nni_posix_tcp_start(nni_tcp_conn *, int, int);
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index ef6ee8e3..0d3c274d 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -36,7 +36,7 @@
#include "posix_tcp.h"
static void
-tcp_conn_dowrite(nni_tcp_conn *c)
+tcp_dowrite(nni_tcp_conn *c)
{
nni_aio *aio;
int fd;
@@ -118,7 +118,7 @@ tcp_conn_dowrite(nni_tcp_conn *c)
}
static void
-tcp_conn_doread(nni_tcp_conn *c)
+tcp_doread(nni_tcp_conn *c)
{
nni_aio *aio;
int fd;
@@ -195,9 +195,10 @@ tcp_conn_doread(nni_tcp_conn *c)
}
}
-void
-nni_tcp_conn_close(nni_tcp_conn *c)
+static void
+tcp_close(void *arg)
{
+ nni_tcp_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
nni_aio *aio;
@@ -212,21 +213,44 @@ nni_tcp_conn_close(nni_tcp_conn *c)
nni_mtx_unlock(&c->mtx);
}
+// tcp_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+tcp_fini(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ tcp_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
static void
-tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+tcp_free(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ nni_reap(&c->reap, tcp_fini, arg);
+}
+
+static void
+tcp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_conn *c = arg;
if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_tcp_conn_close(c);
+ tcp_close(c);
return;
}
nni_mtx_lock(&c->mtx);
if (events & POLLIN) {
- tcp_conn_doread(c);
+ tcp_doread(c);
}
if (events & POLLOUT) {
- tcp_conn_dowrite(c);
+ tcp_dowrite(c);
}
events = 0;
if (!nni_list_empty(&c->writeq)) {
@@ -242,7 +266,7 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
}
static void
-tcp_conn_cancel(nni_aio *aio, void *arg, int rv)
+tcp_cancel(nni_aio *aio, void *arg, int rv)
{
nni_tcp_conn *c = arg;
@@ -254,18 +278,18 @@ tcp_conn_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_send(void *arg, nni_aio *aio)
{
-
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -273,7 +297,7 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_aio_list_append(&c->writeq, aio);
if (nni_list_first(&c->writeq) == aio) {
- tcp_conn_dowrite(c);
+ tcp_dowrite(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -284,17 +308,18 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_recv(void *arg, nni_aio *aio)
{
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -306,7 +331,7 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
// many cases. We also need not arm a list if it was already
// armed.
if (nni_list_first(&c->readq) == aio) {
- tcp_conn_doread(c);
+ tcp_doread(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -317,59 +342,8 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-int
-nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay)
-{
-
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
static int
-tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn * c = arg;
struct sockaddr_storage ss;
@@ -388,7 +362,7 @@ tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn * c = arg;
struct sockaddr_storage ss;
@@ -407,7 +381,7 @@ tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
int fd;
@@ -427,7 +401,7 @@ tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
int fd;
@@ -447,7 +421,7 @@ tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
int fd = nni_posix_pfd_fd(c->pfd);
@@ -462,7 +436,7 @@ tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
int fd = nni_posix_pfd_fd(c->pfd);
@@ -476,46 +450,46 @@ tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_bool(val, buf, szp, t));
}
-static const nni_option tcp_conn_options[] = {
+static const nni_option tcp_options[] = {
{
.o_name = NNG_OPT_REMADDR,
- .o_get = tcp_conn_get_peername,
+ .o_get = tcp_get_peername,
},
{
.o_name = NNG_OPT_LOCADDR,
- .o_get = tcp_conn_get_sockname,
+ .o_get = tcp_get_sockname,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
- .o_get = tcp_conn_get_nodelay,
- .o_set = tcp_conn_set_nodelay,
+ .o_get = tcp_get_nodelay,
+ .o_set = tcp_set_nodelay,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
- .o_get = tcp_conn_get_keepalive,
- .o_set = tcp_conn_set_keepalive,
+ .o_get = tcp_get_keepalive,
+ .o_set = tcp_set_keepalive,
},
{
.o_name = NULL,
},
};
-int
-nni_tcp_conn_getopt(
- nni_tcp_conn *c, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tcp_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- return (nni_getopt(tcp_conn_options, name, c, buf, szp, t));
+ nni_tcp_conn *c = arg;
+ return (nni_getopt(tcp_options, name, c, buf, szp, t));
}
-int
-nni_tcp_conn_setopt(
- nni_tcp_conn *c, const char *name, const void *buf, size_t sz, nni_type t)
+static int
+tcp_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- return (nni_setopt(tcp_conn_options, name, c, buf, sz, t));
+ nni_tcp_conn *c = arg;
+ return (nni_setopt(tcp_options, name, c, buf, sz, t));
}
int
-nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
+nni_posix_tcp_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
{
nni_tcp_conn *c;
@@ -530,12 +504,19 @@ nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ c->stream.s_free = tcp_free;
+ c->stream.s_close = tcp_close;
+ c->stream.s_recv = tcp_recv;
+ c->stream.s_send = tcp_send;
+ c->stream.s_getx = tcp_getx;
+ c->stream.s_setx = tcp_setx;
+
*cp = c;
return (0);
}
void
-nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive)
+nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive)
{
// Configure the initial socket options.
(void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY,
@@ -543,18 +524,5 @@ nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive)
(void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE,
&keepalive, sizeof(int));
- nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c);
-}
-
-void
-nni_tcp_conn_fini(nni_tcp_conn *c)
-{
- nni_tcp_conn_close(c);
- nni_posix_pfd_fini(c->pfd);
- nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
- nni_mtx_fini(&c->mtx);
-
- NNI_FREE_STRUCT(c);
+ nni_posix_pfd_set_cb(c->pfd, tcp_cb, c);
}
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index cfb3482c..21ad862d 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -29,6 +29,16 @@
#include "posix_tcp.h"
+struct nni_tcp_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ bool nodelay;
+ bool keepalive;
+ struct sockaddr_storage src;
+ size_t srclen;
+ nni_mtx mtx;
+};
+
// Dialer stuff.
int
nni_tcp_dialer_init(nni_tcp_dialer **dp)
@@ -58,9 +68,8 @@ nni_tcp_dialer_close(nni_tcp_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->dial_aio = NULL;
nni_aio_set_prov_extra(aio, 0, NULL);
- nni_tcp_conn_close(c);
- nni_reap(
- &c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
}
@@ -94,7 +103,7 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->stream);
}
static void
@@ -142,13 +151,13 @@ tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
- nni_tcp_conn_close(c);
- nni_tcp_conn_fini(c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
}
- nni_posix_tcp_conn_start(c, nd, ka);
+ nni_posix_tcp_start(c, nd, ka);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -156,7 +165,7 @@ tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
// We don't give local address binding support. Outbound dialers always
// get an ephemeral port.
void
-nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+nni_tcp_dial(nni_tcp_dialer *d, nni_aio *aio)
{
nni_tcp_conn * c;
nni_posix_pfd * pfd = NULL;
@@ -166,12 +175,14 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
int rv;
int ka;
int nd;
+ nng_sockaddr sa;
if (nni_aio_begin(aio) != 0) {
return;
}
- if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ nni_aio_get_sockaddr(aio, &sa);
+ if (((sslen = nni_posix_nn2sockaddr(&ss, &sa)) == 0) ||
((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
@@ -189,7 +200,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_tcp_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_finish_error(aio, rv);
return;
@@ -232,7 +243,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nd = d->nodelay ? 1 : 0;
ka = d->keepalive ? 1 : 0;
nni_mtx_unlock(&d->mtx);
- nni_posix_tcp_conn_start(c, nd, ka);
+ nni_posix_tcp_start(c, nd, ka);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
return;
@@ -240,7 +251,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&d->mtx);
- nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index 1e1b84b1..1edeccbc 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -29,6 +29,16 @@
#include "posix_tcp.h"
+struct nni_tcp_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ bool nodelay;
+ bool keepalive;
+ nni_mtx mtx;
+};
+
int
nni_tcp_listener_init(nni_tcp_listener **lp)
{
@@ -133,7 +143,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
continue;
}
- if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_tcp_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -143,7 +153,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
ka = l->keepalive ? 1 : 0;
nd = l->nodelay ? 1 : 0;
nni_aio_list_remove(aio);
- nni_posix_tcp_conn_start(c, nd, ka);
+ nni_posix_tcp_start(c, nd, ka);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}