aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_ipc.h28
-rw-r--r--src/platform/posix/posix_ipcconn.c197
-rw-r--r--src/platform/posix/posix_ipcdial.c100
-rw-r--r--src/platform/posix/posix_ipclisten.c173
-rw-r--r--src/platform/posix/posix_resolv_gai.c7
-rw-r--r--src/platform/posix/posix_tcp.h28
-rw-r--r--src/platform/posix/posix_tcpconn.c186
-rw-r--r--src/platform/posix/posix_tcpdial.c35
-rw-r--r--src/platform/posix/posix_tcplisten.c14
-rw-r--r--src/platform/windows/win_ipc.h43
-rw-r--r--src/platform/windows/win_ipcconn.c145
-rw-r--r--src/platform/windows/win_ipcdial.c149
-rw-r--r--src/platform/windows/win_ipclisten.c195
-rw-r--r--src/platform/windows/win_resolv.c29
-rw-r--r--src/platform/windows/win_tcp.h15
-rw-r--r--src/platform/windows/win_tcpconn.c195
-rw-r--r--src/platform/windows/win_tcpdial.c34
-rw-r--r--src/platform/windows/win_tcplisten.c8
18 files changed, 834 insertions, 747 deletions
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
index bbe11f0d..f570b172 100644
--- a/src/platform/posix/posix_ipc.h
+++ b/src/platform/posix/posix_ipc.h
@@ -1,7 +1,7 @@
//
// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -10,6 +10,7 @@
//
#include "core/nng_impl.h"
+#include "core/stream.h"
#ifdef NNG_PLATFORM_POSIX
#include "platform/posix/posix_aio.h"
@@ -17,6 +18,7 @@
#include <sys/types.h> // For mode_t
struct nni_ipc_conn {
+ nng_stream stream;
nni_posix_pfd * pfd;
nni_list readq;
nni_list writeq;
@@ -24,27 +26,17 @@ struct nni_ipc_conn {
nni_mtx mtx;
nni_aio * dial_aio;
nni_ipc_dialer *dialer;
- nni_reap_item reap;
};
struct nni_ipc_dialer {
- nni_list connq; // pending connections
- bool closed;
- nni_mtx mtx;
+ nng_stream_dialer sd;
+ nni_list connq; // pending connections
+ bool closed;
+ nni_mtx mtx;
+ nng_sockaddr sa;
};
-struct nni_ipc_listener {
- nni_posix_pfd *pfd;
- nng_sockaddr sa;
- nni_list acceptq;
- bool started;
- bool closed;
- char * path;
- mode_t perms;
- nni_mtx mtx;
-};
-
-extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *);
-extern void nni_posix_ipc_conn_start(nni_ipc_conn *);
+extern int nni_posix_ipc_init(nni_ipc_conn **, nni_posix_pfd *);
+extern void nni_posix_ipc_start(nni_ipc_conn *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 48bd75a4..07ec6213 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -39,8 +39,10 @@
#include "posix_ipc.h"
+typedef struct nni_ipc_conn ipc_conn;
+
static void
-ipc_conn_dowrite(nni_ipc_conn *c)
+ipc_dowrite(ipc_conn *c)
{
nni_aio *aio;
int fd;
@@ -122,7 +124,7 @@ ipc_conn_dowrite(nni_ipc_conn *c)
}
static void
-ipc_conn_doread(nni_ipc_conn *c)
+ipc_doread(ipc_conn *c)
{
nni_aio *aio;
int fd;
@@ -199,9 +201,10 @@ ipc_conn_doread(nni_ipc_conn *c)
}
}
-void
-nni_ipc_conn_close(nni_ipc_conn *c)
+static void
+ipc_close(void *arg)
{
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
nni_aio *aio;
@@ -217,20 +220,20 @@ nni_ipc_conn_close(nni_ipc_conn *c)
}
static void
-ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+ipc_cb(nni_posix_pfd *pfd, int events, void *arg)
{
- nni_ipc_conn *c = arg;
+ ipc_conn *c = arg;
if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_ipc_conn_close(c);
+ ipc_close(c);
return;
}
nni_mtx_lock(&c->mtx);
if (events & POLLIN) {
- ipc_conn_doread(c);
+ ipc_doread(c);
}
if (events & POLLOUT) {
- ipc_conn_dowrite(c);
+ ipc_dowrite(c);
}
events = 0;
if (!nni_list_empty(&c->writeq)) {
@@ -246,9 +249,9 @@ ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
}
static void
-ipc_conn_cancel(nni_aio *aio, void *arg, int rv)
+ipc_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_conn *c = arg;
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (nni_aio_list_active(aio)) {
@@ -258,18 +261,18 @@ ipc_conn_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+static void
+ipc_send(void *arg, nni_aio *aio)
{
-
- int rv;
+ ipc_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -277,7 +280,7 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
nni_aio_list_append(&c->writeq, aio);
if (nni_list_first(&c->writeq) == aio) {
- ipc_conn_dowrite(c);
+ ipc_dowrite(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -288,17 +291,18 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+static void
+ipc_recv(void *arg, nni_aio *aio)
{
- int rv;
+ ipc_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -310,7 +314,7 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
// many cases. We also need not arm a list if it was already
// armed.
if (nni_list_first(&c->readq) == aio) {
- ipc_conn_doread(c);
+ ipc_doread(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -322,8 +326,8 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
}
static int
-ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
- uint64_t *prid, uint64_t *znid)
+ipc_peerid(ipc_conn *c, uint64_t *euid, uint64_t *egid, uint64_t *prid,
+ uint64_t *znid)
{
int fd = nni_posix_pfd_fd(c->pfd);
#if defined(NNG_HAVE_GETPEEREID)
@@ -403,43 +407,43 @@ ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid,
#endif
}
-int
-ipc_conn_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
+static int
+ipc_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &id, &ignore, &ignore, &ignore)) != 0) {
+ if ((rv = ipc_peerid(c, &id, &ignore, &ignore, &ignore)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_conn_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &ignore, &id, &ignore, &ignore)) != 0) {
+ if ((rv = ipc_peerid(c, &ignore, &id, &ignore, &ignore)) != 0) {
return (rv);
}
return (nni_copyout_u64(id, buf, szp, t));
}
static int
-ipc_conn_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
+ if ((rv = ipc_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) {
return (rv);
}
if (id == (uint64_t) -1) {
@@ -450,14 +454,14 @@ ipc_conn_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-ipc_conn_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn *c = arg;
- int rv;
- uint64_t ignore;
- uint64_t id;
+ ipc_conn *c = arg;
+ int rv;
+ uint64_t ignore;
+ uint64_t id;
- if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
+ if ((rv = ipc_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) {
return (rv);
}
if (id == (uint64_t) -1) {
@@ -468,9 +472,9 @@ ipc_conn_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-ipc_conn_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
+ipc_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_conn * c = arg;
+ ipc_conn * c = arg;
nni_sockaddr sa;
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
@@ -486,36 +490,17 @@ ipc_conn_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_sockaddr(&sa, buf, szp, t));
}
-int
-nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
-{
- nni_ipc_conn *c;
-
- if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- c->closed = false;
- c->pfd = pfd;
-
- nni_mtx_init(&c->mtx);
- nni_aio_list_init(&c->readq);
- nni_aio_list_init(&c->writeq);
-
- *cp = c;
- return (0);
-}
-
void
-nni_posix_ipc_conn_start(nni_ipc_conn *c)
+nni_posix_ipc_start(nni_ipc_conn *c)
{
- nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c);
+ nni_posix_pfd_set_cb(c->pfd, ipc_cb, c);
}
-void
-nni_ipc_conn_fini(nni_ipc_conn *c)
+static void
+ipc_free(void *arg)
{
- nni_ipc_conn_close(c);
+ ipc_conn *c = arg;
+ ipc_close(c);
nni_posix_pfd_fini(c->pfd);
nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
c->pfd = NULL;
@@ -525,46 +510,72 @@ nni_ipc_conn_fini(nni_ipc_conn *c)
NNI_FREE_STRUCT(c);
}
-static const nni_option ipc_conn_options[] = {
+static const nni_option ipc_options[] = {
{
.o_name = NNG_OPT_LOCADDR,
- .o_get = ipc_conn_get_addr,
+ .o_get = ipc_get_addr,
},
{
.o_name = NNG_OPT_REMADDR,
- .o_get = ipc_conn_get_addr,
+ .o_get = ipc_get_addr,
},
{
.o_name = NNG_OPT_IPC_PEER_PID,
- .o_get = ipc_conn_get_peer_pid,
+ .o_get = ipc_get_peer_pid,
},
{
.o_name = NNG_OPT_IPC_PEER_UID,
- .o_get = ipc_conn_get_peer_uid,
+ .o_get = ipc_get_peer_uid,
},
{
.o_name = NNG_OPT_IPC_PEER_GID,
- .o_get = ipc_conn_get_peer_gid,
+ .o_get = ipc_get_peer_gid,
},
{
.o_name = NNG_OPT_IPC_PEER_ZONEID,
- .o_get = ipc_conn_get_peer_zoneid,
+ .o_get = ipc_get_peer_zoneid,
},
{
.o_name = NULL,
},
};
-int
-nni_ipc_conn_getopt(
- nni_ipc_conn *c, const char *name, void *val, size_t *szp, nni_type t)
+static int
+ipc_getx(void *arg, const char *name, void *val, size_t *szp, nni_type t)
{
- return (nni_getopt(ipc_conn_options, name, c, val, szp, t));
+ ipc_conn *c = arg;
+ return (nni_getopt(ipc_options, name, c, val, szp, t));
+}
+
+static int
+ipc_setx(void *arg, const char *name, const void *val, size_t sz, nni_type t)
+{
+ ipc_conn *c = arg;
+ return (nni_setopt(ipc_options, name, c, val, sz, t));
}
int
-nni_ipc_conn_setopt(
- nni_ipc_conn *c, const char *name, const void *val, size_t sz, nni_type t)
+nni_posix_ipc_init(nni_ipc_conn **cp, nni_posix_pfd *pfd)
{
- return (nni_setopt(ipc_conn_options, name, c, val, sz, t));
-} \ No newline at end of file
+ ipc_conn *c;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ c->closed = false;
+ c->pfd = pfd;
+ c->stream.s_free = ipc_free;
+ c->stream.s_close = ipc_close;
+ c->stream.s_send = ipc_send;
+ c->stream.s_recv = ipc_recv;
+ c->stream.s_getx = ipc_getx;
+ c->stream.s_setx = ipc_setx;
+
+ nni_mtx_init(&c->mtx);
+ nni_aio_list_init(&c->readq);
+ nni_aio_list_init(&c->writeq);
+
+ *cp = c;
+ return (0);
+}
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index d3dc2109..3182b390 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -29,25 +29,13 @@
#include "posix_ipc.h"
-// Dialer stuff.
-int
-nni_ipc_dialer_init(nni_ipc_dialer **dp)
-{
- nni_ipc_dialer *d;
+typedef struct nni_ipc_dialer ipc_dialer;
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&d->mtx);
- d->closed = false;
- nni_aio_list_init(&d->connq);
- *dp = d;
- return (0);
-}
-
-void
-nni_ipc_dialer_close(nni_ipc_dialer *d)
+// Dialer stuff.
+static void
+ipc_dialer_close(void *arg)
{
+ ipc_dialer *d = arg;
nni_mtx_lock(&d->mtx);
if (!d->closed) {
nni_aio *aio;
@@ -58,9 +46,8 @@ nni_ipc_dialer_close(nni_ipc_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->dial_aio = NULL;
nni_aio_set_prov_extra(aio, 0, NULL);
- nni_ipc_conn_close(c);
- nni_reap(
- &c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
}
@@ -68,10 +55,11 @@ nni_ipc_dialer_close(nni_ipc_dialer *d)
nni_mtx_unlock(&d->mtx);
}
-void
-nni_ipc_dialer_fini(nni_ipc_dialer *d)
+static void
+ipc_dialer_free(void *arg)
{
- nni_ipc_dialer_close(d);
+ ipc_dialer *d = arg;
+ ipc_dialer_close(d);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
@@ -94,7 +82,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
- nni_ipc_conn_fini(c);
+ nng_stream_free(&c->stream);
}
static void
@@ -137,13 +125,13 @@ ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
- nni_ipc_conn_close(c);
- nni_ipc_conn_fini(c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
}
- nni_posix_ipc_conn_start(c);
+ nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -151,8 +139,9 @@ ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
// We don't give local address binding support. Outbound dialers always
// get an ephemeral port.
void
-nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+ipc_dialer_dial(void *arg, nni_aio *aio)
{
+ ipc_dialer * d = arg;
nni_ipc_conn * c;
nni_posix_pfd * pfd = NULL;
struct sockaddr_storage ss;
@@ -164,7 +153,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
return;
}
- if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ if (((sslen = nni_posix_nn2sockaddr(&ss, &d->sa)) == 0) ||
(ss.ss_family != AF_UNIX)) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
@@ -182,7 +171,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_ipc_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_finish_error(aio, rv);
return;
@@ -222,7 +211,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
// on loopback, and probably not on every platform.
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&d->mtx);
- nni_posix_ipc_conn_start(c);
+ nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
return;
@@ -230,7 +219,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&d->mtx);
- nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
@@ -241,15 +230,44 @@ static const nni_option ipc_dialer_options[] = {
};
int
-nni_ipc_dialer_getopt(
- nni_ipc_dialer *d, const char *name, void *buf, size_t *szp, nni_type t)
+ipc_dialer_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t)
+{
+ ipc_dialer *d = arg;
+ return (nni_getopt(ipc_dialer_options, nm, d, buf, szp, t));
+}
+
+int
+ipc_dialer_setx(
+ void *arg, const char *nm, const void *buf, size_t sz, nni_type t)
{
- return (nni_getopt(ipc_dialer_options, name, d, buf, szp, t));
+ ipc_dialer *d = arg;
+ return (nni_setopt(ipc_dialer_options, nm, d, buf, sz, t));
}
int
-nni_ipc_dialer_setopt(nni_ipc_dialer *d, const char *name, const void *buf,
- size_t sz, nni_type t)
+nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
{
- return (nni_setopt(ipc_dialer_options, name, d, buf, sz, t));
-} \ No newline at end of file
+ ipc_dialer *d;
+
+ if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) ||
+ (strlen(url->u_path) == 0) ||
+ (strlen(url->u_path) >= NNG_MAXADDRLEN)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&d->mtx);
+ nni_aio_list_init(&d->connq);
+ d->closed = false;
+ d->sa.s_ipc.sa_family = NNG_AF_IPC;
+ strcpy(d->sa.s_ipc.sa_path, url->u_path);
+ d->sd.sd_free = ipc_dialer_free;
+ d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_dial = ipc_dialer_dial;
+ d->sd.sd_getx = ipc_dialer_getx;
+ d->sd.sd_setx = ipc_dialer_setx;
+
+ *dp = (void *) d;
+ return (0);
+}
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index 11b56ab0..2b5d0edd 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -29,28 +29,20 @@
#include "posix_ipc.h"
-int
-nni_ipc_listener_init(nni_ipc_listener **lp)
-{
- nni_ipc_listener *l;
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- nni_mtx_init(&l->mtx);
-
- l->pfd = NULL;
- l->closed = false;
- l->started = false;
- l->perms = 0;
-
- nni_aio_list_init(&l->acceptq);
- *lp = l;
- return (0);
-}
+typedef struct {
+ nng_stream_listener sl;
+ nni_posix_pfd * pfd;
+ nng_sockaddr sa;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ char * path;
+ mode_t perms;
+ nni_mtx mtx;
+} ipc_listener;
static void
-ipc_listener_doclose(nni_ipc_listener *l)
+ipc_listener_doclose(ipc_listener *l)
{
nni_aio *aio;
char * path;
@@ -71,16 +63,17 @@ ipc_listener_doclose(nni_ipc_listener *l)
}
}
-void
-nni_ipc_listener_close(nni_ipc_listener *l)
+static void
+ipc_listener_close(void *arg)
{
+ ipc_listener *l = arg;
nni_mtx_lock(&l->mtx);
ipc_listener_doclose(l);
nni_mtx_unlock(&l->mtx);
}
static void
-ipc_listener_doaccept(nni_ipc_listener *l)
+ipc_listener_doaccept(ipc_listener *l)
{
nni_aio *aio;
@@ -138,7 +131,7 @@ ipc_listener_doaccept(nni_ipc_listener *l)
continue;
}
- if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_ipc_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -146,7 +139,7 @@ ipc_listener_doaccept(nni_ipc_listener *l)
}
nni_aio_list_remove(aio);
- nni_posix_ipc_conn_start(c);
+ nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -155,7 +148,7 @@ ipc_listener_doaccept(nni_ipc_listener *l)
static void
ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
@@ -173,7 +166,7 @@ ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg)
static void
ipc_listener_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
// This is dead easy, because we'll ignore the completion if there
// isn't anything to do the accept on!
@@ -222,16 +215,16 @@ ipc_remove_stale(const char *path)
static int
ipc_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
return (nni_copyout_sockaddr(&l->sa, buf, szp, t));
}
static int
ipc_listener_set_perms(void *arg, const void *buf, size_t sz, nni_type t)
{
- nni_ipc_listener *l = arg;
- int mode;
- int rv;
+ ipc_listener *l = arg;
+ int mode;
+ int rv;
if ((rv = nni_copyin_int(&mode, buf, sz, 0, S_IFMT, t)) != 0) {
return (rv);
@@ -239,16 +232,14 @@ ipc_listener_set_perms(void *arg, const void *buf, size_t sz, nni_type t)
if ((mode & S_IFMT) != 0) {
return (NNG_EINVAL);
}
- if (l != NULL) {
- mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
- nni_mtx_lock(&l->mtx);
- if (l->started) {
- nni_mtx_unlock(&l->mtx);
- return (NNG_EBUSY);
- }
- l->perms = mode;
+ mode |= S_IFSOCK; // set IFSOCK to ensure non-zero
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
}
+ l->perms = mode;
+ nni_mtx_unlock(&l->mtx);
return (0);
}
@@ -266,23 +257,26 @@ static const nni_option ipc_listener_options[] = {
},
};
-int
-nni_ipc_listener_getopt(
- nni_ipc_listener *l, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+ipc_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
+ ipc_listener *l = arg;
return (nni_getopt(ipc_listener_options, name, l, buf, szp, t));
}
-int
-nni_ipc_listener_setopt(nni_ipc_listener *l, const char *name, const void *buf,
- size_t sz, nni_type t)
+static int
+ipc_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
+ ipc_listener *l = arg;
return (nni_setopt(ipc_listener_options, name, l, buf, sz, t));
}
int
-nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+ipc_listener_listen(void *arg)
{
+ ipc_listener * l = arg;
socklen_t len;
struct sockaddr_storage ss;
int rv;
@@ -290,7 +284,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
nni_posix_pfd * pfd;
char * path;
- if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ if (((len = nni_posix_nn2sockaddr(&ss, &l->sa)) == 0) ||
(ss.ss_family != AF_UNIX)) {
return (NNG_EADDRINVAL);
}
@@ -304,7 +298,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
nni_mtx_unlock(&l->mtx);
return (NNG_ECLOSED);
}
- path = nni_strdup(sa->s_ipc.sa_path);
+ path = nni_strdup(l->sa.s_ipc.sa_path);
if (path == NULL) {
return (NNG_ENOMEM);
}
@@ -352,15 +346,15 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
l->pfd = pfd;
l->started = true;
l->path = path;
- l->sa = *sa;
nni_mtx_unlock(&l->mtx);
return (0);
}
-void
-nni_ipc_listener_fini(nni_ipc_listener *l)
+static void
+ipc_listener_free(void *arg)
{
+ ipc_listener * l = arg;
nni_posix_pfd *pfd;
nni_mtx_lock(&l->mtx);
@@ -375,10 +369,11 @@ nni_ipc_listener_fini(nni_ipc_listener *l)
NNI_FREE_STRUCT(l);
}
-void
-nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+static void
+ipc_listener_accept(void *arg, nni_aio *aio)
{
- int rv;
+ ipc_listener *l = arg;
+ int rv;
// Accept is simpler than the connect case. With accept we just
// need to wait for the socket to be readable to indicate an incoming
@@ -410,3 +405,69 @@ nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
}
nni_mtx_unlock(&l->mtx);
}
+
+int
+nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
+{
+ ipc_listener *l;
+
+ if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) ||
+ (strlen(url->u_path) == 0) ||
+ (strlen(url->u_path) >= NNG_MAXADDRLEN)) {
+ return (NNG_EADDRINVAL);
+ }
+
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_init(&l->mtx);
+ nni_aio_list_init(&l->acceptq);
+
+ l->pfd = NULL;
+ l->closed = false;
+ l->started = false;
+ l->perms = 0;
+ l->sa.s_ipc.sa_family = NNG_AF_IPC;
+ strcpy(l->sa.s_ipc.sa_path, url->u_path);
+ l->sl.sl_free = ipc_listener_free;
+ l->sl.sl_close = ipc_listener_close;
+ l->sl.sl_listen = ipc_listener_listen;
+ l->sl.sl_accept = ipc_listener_accept;
+ l->sl.sl_getx = ipc_listener_getx;
+ l->sl.sl_setx = ipc_listener_setx;
+
+ *lp = (void *) l;
+ return (0);
+}
+
+static int
+ipc_check_perms(const void *buf, size_t sz, nni_type t)
+{
+ int32_t mode;
+ int rv;
+
+ if ((rv = nni_copyin_int(&mode, buf, sz, 0, S_IFMT, t)) != 0) {
+ return (rv);
+ }
+ if ((mode & S_IFMT) != 0) {
+ return (NNG_EINVAL);
+ }
+ return (0);
+}
+
+static const nni_chkoption ipc_chkopts[] = {
+ {
+ .o_name = NNG_OPT_IPC_PERMISSIONS,
+ .o_check = ipc_check_perms,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+int
+nni_ipc_checkopt(const char *name, const void *data, size_t sz, nni_type t)
+{
+ return (nni_chkopt(ipc_chkopts, name, data, sz, t));
+}
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index b4d63b59..bb6db188 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -321,10 +321,11 @@ resolv_worker(void *notused)
// Check to make sure we were not canceled.
if ((aio = item->aio) != NULL) {
- nng_sockaddr *sa = nni_aio_get_input(aio, 0);
+
nni_aio_set_prov_extra(aio, 0, NULL);
item->aio = NULL;
- memcpy(sa, &item->sa, sizeof(*sa));
+
+ nni_aio_set_sockaddr(aio, &item->sa);
nni_aio_finish(aio, rv, 0);
NNI_FREE_STRUCT(item);
diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h
index 788fcbf8..1638df61 100644
--- a/src/platform/posix/posix_tcp.h
+++ b/src/platform/posix/posix_tcp.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -14,6 +14,7 @@
#include "platform/posix/posix_aio.h"
struct nni_tcp_conn {
+ nng_stream stream;
nni_posix_pfd * pfd;
nni_list readq;
nni_list writeq;
@@ -23,26 +24,5 @@ struct nni_tcp_conn {
nni_tcp_dialer *dialer;
nni_reap_item reap;
};
-
-struct nni_tcp_dialer {
- nni_list connq; // pending connections
- bool closed;
- bool nodelay;
- bool keepalive;
- struct sockaddr_storage src;
- size_t srclen;
- nni_mtx mtx;
-};
-
-struct nni_tcp_listener {
- nni_posix_pfd *pfd;
- nni_list acceptq;
- bool started;
- bool closed;
- bool nodelay;
- bool keepalive;
- nni_mtx mtx;
-};
-
-extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *);
-extern void nni_posix_tcp_conn_start(nni_tcp_conn *, int, int);
+extern int nni_posix_tcp_init(nni_tcp_conn **, nni_posix_pfd *);
+extern void nni_posix_tcp_start(nni_tcp_conn *, int, int);
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index ef6ee8e3..0d3c274d 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -36,7 +36,7 @@
#include "posix_tcp.h"
static void
-tcp_conn_dowrite(nni_tcp_conn *c)
+tcp_dowrite(nni_tcp_conn *c)
{
nni_aio *aio;
int fd;
@@ -118,7 +118,7 @@ tcp_conn_dowrite(nni_tcp_conn *c)
}
static void
-tcp_conn_doread(nni_tcp_conn *c)
+tcp_doread(nni_tcp_conn *c)
{
nni_aio *aio;
int fd;
@@ -195,9 +195,10 @@ tcp_conn_doread(nni_tcp_conn *c)
}
}
-void
-nni_tcp_conn_close(nni_tcp_conn *c)
+static void
+tcp_close(void *arg)
{
+ nni_tcp_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
nni_aio *aio;
@@ -212,21 +213,44 @@ nni_tcp_conn_close(nni_tcp_conn *c)
nni_mtx_unlock(&c->mtx);
}
+// tcp_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+tcp_fini(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ tcp_close(c);
+ nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+ nni_mtx_fini(&c->mtx);
+
+ NNI_FREE_STRUCT(c);
+}
+
static void
-tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
+tcp_free(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ nni_reap(&c->reap, tcp_fini, arg);
+}
+
+static void
+tcp_cb(nni_posix_pfd *pfd, int events, void *arg)
{
nni_tcp_conn *c = arg;
if (events & (POLLHUP | POLLERR | POLLNVAL)) {
- nni_tcp_conn_close(c);
+ tcp_close(c);
return;
}
nni_mtx_lock(&c->mtx);
if (events & POLLIN) {
- tcp_conn_doread(c);
+ tcp_doread(c);
}
if (events & POLLOUT) {
- tcp_conn_dowrite(c);
+ tcp_dowrite(c);
}
events = 0;
if (!nni_list_empty(&c->writeq)) {
@@ -242,7 +266,7 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg)
}
static void
-tcp_conn_cancel(nni_aio *aio, void *arg, int rv)
+tcp_cancel(nni_aio *aio, void *arg, int rv)
{
nni_tcp_conn *c = arg;
@@ -254,18 +278,18 @@ tcp_conn_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_send(void *arg, nni_aio *aio)
{
-
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -273,7 +297,7 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_aio_list_append(&c->writeq, aio);
if (nni_list_first(&c->writeq) == aio) {
- tcp_conn_dowrite(c);
+ tcp_dowrite(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -284,17 +308,18 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_recv(void *arg, nni_aio *aio)
{
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);
- if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) {
+ if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
@@ -306,7 +331,7 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
// many cases. We also need not arm a list if it was already
// armed.
if (nni_list_first(&c->readq) == aio) {
- tcp_conn_doread(c);
+ tcp_doread(c);
// If we are still the first thing on the list, that
// means we didn't finish the job, so arm the poller to
// complete us.
@@ -317,59 +342,8 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-int
-nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (getpeername(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- struct sockaddr_storage ss;
- socklen_t sslen = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (getsockname(fd, (void *) &ss, &sslen) != 0) {
- return (nni_plat_errno(errno));
- }
- return (nni_posix_sockaddr2nn(sa, &ss));
-}
-
-int
-nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep)
-{
- int val = keep ? 1 : 0;
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
-int
-nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay)
-{
-
- int val = nodelay ? 1 : 0;
- int fd = nni_posix_pfd_fd(c->pfd);
-
- if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) {
- return (nni_plat_errno(errno));
- }
- return (0);
-}
-
static int
-tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn * c = arg;
struct sockaddr_storage ss;
@@ -388,7 +362,7 @@ tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn * c = arg;
struct sockaddr_storage ss;
@@ -407,7 +381,7 @@ tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
int fd;
@@ -427,7 +401,7 @@ tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
int fd;
@@ -447,7 +421,7 @@ tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
int fd = nni_posix_pfd_fd(c->pfd);
@@ -462,7 +436,7 @@ tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
int fd = nni_posix_pfd_fd(c->pfd);
@@ -476,46 +450,46 @@ tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_bool(val, buf, szp, t));
}
-static const nni_option tcp_conn_options[] = {
+static const nni_option tcp_options[] = {
{
.o_name = NNG_OPT_REMADDR,
- .o_get = tcp_conn_get_peername,
+ .o_get = tcp_get_peername,
},
{
.o_name = NNG_OPT_LOCADDR,
- .o_get = tcp_conn_get_sockname,
+ .o_get = tcp_get_sockname,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
- .o_get = tcp_conn_get_nodelay,
- .o_set = tcp_conn_set_nodelay,
+ .o_get = tcp_get_nodelay,
+ .o_set = tcp_set_nodelay,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
- .o_get = tcp_conn_get_keepalive,
- .o_set = tcp_conn_set_keepalive,
+ .o_get = tcp_get_keepalive,
+ .o_set = tcp_set_keepalive,
},
{
.o_name = NULL,
},
};
-int
-nni_tcp_conn_getopt(
- nni_tcp_conn *c, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tcp_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- return (nni_getopt(tcp_conn_options, name, c, buf, szp, t));
+ nni_tcp_conn *c = arg;
+ return (nni_getopt(tcp_options, name, c, buf, szp, t));
}
-int
-nni_tcp_conn_setopt(
- nni_tcp_conn *c, const char *name, const void *buf, size_t sz, nni_type t)
+static int
+tcp_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- return (nni_setopt(tcp_conn_options, name, c, buf, sz, t));
+ nni_tcp_conn *c = arg;
+ return (nni_setopt(tcp_options, name, c, buf, sz, t));
}
int
-nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
+nni_posix_tcp_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
{
nni_tcp_conn *c;
@@ -530,12 +504,19 @@ nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd)
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ c->stream.s_free = tcp_free;
+ c->stream.s_close = tcp_close;
+ c->stream.s_recv = tcp_recv;
+ c->stream.s_send = tcp_send;
+ c->stream.s_getx = tcp_getx;
+ c->stream.s_setx = tcp_setx;
+
*cp = c;
return (0);
}
void
-nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive)
+nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive)
{
// Configure the initial socket options.
(void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY,
@@ -543,18 +524,5 @@ nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive)
(void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE,
&keepalive, sizeof(int));
- nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c);
-}
-
-void
-nni_tcp_conn_fini(nni_tcp_conn *c)
-{
- nni_tcp_conn_close(c);
- nni_posix_pfd_fini(c->pfd);
- nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
- nni_mtx_fini(&c->mtx);
-
- NNI_FREE_STRUCT(c);
+ nni_posix_pfd_set_cb(c->pfd, tcp_cb, c);
}
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index cfb3482c..21ad862d 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -29,6 +29,16 @@
#include "posix_tcp.h"
+struct nni_tcp_dialer {
+ nni_list connq; // pending connections
+ bool closed;
+ bool nodelay;
+ bool keepalive;
+ struct sockaddr_storage src;
+ size_t srclen;
+ nni_mtx mtx;
+};
+
// Dialer stuff.
int
nni_tcp_dialer_init(nni_tcp_dialer **dp)
@@ -58,9 +68,8 @@ nni_tcp_dialer_close(nni_tcp_dialer *d)
if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) {
c->dial_aio = NULL;
nni_aio_set_prov_extra(aio, 0, NULL);
- nni_tcp_conn_close(c);
- nni_reap(
- &c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
}
@@ -94,7 +103,7 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->stream);
}
static void
@@ -142,13 +151,13 @@ tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
- nni_tcp_conn_close(c);
- nni_tcp_conn_fini(c);
+ nng_stream_close(&c->stream);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
}
- nni_posix_tcp_conn_start(c, nd, ka);
+ nni_posix_tcp_start(c, nd, ka);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -156,7 +165,7 @@ tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg)
// We don't give local address binding support. Outbound dialers always
// get an ephemeral port.
void
-nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+nni_tcp_dial(nni_tcp_dialer *d, nni_aio *aio)
{
nni_tcp_conn * c;
nni_posix_pfd * pfd = NULL;
@@ -166,12 +175,14 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
int rv;
int ka;
int nd;
+ nng_sockaddr sa;
if (nni_aio_begin(aio) != 0) {
return;
}
- if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
+ nni_aio_get_sockaddr(aio, &sa);
+ if (((sslen = nni_posix_nn2sockaddr(&ss, &sa)) == 0) ||
((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
@@ -189,7 +200,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_finish_error(aio, rv);
return;
}
- if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_tcp_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_finish_error(aio, rv);
return;
@@ -232,7 +243,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nd = d->nodelay ? 1 : 0;
ka = d->keepalive ? 1 : 0;
nni_mtx_unlock(&d->mtx);
- nni_posix_tcp_conn_start(c, nd, ka);
+ nni_posix_tcp_start(c, nd, ka);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
return;
@@ -240,7 +251,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&d->mtx);
- nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c);
+ nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index 1e1b84b1..1edeccbc 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -29,6 +29,16 @@
#include "posix_tcp.h"
+struct nni_tcp_listener {
+ nni_posix_pfd *pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ bool nodelay;
+ bool keepalive;
+ nni_mtx mtx;
+};
+
int
nni_tcp_listener_init(nni_tcp_listener **lp)
{
@@ -133,7 +143,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
continue;
}
- if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) {
+ if ((rv = nni_posix_tcp_init(&c, pfd)) != 0) {
nni_posix_pfd_fini(pfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -143,7 +153,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
ka = l->keepalive ? 1 : 0;
nd = l->nodelay ? 1 : 0;
nni_aio_list_remove(aio);
- nni_posix_tcp_conn_start(c, nd, ka);
+ nni_posix_tcp_start(c, nd, ka);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
diff --git a/src/platform/windows/win_ipc.h b/src/platform/windows/win_ipc.h
index e8e83957..d410b980 100644
--- a/src/platform/windows/win_ipc.h
+++ b/src/platform/windows/win_ipc.h
@@ -1,7 +1,7 @@
//
// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -19,45 +19,6 @@
#define IPC_PIPE_PREFIX "\\\\.\\pipe\\"
-struct nni_ipc_conn {
- HANDLE f;
- nni_win_io recv_io;
- nni_win_io send_io;
- nni_win_io conn_io;
- nni_list recv_aios;
- nni_list send_aios;
- nni_aio * conn_aio;
- nng_sockaddr sa;
- bool dialer;
- int recv_rv;
- int send_rv;
- int conn_rv;
- bool closed;
- nni_mtx mtx;
- nni_cv cv;
- nni_reap_item reap;
-};
-
-struct nni_ipc_dialer {
- bool closed; // dialers are locked by the worker lock
- nni_list aios;
- nni_list_node node; // node on worker list
-};
-
-struct nni_ipc_listener {
- char * path;
- bool started;
- bool closed;
- HANDLE f;
- SECURITY_ATTRIBUTES sec_attr;
- nni_list aios;
- nni_mtx mtx;
- nni_cv cv;
- nni_win_io io;
- nni_sockaddr sa;
- int rv;
-};
-
-extern int nni_win_ipc_conn_init(nni_ipc_conn **, HANDLE);
+extern int nni_win_ipc_init(nng_stream **, HANDLE, const nng_sockaddr *, bool);
#endif // NNG_PLATFORM_WIN_WINIPC_H
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index ded9ed76..4d267dd9 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -15,10 +15,30 @@
#include <stdio.h>
-#define CONN(c) ((nni_ipc_conn *) (c))
+#define CONN(c) ((ipc_conn *) (c))
+
+typedef struct ipc_conn {
+ nng_stream stream;
+ HANDLE f;
+ nni_win_io recv_io;
+ nni_win_io send_io;
+ nni_win_io conn_io;
+ nni_list recv_aios;
+ nni_list send_aios;
+ nni_aio * conn_aio;
+ nng_sockaddr sa;
+ bool dialer;
+ int recv_rv;
+ int send_rv;
+ int conn_rv;
+ bool closed;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_reap_item reap;
+} ipc_conn;
static void
-ipc_recv_start(nni_ipc_conn *c)
+ipc_recv_start(ipc_conn *c)
{
nni_aio *aio;
unsigned idx;
@@ -75,8 +95,8 @@ again:
static void
ipc_recv_cb(nni_win_io *io, int rv, size_t num)
{
- nni_aio * aio;
- nni_ipc_conn *c = io->ptr;
+ nni_aio * aio;
+ ipc_conn *c = io->ptr;
nni_mtx_lock(&c->mtx);
if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
// Should indicate that it was closed.
@@ -103,7 +123,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num)
static void
ipc_recv_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_conn *c = arg;
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->recv_aios)) {
c->recv_rv = rv;
@@ -116,10 +136,11 @@ ipc_recv_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
+static void
+ipc_recv(void *arg, nni_aio *aio)
{
- int rv;
+ ipc_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -143,7 +164,7 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio)
}
static void
-ipc_send_start(nni_ipc_conn *c)
+ipc_send_start(ipc_conn *c)
{
nni_aio *aio;
unsigned idx;
@@ -200,8 +221,8 @@ again:
static void
ipc_send_cb(nni_win_io *io, int rv, size_t num)
{
- nni_aio * aio;
- nni_ipc_conn *c = io->ptr;
+ nni_aio * aio;
+ ipc_conn *c = io->ptr;
nni_mtx_lock(&c->mtx);
if ((aio = nni_list_first(&c->send_aios)) == NULL) {
// Should indicate that it was closed.
@@ -229,7 +250,7 @@ ipc_send_cb(nni_win_io *io, int rv, size_t num)
static void
ipc_send_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_conn *c = arg;
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (aio == nni_list_first(&c->send_aios)) {
c->send_rv = rv;
@@ -242,10 +263,11 @@ ipc_send_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
+static void
+ipc_send(void *arg, nni_aio *aio)
{
- int rv;
+ ipc_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -268,35 +290,10 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-int
-nni_win_ipc_conn_init(nni_ipc_conn **connp, HANDLE p)
-{
- nni_ipc_conn *c;
- int rv;
-
- if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
- return (NNG_ENOMEM);
- }
- c->f = INVALID_HANDLE_VALUE;
- nni_mtx_init(&c->mtx);
- nni_cv_init(&c->cv, &c->mtx);
- nni_aio_list_init(&c->recv_aios);
- nni_aio_list_init(&c->send_aios);
-
- if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
- ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
- nni_ipc_conn_fini(c);
- return (rv);
- }
-
- c->f = p;
- *connp = c;
- return (0);
-}
-
-void
-nni_ipc_conn_close(nni_ipc_conn *c)
+static void
+ipc_close(void *arg)
{
+ ipc_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
c->closed = true;
@@ -316,7 +313,7 @@ nni_ipc_conn_close(nni_ipc_conn *c)
}
static void
-ipc_conn_reap(nni_ipc_conn *c)
+ipc_conn_reap(ipc_conn *c)
{
nni_mtx_lock(&c->mtx);
while ((!nni_list_empty(&c->recv_aios)) ||
@@ -337,10 +334,11 @@ ipc_conn_reap(nni_ipc_conn *c)
NNI_FREE_STRUCT(c);
}
-void
-nni_ipc_conn_fini(nni_ipc_conn *c)
+static void
+ipc_free(void *arg)
{
- nni_ipc_conn_close(c);
+ ipc_conn *c = arg;
+ ipc_close(c);
nni_reap(&c->reap, (nni_cb) ipc_conn_reap, CONN(c));
}
@@ -386,16 +384,51 @@ static const nni_option ipc_conn_options[] = {
},
};
-int
-nni_ipc_conn_setopt(nni_ipc_conn *c, const char *name, const void *val,
- size_t sz, nni_opt_type t)
+static int
+ipc_setx(void *arg, const char *nm, const void *val, size_t sz, nni_opt_type t)
+{
+ ipc_conn *c = arg;
+ return (nni_setopt(ipc_conn_options, nm, c, val, sz, t));
+}
+
+static int
+ipc_getx(void *arg, const char *nm, void *val, size_t *szp, nni_opt_type t)
{
- return (nni_setopt(ipc_conn_options, name, c, val, sz, t));
+ ipc_conn *c = arg;
+ return (nni_getopt(ipc_conn_options, nm, c, val, szp, t));
}
int
-nni_ipc_conn_getopt(
- nni_ipc_conn *c, const char *name, void *val, size_t *szp, nni_opt_type t)
+nni_win_ipc_init(
+ nng_stream **connp, HANDLE p, const nng_sockaddr *sa, bool dialer)
{
- return (nni_getopt(ipc_conn_options, name, c, val, szp, t));
+ ipc_conn *c;
+ int rv;
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->f = INVALID_HANDLE_VALUE;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+ c->dialer = dialer;
+ c->sa = *sa;
+ c->stream.s_free = ipc_free;
+ c->stream.s_close = ipc_close;
+ c->stream.s_send = ipc_send;
+ c->stream.s_recv = ipc_recv;
+ c->stream.s_getx = ipc_getx;
+ c->stream.s_setx = ipc_setx;
+
+ if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
+ ipc_free(c);
+ return (rv);
+ }
+
+ c->f = p;
+ *connp = (void *) c;
+ return (0);
}
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
index 98d848ae..be2a82b3 100644
--- a/src/platform/windows/win_ipcdial.c
+++ b/src/platform/windows/win_ipcdial.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -15,19 +15,14 @@
#include <stdio.h>
-int
-nni_ipc_dialer_init(nni_ipc_dialer **dp)
-{
- nni_ipc_dialer *d;
-
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
- d->closed = false;
- nni_aio_list_init(&d->aios);
- *dp = d;
- return (0);
-}
+typedef struct ipc_dialer {
+ nng_stream_dialer sd;
+ bool closed; // dialers are locked by the worker lock
+ nni_list aios;
+ nni_list_node node; // node on worker list
+ char * path;
+ nni_sockaddr sa;
+} ipc_dialer;
// Windows IPC is a bit different on the client side. There is no
// support for asynchronous connection, but we can fake it with a
@@ -52,7 +47,7 @@ ipc_dial_thr(void *arg)
nni_mtx_lock(&w->mtx);
for (;;) {
- nni_ipc_dialer *d;
+ ipc_dialer *d;
if (w->exit) {
break;
@@ -63,21 +58,19 @@ ipc_dial_thr(void *arg)
}
while ((d = nni_list_first(&w->workers)) != NULL) {
- nni_ipc_conn *c;
- nni_aio * aio;
- HANDLE f;
- int rv;
- char * path;
+ nng_stream *c;
+ nni_aio * aio;
+ HANDLE f;
+ int rv;
if ((aio = nni_list_first(&d->aios)) == NULL) {
nni_list_remove(&w->workers, d);
continue;
}
- path = nni_aio_get_prov_extra(aio, 0);
-
- f = CreateFileA(path, GENERIC_READ | GENERIC_WRITE, 0,
- NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+ f = CreateFileA(d->path, GENERIC_READ | GENERIC_WRITE,
+ 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED,
+ NULL);
if (f == INVALID_HANDLE_VALUE) {
switch ((rv = GetLastError())) {
@@ -99,29 +92,20 @@ ipc_dial_thr(void *arg)
break;
}
nni_list_remove(&d->aios, aio);
- nni_aio_set_prov_extra(aio, 0, NULL);
- nni_strfree(path);
nni_aio_finish_error(aio, rv);
continue;
}
nni_list_remove(&d->aios, aio);
- nni_aio_set_prov_extra(aio, 0, NULL);
if (((rv = nni_win_io_register(f)) != 0) ||
- ((rv = nni_win_ipc_conn_init(&c, f)) != 0)) {
+ ((rv = nni_win_ipc_init(&c, f, &d->sa, true)) !=
+ 0)) {
DisconnectNamedPipe(f);
CloseHandle(f);
nni_aio_finish_error(aio, rv);
- nni_strfree(path);
continue;
}
- c->dialer = true;
- c->sa.s_ipc.sa_family = NNG_AF_IPC;
- snprintf(c->sa.s_ipc.sa_path,
- sizeof(c->sa.s_ipc.sa_path), "%s",
- path + strlen(IPC_PIPE_PREFIX));
- nni_strfree(path);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -140,27 +124,23 @@ ipc_dial_thr(void *arg)
static void
ipc_dial_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_dialer *d = arg;
- ipc_dial_work * w = &ipc_connecter;
+ ipc_dialer * d = arg;
+ ipc_dial_work *w = &ipc_connecter;
nni_mtx_lock(&w->mtx);
if (nni_aio_list_active(aio)) {
- char *path;
if (nni_list_active(&w->waiters, d)) {
nni_list_remove(&w->waiters, d);
nni_cv_wake(&w->cv);
}
nni_aio_list_remove(aio);
- path = nni_aio_get_prov_extra(aio, 0);
- nni_aio_set_prov_extra(aio, 0, NULL);
- nni_strfree(path);
nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&w->mtx);
}
-void
-nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+static void
+ipc_dialer_dial(ipc_dialer *d, nni_aio *aio)
{
ipc_dial_work *w = &ipc_connecter;
char * path;
@@ -169,12 +149,8 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
if (nni_aio_begin(aio) != 0) {
return;
}
- if (sa->s_family != NNG_AF_IPC) {
- nni_aio_finish_error(aio, NNG_EADDRINVAL);
- return;
- }
if ((rv = nni_asprintf(
- &path, IPC_PIPE_PREFIX "%s", sa->s_ipc.sa_path)) != 0) {
+ &path, IPC_PIPE_PREFIX "%s", d->sa.s_ipc.sa_path)) != 0) {
nni_aio_finish_error(aio, rv);
return;
}
@@ -182,19 +158,16 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_mtx_lock(&w->mtx);
if ((rv = nni_aio_schedule(aio, ipc_dial_cancel, d)) != 0) {
nni_mtx_unlock(&w->mtx);
- nni_strfree(path);
nni_aio_finish_error(aio, rv);
return;
}
if (d->closed) {
nni_mtx_unlock(&w->mtx);
- nni_strfree(path);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
- nni_aio_set_prov_extra(aio, 0, path);
nni_list_append(&d->aios, aio);
if (nni_list_first(&d->aios) == aio) {
nni_list_append(&w->waiters, d);
@@ -203,16 +176,10 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_mtx_unlock(&w->mtx);
}
-void
-nni_ipc_dialer_fini(nni_ipc_dialer *d)
-{
- nni_ipc_dialer_close(d);
- NNI_FREE_STRUCT(d);
-}
-
-void
-nni_ipc_dialer_close(nni_ipc_dialer *d)
+static void
+ipc_dialer_close(void *arg)
{
+ ipc_dialer * d = arg;
ipc_dial_work *w = &ipc_connecter;
nni_aio * aio;
@@ -228,6 +195,17 @@ nni_ipc_dialer_close(nni_ipc_dialer *d)
nni_mtx_unlock(&w->mtx);
}
+static void
+ipc_dialer_free(void *arg)
+{
+ ipc_dialer *d = arg;
+ ipc_dialer_close(d);
+ if (d->path) {
+ nni_strfree(d->path);
+ }
+ NNI_FREE_STRUCT(d);
+}
+
static const nni_option ipc_dialer_options[] = {
{
.o_name = NULL,
@@ -235,17 +213,50 @@ static const nni_option ipc_dialer_options[] = {
};
int
-nni_ipc_dialer_setopt(nni_ipc_dialer *d, const char *name, const void *buf,
- size_t sz, nni_type t)
+ipc_dialer_setx(
+ void *arg, const char *nm, const void *buf, size_t sz, nni_type t)
+{
+ ipc_dialer *d = arg;
+ return (nni_setopt(ipc_dialer_options, nm, d, buf, sz, t));
+}
+
+int
+ipc_dialer_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t)
{
- return (nni_setopt(ipc_dialer_options, name, d, buf, sz, t));
+ ipc_dialer *d = arg;
+ return (nni_getopt(ipc_dialer_options, nm, d, buf, szp, t));
}
int
-nni_ipc_dialer_getopt(
- nni_ipc_dialer *d, const char *name, void *buf, size_t *szp, nni_type t)
+nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
{
- return (nni_getopt(ipc_dialer_options, name, d, buf, szp, t));
+ ipc_dialer *d;
+ int rv;
+
+ if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) ||
+ (strlen(url->u_path) == 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ if ((rv = nni_asprintf(&d->path, IPC_PIPE_PREFIX "%s", url->u_path)) !=
+ 0) {
+ NNI_FREE_STRUCT(d);
+ return (rv);
+ }
+ snprintf(d->sa.s_ipc.sa_path, NNG_MAXADDRLEN, "%s", url->u_path);
+ d->sa.s_ipc.sa_family = NNG_AF_IPC;
+ d->closed = false;
+ d->sd.sd_free = ipc_dialer_free;
+ d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_dial = ipc_dialer_dial;
+ d->sd.sd_getx = ipc_dialer_getx;
+ d->sd.sd_setx = ipc_dialer_setx;
+ nni_aio_list_init(&d->aios);
+ *dp = (void *) d;
+ return (0);
}
int
@@ -254,8 +265,8 @@ nni_win_ipc_sysinit(void)
int rv;
ipc_dial_work *worker = &ipc_connecter;
- NNI_LIST_INIT(&worker->workers, nni_ipc_dialer, node);
- NNI_LIST_INIT(&worker->waiters, nni_ipc_dialer, node);
+ NNI_LIST_INIT(&worker->workers, ipc_dialer, node);
+ NNI_LIST_INIT(&worker->waiters, ipc_dialer, node);
nni_mtx_init(&worker->mtx);
nni_cv_init(&worker->cv, &worker->mtx);
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
index 4b3660ec..a3922d06 100644
--- a/src/platform/windows/win_ipclisten.c
+++ b/src/platform/windows/win_ipclisten.c
@@ -1,7 +1,7 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -15,12 +15,27 @@
#include <stdio.h>
+typedef struct {
+ nng_stream_listener sl;
+ char * path;
+ bool started;
+ bool closed;
+ HANDLE f;
+ SECURITY_ATTRIBUTES sec_attr;
+ nni_list aios;
+ nni_mtx mtx;
+ nni_cv cv;
+ nni_win_io io;
+ nni_sockaddr sa;
+ int rv;
+} ipc_listener;
+
static void
-ipc_accept_done(nni_ipc_listener *l, int rv)
+ipc_accept_done(ipc_listener *l, int rv)
{
- nni_aio * aio;
- HANDLE f;
- nni_ipc_conn *c;
+ nni_aio * aio;
+ HANDLE f;
+ nng_stream *c;
aio = nni_list_first(&l->aios);
nni_list_remove(&l->aios, aio);
@@ -50,24 +65,21 @@ ipc_accept_done(nni_ipc_listener *l, int rv)
}
if (((rv = nni_win_io_register(f)) != 0) ||
- ((rv = nni_win_ipc_conn_init(&c, l->f)) != 0)) {
+ ((rv = nni_win_ipc_init(&c, l->f, &l->sa, false)) != 0)) {
DisconnectNamedPipe(l->f);
DisconnectNamedPipe(f);
CloseHandle(f);
nni_aio_finish_error(aio, rv);
return;
}
- l->f = f;
- c->sa.s_ipc.sa_family = NNG_AF_IPC;
- snprintf(c->sa.s_ipc.sa_path, sizeof(c->sa.s_ipc.sa_path), "%s",
- l->path + strlen(IPC_PIPE_PREFIX));
- c->dialer = false;
+ // Install the replacement pipe.
+ l->f = f;
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
static void
-ipc_accept_start(nni_ipc_listener *l)
+ipc_accept_start(ipc_listener *l)
{
nni_aio *aio;
@@ -102,7 +114,7 @@ ipc_accept_start(nni_ipc_listener *l)
static void
ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
{
- nni_ipc_listener *l = io->ptr;
+ ipc_listener *l = io->ptr;
NNI_ARG_UNUSED(cnt);
@@ -122,37 +134,12 @@ ipc_accept_cb(nni_win_io *io, int rv, size_t cnt)
nni_mtx_unlock(&l->mtx);
}
-int
-nni_ipc_listener_init(nni_ipc_listener **lp)
-{
- nni_ipc_listener *l;
- int rv;
-
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
- NNI_FREE_STRUCT(l);
- return (rv);
- }
- l->started = false;
- l->closed = false;
- l->sec_attr.nLength = sizeof(l->sec_attr);
- l->sec_attr.lpSecurityDescriptor = NULL;
- l->sec_attr.bInheritHandle = FALSE;
- nni_aio_list_init(&l->aios);
- nni_mtx_init(&l->mtx);
- nni_cv_init(&l->cv, &l->mtx);
- *lp = l;
- return (0);
-}
-
static int
ipc_listener_set_sec_desc(void *arg, const void *buf, size_t sz, nni_type t)
{
- nni_ipc_listener *l = arg;
- void * desc;
- int rv;
+ ipc_listener *l = arg;
+ void * desc;
+ int rv;
if ((rv = nni_copyin_ptr(&desc, buf, sz, t)) != 0) {
return (rv);
@@ -160,22 +147,20 @@ ipc_listener_set_sec_desc(void *arg, const void *buf, size_t sz, nni_type t)
if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
return (NNG_EINVAL);
}
- if (l != NULL) {
- nni_mtx_lock(&l->mtx);
- if (l->started) {
- nni_mtx_unlock(&l->mtx);
- return (NNG_EBUSY);
- }
- l->sec_attr.lpSecurityDescriptor = desc;
+ nni_mtx_lock(&l->mtx);
+ if (l->started) {
nni_mtx_unlock(&l->mtx);
+ return (NNG_EBUSY);
}
+ l->sec_attr.lpSecurityDescriptor = desc;
+ nni_mtx_unlock(&l->mtx);
return (0);
}
static int
ipc_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
return ((nni_copyout_sockaddr(&l->sa, buf, szp, t)));
}
@@ -194,25 +179,28 @@ static const nni_option ipc_listener_options[] = {
};
int
-nni_ipc_listener_setopt(nni_ipc_listener *l, const char *name, const void *buf,
- size_t sz, nni_type t)
+ipc_listener_setx(
+ void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
+ ipc_listener *l = arg;
return (nni_setopt(ipc_listener_options, name, l, buf, sz, t));
}
int
-nni_ipc_listener_getopt(
- nni_ipc_listener *l, const char *name, void *buf, size_t *szp, nni_type t)
+ipc_listener_getx(
+ void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
+ ipc_listener *l = arg;
return (nni_getopt(ipc_listener_options, name, l, buf, szp, t));
}
-int
-nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
+static int
+ipc_listener_listen(void *arg)
{
- int rv;
- HANDLE f;
- char * path;
+ ipc_listener *l = arg;
+ int rv;
+ HANDLE f;
+ char * path;
nni_mtx_lock(&l->mtx);
if (l->started) {
@@ -223,7 +211,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
nni_mtx_unlock(&l->mtx);
return (NNG_ECLOSED);
}
- rv = nni_asprintf(&path, IPC_PIPE_PREFIX "%s", sa->s_ipc.sa_path);
+ rv = nni_asprintf(&path, IPC_PIPE_PREFIX "%s", l->sa.s_ipc.sa_path);
if (rv != 0) {
nni_mtx_unlock(&l->mtx);
return (rv);
@@ -255,7 +243,6 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
l->f = f;
l->path = path;
l->started = true;
- l->sa = *sa;
nni_mtx_unlock(&l->mtx);
return (0);
}
@@ -263,7 +250,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa)
static void
ipc_accept_cancel(nni_aio *aio, void *arg, int rv)
{
- nni_ipc_listener *l = arg;
+ ipc_listener *l = arg;
nni_mtx_unlock(&l->mtx);
if (aio == nni_list_first(&l->aios)) {
@@ -277,9 +264,10 @@ ipc_accept_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
+static void
+ipc_listener_accept(void *arg, nni_aio *aio)
{
+ ipc_listener *l = arg;
if (nni_aio_begin(aio) != 0) {
return;
}
@@ -301,9 +289,10 @@ nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ipc_listener_close(nni_ipc_listener *l)
+static void
+ipc_listener_close(void *arg)
{
+ ipc_listener *l = arg;
nni_mtx_lock(&l->mtx);
if (!l->closed) {
l->closed = true;
@@ -316,9 +305,10 @@ nni_ipc_listener_close(nni_ipc_listener *l)
nni_mtx_unlock(&l->mtx);
}
-void
-nni_ipc_listener_fini(nni_ipc_listener *l)
+static void
+ipc_listener_free(void *arg)
{
+ ipc_listener *l = arg;
nni_mtx_lock(&l->mtx);
while (!nni_list_empty(&l->aios)) {
nni_cv_wait(&l->cv);
@@ -330,3 +320,72 @@ nni_ipc_listener_fini(nni_ipc_listener *l)
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
+
+int
+nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
+{
+ ipc_listener *l;
+ int rv;
+
+ if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) ||
+ (strlen(url->u_path) == 0)) {
+ return (NNG_EADDRINVAL);
+ }
+ if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
+ NNI_FREE_STRUCT(l);
+ return (rv);
+ }
+ l->started = false;
+ l->closed = false;
+ l->sec_attr.nLength = sizeof(l->sec_attr);
+ l->sec_attr.lpSecurityDescriptor = NULL;
+ l->sec_attr.bInheritHandle = FALSE;
+ l->sa.s_ipc.sa_family = NNG_AF_IPC;
+ l->sl.sl_free = ipc_listener_free;
+ l->sl.sl_close = ipc_listener_close;
+ l->sl.sl_listen = ipc_listener_listen;
+ l->sl.sl_accept = ipc_listener_accept;
+ l->sl.sl_getx = ipc_listener_getx;
+ l->sl.sl_setx = ipc_listener_setx;
+ snprintf(l->sa.s_ipc.sa_path, NNG_MAXADDRLEN, "%s", url->u_path);
+ nni_aio_list_init(&l->aios);
+ nni_mtx_init(&l->mtx);
+ nni_cv_init(&l->cv, &l->mtx);
+ *lp = (void *) l;
+ return (0);
+}
+
+static int
+ipc_check_sec_desc(const void *buf, size_t sz, nni_type t)
+{
+ void *desc;
+ int rv;
+
+ if ((rv = nni_copyin_ptr(&desc, buf, sz, t)) != 0) {
+ return (rv);
+ }
+ if (!IsValidSecurityDescriptor((SECURITY_DESCRIPTOR *) desc)) {
+ return (NNG_EINVAL);
+ }
+
+ return (0);
+}
+
+static const nni_chkoption ipc_chkopts[] = {
+ {
+ .o_name = NNG_OPT_IPC_SECURITY_DESCRIPTOR,
+ .o_check = ipc_check_sec_desc,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+int
+nni_ipc_checkopt(const char *name, const void *data, size_t sz, nni_type t)
+{
+ return (nni_chkopt(ipc_chkopts, name, data, sz, t));
+}
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index d361a1e8..e01dba3b 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -134,25 +134,27 @@ resolv_task(resolv_item *item)
}
}
- if (probe != NULL) {
+ if ((probe != NULL) && (item->aio != NULL)) {
struct sockaddr_in * sin;
struct sockaddr_in6 *sin6;
- nni_sockaddr * sa = &item->sa;
+ nni_sockaddr sa;
switch (probe->ai_addr->sa_family) {
case AF_INET:
- rv = 0;
- sin = (void *) probe->ai_addr;
- sa->s_in.sa_family = NNG_AF_INET;
- sa->s_in.sa_port = item->port;
- sa->s_in.sa_addr = sin->sin_addr.s_addr;
+ rv = 0;
+ sin = (void *) probe->ai_addr;
+ sa.s_in.sa_family = NNG_AF_INET;
+ sa.s_in.sa_port = item->port;
+ sa.s_in.sa_addr = sin->sin_addr.s_addr;
+ nni_aio_set_sockaddr(item->aio, &sa);
break;
case AF_INET6:
- rv = 0;
- sin6 = (void *) probe->ai_addr;
- sa->s_in6.sa_family = NNG_AF_INET6;
- sa->s_in6.sa_port = item->port;
- memcpy(sa->s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
+ rv = 0;
+ sin6 = (void *) probe->ai_addr;
+ sa.s_in6.sa_family = NNG_AF_INET6;
+ sa.s_in6.sa_port = item->port;
+ memcpy(sa.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16);
+ nni_aio_set_sockaddr(item->aio, &sa);
break;
}
}
@@ -294,10 +296,9 @@ resolv_worker(void *notused)
// Check to make sure we were not canceled.
if ((aio = item->aio) != NULL) {
- nng_sockaddr *sa = nni_aio_get_input(aio, 0);
nni_aio_set_prov_extra(aio, 0, NULL);
item->aio = NULL;
- memcpy(sa, &item->sa, sizeof(*sa));
+
nni_aio_finish(aio, rv, 0);
NNI_FREE_STRUCT(item);
diff --git a/src/platform/windows/win_tcp.h b/src/platform/windows/win_tcp.h
index 1b34aa29..b37b2353 100644
--- a/src/platform/windows/win_tcp.h
+++ b/src/platform/windows/win_tcp.h
@@ -1,7 +1,7 @@
//
// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
-// Copyright 2018 Devolutions <info@devolutions.net>
+// Copyright 2019 Devolutions <info@devolutions.net>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,6 +17,7 @@
#include "core/nng_impl.h"
struct nni_tcp_conn {
+ nng_stream ops;
SOCKET s;
nni_win_io recv_io;
nni_win_io send_io;
@@ -37,18 +38,6 @@ struct nni_tcp_conn {
nni_cv cv;
};
-struct nni_tcp_dialer {
- LPFN_CONNECTEX connectex; // looked up name via ioctl
- nni_list aios; // in flight connections
- bool closed;
- bool nodelay; // initial value for child conns
- bool keepalive; // initial value for child conns
- SOCKADDR_STORAGE src;
- size_t srclen;
- nni_mtx mtx;
- nni_reap_item reap;
-};
-
struct nni_tcp_listener {
SOCKET s;
nni_list aios;
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index 54d22dea..c77bbc72 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -110,8 +110,8 @@ tcp_recv_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&c->mtx);
}
-void
-nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_recv(nni_tcp_conn *c, nni_aio *aio)
{
int rv;
@@ -225,10 +225,11 @@ tcp_send_cb(nni_win_io *io, int rv, size_t num)
nni_aio_finish_synch(aio, rv, num);
}
-void
-nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
+static void
+tcp_send(void *arg, nni_aio *aio)
{
- int rv;
+ nni_tcp_conn *c = arg;
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
@@ -251,49 +252,10 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio)
nni_mtx_unlock(&c->mtx);
}
-int
-nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
-{
- nni_tcp_conn *c;
- int rv;
- BOOL yes;
- DWORD no;
-
- // Don't inherit the handle (CLOEXEC really).
- SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
-
- if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
- return (NNG_ENOMEM);
- }
- c->s = INVALID_SOCKET;
- nni_mtx_init(&c->mtx);
- nni_cv_init(&c->cv, &c->mtx);
- nni_aio_list_init(&c->recv_aios);
- nni_aio_list_init(&c->send_aios);
- c->conn_aio = NULL;
-
- if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
- ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
- ((rv = nni_win_io_register((HANDLE) s)) != 0)) {
- nni_tcp_conn_fini(c);
- return (rv);
- }
-
- no = 0;
- (void) setsockopt(
- s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
- yes = 1;
- (void) setsockopt(
- s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes));
-
- c->s = s;
- *connp = c;
- return (0);
-}
-
-void
-nni_tcp_conn_close(nni_tcp_conn *c)
+static void
+tcp_close(void *arg)
{
+ nni_tcp_conn *c = arg;
nni_mtx_lock(&c->mtx);
if (!c->closed) {
c->closed = true;
@@ -310,50 +272,8 @@ nni_tcp_conn_close(nni_tcp_conn *c)
nni_mtx_unlock(&c->mtx);
}
-int
-nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- if (nni_win_sockaddr2nn(sa, &c->peername) < 0) {
- return (NNG_EADDRINVAL);
- }
- return (0);
-}
-
-int
-nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa)
-{
- if (nni_win_sockaddr2nn(sa, &c->sockname) < 0) {
- return (NNG_EADDRINVAL);
- }
- return (0);
-}
-
-int
-nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool val)
-{
- BOOL b;
- b = val ? TRUE : FALSE;
- if (setsockopt(
- c->s, IPPROTO_TCP, TCP_NODELAY, (void *) &b, sizeof(b)) != 0) {
- return (nni_win_error(WSAGetLastError()));
- }
- return (0);
-}
-
-int
-nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool val)
-{
- BOOL b;
- b = val ? TRUE : FALSE;
- if (setsockopt(
- c->s, SOL_SOCKET, SO_KEEPALIVE, (void *) &b, sizeof(b)) != 0) {
- return (nni_win_error(WSAGetLastError()));
- }
- return (0);
-}
-
static int
-tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
nng_sockaddr sa;
@@ -365,7 +285,7 @@ tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
nng_sockaddr sa;
@@ -377,7 +297,7 @@ tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
bool val;
@@ -395,7 +315,7 @@ tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
+tcp_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
{
nni_tcp_conn *c = arg;
bool val;
@@ -414,7 +334,7 @@ tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t)
}
static int
-tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
BOOL b = 0;
@@ -428,7 +348,7 @@ tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
}
static int
-tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
+tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
BOOL b = 0;
@@ -441,48 +361,49 @@ tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_bool(b, buf, szp, t));
}
-static const nni_option tcp_conn_options[] = {
+static const nni_option tcp_options[] = {
{
.o_name = NNG_OPT_REMADDR,
- .o_get = tcp_conn_get_peername,
+ .o_get = tcp_get_peername,
},
{
.o_name = NNG_OPT_LOCADDR,
- .o_get = tcp_conn_get_sockname,
+ .o_get = tcp_get_sockname,
},
{
.o_name = NNG_OPT_TCP_NODELAY,
- .o_get = tcp_conn_get_nodelay,
- .o_set = tcp_conn_set_nodelay,
+ .o_get = tcp_get_nodelay,
+ .o_set = tcp_set_nodelay,
},
{
.o_name = NNG_OPT_TCP_KEEPALIVE,
- .o_get = tcp_conn_get_keepalive,
- .o_set = tcp_conn_set_keepalive,
+ .o_get = tcp_get_keepalive,
+ .o_set = tcp_set_keepalive,
},
{
.o_name = NULL,
},
};
-int
-nni_tcp_conn_getopt(
- nni_tcp_conn *c, const char *name, void *buf, size_t *szp, nni_type t)
+static int
+tcp_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
{
- return (nni_getopt(tcp_conn_options, name, c, buf, szp, t));
+ nni_tcp_conn *c = arg;
+ return (nni_getopt(tcp_options, name, c, buf, szp, t));
}
-int
-nni_tcp_conn_setopt(
- nni_tcp_conn *c, const char *name, const void *buf, size_t sz, nni_type t)
+static int
+tcp_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
{
- return (nni_setopt(tcp_conn_options, name, c, buf, sz, t));
+ nni_tcp_conn *c = arg;
+ return (nni_setopt(tcp_options, name, c, buf, sz, t));
}
-void
-nni_tcp_conn_fini(nni_tcp_conn *c)
+static void
+tcp_free(void *arg)
{
- nni_tcp_conn_close(c);
+ nni_tcp_conn *c = arg;
+ tcp_close(c);
nni_mtx_lock(&c->mtx);
while ((!nni_list_empty(&c->recv_aios)) ||
@@ -502,3 +423,49 @@ nni_tcp_conn_fini(nni_tcp_conn *c)
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
+
+int
+nni_win_tcp_conn_init(nni_tcp_conn **connp, SOCKET s)
+{
+ nni_tcp_conn *c;
+ int rv;
+ BOOL yes;
+ DWORD no;
+
+ // Don't inherit the handle (CLOEXEC really).
+ SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
+
+ if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ c->s = INVALID_SOCKET;
+ nni_mtx_init(&c->mtx);
+ nni_cv_init(&c->cv, &c->mtx);
+ nni_aio_list_init(&c->recv_aios);
+ nni_aio_list_init(&c->send_aios);
+ c->conn_aio = NULL;
+ c->ops.s_close = tcp_close;
+ c->ops.s_free = tcp_free;
+ c->ops.s_send = tcp_send;
+ c->ops.s_recv = tcp_recv;
+ c->ops.s_getx = tcp_getx;
+ c->ops.s_setx = tcp_setx;
+
+ if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
+ ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
+ ((rv = nni_win_io_register((HANDLE) s)) != 0)) {
+ tcp_free(c);
+ return (rv);
+ }
+
+ no = 0;
+ (void) setsockopt(
+ s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
+ yes = 1;
+ (void) setsockopt(
+ s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes));
+
+ c->s = s;
+ *connp = c;
+ return (0);
+}
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 64b4e800..6bb3d92a 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -16,6 +16,18 @@
#include <malloc.h>
#include <stdio.h>
+struct nni_tcp_dialer {
+ LPFN_CONNECTEX connectex; // looked up name via ioctl
+ nni_list aios; // in flight connections
+ bool closed;
+ bool nodelay; // initial value for child conns
+ bool keepalive; // initial value for child conns
+ SOCKADDR_STORAGE src; // source address
+ size_t srclen;
+ nni_mtx mtx;
+ nni_reap_item reap;
+};
+
int
nni_tcp_dialer_init(nni_tcp_dialer **dp)
{
@@ -137,7 +149,7 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
} else {
DWORD yes = 1;
@@ -156,19 +168,22 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt)
}
void
-nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
+nni_tcp_dial(nni_tcp_dialer *d, nni_aio *aio)
{
SOCKET s;
SOCKADDR_STORAGE ss;
int len;
nni_tcp_conn * c;
int rv;
+ nng_sockaddr sa;
+
+ nni_aio_get_sockaddr(aio, &sa);
if (nni_aio_begin(aio) != 0) {
return;
}
- if ((len = nni_win_nn2sockaddr(&ss, sa)) <= 0) {
+ if ((len = nni_win_nn2sockaddr(&ss, &sa)) <= 0) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
}
@@ -179,7 +194,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
}
if ((rv = nni_win_tcp_conn_init(&c, s)) != 0) {
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
@@ -194,7 +209,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_mtx_lock(&d->mtx);
if (d->closed) {
nni_mtx_unlock(&d->mtx);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
@@ -212,7 +227,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
if (bind(s, (SOCKADDR *) &c->sockname, len) != 0) {
rv = nni_win_error(GetLastError());
nni_mtx_unlock(&d->mtx);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
@@ -221,7 +236,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_aio_set_prov_extra(aio, 0, c);
if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
nni_mtx_unlock(&d->mtx);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
@@ -234,8 +249,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
if ((rv = GetLastError()) != ERROR_IO_PENDING) {
nni_aio_list_remove(aio);
nni_mtx_unlock(&d->mtx);
-
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 9cf16985..e98a0c37 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -98,7 +98,7 @@ tcp_accept_cb(nni_win_io *io, int rv, size_t cnt)
nni_mtx_unlock(&l->mtx);
if (rv != 0) {
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
@@ -309,7 +309,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_extra(aio, 0, NULL);
nni_mtx_unlock(&l->mtx);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
@@ -320,7 +320,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
// Fast failure (synchronous.)
nni_aio_list_remove(aio);
nni_mtx_unlock(&l->mtx);
- nni_tcp_conn_fini(c);
+ nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}