From 81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 12 Dec 2024 17:55:48 -0800 Subject: streams: add explicit stop functions This allows us to explicitly stop streams, dialers, and listeners, before we start tearing down things. This hopefully will be useful in resolving use-after-free bugs in http, tls, and websockets. The new functions are not yet documented, but they are nng_stream_stop, nng_stream_dialer_stop, and nng_stream_listener_stop. They should be called after close, and before free. The close functions now close without blocking, but the stop function is allowed to block. --- demo/stream/stream.c | 7 +- include/nng/nng.h | 3 + src/core/platform.h | 5 ++ src/core/sockfd.c | 7 ++ src/core/stream.c | 37 +++++++++- src/core/stream.h | 5 +- src/core/tcp.c | 37 ++++++++-- src/platform/posix/posix_ipcconn.c | 25 +++++-- src/platform/posix/posix_ipcdial.c | 14 +++- src/platform/posix/posix_ipclisten.c | 30 ++++++--- src/platform/posix/posix_sockfd.c | 19 ++++-- src/platform/posix/posix_tcpconn.c | 25 +++++-- src/platform/posix/posix_tcpdial.c | 12 +++- src/platform/posix/posix_tcplisten.c | 12 +++- src/platform/tcp_stream_test.c | 8 ++- src/platform/windows/win_ipcconn.c | 11 ++- src/platform/windows/win_ipcdial.c | 10 ++- src/platform/windows/win_ipclisten.c | 16 ++++- src/platform/windows/win_tcpconn.c | 26 ++++---- src/platform/windows/win_tcpdial.c | 11 ++- src/platform/windows/win_tcplisten.c | 7 ++ src/sp/transport/ipc/ipc.c | 3 + src/sp/transport/tcp/tcp.c | 3 + src/sp/transport/tls/tls.c | 2 + src/sp/transport/ws/websocket.c | 7 +- src/supplemental/http/http_client.c | 2 + src/supplemental/http/http_server.c | 1 + src/supplemental/tls/tls_common.c | 39 ++++++++--- src/supplemental/tls/tls_test.c | 39 ++++++++--- src/supplemental/websocket/websocket.c | 100 ++++++++++++++++++---------- src/supplemental/websocket/websocket_test.c | 15 ++++- 31 files changed, 424 insertions(+), 114 deletions(-) diff --git a/demo/stream/stream.c b/demo/stream/stream.c index c6cc9b23..5a4995e6 100644 --- a/demo/stream/stream.c +++ b/demo/stream/stream.c @@ -55,7 +55,7 @@ int client(const char *url) { nng_stream_dialer *dialer; - nng_aio * aio; + nng_aio *aio; nng_iov iov; int rv; @@ -102,6 +102,11 @@ client(const char *url) // Send ELCOSE to send/recv associated wit this stream free(iov.iov_buf); + + // stop everything before freeing + nng_stream_stop(c1); + nng_stream_dialer_stop(dialer); + nng_stream_free(c1); nng_aio_free(aio); nng_stream_dialer_free(dialer); diff --git a/include/nng/nng.h b/include/nng/nng.h index 0ce5f46c..d087805d 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -1137,6 +1137,7 @@ typedef struct nng_stream_listener nng_stream_listener; NNG_DECL void nng_stream_free(nng_stream *); NNG_DECL void nng_stream_close(nng_stream *); +NNG_DECL void nng_stream_stop(nng_stream *); NNG_DECL void nng_stream_send(nng_stream *, nng_aio *); NNG_DECL void nng_stream_recv(nng_stream *, nng_aio *); NNG_DECL int nng_stream_get_bool(nng_stream *, const char *, bool *); @@ -1152,6 +1153,7 @@ NNG_DECL int nng_stream_dialer_alloc_url( nng_stream_dialer **, const nng_url *); NNG_DECL void nng_stream_dialer_free(nng_stream_dialer *); NNG_DECL void nng_stream_dialer_close(nng_stream_dialer *); +NNG_DECL void nng_stream_dialer_stop(nng_stream_dialer *); NNG_DECL void nng_stream_dialer_dial(nng_stream_dialer *, nng_aio *); NNG_DECL int nng_stream_dialer_get_bool( nng_stream_dialer *, const char *, bool *); @@ -1193,6 +1195,7 @@ NNG_DECL int nng_stream_listener_alloc_url( nng_stream_listener **, const nng_url *); NNG_DECL void nng_stream_listener_free(nng_stream_listener *); NNG_DECL void nng_stream_listener_close(nng_stream_listener *); +NNG_DECL void nng_stream_listener_stop(nng_stream_listener *); NNG_DECL int nng_stream_listener_listen(nng_stream_listener *); NNG_DECL void nng_stream_listener_accept(nng_stream_listener *, nng_aio *); NNG_DECL int nng_stream_listener_get_bool( diff --git a/src/core/platform.h b/src/core/platform.h index d2409e5b..2f5b5c17 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -293,6 +293,7 @@ extern void nni_tcp_dialer_fini(nni_tcp_dialer *); // Further operations on it should return NNG_ECLOSED. // Any in-progress connection will be aborted. extern void nni_tcp_dialer_close(nni_tcp_dialer *); +extern void nni_tcp_dialer_stop(nni_tcp_dialer *); // nni_tcp_dial attempts to create an outgoing connection, // asynchronously, to the address in the aio. On success, the first (and only) @@ -318,6 +319,10 @@ extern void nni_tcp_listener_fini(nni_tcp_listener *); // any bound socket, and further operations will result in NNG_ECLOSED. extern void nni_tcp_listener_close(nni_tcp_listener *); +// nni_tcp_listener_stop is close + waits for any operations to stop, +// so there won't be any further accepts after this. +extern void nni_tcp_listener_stop(nni_tcp_listener *); + // nni_tcp_listener_listen creates the socket in listening mode, bound // to the specified address. extern int nni_tcp_listener_listen(nni_tcp_listener *, const nni_sockaddr *); diff --git a/src/core/sockfd.c b/src/core/sockfd.c index cedd7436..787a0783 100644 --- a/src/core/sockfd.c +++ b/src/core/sockfd.c @@ -63,6 +63,12 @@ sfd_listener_close(void *arg) nni_mtx_unlock(&l->mtx); } +static void +sfd_listener_stop(void *arg) +{ + sfd_listener_close(arg); +} + static int sfd_listener_listen(void *arg) { @@ -222,6 +228,7 @@ nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->ops.sl_free = sfd_listener_free; l->ops.sl_close = sfd_listener_close; + l->ops.sl_stop = sfd_listener_stop; l->ops.sl_listen = sfd_listener_listen; l->ops.sl_accept = sfd_listener_accept; l->ops.sl_get = sfd_listener_get; diff --git a/src/core/stream.c b/src/core/stream.c index ad026586..16e11eca 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -116,7 +116,17 @@ static struct { void nng_stream_close(nng_stream *s) { - s->s_close(s); + if (s != NULL) { + s->s_close(s); + } +} + +void +nng_stream_stop(nng_stream *s) +{ + if (s != NULL) { + s->s_stop(s); + } } void @@ -149,7 +159,17 @@ nni_stream_get( void nng_stream_dialer_close(nng_stream_dialer *d) { - d->sd_close(d); + if (d != NULL) { + d->sd_close(d); + } +} + +void +nng_stream_dialer_stop(nng_stream_dialer *d) +{ + if (d != NULL) { + d->sd_stop(d); + } } void @@ -226,8 +246,19 @@ nni_stream_dialer_set_tls(nng_stream_dialer *d, nng_tls_config *cfg) void nng_stream_listener_close(nng_stream_listener *l) { - l->sl_close(l); + if (l != NULL) { + l->sl_close(l); + } +} + +void +nng_stream_listener_stop(nng_stream_listener *l) +{ + if (l != NULL) { + l->sl_stop(l); + } } + void nng_stream_listener_free(nng_stream_listener *l) { diff --git a/src/core/stream.h b/src/core/stream.h index 9ea65834..e09dc9e5 100644 --- a/src/core/stream.h +++ b/src/core/stream.h @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -43,6 +43,7 @@ extern int nni_stream_listener_get_tls( struct nng_stream { void (*s_free)(void *); void (*s_close)(void *); + void (*s_stop)(void *); void (*s_recv)(void *, nng_aio *); void (*s_send)(void *, nng_aio *); int (*s_get)(void *, const char *, void *, size_t *, nni_type); @@ -53,6 +54,7 @@ struct nng_stream { struct nng_stream_dialer { void (*sd_free)(void *); void (*sd_close)(void *); + void (*sd_stop)(void *); void (*sd_dial)(void *, nng_aio *); int (*sd_get)(void *, const char *, void *, size_t *, nni_type); int (*sd_set)(void *, const char *, const void *, size_t, nni_type); @@ -65,6 +67,7 @@ struct nng_stream_dialer { struct nng_stream_listener { void (*sl_free)(void *); void (*sl_close)(void *); + void (*sl_stop)(void *); int (*sl_listen)(void *); void (*sl_accept)(void *, nng_aio *); int (*sl_get)(void *, const char *, void *, size_t *, nni_type); diff --git a/src/core/tcp.c b/src/core/tcp.c index 5d324a13..d2e08493 100644 --- a/src/core/tcp.c +++ b/src/core/tcp.c @@ -104,6 +104,8 @@ tcp_dial_con_cb(void *arg) if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) { if (rv == 0) { // Make sure we discard the underlying connection. + nng_stream_close(nni_aio_get_output(d->conaio, 0)); + nng_stream_stop(nni_aio_get_output(d->conaio, 0)); nng_stream_free(nni_aio_get_output(d->conaio, 0)); nni_aio_set_output(d->conaio, 0, NULL); } @@ -127,14 +129,26 @@ tcp_dialer_close(void *arg) { tcp_dialer *d = arg; nni_aio *aio; - nni_mtx_lock(&d->mtx); - d->closed = true; - while ((aio = nni_list_first(&d->conaios)) != NULL) { - nni_list_remove(&d->conaios, aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + + if (d != NULL) { + nni_mtx_lock(&d->mtx); + d->closed = true; + while ((aio = nni_list_first(&d->conaios)) != NULL) { + nni_list_remove(&d->conaios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_tcp_dialer_close(d->d); + nni_mtx_unlock(&d->mtx); + } +} + +static void +tcp_dialer_stop(void *arg) +{ + tcp_dialer *d = arg; + if (d != NULL) { + nni_tcp_dialer_stop(d->d); } - nni_tcp_dialer_close(d->d); - nni_mtx_unlock(&d->mtx); } static void @@ -222,6 +236,7 @@ tcp_dialer_alloc(tcp_dialer **dp) d->ops.sd_close = tcp_dialer_close; d->ops.sd_free = tcp_dialer_free; + d->ops.sd_stop = tcp_dialer_stop; d->ops.sd_dial = tcp_dialer_dial; d->ops.sd_get = tcp_dialer_get; d->ops.sd_set = tcp_dialer_set; @@ -275,6 +290,13 @@ tcp_listener_close(void *arg) nni_tcp_listener_close(l->l); } +static void +tcp_listener_stop(void *arg) +{ + tcp_listener *l = arg; + nni_tcp_listener_stop(l->l); +} + static void tcp_listener_free(void *arg) { @@ -372,6 +394,7 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa) l->ops.sl_free = tcp_listener_free; l->ops.sl_close = tcp_listener_close; + l->ops.sl_stop = tcp_listener_stop; l->ops.sl_listen = tcp_listener_listen; l->ops.sl_accept = tcp_listener_accept; l->ops.sl_get = tcp_listener_get; diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 7f979a0a..8b11b64f 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c) } static void -ipc_reap(void *arg) +ipc_stop(void *arg) { - ipc_conn *c = arg; + ipc_conn *c = arg; + nni_posix_pfd *pfd; + ipc_close(c); - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); + pfd = c->pfd; + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); } - nni_mtx_fini(&c->mtx); +} +static void +ipc_reap(void *arg) +{ + ipc_conn *c = arg; + ipc_stop(c); + + nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { nni_posix_ipc_dialer_rele(c->dialer); } @@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d) c->dialer = d; c->stream.s_free = ipc_free; c->stream.s_close = ipc_close; + c->stream.s_stop = ipc_stop; c->stream.s_send = ipc_send; c->stream.s_recv = ipc_recv; c->stream.s_get = ipc_get; diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index 7799aca2..667d8262 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -41,6 +41,7 @@ ipc_dialer_close(void *arg) c->dial_aio = NULL; nni_aio_set_prov_data(aio, NULL); nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } nni_aio_finish_error(aio, NNG_ECLOSED); @@ -49,6 +50,13 @@ ipc_dialer_close(void *arg) nni_mtx_unlock(&d->mtx); } +static void +ipc_dialer_stop(void *arg) +{ + nni_ipc_dialer *d = arg; + ipc_dialer_close(d); +} + static void ipc_dialer_fini(ipc_dialer *d) { @@ -61,7 +69,7 @@ ipc_dialer_free(void *arg) { ipc_dialer *d = arg; - ipc_dialer_close(d); + ipc_dialer_stop(d); nni_atomic_set_bool(&d->fini, true); nni_posix_ipc_dialer_rele(d); } @@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } @@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) if (rv != 0) { nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); return; @@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio) error: nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } @@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->closed = false; d->sd.sd_free = ipc_dialer_free; d->sd.sd_close = ipc_dialer_close; + d->sd.sd_stop = ipc_dialer_stop; d->sd.sd_dial = ipc_dialer_dial; d->sd.sd_get = ipc_dialer_get; d->sd.sd_set = ipc_dialer_set; diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index bd938841..a122d7bb 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -72,6 +72,23 @@ ipc_listener_close(void *arg) nni_mtx_unlock(&l->mtx); } +static void +ipc_listener_stop(void *arg) +{ + ipc_listener *l = arg; + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + pfd = l->pfd; + l->pfd = NULL; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } +} + static void ipc_listener_doaccept(ipc_listener *l) { @@ -132,6 +149,7 @@ ipc_listener_doaccept(ipc_listener *l) } if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -406,17 +424,10 @@ ipc_listener_listen(void *arg) static void ipc_listener_free(void *arg) { - ipc_listener *l = arg; - nni_posix_pfd *pfd; + ipc_listener *l = arg; - nni_mtx_lock(&l->mtx); - ipc_listener_doclose(l); - pfd = l->pfd; - nni_mtx_unlock(&l->mtx); + ipc_listener_stop(l); - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -507,6 +518,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->perms = 0; l->sl.sl_free = ipc_listener_free; l->sl.sl_close = ipc_listener_close; + l->sl.sl_stop = ipc_listener_stop; l->sl.sl_listen = ipc_listener_listen; l->sl.sl_accept = ipc_listener_accept; l->sl.sl_get = ipc_listener_get; diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c index b0d88a31..997feae6 100644 --- a/src/platform/posix/posix_sockfd.c +++ b/src/platform/posix/posix_sockfd.c @@ -200,16 +200,26 @@ sfd_close(void *arg) nni_mtx_unlock(&c->mtx); } -// sfd_fini may block briefly waiting for the pollq thread. -// To get that out of our context, we simply reap this. static void -sfd_fini(void *arg) +sfd_stop(void *arg) { nni_sfd_conn *c = arg; sfd_close(c); + + // ideally this would *stop* without freeing the pfd if (c->pfd != NULL) { nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; } +} + +// sfd_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +sfd_fini(void *arg) +{ + nni_sfd_conn *c = arg; + sfd_stop(c); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -475,6 +485,7 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) c->stream.s_free = sfd_free; c->stream.s_close = sfd_close; + c->stream.s_stop = sfd_stop; c->stream.s_recv = sfd_recv; c->stream.s_send = sfd_send; c->stream.s_get = sfd_get; @@ -490,4 +501,4 @@ void nni_sfd_close_fd(int fd) { close(fd); -} \ No newline at end of file +} diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index 74b3371b..ce5243b0 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -10,6 +10,7 @@ // #include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" #include #include @@ -199,16 +200,30 @@ tcp_close(void *arg) nni_mtx_unlock(&c->mtx); } +static void +tcp_stop(void *arg) +{ + nni_tcp_conn *c = arg; + nni_posix_pfd *pfd; + tcp_close(c); + + nni_mtx_lock(&c->mtx); + pfd = c->pfd; + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } +} + // tcp_fini may block briefly waiting for the pollq thread. // To get that out of our context, we simply reap this. static void tcp_fini(void *arg) { nni_tcp_conn *c = arg; - tcp_close(c); - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); - } + tcp_stop(c); nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { @@ -221,6 +236,7 @@ static nni_reap_list tcp_reap_list = { .rl_offset = offsetof(nni_tcp_conn, reap), .rl_func = tcp_fini, }; + static void tcp_free(void *arg) { @@ -454,6 +470,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) nni_aio_list_init(&c->writeq); c->stream.s_free = tcp_free; + c->stream.s_stop = tcp_stop; c->stream.s_close = tcp_close; c->stream.s_recv = tcp_recv; c->stream.s_send = tcp_send; diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 73ba4394..52ea6cff 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -77,9 +77,15 @@ tcp_dialer_fini(nni_tcp_dialer *d) } void -nni_tcp_dialer_fini(nni_tcp_dialer *d) +nni_tcp_dialer_stop(nni_tcp_dialer *d) { nni_tcp_dialer_close(d); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_stop(d); nni_atomic_set_bool(&d->fini, true); nni_posix_tcp_dialer_rele(d); } @@ -112,6 +118,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); + nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } @@ -263,6 +271,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) error: nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); + nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index a38411c5..32f0fd60 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -150,6 +150,7 @@ tcp_listener_doaccept(nni_tcp_listener *l) if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { close(newfd); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -280,18 +281,25 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) } void -nni_tcp_listener_fini(nni_tcp_listener *l) +nni_tcp_listener_stop(nni_tcp_listener *l) { nni_posix_pfd *pfd; nni_mtx_lock(&l->mtx); tcp_listener_doclose(l); - pfd = l->pfd; + pfd = l->pfd; + l->pfd = NULL; nni_mtx_unlock(&l->mtx); if (pfd != NULL) { nni_posix_pfd_fini(pfd); } +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_tcp_listener_stop(l); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } diff --git a/src/platform/tcp_stream_test.c b/src/platform/tcp_stream_test.c index b46d2d1c..708120c3 100644 --- a/src/platform/tcp_stream_test.c +++ b/src/platform/tcp_stream_test.c @@ -116,8 +116,10 @@ test_tcp_stream(void) NUTS_TRUE(sa2.s_in.sa_port == sa.s_in.sa_port); nng_stream_listener_close(l); - nng_stream_listener_free(l); nng_stream_dialer_close(d); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); + nng_stream_listener_free(l); nng_stream_dialer_free(d); nng_aio_free(aio1); nng_aio_free(aio2); @@ -125,8 +127,10 @@ test_tcp_stream(void) nng_aio_free(laio); nng_aio_free(maio); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); } diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index 5f540a6c..56a688a4 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -352,7 +352,7 @@ ipc_close(void *arg) } static void -ipc_free(void *arg) +ipc_stop(void *arg) { ipc_conn *c = arg; nni_aio *aio; @@ -381,6 +381,14 @@ ipc_free(void *arg) DisconnectNamedPipe(f); CloseHandle(f); } +} + +static void +ipc_free(void *arg) +{ + ipc_conn *c = arg; + + ipc_stop(c); nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); @@ -460,6 +468,7 @@ nni_win_ipc_init( c->sa = *sa; c->stream.s_free = ipc_free; c->stream.s_close = ipc_close; + c->stream.s_stop = ipc_stop; c->stream.s_send = ipc_send; c->stream.s_recv = ipc_recv; c->stream.s_get = ipc_get; diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c index cb826308..347ee39b 100644 --- a/src/platform/windows/win_ipcdial.c +++ b/src/platform/windows/win_ipcdial.c @@ -192,10 +192,17 @@ ipc_dialer_close(void *arg) } static void -ipc_dialer_free(void *arg) +ipc_dialer_stop(void *arg) { ipc_dialer *d = arg; ipc_dialer_close(d); +} + +static void +ipc_dialer_free(void *arg) +{ + ipc_dialer *d = arg; + ipc_dialer_stop(d); if (d->path) { nni_strfree(d->path); } @@ -260,6 +267,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->closed = false; d->sd.sd_free = ipc_dialer_free; d->sd.sd_close = ipc_dialer_close; + d->sd.sd_stop = ipc_dialer_stop; d->sd.sd_dial = ipc_dialer_dial; d->sd.sd_get = ipc_dialer_get; d->sd.sd_set = ipc_dialer_set; diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index 98cb8273..d09c98c7 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -300,8 +300,18 @@ ipc_listener_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - bool accepting = l->accepting; nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_stop(void *arg) +{ + ipc_listener *l = arg; + + ipc_listener_close(l); + + nni_mtx_lock(&l->mtx); + bool accepting = l->accepting; // This craziness because CancelIoEx on ConnectNamedPipe // seems to be incredibly unreliable. It does work, sometimes, @@ -309,6 +319,7 @@ ipc_listener_close(void *arg) // to be retired in favor of UNIX domain sockets anyway. while (accepting) { + nni_mtx_unlock(&l->mtx); if (!CancelIoEx(l->f, &l->io.olpd)) { // operation not found probably // We just inject a safety sleep to @@ -323,8 +334,8 @@ ipc_listener_close(void *arg) nng_msleep(100); nni_mtx_lock(&l->mtx); accepting = l->accepting; - nni_mtx_unlock(&l->mtx); } + nni_mtx_unlock(&l->mtx); DisconnectNamedPipe(l->f); CloseHandle(l->f); } @@ -360,6 +371,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->sec_attr.bInheritHandle = FALSE; l->sa.s_ipc.sa_family = NNG_AF_IPC; l->sl.sl_free = ipc_listener_free; + l->sl.sl_stop = ipc_listener_stop; l->sl.sl_close = ipc_listener_close; l->sl.sl_listen = ipc_listener_listen; l->sl.sl_accept = ipc_listener_accept; diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index 3f3acd17..2f2fd378 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -180,6 +180,7 @@ tcp_send_start(nni_tcp_conn *c) return; } } + nni_cv_wake(&c->cv); } static void @@ -266,17 +267,6 @@ tcp_close(void *arg) closesocket(s); } } - now = nni_clock(); - // wait up to a maximum of 10 seconds before assuming something is - // badly amiss. from what we can tell, this doesn't happen, and we do - // see the timer expire properly, but this safeguard can prevent a - // hang. - while ((c->recving || c->sending) && - ((nni_clock() - now) < (NNI_SECOND * 10))) { - nni_mtx_unlock(&c->mtx); - nni_msleep(1); - nni_mtx_lock(&c->mtx); - } nni_mtx_unlock(&c->mtx); } @@ -369,21 +359,28 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) } static void -tcp_free(void *arg) +tcp_stop(void *arg) { nni_tcp_conn *c = arg; tcp_close(c); nni_mtx_lock(&c->mtx); - while ((!nni_list_empty(&c->recv_aios)) || + while (c->recving || c->sending || (!nni_list_empty(&c->recv_aios)) || (!nni_list_empty(&c->send_aios))) { nni_cv_wait(&c->cv); } nni_mtx_unlock(&c->mtx); - if (c->s != INVALID_SOCKET) { closesocket(c->s); } +} + +static void +tcp_free(void *arg) +{ + nni_tcp_conn *c = arg; + tcp_stop(c); + nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -410,6 +407,7 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s) nni_aio_list_init(&c->send_aios); c->conn_aio = NULL; c->ops.s_close = tcp_close; + c->ops.s_stop = tcp_stop; c->ops.s_free = tcp_free; c->ops.s_send = tcp_send; c->ops.s_recv = tcp_recv; diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 5492d265..c0385648 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -95,9 +95,16 @@ static nni_reap_list tcp_dialer_reap_list = { }; void -nni_tcp_dialer_fini(nni_tcp_dialer *d) +nni_tcp_dialer_stop(nni_tcp_dialer *d) { nni_tcp_dialer_close(d); + // TODO: wait for conn_io.olpd? +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_stop(d); nni_mtx_lock(&d->mtx); if (!nni_list_empty(&d->aios)) { nni_mtx_unlock(&d->mtx); @@ -155,6 +162,8 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) nni_mtx_unlock(&d->mtx); if (rv != 0) { + nng_stream_close(&c->ops); + nng_stream_stop(&c->ops); nng_stream_free(&c->ops); nni_aio_finish_error(aio, rv); } else { diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 021483d4..4e4fb090 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -177,6 +177,13 @@ static nni_reap_list tcp_listener_reap_list = { .rl_func = (nni_cb) nni_tcp_listener_fini, }; +void +nni_tcp_listener_stop(nni_tcp_listener *l) +{ + nni_tcp_listener_close(l); + // TODO: maybe wait for l->l_accept_io.olpd to finish? +} + void nni_tcp_listener_fini(nni_tcp_listener *l) { diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c index dadb1909..c659b8fd 100644 --- a/src/sp/transport/ipc/ipc.c +++ b/src/sp/transport/ipc/ipc.c @@ -111,6 +111,7 @@ ipc_pipe_stop(void *arg) nni_aio_stop(&p->rx_aio); nni_aio_stop(&p->tx_aio); nni_aio_stop(&p->neg_aio); + nng_stream_stop(p->conn); nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); nni_mtx_unlock(&ep->mtx); @@ -655,6 +656,8 @@ ipc_ep_stop(void *arg) nni_aio_stop(&ep->time_aio); nni_aio_stop(&ep->conn_aio); + nng_stream_dialer_stop(ep->dialer); + nng_stream_listener_stop(ep->listener); } static void diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 2497b899..51991cbd 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -124,6 +124,7 @@ tcptran_pipe_stop(void *arg) nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negoaio); + nng_stream_stop(p->conn); } static int @@ -674,6 +675,8 @@ tcptran_ep_stop(void *arg) nni_aio_stop(ep->timeaio); nni_aio_stop(ep->connaio); + nng_stream_dialer_stop(ep->dialer); + nng_stream_listener_stop(ep->listener); } static void diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c index 7a907e8e..dcb100f5 100644 --- a/src/sp/transport/tls/tls.c +++ b/src/sp/transport/tls/tls.c @@ -657,6 +657,8 @@ tlstran_ep_stop(void *arg) nni_aio_stop(ep->timeaio); nni_aio_stop(ep->connaio); + nng_stream_dialer_stop(ep->dialer); + nng_stream_listener_stop(ep->listener); } static void diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c index 27cc6434..16929ec9 100644 --- a/src/sp/transport/ws/websocket.c +++ b/src/sp/transport/ws/websocket.c @@ -183,6 +183,7 @@ wstran_pipe_stop(void *arg) nni_aio_stop(&p->rxaio); nni_aio_stop(&p->txaio); + nng_stream_stop(p->ws); } static int @@ -214,9 +215,7 @@ wstran_pipe_close(void *arg) nni_aio_close(&p->rxaio); nni_aio_close(&p->txaio); - nni_mtx_lock(&p->mtx); nng_stream_close(p->ws); - nni_mtx_unlock(&p->mtx); } static int @@ -373,6 +372,7 @@ wstran_dialer_stop(void *arg) ws_dialer *d = arg; nni_aio_stop(&d->connaio); + nng_stream_dialer_stop(d->dialer); } static void @@ -392,6 +392,7 @@ wstran_listener_stop(void *arg) ws_listener *l = arg; nni_aio_stop(&l->accaio); + nng_stream_listener_stop(l->listener); } static void @@ -421,6 +422,7 @@ wstran_connect_cb(void *arg) } if ((uaio = nni_list_first(&d->aios)) == NULL) { // The client stopped caring about this! + nng_stream_stop(ws); nng_stream_free(ws); nni_mtx_unlock(&d->mtx); return; @@ -430,6 +432,7 @@ wstran_connect_cb(void *arg) if ((rv = nni_aio_result(caio)) != 0) { nni_aio_finish_error(uaio, rv); } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { + nng_stream_stop(ws); nng_stream_free(ws); nni_aio_finish_error(uaio, rv); } else { diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index af5bc717..03125abf 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -89,6 +89,8 @@ http_dial_cb(void *arg) void nni_http_client_fini(nni_http_client *c) { + nni_aio_stop(c->aio); + nng_stream_dialer_stop(c->dialer); nni_aio_free(c->aio); nng_stream_dialer_free(c->dialer); nni_mtx_fini(&c->mtx); diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 2240492f..0d5d9cb0 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -895,6 +895,7 @@ http_server_fini(nni_http_server *s) http_error *epage; nni_aio_stop(s->accaio); + nng_stream_listener_stop(s->listener); nni_mtx_lock(&s->mtx); NNI_ASSERT(nni_list_empty(&s->conns)); diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index c04d03a5..a871e74e 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -126,6 +126,14 @@ tls_dialer_free(void *arg) } } +static void +tls_dialer_stop(void *arg) +{ + tls_dialer *d = arg; + + nng_stream_dialer_stop(d->d); +} + // For dialing, we need to have our own completion callback, instead of // the user's completion callback. @@ -281,6 +289,7 @@ nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->ops.sd_close = tls_dialer_close; d->ops.sd_free = tls_dialer_free; + d->ops.sd_stop = tls_dialer_stop; d->ops.sd_dial = tls_dialer_dial; d->ops.sd_get = tls_dialer_get; d->ops.sd_set = tls_dialer_set; @@ -306,6 +315,13 @@ tls_listener_close(void *arg) nng_stream_listener_close(l->l); } +static void +tls_listener_stop(void *arg) +{ + tls_listener *l = arg; + nng_stream_listener_close(l->l); +} + static void tls_listener_free(void *arg) { @@ -436,6 +452,7 @@ nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url) } l->ops.sl_free = tls_listener_free; l->ops.sl_close = tls_listener_close; + l->ops.sl_stop = tls_listener_stop; l->ops.sl_accept = tls_listener_accept; l->ops.sl_listen = tls_listener_listen; l->ops.sl_get = tls_listener_get; @@ -526,6 +543,18 @@ tls_close(void *arg) nng_stream_close(conn->tcp); } +static void +tls_stop(void *arg) +{ + tls_conn *conn = arg; + + tls_close(conn); + nng_stream_stop(conn->tcp); + nni_aio_stop(&conn->conn_aio); + nni_aio_stop(&conn->tcp_send); + nni_aio_stop(&conn->tcp_recv); +} + static int tls_get_verified(void *arg, void *buf, size_t *szp, nni_type t) { @@ -624,6 +653,7 @@ tls_alloc(tls_conn **conn_p, nng_tls_config *cfg, nng_aio *user_aio) conn->stream.s_close = tls_close; conn->stream.s_free = tls_free; + conn->stream.s_stop = tls_stop; conn->stream.s_send = tls_send; conn->stream.s_recv = tls_recv; conn->stream.s_get = tls_get; @@ -638,14 +668,7 @@ tls_reap(void *arg) { tls_conn *conn = arg; - // Shut it all down first. We should be freed. - if (conn->tcp != NULL) { - nng_stream_close(conn->tcp); - } - nni_aio_stop(&conn->conn_aio); - nni_aio_stop(&conn->tcp_send); - nni_aio_stop(&conn->tcp_recv); - + tls_stop(conn); conn->ops.fini((void *) (conn + 1)); nni_aio_fini(&conn->conn_aio); nni_aio_fini(&conn->tcp_send); diff --git a/src/supplemental/tls/tls_test.c b/src/supplemental/tls/tls_test.c index 43ce0c85..517be143 100644 --- a/src/supplemental/tls/tls_test.c +++ b/src/supplemental/tls/tls_test.c @@ -57,6 +57,7 @@ test_tls_conn_refused(void) NUTS_FAIL(nng_aio_result(aio), NNG_ECONNREFUSED); nng_aio_free(aio); + nng_stream_dialer_stop(dialer); nng_stream_dialer_free(dialer); } @@ -133,6 +134,10 @@ test_tls_large_message(void) nng_free(buf1, size); nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -214,8 +219,10 @@ test_tls_ecdsa(void) NUTS_PASS(nuts_stream_wait(t2)); NUTS_TRUE(memcmp(buf1, buf2, size) == 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -224,6 +231,8 @@ test_tls_ecdsa(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -241,6 +250,7 @@ test_tls_garbled_cert(void) c1, nuts_garbled_crt, nuts_server_key, NULL), NNG_ECRYPTO); + nng_stream_listener_stop(l); nng_stream_listener_free(l); nng_tls_config_free(c1); } @@ -318,8 +328,10 @@ test_tls_psk(void) NUTS_PASS(nuts_stream_wait(t2)); NUTS_TRUE(memcmp(buf1, buf2, size) == 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -328,6 +340,8 @@ test_tls_psk(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -408,8 +422,10 @@ test_tls_psk_server_identities(void) NUTS_PASS(nuts_stream_wait(t2)); NUTS_TRUE(memcmp(buf1, buf2, size) == 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -418,6 +434,8 @@ test_tls_psk_server_identities(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -495,8 +513,10 @@ test_tls_psk_bad_identity(void) NUTS_ASSERT(nuts_stream_wait(t1) != 0); NUTS_ASSERT(nuts_stream_wait(t2) != 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -505,6 +525,8 @@ test_tls_psk_bad_identity(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -543,6 +565,7 @@ test_tls_psk_config_busy(void) NUTS_FAIL( nng_tls_config_psk(c1, "identity2", key, sizeof(key)), NNG_EBUSY); + nng_stream_listener_stop(l); nng_stream_listener_free(l); nng_aio_free(aio); nng_tls_config_free(c1); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 9f3f6d0b..e7372a49 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1183,12 +1183,9 @@ ws_close_error(nni_ws *ws, uint16_t code) } static void -ws_fini(void *arg) +ws_stop(void *arg) { - nni_ws *ws = arg; - ws_frame *frame; - nng_aio *aio; - + nni_ws *ws = arg; ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); // Give a chance for the close frame to drain. @@ -1198,7 +1195,6 @@ ws_fini(void *arg) nni_aio_stop(&ws->txaio); nni_aio_stop(&ws->closeaio); nni_aio_stop(&ws->httpaio); - nni_aio_stop(&ws->connaio); if (nni_list_node_active(&ws->node)) { nni_ws_dialer *d; @@ -1210,6 +1206,16 @@ ws_fini(void *arg) nni_mtx_unlock(&d->mtx); } } +} + +static void +ws_fini(void *arg) +{ + nni_ws *ws = arg; + ws_frame *frame; + nng_aio *aio; + + ws_stop(ws); nni_mtx_lock(&ws->mtx); while ((frame = nni_list_first(&ws->rxq)) != NULL) { @@ -1450,6 +1456,7 @@ ws_init(nni_ws **wsp) ws->ops.s_close = ws_str_close; ws->ops.s_free = ws_str_free; + ws->ops.s_stop = ws_stop; ws->ops.s_send = ws_str_send; ws->ops.s_recv = ws_str_recv; ws->ops.s_get = ws_str_get; @@ -1460,10 +1467,11 @@ ws_init(nni_ws **wsp) } static void -ws_listener_free(void *arg) +ws_listener_stop(void *arg) { - nni_ws_listener *l = arg; - ws_header *hdr; + nni_ws_listener *l = arg; + nni_http_handler *h; + nni_http_server *s; ws_listener_close(l); @@ -1471,16 +1479,27 @@ ws_listener_free(void *arg) while (!nni_list_empty(&l->reply)) { nni_cv_wait(&l->cv); } + h = l->handler; + s = l->server; + l->handler = NULL; + l->server = NULL; nni_mtx_unlock(&l->mtx); - if (l->handler != NULL) { - nni_http_handler_fini(l->handler); - l->handler = NULL; + if (h != NULL) { + nni_http_handler_fini(h); } - if (l->server != NULL) { - nni_http_server_fini(l->server); - l->server = NULL; + if (s != NULL) { + nni_http_server_fini(s); } +} + +static void +ws_listener_free(void *arg) +{ + nni_ws_listener *l = arg; + ws_header *hdr; + + ws_listener_stop(l); nni_cv_fini(&l->cv); nni_mtx_fini(&l->mtx); @@ -2148,6 +2167,7 @@ nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url) l->isstream = true; l->ops.sl_free = ws_listener_free; l->ops.sl_close = ws_listener_close; + l->ops.sl_stop = ws_listener_stop; l->ops.sl_accept = ws_listener_accept; l->ops.sl_listen = ws_listener_listen; l->ops.sl_set = ws_listener_set; @@ -2255,16 +2275,43 @@ err: } static void -ws_dialer_free(void *arg) +ws_dialer_close(void *arg) { nni_ws_dialer *d = arg; - ws_header *hdr; + nni_ws *ws; + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + return; + } + d->closed = true; + NNI_LIST_FOREACH (&d->wspend, ws) { + nni_aio_close(&ws->connaio); + nni_aio_close(&ws->httpaio); + } + nni_mtx_unlock(&d->mtx); +} +static void +ws_dialer_stop(void *arg) +{ + nni_ws_dialer *d = arg; + + ws_dialer_close(d); nni_mtx_lock(&d->mtx); while (!nni_list_empty(&d->wspend)) { nni_cv_wait(&d->cv); } nni_mtx_unlock(&d->mtx); +} + +static void +ws_dialer_free(void *arg) +{ + nni_ws_dialer *d = arg; + ws_header *hdr; + + ws_dialer_stop(d); nni_strfree(d->proto); while ((hdr = nni_list_first(&d->headers)) != NULL) { @@ -2284,24 +2331,6 @@ ws_dialer_free(void *arg) NNI_FREE_STRUCT(d); } -static void -ws_dialer_close(void *arg) -{ - nni_ws_dialer *d = arg; - nni_ws *ws; - nni_mtx_lock(&d->mtx); - if (d->closed) { - nni_mtx_unlock(&d->mtx); - return; - } - d->closed = true; - NNI_LIST_FOREACH (&d->wspend, ws) { - nni_aio_close(&ws->connaio); - nni_aio_close(&ws->httpaio); - } - nni_mtx_unlock(&d->mtx); -} - static void ws_dial_cancel(nni_aio *aio, void *arg, int rv) { @@ -2679,6 +2708,7 @@ nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->ops.sd_free = ws_dialer_free; d->ops.sd_close = ws_dialer_close; + d->ops.sd_stop = ws_dialer_stop; d->ops.sd_dial = ws_dialer_dial; d->ops.sd_set = ws_dialer_set; d->ops.sd_get = ws_dialer_get; diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c index 781ca1d8..9a28d69b 100644 --- a/src/supplemental/websocket/websocket_test.c +++ b/src/supplemental/websocket/websocket_test.c @@ -107,9 +107,13 @@ test_websocket_wildcard(void) NUTS_TRUE(memcmp(buf1, buf2, 5) == 0); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); nng_aio_free(daio); nng_aio_free(laio); nng_aio_free(aio1); @@ -206,9 +210,13 @@ test_websocket_conn_props(void) nng_strfree(str); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); nng_aio_free(daio); nng_aio_free(laio); nng_stream_listener_free(l); @@ -495,6 +503,7 @@ test_websocket_fragmentation(void) nng_aio_free(caio); nng_stream_close(c); + nng_stream_stop(c); nng_stream_free(c); nng_aio_free(state.aio); @@ -502,6 +511,8 @@ test_websocket_fragmentation(void) nng_cv_free(state.cv); nng_mtx_free(state.lock); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_free(send_buf, state.total); nng_free(recv_buf, state.total); nng_aio_free(daio); -- cgit v1.2.3-70-g09d2