aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-12 17:55:48 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-12 17:55:48 -0800
commit81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d (patch)
treef9f21aa66bd22cfd95ae0c4b8abe57036c8fce0d /src
parent371eedeeb6fafe628ae89b9ad2690fa3d6a57e8a (diff)
downloadnng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.tar.gz
nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.tar.bz2
nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.zip
streams: add explicit stop functions
This allows us to explicitly stop streams, dialers, and listeners, before we start tearing down things. This hopefully will be useful in resolving use-after-free bugs in http, tls, and websockets. The new functions are not yet documented, but they are nng_stream_stop, nng_stream_dialer_stop, and nng_stream_listener_stop. They should be called after close, and before free. The close functions now close without blocking, but the stop function is allowed to block.
Diffstat (limited to 'src')
-rw-r--r--src/core/platform.h5
-rw-r--r--src/core/sockfd.c7
-rw-r--r--src/core/stream.c37
-rw-r--r--src/core/stream.h5
-rw-r--r--src/core/tcp.c37
-rw-r--r--src/platform/posix/posix_ipcconn.c25
-rw-r--r--src/platform/posix/posix_ipcdial.c14
-rw-r--r--src/platform/posix/posix_ipclisten.c30
-rw-r--r--src/platform/posix/posix_sockfd.c19
-rw-r--r--src/platform/posix/posix_tcpconn.c25
-rw-r--r--src/platform/posix/posix_tcpdial.c12
-rw-r--r--src/platform/posix/posix_tcplisten.c12
-rw-r--r--src/platform/tcp_stream_test.c8
-rw-r--r--src/platform/windows/win_ipcconn.c11
-rw-r--r--src/platform/windows/win_ipcdial.c10
-rw-r--r--src/platform/windows/win_ipclisten.c16
-rw-r--r--src/platform/windows/win_tcpconn.c26
-rw-r--r--src/platform/windows/win_tcpdial.c11
-rw-r--r--src/platform/windows/win_tcplisten.c7
-rw-r--r--src/sp/transport/ipc/ipc.c3
-rw-r--r--src/sp/transport/tcp/tcp.c3
-rw-r--r--src/sp/transport/tls/tls.c2
-rw-r--r--src/sp/transport/ws/websocket.c7
-rw-r--r--src/supplemental/http/http_client.c2
-rw-r--r--src/supplemental/http/http_server.c1
-rw-r--r--src/supplemental/tls/tls_common.c39
-rw-r--r--src/supplemental/tls/tls_test.c39
-rw-r--r--src/supplemental/websocket/websocket.c100
-rw-r--r--src/supplemental/websocket/websocket_test.c15
29 files changed, 415 insertions, 113 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index d2409e5b..2f5b5c17 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -293,6 +293,7 @@ extern void nni_tcp_dialer_fini(nni_tcp_dialer *);
// Further operations on it should return NNG_ECLOSED.
// Any in-progress connection will be aborted.
extern void nni_tcp_dialer_close(nni_tcp_dialer *);
+extern void nni_tcp_dialer_stop(nni_tcp_dialer *);
// nni_tcp_dial attempts to create an outgoing connection,
// asynchronously, to the address in the aio. On success, the first (and only)
@@ -318,6 +319,10 @@ extern void nni_tcp_listener_fini(nni_tcp_listener *);
// any bound socket, and further operations will result in NNG_ECLOSED.
extern void nni_tcp_listener_close(nni_tcp_listener *);
+// nni_tcp_listener_stop is close + waits for any operations to stop,
+// so there won't be any further accepts after this.
+extern void nni_tcp_listener_stop(nni_tcp_listener *);
+
// nni_tcp_listener_listen creates the socket in listening mode, bound
// to the specified address.
extern int nni_tcp_listener_listen(nni_tcp_listener *, const nni_sockaddr *);
diff --git a/src/core/sockfd.c b/src/core/sockfd.c
index cedd7436..787a0783 100644
--- a/src/core/sockfd.c
+++ b/src/core/sockfd.c
@@ -63,6 +63,12 @@ sfd_listener_close(void *arg)
nni_mtx_unlock(&l->mtx);
}
+static void
+sfd_listener_stop(void *arg)
+{
+ sfd_listener_close(arg);
+}
+
static int
sfd_listener_listen(void *arg)
{
@@ -222,6 +228,7 @@ nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->ops.sl_free = sfd_listener_free;
l->ops.sl_close = sfd_listener_close;
+ l->ops.sl_stop = sfd_listener_stop;
l->ops.sl_listen = sfd_listener_listen;
l->ops.sl_accept = sfd_listener_accept;
l->ops.sl_get = sfd_listener_get;
diff --git a/src/core/stream.c b/src/core/stream.c
index ad026586..16e11eca 100644
--- a/src/core/stream.c
+++ b/src/core/stream.c
@@ -116,7 +116,17 @@ static struct {
void
nng_stream_close(nng_stream *s)
{
- s->s_close(s);
+ if (s != NULL) {
+ s->s_close(s);
+ }
+}
+
+void
+nng_stream_stop(nng_stream *s)
+{
+ if (s != NULL) {
+ s->s_stop(s);
+ }
}
void
@@ -149,7 +159,17 @@ nni_stream_get(
void
nng_stream_dialer_close(nng_stream_dialer *d)
{
- d->sd_close(d);
+ if (d != NULL) {
+ d->sd_close(d);
+ }
+}
+
+void
+nng_stream_dialer_stop(nng_stream_dialer *d)
+{
+ if (d != NULL) {
+ d->sd_stop(d);
+ }
}
void
@@ -226,8 +246,19 @@ nni_stream_dialer_set_tls(nng_stream_dialer *d, nng_tls_config *cfg)
void
nng_stream_listener_close(nng_stream_listener *l)
{
- l->sl_close(l);
+ if (l != NULL) {
+ l->sl_close(l);
+ }
+}
+
+void
+nng_stream_listener_stop(nng_stream_listener *l)
+{
+ if (l != NULL) {
+ l->sl_stop(l);
+ }
}
+
void
nng_stream_listener_free(nng_stream_listener *l)
{
diff --git a/src/core/stream.h b/src/core/stream.h
index 9ea65834..e09dc9e5 100644
--- a/src/core/stream.h
+++ b/src/core/stream.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -43,6 +43,7 @@ extern int nni_stream_listener_get_tls(
struct nng_stream {
void (*s_free)(void *);
void (*s_close)(void *);
+ void (*s_stop)(void *);
void (*s_recv)(void *, nng_aio *);
void (*s_send)(void *, nng_aio *);
int (*s_get)(void *, const char *, void *, size_t *, nni_type);
@@ -53,6 +54,7 @@ struct nng_stream {
struct nng_stream_dialer {
void (*sd_free)(void *);
void (*sd_close)(void *);
+ void (*sd_stop)(void *);
void (*sd_dial)(void *, nng_aio *);
int (*sd_get)(void *, const char *, void *, size_t *, nni_type);
int (*sd_set)(void *, const char *, const void *, size_t, nni_type);
@@ -65,6 +67,7 @@ struct nng_stream_dialer {
struct nng_stream_listener {
void (*sl_free)(void *);
void (*sl_close)(void *);
+ void (*sl_stop)(void *);
int (*sl_listen)(void *);
void (*sl_accept)(void *, nng_aio *);
int (*sl_get)(void *, const char *, void *, size_t *, nni_type);
diff --git a/src/core/tcp.c b/src/core/tcp.c
index 5d324a13..d2e08493 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -104,6 +104,8 @@ tcp_dial_con_cb(void *arg)
if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) {
if (rv == 0) {
// Make sure we discard the underlying connection.
+ nng_stream_close(nni_aio_get_output(d->conaio, 0));
+ nng_stream_stop(nni_aio_get_output(d->conaio, 0));
nng_stream_free(nni_aio_get_output(d->conaio, 0));
nni_aio_set_output(d->conaio, 0, NULL);
}
@@ -127,14 +129,26 @@ tcp_dialer_close(void *arg)
{
tcp_dialer *d = arg;
nni_aio *aio;
- nni_mtx_lock(&d->mtx);
- d->closed = true;
- while ((aio = nni_list_first(&d->conaios)) != NULL) {
- nni_list_remove(&d->conaios, aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+
+ if (d != NULL) {
+ nni_mtx_lock(&d->mtx);
+ d->closed = true;
+ while ((aio = nni_list_first(&d->conaios)) != NULL) {
+ nni_list_remove(&d->conaios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_tcp_dialer_close(d->d);
+ nni_mtx_unlock(&d->mtx);
+ }
+}
+
+static void
+tcp_dialer_stop(void *arg)
+{
+ tcp_dialer *d = arg;
+ if (d != NULL) {
+ nni_tcp_dialer_stop(d->d);
}
- nni_tcp_dialer_close(d->d);
- nni_mtx_unlock(&d->mtx);
}
static void
@@ -222,6 +236,7 @@ tcp_dialer_alloc(tcp_dialer **dp)
d->ops.sd_close = tcp_dialer_close;
d->ops.sd_free = tcp_dialer_free;
+ d->ops.sd_stop = tcp_dialer_stop;
d->ops.sd_dial = tcp_dialer_dial;
d->ops.sd_get = tcp_dialer_get;
d->ops.sd_set = tcp_dialer_set;
@@ -276,6 +291,13 @@ tcp_listener_close(void *arg)
}
static void
+tcp_listener_stop(void *arg)
+{
+ tcp_listener *l = arg;
+ nni_tcp_listener_stop(l->l);
+}
+
+static void
tcp_listener_free(void *arg)
{
tcp_listener *l = arg;
@@ -372,6 +394,7 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)
l->ops.sl_free = tcp_listener_free;
l->ops.sl_close = tcp_listener_close;
+ l->ops.sl_stop = tcp_listener_stop;
l->ops.sl_listen = tcp_listener_listen;
l->ops.sl_accept = tcp_listener_accept;
l->ops.sl_get = tcp_listener_get;
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 7f979a0a..8b11b64f 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c)
}
static void
-ipc_reap(void *arg)
+ipc_stop(void *arg)
{
- ipc_conn *c = arg;
+ ipc_conn *c = arg;
+ nni_posix_pfd *pfd;
+
ipc_close(c);
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx);
+ pfd = c->pfd;
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
}
- nni_mtx_fini(&c->mtx);
+}
+static void
+ipc_reap(void *arg)
+{
+ ipc_conn *c = arg;
+ ipc_stop(c);
+
+ nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
nni_posix_ipc_dialer_rele(c->dialer);
}
@@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
c->dialer = d;
c->stream.s_free = ipc_free;
c->stream.s_close = ipc_close;
+ c->stream.s_stop = ipc_stop;
c->stream.s_send = ipc_send;
c->stream.s_recv = ipc_recv;
c->stream.s_get = ipc_get;
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index 7799aca2..667d8262 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -41,6 +41,7 @@ ipc_dialer_close(void *arg)
c->dial_aio = NULL;
nni_aio_set_prov_data(aio, NULL);
nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -50,6 +51,13 @@ ipc_dialer_close(void *arg)
}
static void
+ipc_dialer_stop(void *arg)
+{
+ nni_ipc_dialer *d = arg;
+ ipc_dialer_close(d);
+}
+
+static void
ipc_dialer_fini(ipc_dialer *d)
{
nni_mtx_fini(&d->mtx);
@@ -61,7 +69,7 @@ ipc_dialer_free(void *arg)
{
ipc_dialer *d = arg;
- ipc_dialer_close(d);
+ ipc_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_ipc_dialer_rele(d);
}
@@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
@@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
if (rv != 0) {
nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
@@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
@@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->closed = false;
d->sd.sd_free = ipc_dialer_free;
d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_stop = ipc_dialer_stop;
d->sd.sd_dial = ipc_dialer_dial;
d->sd.sd_get = ipc_dialer_get;
d->sd.sd_set = ipc_dialer_set;
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index bd938841..a122d7bb 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -73,6 +73,23 @@ ipc_listener_close(void *arg)
}
static void
+ipc_listener_stop(void *arg)
+{
+ ipc_listener *l = arg;
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ l->pfd = NULL;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+}
+
+static void
ipc_listener_doaccept(ipc_listener *l)
{
nni_aio *aio;
@@ -132,6 +149,7 @@ ipc_listener_doaccept(ipc_listener *l)
}
if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -406,17 +424,10 @@ ipc_listener_listen(void *arg)
static void
ipc_listener_free(void *arg)
{
- ipc_listener *l = arg;
- nni_posix_pfd *pfd;
+ ipc_listener *l = arg;
- nni_mtx_lock(&l->mtx);
- ipc_listener_doclose(l);
- pfd = l->pfd;
- nni_mtx_unlock(&l->mtx);
+ ipc_listener_stop(l);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -507,6 +518,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->perms = 0;
l->sl.sl_free = ipc_listener_free;
l->sl.sl_close = ipc_listener_close;
+ l->sl.sl_stop = ipc_listener_stop;
l->sl.sl_listen = ipc_listener_listen;
l->sl.sl_accept = ipc_listener_accept;
l->sl.sl_get = ipc_listener_get;
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
index b0d88a31..997feae6 100644
--- a/src/platform/posix/posix_sockfd.c
+++ b/src/platform/posix/posix_sockfd.c
@@ -200,16 +200,26 @@ sfd_close(void *arg)
nni_mtx_unlock(&c->mtx);
}
-// sfd_fini may block briefly waiting for the pollq thread.
-// To get that out of our context, we simply reap this.
static void
-sfd_fini(void *arg)
+sfd_stop(void *arg)
{
nni_sfd_conn *c = arg;
sfd_close(c);
+
+ // ideally this would *stop* without freeing the pfd
if (c->pfd != NULL) {
nni_posix_pfd_fini(c->pfd);
+ c->pfd = NULL;
}
+}
+
+// sfd_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+sfd_fini(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ sfd_stop(c);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -475,6 +485,7 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
c->stream.s_free = sfd_free;
c->stream.s_close = sfd_close;
+ c->stream.s_stop = sfd_stop;
c->stream.s_recv = sfd_recv;
c->stream.s_send = sfd_send;
c->stream.s_get = sfd_get;
@@ -490,4 +501,4 @@ void
nni_sfd_close_fd(int fd)
{
close(fd);
-} \ No newline at end of file
+}
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 74b3371b..ce5243b0 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -10,6 +10,7 @@
//
#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
#include <errno.h>
#include <fcntl.h>
@@ -199,16 +200,30 @@ tcp_close(void *arg)
nni_mtx_unlock(&c->mtx);
}
+static void
+tcp_stop(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ nni_posix_pfd *pfd;
+ tcp_close(c);
+
+ nni_mtx_lock(&c->mtx);
+ pfd = c->pfd;
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+}
+
// tcp_fini may block briefly waiting for the pollq thread.
// To get that out of our context, we simply reap this.
static void
tcp_fini(void *arg)
{
nni_tcp_conn *c = arg;
- tcp_close(c);
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
- }
+ tcp_stop(c);
nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
@@ -221,6 +236,7 @@ static nni_reap_list tcp_reap_list = {
.rl_offset = offsetof(nni_tcp_conn, reap),
.rl_func = tcp_fini,
};
+
static void
tcp_free(void *arg)
{
@@ -454,6 +470,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
nni_aio_list_init(&c->writeq);
c->stream.s_free = tcp_free;
+ c->stream.s_stop = tcp_stop;
c->stream.s_close = tcp_close;
c->stream.s_recv = tcp_recv;
c->stream.s_send = tcp_send;
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 73ba4394..52ea6cff 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -77,9 +77,15 @@ tcp_dialer_fini(nni_tcp_dialer *d)
}
void
-nni_tcp_dialer_fini(nni_tcp_dialer *d)
+nni_tcp_dialer_stop(nni_tcp_dialer *d)
{
nni_tcp_dialer_close(d);
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_tcp_dialer_rele(d);
}
@@ -112,6 +118,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
+ nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
@@ -263,6 +271,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
+ nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index a38411c5..32f0fd60 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -150,6 +150,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
close(newfd);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -280,18 +281,25 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
}
void
-nni_tcp_listener_fini(nni_tcp_listener *l)
+nni_tcp_listener_stop(nni_tcp_listener *l)
{
nni_posix_pfd *pfd;
nni_mtx_lock(&l->mtx);
tcp_listener_doclose(l);
- pfd = l->pfd;
+ pfd = l->pfd;
+ l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
if (pfd != NULL) {
nni_posix_pfd_fini(pfd);
}
+}
+
+void
+nni_tcp_listener_fini(nni_tcp_listener *l)
+{
+ nni_tcp_listener_stop(l);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
diff --git a/src/platform/tcp_stream_test.c b/src/platform/tcp_stream_test.c
index b46d2d1c..708120c3 100644
--- a/src/platform/tcp_stream_test.c
+++ b/src/platform/tcp_stream_test.c
@@ -116,8 +116,10 @@ test_tcp_stream(void)
NUTS_TRUE(sa2.s_in.sa_port == sa.s_in.sa_port);
nng_stream_listener_close(l);
- nng_stream_listener_free(l);
nng_stream_dialer_close(d);
+ nng_stream_listener_stop(l);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_free(l);
nng_stream_dialer_free(d);
nng_aio_free(aio1);
nng_aio_free(aio2);
@@ -125,8 +127,10 @@ test_tcp_stream(void)
nng_aio_free(laio);
nng_aio_free(maio);
nng_stream_close(c1);
- nng_stream_free(c1);
nng_stream_close(c2);
+ nng_stream_stop(c1);
+ nng_stream_stop(c2);
+ nng_stream_free(c1);
nng_stream_free(c2);
}
diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c
index 5f540a6c..56a688a4 100644
--- a/src/platform/windows/win_ipcconn.c
+++ b/src/platform/windows/win_ipcconn.c
@@ -352,7 +352,7 @@ ipc_close(void *arg)
}
static void
-ipc_free(void *arg)
+ipc_stop(void *arg)
{
ipc_conn *c = arg;
nni_aio *aio;
@@ -381,6 +381,14 @@ ipc_free(void *arg)
DisconnectNamedPipe(f);
CloseHandle(f);
}
+}
+
+static void
+ipc_free(void *arg)
+{
+ ipc_conn *c = arg;
+
+ ipc_stop(c);
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
@@ -460,6 +468,7 @@ nni_win_ipc_init(
c->sa = *sa;
c->stream.s_free = ipc_free;
c->stream.s_close = ipc_close;
+ c->stream.s_stop = ipc_stop;
c->stream.s_send = ipc_send;
c->stream.s_recv = ipc_recv;
c->stream.s_get = ipc_get;
diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c
index cb826308..347ee39b 100644
--- a/src/platform/windows/win_ipcdial.c
+++ b/src/platform/windows/win_ipcdial.c
@@ -192,10 +192,17 @@ ipc_dialer_close(void *arg)
}
static void
-ipc_dialer_free(void *arg)
+ipc_dialer_stop(void *arg)
{
ipc_dialer *d = arg;
ipc_dialer_close(d);
+}
+
+static void
+ipc_dialer_free(void *arg)
+{
+ ipc_dialer *d = arg;
+ ipc_dialer_stop(d);
if (d->path) {
nni_strfree(d->path);
}
@@ -260,6 +267,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->closed = false;
d->sd.sd_free = ipc_dialer_free;
d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_stop = ipc_dialer_stop;
d->sd.sd_dial = ipc_dialer_dial;
d->sd.sd_get = ipc_dialer_get;
d->sd.sd_set = ipc_dialer_set;
diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c
index 98cb8273..d09c98c7 100644
--- a/src/platform/windows/win_ipclisten.c
+++ b/src/platform/windows/win_ipclisten.c
@@ -300,8 +300,18 @@ ipc_listener_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- bool accepting = l->accepting;
nni_mtx_unlock(&l->mtx);
+}
+
+static void
+ipc_listener_stop(void *arg)
+{
+ ipc_listener *l = arg;
+
+ ipc_listener_close(l);
+
+ nni_mtx_lock(&l->mtx);
+ bool accepting = l->accepting;
// This craziness because CancelIoEx on ConnectNamedPipe
// seems to be incredibly unreliable. It does work, sometimes,
@@ -309,6 +319,7 @@ ipc_listener_close(void *arg)
// to be retired in favor of UNIX domain sockets anyway.
while (accepting) {
+ nni_mtx_unlock(&l->mtx);
if (!CancelIoEx(l->f, &l->io.olpd)) {
// operation not found probably
// We just inject a safety sleep to
@@ -323,8 +334,8 @@ ipc_listener_close(void *arg)
nng_msleep(100);
nni_mtx_lock(&l->mtx);
accepting = l->accepting;
- nni_mtx_unlock(&l->mtx);
}
+ nni_mtx_unlock(&l->mtx);
DisconnectNamedPipe(l->f);
CloseHandle(l->f);
}
@@ -360,6 +371,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->sec_attr.bInheritHandle = FALSE;
l->sa.s_ipc.sa_family = NNG_AF_IPC;
l->sl.sl_free = ipc_listener_free;
+ l->sl.sl_stop = ipc_listener_stop;
l->sl.sl_close = ipc_listener_close;
l->sl.sl_listen = ipc_listener_listen;
l->sl.sl_accept = ipc_listener_accept;
diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c
index 3f3acd17..2f2fd378 100644
--- a/src/platform/windows/win_tcpconn.c
+++ b/src/platform/windows/win_tcpconn.c
@@ -180,6 +180,7 @@ tcp_send_start(nni_tcp_conn *c)
return;
}
}
+ nni_cv_wake(&c->cv);
}
static void
@@ -266,17 +267,6 @@ tcp_close(void *arg)
closesocket(s);
}
}
- now = nni_clock();
- // wait up to a maximum of 10 seconds before assuming something is
- // badly amiss. from what we can tell, this doesn't happen, and we do
- // see the timer expire properly, but this safeguard can prevent a
- // hang.
- while ((c->recving || c->sending) &&
- ((nni_clock() - now) < (NNI_SECOND * 10))) {
- nni_mtx_unlock(&c->mtx);
- nni_msleep(1);
- nni_mtx_lock(&c->mtx);
- }
nni_mtx_unlock(&c->mtx);
}
@@ -369,21 +359,28 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
}
static void
-tcp_free(void *arg)
+tcp_stop(void *arg)
{
nni_tcp_conn *c = arg;
tcp_close(c);
nni_mtx_lock(&c->mtx);
- while ((!nni_list_empty(&c->recv_aios)) ||
+ while (c->recving || c->sending || (!nni_list_empty(&c->recv_aios)) ||
(!nni_list_empty(&c->send_aios))) {
nni_cv_wait(&c->cv);
}
nni_mtx_unlock(&c->mtx);
-
if (c->s != INVALID_SOCKET) {
closesocket(c->s);
}
+}
+
+static void
+tcp_free(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ tcp_stop(c);
+
nni_cv_fini(&c->cv);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -410,6 +407,7 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s)
nni_aio_list_init(&c->send_aios);
c->conn_aio = NULL;
c->ops.s_close = tcp_close;
+ c->ops.s_stop = tcp_stop;
c->ops.s_free = tcp_free;
c->ops.s_send = tcp_send;
c->ops.s_recv = tcp_recv;
diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c
index 5492d265..c0385648 100644
--- a/src/platform/windows/win_tcpdial.c
+++ b/src/platform/windows/win_tcpdial.c
@@ -95,9 +95,16 @@ static nni_reap_list tcp_dialer_reap_list = {
};
void
-nni_tcp_dialer_fini(nni_tcp_dialer *d)
+nni_tcp_dialer_stop(nni_tcp_dialer *d)
{
nni_tcp_dialer_close(d);
+ // TODO: wait for conn_io.olpd?
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_stop(d);
nni_mtx_lock(&d->mtx);
if (!nni_list_empty(&d->aios)) {
nni_mtx_unlock(&d->mtx);
@@ -155,6 +162,8 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt)
nni_mtx_unlock(&d->mtx);
if (rv != 0) {
+ nng_stream_close(&c->ops);
+ nng_stream_stop(&c->ops);
nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
} else {
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 021483d4..4e4fb090 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -178,6 +178,13 @@ static nni_reap_list tcp_listener_reap_list = {
};
void
+nni_tcp_listener_stop(nni_tcp_listener *l)
+{
+ nni_tcp_listener_close(l);
+ // TODO: maybe wait for l->l_accept_io.olpd to finish?
+}
+
+void
nni_tcp_listener_fini(nni_tcp_listener *l)
{
nni_tcp_listener_close(l);
diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c
index dadb1909..c659b8fd 100644
--- a/src/sp/transport/ipc/ipc.c
+++ b/src/sp/transport/ipc/ipc.c
@@ -111,6 +111,7 @@ ipc_pipe_stop(void *arg)
nni_aio_stop(&p->rx_aio);
nni_aio_stop(&p->tx_aio);
nni_aio_stop(&p->neg_aio);
+ nng_stream_stop(p->conn);
nni_mtx_lock(&ep->mtx);
nni_list_node_remove(&p->node);
nni_mtx_unlock(&ep->mtx);
@@ -655,6 +656,8 @@ ipc_ep_stop(void *arg)
nni_aio_stop(&ep->time_aio);
nni_aio_stop(&ep->conn_aio);
+ nng_stream_dialer_stop(ep->dialer);
+ nng_stream_listener_stop(ep->listener);
}
static void
diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c
index 2497b899..51991cbd 100644
--- a/src/sp/transport/tcp/tcp.c
+++ b/src/sp/transport/tcp/tcp.c
@@ -124,6 +124,7 @@ tcptran_pipe_stop(void *arg)
nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
+ nng_stream_stop(p->conn);
}
static int
@@ -674,6 +675,8 @@ tcptran_ep_stop(void *arg)
nni_aio_stop(ep->timeaio);
nni_aio_stop(ep->connaio);
+ nng_stream_dialer_stop(ep->dialer);
+ nng_stream_listener_stop(ep->listener);
}
static void
diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c
index 7a907e8e..dcb100f5 100644
--- a/src/sp/transport/tls/tls.c
+++ b/src/sp/transport/tls/tls.c
@@ -657,6 +657,8 @@ tlstran_ep_stop(void *arg)
nni_aio_stop(ep->timeaio);
nni_aio_stop(ep->connaio);
+ nng_stream_dialer_stop(ep->dialer);
+ nng_stream_listener_stop(ep->listener);
}
static void
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index 27cc6434..16929ec9 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -183,6 +183,7 @@ wstran_pipe_stop(void *arg)
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
+ nng_stream_stop(p->ws);
}
static int
@@ -214,9 +215,7 @@ wstran_pipe_close(void *arg)
nni_aio_close(&p->rxaio);
nni_aio_close(&p->txaio);
- nni_mtx_lock(&p->mtx);
nng_stream_close(p->ws);
- nni_mtx_unlock(&p->mtx);
}
static int
@@ -373,6 +372,7 @@ wstran_dialer_stop(void *arg)
ws_dialer *d = arg;
nni_aio_stop(&d->connaio);
+ nng_stream_dialer_stop(d->dialer);
}
static void
@@ -392,6 +392,7 @@ wstran_listener_stop(void *arg)
ws_listener *l = arg;
nni_aio_stop(&l->accaio);
+ nng_stream_listener_stop(l->listener);
}
static void
@@ -421,6 +422,7 @@ wstran_connect_cb(void *arg)
}
if ((uaio = nni_list_first(&d->aios)) == NULL) {
// The client stopped caring about this!
+ nng_stream_stop(ws);
nng_stream_free(ws);
nni_mtx_unlock(&d->mtx);
return;
@@ -430,6 +432,7 @@ wstran_connect_cb(void *arg)
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
} else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
+ nng_stream_stop(ws);
nng_stream_free(ws);
nni_aio_finish_error(uaio, rv);
} else {
diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c
index af5bc717..03125abf 100644
--- a/src/supplemental/http/http_client.c
+++ b/src/supplemental/http/http_client.c
@@ -89,6 +89,8 @@ http_dial_cb(void *arg)
void
nni_http_client_fini(nni_http_client *c)
{
+ nni_aio_stop(c->aio);
+ nng_stream_dialer_stop(c->dialer);
nni_aio_free(c->aio);
nng_stream_dialer_free(c->dialer);
nni_mtx_fini(&c->mtx);
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index 2240492f..0d5d9cb0 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -895,6 +895,7 @@ http_server_fini(nni_http_server *s)
http_error *epage;
nni_aio_stop(s->accaio);
+ nng_stream_listener_stop(s->listener);
nni_mtx_lock(&s->mtx);
NNI_ASSERT(nni_list_empty(&s->conns));
diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c
index c04d03a5..a871e74e 100644
--- a/src/supplemental/tls/tls_common.c
+++ b/src/supplemental/tls/tls_common.c
@@ -126,6 +126,14 @@ tls_dialer_free(void *arg)
}
}
+static void
+tls_dialer_stop(void *arg)
+{
+ tls_dialer *d = arg;
+
+ nng_stream_dialer_stop(d->d);
+}
+
// For dialing, we need to have our own completion callback, instead of
// the user's completion callback.
@@ -281,6 +289,7 @@ nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->ops.sd_close = tls_dialer_close;
d->ops.sd_free = tls_dialer_free;
+ d->ops.sd_stop = tls_dialer_stop;
d->ops.sd_dial = tls_dialer_dial;
d->ops.sd_get = tls_dialer_get;
d->ops.sd_set = tls_dialer_set;
@@ -307,6 +316,13 @@ tls_listener_close(void *arg)
}
static void
+tls_listener_stop(void *arg)
+{
+ tls_listener *l = arg;
+ nng_stream_listener_close(l->l);
+}
+
+static void
tls_listener_free(void *arg)
{
tls_listener *l;
@@ -436,6 +452,7 @@ nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url)
}
l->ops.sl_free = tls_listener_free;
l->ops.sl_close = tls_listener_close;
+ l->ops.sl_stop = tls_listener_stop;
l->ops.sl_accept = tls_listener_accept;
l->ops.sl_listen = tls_listener_listen;
l->ops.sl_get = tls_listener_get;
@@ -526,6 +543,18 @@ tls_close(void *arg)
nng_stream_close(conn->tcp);
}
+static void
+tls_stop(void *arg)
+{
+ tls_conn *conn = arg;
+
+ tls_close(conn);
+ nng_stream_stop(conn->tcp);
+ nni_aio_stop(&conn->conn_aio);
+ nni_aio_stop(&conn->tcp_send);
+ nni_aio_stop(&conn->tcp_recv);
+}
+
static int
tls_get_verified(void *arg, void *buf, size_t *szp, nni_type t)
{
@@ -624,6 +653,7 @@ tls_alloc(tls_conn **conn_p, nng_tls_config *cfg, nng_aio *user_aio)
conn->stream.s_close = tls_close;
conn->stream.s_free = tls_free;
+ conn->stream.s_stop = tls_stop;
conn->stream.s_send = tls_send;
conn->stream.s_recv = tls_recv;
conn->stream.s_get = tls_get;
@@ -638,14 +668,7 @@ tls_reap(void *arg)
{
tls_conn *conn = arg;
- // Shut it all down first. We should be freed.
- if (conn->tcp != NULL) {
- nng_stream_close(conn->tcp);
- }
- nni_aio_stop(&conn->conn_aio);
- nni_aio_stop(&conn->tcp_send);
- nni_aio_stop(&conn->tcp_recv);
-
+ tls_stop(conn);
conn->ops.fini((void *) (conn + 1));
nni_aio_fini(&conn->conn_aio);
nni_aio_fini(&conn->tcp_send);
diff --git a/src/supplemental/tls/tls_test.c b/src/supplemental/tls/tls_test.c
index 43ce0c85..517be143 100644
--- a/src/supplemental/tls/tls_test.c
+++ b/src/supplemental/tls/tls_test.c
@@ -57,6 +57,7 @@ test_tls_conn_refused(void)
NUTS_FAIL(nng_aio_result(aio), NNG_ECONNREFUSED);
nng_aio_free(aio);
+ nng_stream_dialer_stop(dialer);
nng_stream_dialer_free(dialer);
}
@@ -133,6 +134,10 @@ test_tls_large_message(void)
nng_free(buf1, size);
nng_free(buf2, size);
+ nng_stream_stop(s1);
+ nng_stream_stop(s2);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_stop(l);
nng_stream_free(s1);
nng_stream_free(s2);
nng_stream_dialer_free(d);
@@ -214,8 +219,10 @@ test_tls_ecdsa(void)
NUTS_PASS(nuts_stream_wait(t2));
NUTS_TRUE(memcmp(buf1, buf2, size) == 0);
- nng_free(buf1, size);
- nng_free(buf2, size);
+ nng_stream_stop(s1);
+ nng_stream_stop(s2);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_stop(l);
nng_stream_free(s1);
nng_stream_free(s2);
nng_stream_dialer_free(d);
@@ -224,6 +231,8 @@ test_tls_ecdsa(void)
nng_tls_config_free(c2);
nng_aio_free(aio1);
nng_aio_free(aio2);
+ nng_free(buf1, size);
+ nng_free(buf2, size);
}
void
@@ -241,6 +250,7 @@ test_tls_garbled_cert(void)
c1, nuts_garbled_crt, nuts_server_key, NULL),
NNG_ECRYPTO);
+ nng_stream_listener_stop(l);
nng_stream_listener_free(l);
nng_tls_config_free(c1);
}
@@ -318,8 +328,10 @@ test_tls_psk(void)
NUTS_PASS(nuts_stream_wait(t2));
NUTS_TRUE(memcmp(buf1, buf2, size) == 0);
- nng_free(buf1, size);
- nng_free(buf2, size);
+ nng_stream_stop(s1);
+ nng_stream_stop(s2);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_stop(l);
nng_stream_free(s1);
nng_stream_free(s2);
nng_stream_dialer_free(d);
@@ -328,6 +340,8 @@ test_tls_psk(void)
nng_tls_config_free(c2);
nng_aio_free(aio1);
nng_aio_free(aio2);
+ nng_free(buf1, size);
+ nng_free(buf2, size);
}
void
@@ -408,8 +422,10 @@ test_tls_psk_server_identities(void)
NUTS_PASS(nuts_stream_wait(t2));
NUTS_TRUE(memcmp(buf1, buf2, size) == 0);
- nng_free(buf1, size);
- nng_free(buf2, size);
+ nng_stream_stop(s1);
+ nng_stream_stop(s2);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_stop(l);
nng_stream_free(s1);
nng_stream_free(s2);
nng_stream_dialer_free(d);
@@ -418,6 +434,8 @@ test_tls_psk_server_identities(void)
nng_tls_config_free(c2);
nng_aio_free(aio1);
nng_aio_free(aio2);
+ nng_free(buf1, size);
+ nng_free(buf2, size);
}
void
@@ -495,8 +513,10 @@ test_tls_psk_bad_identity(void)
NUTS_ASSERT(nuts_stream_wait(t1) != 0);
NUTS_ASSERT(nuts_stream_wait(t2) != 0);
- nng_free(buf1, size);
- nng_free(buf2, size);
+ nng_stream_stop(s1);
+ nng_stream_stop(s2);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_stop(l);
nng_stream_free(s1);
nng_stream_free(s2);
nng_stream_dialer_free(d);
@@ -505,6 +525,8 @@ test_tls_psk_bad_identity(void)
nng_tls_config_free(c2);
nng_aio_free(aio1);
nng_aio_free(aio2);
+ nng_free(buf1, size);
+ nng_free(buf2, size);
}
void
@@ -543,6 +565,7 @@ test_tls_psk_config_busy(void)
NUTS_FAIL(
nng_tls_config_psk(c1, "identity2", key, sizeof(key)), NNG_EBUSY);
+ nng_stream_listener_stop(l);
nng_stream_listener_free(l);
nng_aio_free(aio);
nng_tls_config_free(c1);
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 9f3f6d0b..e7372a49 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1183,12 +1183,9 @@ ws_close_error(nni_ws *ws, uint16_t code)
}
static void
-ws_fini(void *arg)
+ws_stop(void *arg)
{
- nni_ws *ws = arg;
- ws_frame *frame;
- nng_aio *aio;
-
+ nni_ws *ws = arg;
ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);
// Give a chance for the close frame to drain.
@@ -1198,7 +1195,6 @@ ws_fini(void *arg)
nni_aio_stop(&ws->txaio);
nni_aio_stop(&ws->closeaio);
nni_aio_stop(&ws->httpaio);
- nni_aio_stop(&ws->connaio);
if (nni_list_node_active(&ws->node)) {
nni_ws_dialer *d;
@@ -1210,6 +1206,16 @@ ws_fini(void *arg)
nni_mtx_unlock(&d->mtx);
}
}
+}
+
+static void
+ws_fini(void *arg)
+{
+ nni_ws *ws = arg;
+ ws_frame *frame;
+ nng_aio *aio;
+
+ ws_stop(ws);
nni_mtx_lock(&ws->mtx);
while ((frame = nni_list_first(&ws->rxq)) != NULL) {
@@ -1450,6 +1456,7 @@ ws_init(nni_ws **wsp)
ws->ops.s_close = ws_str_close;
ws->ops.s_free = ws_str_free;
+ ws->ops.s_stop = ws_stop;
ws->ops.s_send = ws_str_send;
ws->ops.s_recv = ws_str_recv;
ws->ops.s_get = ws_str_get;
@@ -1460,10 +1467,11 @@ ws_init(nni_ws **wsp)
}
static void
-ws_listener_free(void *arg)
+ws_listener_stop(void *arg)
{
- nni_ws_listener *l = arg;
- ws_header *hdr;
+ nni_ws_listener *l = arg;
+ nni_http_handler *h;
+ nni_http_server *s;
ws_listener_close(l);
@@ -1471,16 +1479,27 @@ ws_listener_free(void *arg)
while (!nni_list_empty(&l->reply)) {
nni_cv_wait(&l->cv);
}
+ h = l->handler;
+ s = l->server;
+ l->handler = NULL;
+ l->server = NULL;
nni_mtx_unlock(&l->mtx);
- if (l->handler != NULL) {
- nni_http_handler_fini(l->handler);
- l->handler = NULL;
+ if (h != NULL) {
+ nni_http_handler_fini(h);
}
- if (l->server != NULL) {
- nni_http_server_fini(l->server);
- l->server = NULL;
+ if (s != NULL) {
+ nni_http_server_fini(s);
}
+}
+
+static void
+ws_listener_free(void *arg)
+{
+ nni_ws_listener *l = arg;
+ ws_header *hdr;
+
+ ws_listener_stop(l);
nni_cv_fini(&l->cv);
nni_mtx_fini(&l->mtx);
@@ -2148,6 +2167,7 @@ nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url)
l->isstream = true;
l->ops.sl_free = ws_listener_free;
l->ops.sl_close = ws_listener_close;
+ l->ops.sl_stop = ws_listener_stop;
l->ops.sl_accept = ws_listener_accept;
l->ops.sl_listen = ws_listener_listen;
l->ops.sl_set = ws_listener_set;
@@ -2255,16 +2275,43 @@ err:
}
static void
-ws_dialer_free(void *arg)
+ws_dialer_close(void *arg)
{
nni_ws_dialer *d = arg;
- ws_header *hdr;
+ nni_ws *ws;
+ nni_mtx_lock(&d->mtx);
+ if (d->closed) {
+ nni_mtx_unlock(&d->mtx);
+ return;
+ }
+ d->closed = true;
+ NNI_LIST_FOREACH (&d->wspend, ws) {
+ nni_aio_close(&ws->connaio);
+ nni_aio_close(&ws->httpaio);
+ }
+ nni_mtx_unlock(&d->mtx);
+}
+static void
+ws_dialer_stop(void *arg)
+{
+ nni_ws_dialer *d = arg;
+
+ ws_dialer_close(d);
nni_mtx_lock(&d->mtx);
while (!nni_list_empty(&d->wspend)) {
nni_cv_wait(&d->cv);
}
nni_mtx_unlock(&d->mtx);
+}
+
+static void
+ws_dialer_free(void *arg)
+{
+ nni_ws_dialer *d = arg;
+ ws_header *hdr;
+
+ ws_dialer_stop(d);
nni_strfree(d->proto);
while ((hdr = nni_list_first(&d->headers)) != NULL) {
@@ -2285,24 +2332,6 @@ ws_dialer_free(void *arg)
}
static void
-ws_dialer_close(void *arg)
-{
- nni_ws_dialer *d = arg;
- nni_ws *ws;
- nni_mtx_lock(&d->mtx);
- if (d->closed) {
- nni_mtx_unlock(&d->mtx);
- return;
- }
- d->closed = true;
- NNI_LIST_FOREACH (&d->wspend, ws) {
- nni_aio_close(&ws->connaio);
- nni_aio_close(&ws->httpaio);
- }
- nni_mtx_unlock(&d->mtx);
-}
-
-static void
ws_dial_cancel(nni_aio *aio, void *arg, int rv)
{
nni_ws *ws = arg;
@@ -2679,6 +2708,7 @@ nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->ops.sd_free = ws_dialer_free;
d->ops.sd_close = ws_dialer_close;
+ d->ops.sd_stop = ws_dialer_stop;
d->ops.sd_dial = ws_dialer_dial;
d->ops.sd_set = ws_dialer_set;
d->ops.sd_get = ws_dialer_get;
diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c
index 781ca1d8..9a28d69b 100644
--- a/src/supplemental/websocket/websocket_test.c
+++ b/src/supplemental/websocket/websocket_test.c
@@ -107,9 +107,13 @@ test_websocket_wildcard(void)
NUTS_TRUE(memcmp(buf1, buf2, 5) == 0);
nng_stream_close(c1);
- nng_stream_free(c1);
nng_stream_close(c2);
+ nng_stream_stop(c1);
+ nng_stream_stop(c2);
+ nng_stream_free(c1);
nng_stream_free(c2);
+ nng_stream_listener_stop(l);
+ nng_stream_dialer_stop(d);
nng_aio_free(daio);
nng_aio_free(laio);
nng_aio_free(aio1);
@@ -206,9 +210,13 @@ test_websocket_conn_props(void)
nng_strfree(str);
nng_stream_close(c1);
- nng_stream_free(c1);
nng_stream_close(c2);
+ nng_stream_stop(c1);
+ nng_stream_stop(c2);
+ nng_stream_free(c1);
nng_stream_free(c2);
+ nng_stream_listener_stop(l);
+ nng_stream_dialer_stop(d);
nng_aio_free(daio);
nng_aio_free(laio);
nng_stream_listener_free(l);
@@ -495,6 +503,7 @@ test_websocket_fragmentation(void)
nng_aio_free(caio);
nng_stream_close(c);
+ nng_stream_stop(c);
nng_stream_free(c);
nng_aio_free(state.aio);
@@ -502,6 +511,8 @@ test_websocket_fragmentation(void)
nng_cv_free(state.cv);
nng_mtx_free(state.lock);
+ nng_stream_dialer_stop(d);
+ nng_stream_listener_stop(l);
nng_free(send_buf, state.total);
nng_free(recv_buf, state.total);
nng_aio_free(daio);