aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_ipcconn.c25
-rw-r--r--src/platform/posix/posix_ipcdial.c14
-rw-r--r--src/platform/posix/posix_ipclisten.c30
-rw-r--r--src/platform/posix/posix_sockfd.c19
-rw-r--r--src/platform/posix/posix_tcpconn.c25
-rw-r--r--src/platform/posix/posix_tcpdial.c12
-rw-r--r--src/platform/posix/posix_tcplisten.c12
-rw-r--r--src/platform/tcp_stream_test.c8
-rw-r--r--src/platform/windows/win_ipcconn.c11
-rw-r--r--src/platform/windows/win_ipcdial.c10
-rw-r--r--src/platform/windows/win_ipclisten.c16
-rw-r--r--src/platform/windows/win_tcpconn.c26
-rw-r--r--src/platform/windows/win_tcpdial.c11
-rw-r--r--src/platform/windows/win_tcplisten.c7
14 files changed, 179 insertions, 47 deletions
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 7f979a0a..8b11b64f 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c)
}
static void
-ipc_reap(void *arg)
+ipc_stop(void *arg)
{
- ipc_conn *c = arg;
+ ipc_conn *c = arg;
+ nni_posix_pfd *pfd;
+
ipc_close(c);
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx);
+ pfd = c->pfd;
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
}
- nni_mtx_fini(&c->mtx);
+}
+static void
+ipc_reap(void *arg)
+{
+ ipc_conn *c = arg;
+ ipc_stop(c);
+
+ nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
nni_posix_ipc_dialer_rele(c->dialer);
}
@@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
c->dialer = d;
c->stream.s_free = ipc_free;
c->stream.s_close = ipc_close;
+ c->stream.s_stop = ipc_stop;
c->stream.s_send = ipc_send;
c->stream.s_recv = ipc_recv;
c->stream.s_get = ipc_get;
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index 7799aca2..667d8262 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -41,6 +41,7 @@ ipc_dialer_close(void *arg)
c->dial_aio = NULL;
nni_aio_set_prov_data(aio, NULL);
nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -50,6 +51,13 @@ ipc_dialer_close(void *arg)
}
static void
+ipc_dialer_stop(void *arg)
+{
+ nni_ipc_dialer *d = arg;
+ ipc_dialer_close(d);
+}
+
+static void
ipc_dialer_fini(ipc_dialer *d)
{
nni_mtx_fini(&d->mtx);
@@ -61,7 +69,7 @@ ipc_dialer_free(void *arg)
{
ipc_dialer *d = arg;
- ipc_dialer_close(d);
+ ipc_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_ipc_dialer_rele(d);
}
@@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
@@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
if (rv != 0) {
nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
@@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
@@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->closed = false;
d->sd.sd_free = ipc_dialer_free;
d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_stop = ipc_dialer_stop;
d->sd.sd_dial = ipc_dialer_dial;
d->sd.sd_get = ipc_dialer_get;
d->sd.sd_set = ipc_dialer_set;
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index bd938841..a122d7bb 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -73,6 +73,23 @@ ipc_listener_close(void *arg)
}
static void
+ipc_listener_stop(void *arg)
+{
+ ipc_listener *l = arg;
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ l->pfd = NULL;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+}
+
+static void
ipc_listener_doaccept(ipc_listener *l)
{
nni_aio *aio;
@@ -132,6 +149,7 @@ ipc_listener_doaccept(ipc_listener *l)
}
if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -406,17 +424,10 @@ ipc_listener_listen(void *arg)
static void
ipc_listener_free(void *arg)
{
- ipc_listener *l = arg;
- nni_posix_pfd *pfd;
+ ipc_listener *l = arg;
- nni_mtx_lock(&l->mtx);
- ipc_listener_doclose(l);
- pfd = l->pfd;
- nni_mtx_unlock(&l->mtx);
+ ipc_listener_stop(l);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -507,6 +518,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->perms = 0;
l->sl.sl_free = ipc_listener_free;
l->sl.sl_close = ipc_listener_close;
+ l->sl.sl_stop = ipc_listener_stop;
l->sl.sl_listen = ipc_listener_listen;
l->sl.sl_accept = ipc_listener_accept;
l->sl.sl_get = ipc_listener_get;
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
index b0d88a31..997feae6 100644
--- a/src/platform/posix/posix_sockfd.c
+++ b/src/platform/posix/posix_sockfd.c
@@ -200,16 +200,26 @@ sfd_close(void *arg)
nni_mtx_unlock(&c->mtx);
}
-// sfd_fini may block briefly waiting for the pollq thread.
-// To get that out of our context, we simply reap this.
static void
-sfd_fini(void *arg)
+sfd_stop(void *arg)
{
nni_sfd_conn *c = arg;
sfd_close(c);
+
+ // ideally this would *stop* without freeing the pfd
if (c->pfd != NULL) {
nni_posix_pfd_fini(c->pfd);
+ c->pfd = NULL;
}
+}
+
+// sfd_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+sfd_fini(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ sfd_stop(c);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -475,6 +485,7 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
c->stream.s_free = sfd_free;
c->stream.s_close = sfd_close;
+ c->stream.s_stop = sfd_stop;
c->stream.s_recv = sfd_recv;
c->stream.s_send = sfd_send;
c->stream.s_get = sfd_get;
@@ -490,4 +501,4 @@ void
nni_sfd_close_fd(int fd)
{
close(fd);
-} \ No newline at end of file
+}
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 74b3371b..ce5243b0 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -10,6 +10,7 @@
//
#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
#include <errno.h>
#include <fcntl.h>
@@ -199,16 +200,30 @@ tcp_close(void *arg)
nni_mtx_unlock(&c->mtx);
}
+static void
+tcp_stop(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ nni_posix_pfd *pfd;
+ tcp_close(c);
+
+ nni_mtx_lock(&c->mtx);
+ pfd = c->pfd;
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+}
+
// tcp_fini may block briefly waiting for the pollq thread.
// To get that out of our context, we simply reap this.
static void
tcp_fini(void *arg)
{
nni_tcp_conn *c = arg;
- tcp_close(c);
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
- }
+ tcp_stop(c);
nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
@@ -221,6 +236,7 @@ static nni_reap_list tcp_reap_list = {
.rl_offset = offsetof(nni_tcp_conn, reap),
.rl_func = tcp_fini,
};
+
static void
tcp_free(void *arg)
{
@@ -454,6 +470,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
nni_aio_list_init(&c->writeq);
c->stream.s_free = tcp_free;
+ c->stream.s_stop = tcp_stop;
c->stream.s_close = tcp_close;
c->stream.s_recv = tcp_recv;
c->stream.s_send = tcp_send;
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 73ba4394..52ea6cff 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -77,9 +77,15 @@ tcp_dialer_fini(nni_tcp_dialer *d)
}
void
-nni_tcp_dialer_fini(nni_tcp_dialer *d)
+nni_tcp_dialer_stop(nni_tcp_dialer *d)
{
nni_tcp_dialer_close(d);
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_tcp_dialer_rele(d);
}
@@ -112,6 +118,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
+ nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
@@ -263,6 +271,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
+ nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index a38411c5..32f0fd60 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -150,6 +150,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
close(newfd);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -280,18 +281,25 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
}
void
-nni_tcp_listener_fini(nni_tcp_listener *l)
+nni_tcp_listener_stop(nni_tcp_listener *l)
{
nni_posix_pfd *pfd;
nni_mtx_lock(&l->mtx);
tcp_listener_doclose(l);
- pfd = l->pfd;
+ pfd = l->pfd;
+ l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
if (pfd != NULL) {
nni_posix_pfd_fini(pfd);
}
+}
+
+void
+nni_tcp_listener_fini(nni_tcp_listener *l)
+{
+ nni_tcp_listener_stop(l);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
diff --git a/src/platform/tcp_stream_test.c b/src/platform/tcp_stream_test.c
index b46d2d1c..708120c3 100644
--- a/src/platform/tcp_stream_test.c
+++ b/src/platform/tcp_stream_test.c
@@ -116,8 +116,10 @@ test_tcp_stream(void)
NUTS_TRUE(sa2.s_in.sa_port == sa.s_in.sa_port);
nng_stream_listener_close(l);
- nng_stream_listener_free(l);
nng_stream_dialer_close(d);
+ nng_stream_listener_stop(l);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_free(l);
nng_stream_dialer_free(d);
nng_aio_free(aio1);
nng_aio_free(aio2);
@@ -125,8 +127,10 @@ test_tcp_stream(void)
nng_aio_free(laio);
nng_aio_free(maio);
nng_stream_close(c1);
- nng_stream_free(c1);
nng_stream_close(c2);
+ nng_stream_stop(c1);
+ nng_stream_stop(c2);
+ nng_stream_free(c1);
nng_stream_free(c2);
}
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index 5f540a6c..56a688a4 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -352,7 +352,7 @@ ipc_close(void *arg)
}
static void
-ipc_free(void *arg)
+ipc_stop(void *arg)
{
ipc_conn *c = arg;
nni_aio *aio;
@@ -381,6 +381,14 @@ ipc_free(void *arg)
DisconnectNamedPipe(f);
CloseHandle(f);
}
+}
+
+static void
+ipc_free(void *arg)
+{
+ ipc_conn *c = arg;
+
+ ipc_stop(c);
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
@@ -460,6 +468,7 @@ nni_win_ipc_init(
c->sa = *sa;
c->stream.s_free = ipc_free;
c->stream.s_close = ipc_close;
+ c->stream.s_stop = ipc_stop;
c->stream.s_send = ipc_send;
c->stream.s_recv = ipc_recv;
c->stream.s_get = ipc_get;
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
index cb826308..347ee39b 100644
--- a/src/platform/windows/win_ipcdial.c
+++ b/src/platform/windows/win_ipcdial.c
@@ -192,10 +192,17 @@ ipc_dialer_close(void *arg)
}
static void
-ipc_dialer_free(void *arg)
+ipc_dialer_stop(void *arg)
{
ipc_dialer *d = arg;
ipc_dialer_close(d);
+}
+
+static void
+ipc_dialer_free(void *arg)
+{
+ ipc_dialer *d = arg;
+ ipc_dialer_stop(d);
if (d->path) {
nni_strfree(d->path);
}
@@ -260,6 +267,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->closed = false;
d->sd.sd_free = ipc_dialer_free;
d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_stop = ipc_dialer_stop;
d->sd.sd_dial = ipc_dialer_dial;
d->sd.sd_get = ipc_dialer_get;
d->sd.sd_set = ipc_dialer_set;
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
index 98cb8273..d09c98c7 100644
--- a/src/platform/windows/win_ipclisten.c
+++ b/src/platform/windows/win_ipclisten.c
@@ -300,8 +300,18 @@ ipc_listener_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- bool accepting = l->accepting;
nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_stop(void *arg)
+{
+ ipc_listener *l = arg;
+
+ ipc_listener_close(l);
+
+ nni_mtx_lock(&l->mtx);
+ bool accepting = l->accepting;
// This craziness because CancelIoEx on ConnectNamedPipe
// seems to be incredibly unreliable. It does work, sometimes,
@@ -309,6 +319,7 @@ ipc_listener_close(void *arg)
// to be retired in favor of UNIX domain sockets anyway.
while (accepting) {
+ nni_mtx_unlock(&l->mtx);
if (!CancelIoEx(l->f, &l->io.olpd)) {
// operation not found probably
// We just inject a safety sleep to
@@ -323,8 +334,8 @@ ipc_listener_close(void *arg)
nng_msleep(100);
nni_mtx_lock(&l->mtx);
accepting = l->accepting;
- nni_mtx_unlock(&l->mtx);
}
+ nni_mtx_unlock(&l->mtx);
DisconnectNamedPipe(l->f);
CloseHandle(l->f);
}
@@ -360,6 +371,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->sec_attr.bInheritHandle = FALSE;
l->sa.s_ipc.sa_family = NNG_AF_IPC;
l->sl.sl_free = ipc_listener_free;
+ l->sl.sl_stop = ipc_listener_stop;
l->sl.sl_close = ipc_listener_close;
l->sl.sl_listen = ipc_listener_listen;
l->sl.sl_accept = ipc_listener_accept;
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index 3f3acd17..2f2fd378 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -180,6 +180,7 @@ tcp_send_start(nni_tcp_conn *c)
return;
}
}
+ nni_cv_wake(&c->cv);
}
static void
@@ -266,17 +267,6 @@ tcp_close(void *arg)
closesocket(s);
}
}
- now = nni_clock();
- // wait up to a maximum of 10 seconds before assuming something is
- // badly amiss. from what we can tell, this doesn't happen, and we do
- // see the timer expire properly, but this safeguard can prevent a
- // hang.
- while ((c->recving || c->sending) &&
- ((nni_clock() - now) < (NNI_SECOND * 10))) {
- nni_mtx_unlock(&c->mtx);
- nni_msleep(1);
- nni_mtx_lock(&c->mtx);
- }
nni_mtx_unlock(&c->mtx);
}
@@ -369,21 +359,28 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
}
static void
-tcp_free(void *arg)
+tcp_stop(void *arg)
{
nni_tcp_conn *c = arg;
tcp_close(c);
nni_mtx_lock(&c->mtx);
- while ((!nni_list_empty(&c->recv_aios)) ||
+ while (c->recving || c->sending || (!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
}
nni_mtx_unlock(&c->mtx);
-
if (c->s != INVALID_SOCKET) {
closesocket(c->s);
}
+}
+
+static void
+tcp_free(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ tcp_stop(c);
+
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -410,6 +407,7 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s)
nni_aio_list_init(&c->send_aios);
c->conn_aio = NULL;
c->ops.s_close = tcp_close;
+ c->ops.s_stop = tcp_stop;
c->ops.s_free = tcp_free;
c->ops.s_send = tcp_send;
c->ops.s_recv = tcp_recv;
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 5492d265..c0385648 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -95,9 +95,16 @@ static nni_reap_list tcp_dialer_reap_list = {
};
void
-nni_tcp_dialer_fini(nni_tcp_dialer *d)
+nni_tcp_dialer_stop(nni_tcp_dialer *d)
{
nni_tcp_dialer_close(d);
+ // TODO: wait for conn_io.olpd?
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_stop(d);
nni_mtx_lock(&d->mtx);
if (!nni_list_empty(&d->aios)) {
nni_mtx_unlock(&d->mtx);
@@ -155,6 +162,8 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
+ nng_stream_close(&c->ops);
+ nng_stream_stop(&c->ops);
nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
} else {
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 021483d4..4e4fb090 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -178,6 +178,13 @@ static nni_reap_list tcp_listener_reap_list = {
};
void
+nni_tcp_listener_stop(nni_tcp_listener *l)
+{
+ nni_tcp_listener_close(l);
+ // TODO: maybe wait for l->l_accept_io.olpd to finish?
+}
+
+void
nni_tcp_listener_fini(nni_tcp_listener *l)
{
nni_tcp_listener_close(l);