diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-12 17:55:48 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-12 17:55:48 -0800 |
| commit | 81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d (patch) | |
| tree | f9f21aa66bd22cfd95ae0c4b8abe57036c8fce0d /src | |
| parent | 371eedeeb6fafe628ae89b9ad2690fa3d6a57e8a (diff) | |
| download | nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.tar.gz nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.tar.bz2 nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.zip | |
streams: add explicit stop functions
This allows us to explicitly stop streams, dialers, and listeners,
before we start tearing down things. This hopefully will be useful
in resolving use-after-free bugs in http, tls, and websockets.
The new functions are not yet documented, but they are
nng_stream_stop, nng_stream_dialer_stop, and nng_stream_listener_stop.
They should be called after close, and before free. The close
functions now close without blocking, but the stop function is
allowed to block.
Diffstat (limited to 'src')
29 files changed, 415 insertions, 113 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index d2409e5b..2f5b5c17 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -293,6 +293,7 @@ extern void nni_tcp_dialer_fini(nni_tcp_dialer *); // Further operations on it should return NNG_ECLOSED. // Any in-progress connection will be aborted. extern void nni_tcp_dialer_close(nni_tcp_dialer *); +extern void nni_tcp_dialer_stop(nni_tcp_dialer *); // nni_tcp_dial attempts to create an outgoing connection, // asynchronously, to the address in the aio. On success, the first (and only) @@ -318,6 +319,10 @@ extern void nni_tcp_listener_fini(nni_tcp_listener *); // any bound socket, and further operations will result in NNG_ECLOSED. extern void nni_tcp_listener_close(nni_tcp_listener *); +// nni_tcp_listener_stop is close + waits for any operations to stop, +// so there won't be any further accepts after this. +extern void nni_tcp_listener_stop(nni_tcp_listener *); + // nni_tcp_listener_listen creates the socket in listening mode, bound // to the specified address. extern int nni_tcp_listener_listen(nni_tcp_listener *, const nni_sockaddr *); diff --git a/src/core/sockfd.c b/src/core/sockfd.c index cedd7436..787a0783 100644 --- a/src/core/sockfd.c +++ b/src/core/sockfd.c @@ -63,6 +63,12 @@ sfd_listener_close(void *arg) nni_mtx_unlock(&l->mtx); } +static void +sfd_listener_stop(void *arg) +{ + sfd_listener_close(arg); +} + static int sfd_listener_listen(void *arg) { @@ -222,6 +228,7 @@ nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->ops.sl_free = sfd_listener_free; l->ops.sl_close = sfd_listener_close; + l->ops.sl_stop = sfd_listener_stop; l->ops.sl_listen = sfd_listener_listen; l->ops.sl_accept = sfd_listener_accept; l->ops.sl_get = sfd_listener_get; diff --git a/src/core/stream.c b/src/core/stream.c index ad026586..16e11eca 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -116,7 +116,17 @@ static struct { void nng_stream_close(nng_stream *s) { - s->s_close(s); + if (s != NULL) { + s->s_close(s); + } +} + +void +nng_stream_stop(nng_stream *s) +{ + if (s != NULL) { + s->s_stop(s); + } } void @@ -149,7 +159,17 @@ nni_stream_get( void nng_stream_dialer_close(nng_stream_dialer *d) { - d->sd_close(d); + if (d != NULL) { + d->sd_close(d); + } +} + +void +nng_stream_dialer_stop(nng_stream_dialer *d) +{ + if (d != NULL) { + d->sd_stop(d); + } } void @@ -226,8 +246,19 @@ nni_stream_dialer_set_tls(nng_stream_dialer *d, nng_tls_config *cfg) void nng_stream_listener_close(nng_stream_listener *l) { - l->sl_close(l); + if (l != NULL) { + l->sl_close(l); + } +} + +void +nng_stream_listener_stop(nng_stream_listener *l) +{ + if (l != NULL) { + l->sl_stop(l); + } } + void nng_stream_listener_free(nng_stream_listener *l) { diff --git a/src/core/stream.h b/src/core/stream.h index 9ea65834..e09dc9e5 100644 --- a/src/core/stream.h +++ b/src/core/stream.h @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -43,6 +43,7 @@ extern int nni_stream_listener_get_tls( struct nng_stream { void (*s_free)(void *); void (*s_close)(void *); + void (*s_stop)(void *); void (*s_recv)(void *, nng_aio *); void (*s_send)(void *, nng_aio *); int (*s_get)(void *, const char *, void *, size_t *, nni_type); @@ -53,6 +54,7 @@ struct nng_stream { struct nng_stream_dialer { void (*sd_free)(void *); void (*sd_close)(void *); + void (*sd_stop)(void *); void (*sd_dial)(void *, nng_aio *); int (*sd_get)(void *, const char *, void *, size_t *, nni_type); int (*sd_set)(void *, const char *, const void *, size_t, nni_type); @@ -65,6 +67,7 @@ struct nng_stream_dialer { struct nng_stream_listener { void (*sl_free)(void *); void (*sl_close)(void *); + void (*sl_stop)(void *); int (*sl_listen)(void *); void (*sl_accept)(void *, nng_aio *); int (*sl_get)(void *, const char *, void *, size_t *, nni_type); diff --git a/src/core/tcp.c b/src/core/tcp.c index 5d324a13..d2e08493 100644 --- a/src/core/tcp.c +++ b/src/core/tcp.c @@ -104,6 +104,8 @@ tcp_dial_con_cb(void *arg) if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) { if (rv == 0) { // Make sure we discard the underlying connection. + nng_stream_close(nni_aio_get_output(d->conaio, 0)); + nng_stream_stop(nni_aio_get_output(d->conaio, 0)); nng_stream_free(nni_aio_get_output(d->conaio, 0)); nni_aio_set_output(d->conaio, 0, NULL); } @@ -127,14 +129,26 @@ tcp_dialer_close(void *arg) { tcp_dialer *d = arg; nni_aio *aio; - nni_mtx_lock(&d->mtx); - d->closed = true; - while ((aio = nni_list_first(&d->conaios)) != NULL) { - nni_list_remove(&d->conaios, aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + + if (d != NULL) { + nni_mtx_lock(&d->mtx); + d->closed = true; + while ((aio = nni_list_first(&d->conaios)) != NULL) { + nni_list_remove(&d->conaios, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_tcp_dialer_close(d->d); + nni_mtx_unlock(&d->mtx); + } +} + +static void +tcp_dialer_stop(void *arg) +{ + tcp_dialer *d = arg; + if (d != NULL) { + nni_tcp_dialer_stop(d->d); } - nni_tcp_dialer_close(d->d); - nni_mtx_unlock(&d->mtx); } static void @@ -222,6 +236,7 @@ tcp_dialer_alloc(tcp_dialer **dp) d->ops.sd_close = tcp_dialer_close; d->ops.sd_free = tcp_dialer_free; + d->ops.sd_stop = tcp_dialer_stop; d->ops.sd_dial = tcp_dialer_dial; d->ops.sd_get = tcp_dialer_get; d->ops.sd_set = tcp_dialer_set; @@ -276,6 +291,13 @@ tcp_listener_close(void *arg) } static void +tcp_listener_stop(void *arg) +{ + tcp_listener *l = arg; + nni_tcp_listener_stop(l->l); +} + +static void tcp_listener_free(void *arg) { tcp_listener *l = arg; @@ -372,6 +394,7 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa) l->ops.sl_free = tcp_listener_free; l->ops.sl_close = tcp_listener_close; + l->ops.sl_stop = tcp_listener_stop; l->ops.sl_listen = tcp_listener_listen; l->ops.sl_accept = tcp_listener_accept; l->ops.sl_get = tcp_listener_get; diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 7f979a0a..8b11b64f 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c) } static void -ipc_reap(void *arg) +ipc_stop(void *arg) { - ipc_conn *c = arg; + ipc_conn *c = arg; + nni_posix_pfd *pfd; + ipc_close(c); - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); + pfd = c->pfd; + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); } - nni_mtx_fini(&c->mtx); +} +static void +ipc_reap(void *arg) +{ + ipc_conn *c = arg; + ipc_stop(c); + + nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { nni_posix_ipc_dialer_rele(c->dialer); } @@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d) c->dialer = d; c->stream.s_free = ipc_free; c->stream.s_close = ipc_close; + c->stream.s_stop = ipc_stop; c->stream.s_send = ipc_send; c->stream.s_recv = ipc_recv; c->stream.s_get = ipc_get; diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index 7799aca2..667d8262 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -41,6 +41,7 @@ ipc_dialer_close(void *arg) c->dial_aio = NULL; nni_aio_set_prov_data(aio, NULL); nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } nni_aio_finish_error(aio, NNG_ECLOSED); @@ -50,6 +51,13 @@ ipc_dialer_close(void *arg) } static void +ipc_dialer_stop(void *arg) +{ + nni_ipc_dialer *d = arg; + ipc_dialer_close(d); +} + +static void ipc_dialer_fini(ipc_dialer *d) { nni_mtx_fini(&d->mtx); @@ -61,7 +69,7 @@ ipc_dialer_free(void *arg) { ipc_dialer *d = arg; - ipc_dialer_close(d); + ipc_dialer_stop(d); nni_atomic_set_bool(&d->fini, true); nni_posix_ipc_dialer_rele(d); } @@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } @@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) if (rv != 0) { nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); return; @@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio) error: nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } @@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->closed = false; d->sd.sd_free = ipc_dialer_free; d->sd.sd_close = ipc_dialer_close; + d->sd.sd_stop = ipc_dialer_stop; d->sd.sd_dial = ipc_dialer_dial; d->sd.sd_get = ipc_dialer_get; d->sd.sd_set = ipc_dialer_set; diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index bd938841..a122d7bb 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -73,6 +73,23 @@ ipc_listener_close(void *arg) } static void +ipc_listener_stop(void *arg) +{ + ipc_listener *l = arg; + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + pfd = l->pfd; + l->pfd = NULL; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } +} + +static void ipc_listener_doaccept(ipc_listener *l) { nni_aio *aio; @@ -132,6 +149,7 @@ ipc_listener_doaccept(ipc_listener *l) } if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -406,17 +424,10 @@ ipc_listener_listen(void *arg) static void ipc_listener_free(void *arg) { - ipc_listener *l = arg; - nni_posix_pfd *pfd; + ipc_listener *l = arg; - nni_mtx_lock(&l->mtx); - ipc_listener_doclose(l); - pfd = l->pfd; - nni_mtx_unlock(&l->mtx); + ipc_listener_stop(l); - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -507,6 +518,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->perms = 0; l->sl.sl_free = ipc_listener_free; l->sl.sl_close = ipc_listener_close; + l->sl.sl_stop = ipc_listener_stop; l->sl.sl_listen = ipc_listener_listen; l->sl.sl_accept = ipc_listener_accept; l->sl.sl_get = ipc_listener_get; diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c index b0d88a31..997feae6 100644 --- a/src/platform/posix/posix_sockfd.c +++ b/src/platform/posix/posix_sockfd.c @@ -200,16 +200,26 @@ sfd_close(void *arg) nni_mtx_unlock(&c->mtx); } -// sfd_fini may block briefly waiting for the pollq thread. -// To get that out of our context, we simply reap this. static void -sfd_fini(void *arg) +sfd_stop(void *arg) { nni_sfd_conn *c = arg; sfd_close(c); + + // ideally this would *stop* without freeing the pfd if (c->pfd != NULL) { nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; } +} + +// sfd_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +sfd_fini(void *arg) +{ + nni_sfd_conn *c = arg; + sfd_stop(c); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -475,6 +485,7 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) c->stream.s_free = sfd_free; c->stream.s_close = sfd_close; + c->stream.s_stop = sfd_stop; c->stream.s_recv = sfd_recv; c->stream.s_send = sfd_send; c->stream.s_get = sfd_get; @@ -490,4 +501,4 @@ void nni_sfd_close_fd(int fd) { close(fd); -}
\ No newline at end of file +} diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index 74b3371b..ce5243b0 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -10,6 +10,7 @@ // #include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <fcntl.h> @@ -199,16 +200,30 @@ tcp_close(void *arg) nni_mtx_unlock(&c->mtx); } +static void +tcp_stop(void *arg) +{ + nni_tcp_conn *c = arg; + nni_posix_pfd *pfd; + tcp_close(c); + + nni_mtx_lock(&c->mtx); + pfd = c->pfd; + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } +} + // tcp_fini may block briefly waiting for the pollq thread. // To get that out of our context, we simply reap this. static void tcp_fini(void *arg) { nni_tcp_conn *c = arg; - tcp_close(c); - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); - } + tcp_stop(c); nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { @@ -221,6 +236,7 @@ static nni_reap_list tcp_reap_list = { .rl_offset = offsetof(nni_tcp_conn, reap), .rl_func = tcp_fini, }; + static void tcp_free(void *arg) { @@ -454,6 +470,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) nni_aio_list_init(&c->writeq); c->stream.s_free = tcp_free; + c->stream.s_stop = tcp_stop; c->stream.s_close = tcp_close; c->stream.s_recv = tcp_recv; c->stream.s_send = tcp_send; diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 73ba4394..52ea6cff 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -77,9 +77,15 @@ tcp_dialer_fini(nni_tcp_dialer *d) } void -nni_tcp_dialer_fini(nni_tcp_dialer *d) +nni_tcp_dialer_stop(nni_tcp_dialer *d) { nni_tcp_dialer_close(d); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_stop(d); nni_atomic_set_bool(&d->fini, true); nni_posix_tcp_dialer_rele(d); } @@ -112,6 +118,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); + nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } @@ -263,6 +271,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) error: nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); + nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index a38411c5..32f0fd60 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -150,6 +150,7 @@ tcp_listener_doaccept(nni_tcp_listener *l) if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { close(newfd); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -280,18 +281,25 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) } void -nni_tcp_listener_fini(nni_tcp_listener *l) +nni_tcp_listener_stop(nni_tcp_listener *l) { nni_posix_pfd *pfd; nni_mtx_lock(&l->mtx); tcp_listener_doclose(l); - pfd = l->pfd; + pfd = l->pfd; + l->pfd = NULL; nni_mtx_unlock(&l->mtx); if (pfd != NULL) { nni_posix_pfd_fini(pfd); } +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_tcp_listener_stop(l); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } diff --git a/src/platform/tcp_stream_test.c b/src/platform/tcp_stream_test.c index b46d2d1c..708120c3 100644 --- a/src/platform/tcp_stream_test.c +++ b/src/platform/tcp_stream_test.c @@ -116,8 +116,10 @@ test_tcp_stream(void) NUTS_TRUE(sa2.s_in.sa_port == sa.s_in.sa_port); nng_stream_listener_close(l); - nng_stream_listener_free(l); nng_stream_dialer_close(d); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); + nng_stream_listener_free(l); nng_stream_dialer_free(d); nng_aio_free(aio1); nng_aio_free(aio2); @@ -125,8 +127,10 @@ test_tcp_stream(void) nng_aio_free(laio); nng_aio_free(maio); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); } diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index 5f540a6c..56a688a4 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -352,7 +352,7 @@ ipc_close(void *arg) } static void -ipc_free(void *arg) +ipc_stop(void *arg) { ipc_conn *c = arg; nni_aio *aio; @@ -381,6 +381,14 @@ ipc_free(void *arg) DisconnectNamedPipe(f); CloseHandle(f); } +} + +static void +ipc_free(void *arg) +{ + ipc_conn *c = arg; + + ipc_stop(c); nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); @@ -460,6 +468,7 @@ nni_win_ipc_init( c->sa = *sa; c->stream.s_free = ipc_free; c->stream.s_close = ipc_close; + c->stream.s_stop = ipc_stop; c->stream.s_send = ipc_send; c->stream.s_recv = ipc_recv; c->stream.s_get = ipc_get; diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c index cb826308..347ee39b 100644 --- a/src/platform/windows/win_ipcdial.c +++ b/src/platform/windows/win_ipcdial.c @@ -192,10 +192,17 @@ ipc_dialer_close(void *arg) } static void -ipc_dialer_free(void *arg) +ipc_dialer_stop(void *arg) { ipc_dialer *d = arg; ipc_dialer_close(d); +} + +static void +ipc_dialer_free(void *arg) +{ + ipc_dialer *d = arg; + ipc_dialer_stop(d); if (d->path) { nni_strfree(d->path); } @@ -260,6 +267,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->closed = false; d->sd.sd_free = ipc_dialer_free; d->sd.sd_close = ipc_dialer_close; + d->sd.sd_stop = ipc_dialer_stop; d->sd.sd_dial = ipc_dialer_dial; d->sd.sd_get = ipc_dialer_get; d->sd.sd_set = ipc_dialer_set; diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index 98cb8273..d09c98c7 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -300,8 +300,18 @@ ipc_listener_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - bool accepting = l->accepting; nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_stop(void *arg) +{ + ipc_listener *l = arg; + + ipc_listener_close(l); + + nni_mtx_lock(&l->mtx); + bool accepting = l->accepting; // This craziness because CancelIoEx on ConnectNamedPipe // seems to be incredibly unreliable. It does work, sometimes, @@ -309,6 +319,7 @@ ipc_listener_close(void *arg) // to be retired in favor of UNIX domain sockets anyway. while (accepting) { + nni_mtx_unlock(&l->mtx); if (!CancelIoEx(l->f, &l->io.olpd)) { // operation not found probably // We just inject a safety sleep to @@ -323,8 +334,8 @@ ipc_listener_close(void *arg) nng_msleep(100); nni_mtx_lock(&l->mtx); accepting = l->accepting; - nni_mtx_unlock(&l->mtx); } + nni_mtx_unlock(&l->mtx); DisconnectNamedPipe(l->f); CloseHandle(l->f); } @@ -360,6 +371,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->sec_attr.bInheritHandle = FALSE; l->sa.s_ipc.sa_family = NNG_AF_IPC; l->sl.sl_free = ipc_listener_free; + l->sl.sl_stop = ipc_listener_stop; l->sl.sl_close = ipc_listener_close; l->sl.sl_listen = ipc_listener_listen; l->sl.sl_accept = ipc_listener_accept; diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index 3f3acd17..2f2fd378 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -180,6 +180,7 @@ tcp_send_start(nni_tcp_conn *c) return; } } + nni_cv_wake(&c->cv); } static void @@ -266,17 +267,6 @@ tcp_close(void *arg) closesocket(s); } } - now = nni_clock(); - // wait up to a maximum of 10 seconds before assuming something is - // badly amiss. from what we can tell, this doesn't happen, and we do - // see the timer expire properly, but this safeguard can prevent a - // hang. - while ((c->recving || c->sending) && - ((nni_clock() - now) < (NNI_SECOND * 10))) { - nni_mtx_unlock(&c->mtx); - nni_msleep(1); - nni_mtx_lock(&c->mtx); - } nni_mtx_unlock(&c->mtx); } @@ -369,21 +359,28 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) } static void -tcp_free(void *arg) +tcp_stop(void *arg) { nni_tcp_conn *c = arg; tcp_close(c); nni_mtx_lock(&c->mtx); - while ((!nni_list_empty(&c->recv_aios)) || + while (c->recving || c->sending || (!nni_list_empty(&c->recv_aios)) || (!nni_list_empty(&c->send_aios))) { nni_cv_wait(&c->cv); } nni_mtx_unlock(&c->mtx); - if (c->s != INVALID_SOCKET) { closesocket(c->s); } +} + +static void +tcp_free(void *arg) +{ + nni_tcp_conn *c = arg; + tcp_stop(c); + nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -410,6 +407,7 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s) nni_aio_list_init(&c->send_aios); c->conn_aio = NULL; c->ops.s_close = tcp_close; + c->ops.s_stop = tcp_stop; c->ops.s_free = tcp_free; c->ops.s_send = tcp_send; c->ops.s_recv = tcp_recv; diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 5492d265..c0385648 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -95,9 +95,16 @@ static nni_reap_list tcp_dialer_reap_list = { }; void -nni_tcp_dialer_fini(nni_tcp_dialer *d) +nni_tcp_dialer_stop(nni_tcp_dialer *d) { nni_tcp_dialer_close(d); + // TODO: wait for conn_io.olpd? +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_stop(d); nni_mtx_lock(&d->mtx); if (!nni_list_empty(&d->aios)) { nni_mtx_unlock(&d->mtx); @@ -155,6 +162,8 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) nni_mtx_unlock(&d->mtx); if (rv != 0) { + nng_stream_close(&c->ops); + nng_stream_stop(&c->ops); nng_stream_free(&c->ops); nni_aio_finish_error(aio, rv); } else { diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 021483d4..4e4fb090 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -178,6 +178,13 @@ static nni_reap_list tcp_listener_reap_list = { }; void +nni_tcp_listener_stop(nni_tcp_listener *l) +{ + nni_tcp_listener_close(l); + // TODO: maybe wait for l->l_accept_io.olpd to finish? +} + +void nni_tcp_listener_fini(nni_tcp_listener *l) { nni_tcp_listener_close(l); diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c index dadb1909..c659b8fd 100644 --- a/src/sp/transport/ipc/ipc.c +++ b/src/sp/transport/ipc/ipc.c @@ -111,6 +111,7 @@ ipc_pipe_stop(void *arg) nni_aio_stop(&p->rx_aio); nni_aio_stop(&p->tx_aio); nni_aio_stop(&p->neg_aio); + nng_stream_stop(p->conn); nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); nni_mtx_unlock(&ep->mtx); @@ -655,6 +656,8 @@ ipc_ep_stop(void *arg) nni_aio_stop(&ep->time_aio); nni_aio_stop(&ep->conn_aio); + nng_stream_dialer_stop(ep->dialer); + nng_stream_listener_stop(ep->listener); } static void diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 2497b899..51991cbd 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -124,6 +124,7 @@ tcptran_pipe_stop(void *arg) nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negoaio); + nng_stream_stop(p->conn); } static int @@ -674,6 +675,8 @@ tcptran_ep_stop(void *arg) nni_aio_stop(ep->timeaio); nni_aio_stop(ep->connaio); + nng_stream_dialer_stop(ep->dialer); + nng_stream_listener_stop(ep->listener); } static void diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c index 7a907e8e..dcb100f5 100644 --- a/src/sp/transport/tls/tls.c +++ b/src/sp/transport/tls/tls.c @@ -657,6 +657,8 @@ tlstran_ep_stop(void *arg) nni_aio_stop(ep->timeaio); nni_aio_stop(ep->connaio); + nng_stream_dialer_stop(ep->dialer); + nng_stream_listener_stop(ep->listener); } static void diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c index 27cc6434..16929ec9 100644 --- a/src/sp/transport/ws/websocket.c +++ b/src/sp/transport/ws/websocket.c @@ -183,6 +183,7 @@ wstran_pipe_stop(void *arg) nni_aio_stop(&p->rxaio); nni_aio_stop(&p->txaio); + nng_stream_stop(p->ws); } static int @@ -214,9 +215,7 @@ wstran_pipe_close(void *arg) nni_aio_close(&p->rxaio); nni_aio_close(&p->txaio); - nni_mtx_lock(&p->mtx); nng_stream_close(p->ws); - nni_mtx_unlock(&p->mtx); } static int @@ -373,6 +372,7 @@ wstran_dialer_stop(void *arg) ws_dialer *d = arg; nni_aio_stop(&d->connaio); + nng_stream_dialer_stop(d->dialer); } static void @@ -392,6 +392,7 @@ wstran_listener_stop(void *arg) ws_listener *l = arg; nni_aio_stop(&l->accaio); + nng_stream_listener_stop(l->listener); } static void @@ -421,6 +422,7 @@ wstran_connect_cb(void *arg) } if ((uaio = nni_list_first(&d->aios)) == NULL) { // The client stopped caring about this! + nng_stream_stop(ws); nng_stream_free(ws); nni_mtx_unlock(&d->mtx); return; @@ -430,6 +432,7 @@ wstran_connect_cb(void *arg) if ((rv = nni_aio_result(caio)) != 0) { nni_aio_finish_error(uaio, rv); } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) { + nng_stream_stop(ws); nng_stream_free(ws); nni_aio_finish_error(uaio, rv); } else { diff --git a/src/supplemental/http/http_client.c b/src/supplemental/http/http_client.c index af5bc717..03125abf 100644 --- a/src/supplemental/http/http_client.c +++ b/src/supplemental/http/http_client.c @@ -89,6 +89,8 @@ http_dial_cb(void *arg) void nni_http_client_fini(nni_http_client *c) { + nni_aio_stop(c->aio); + nng_stream_dialer_stop(c->dialer); nni_aio_free(c->aio); nng_stream_dialer_free(c->dialer); nni_mtx_fini(&c->mtx); diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c index 2240492f..0d5d9cb0 100644 --- a/src/supplemental/http/http_server.c +++ b/src/supplemental/http/http_server.c @@ -895,6 +895,7 @@ http_server_fini(nni_http_server *s) http_error *epage; nni_aio_stop(s->accaio); + nng_stream_listener_stop(s->listener); nni_mtx_lock(&s->mtx); NNI_ASSERT(nni_list_empty(&s->conns)); diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index c04d03a5..a871e74e 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -126,6 +126,14 @@ tls_dialer_free(void *arg) } } +static void +tls_dialer_stop(void *arg) +{ + tls_dialer *d = arg; + + nng_stream_dialer_stop(d->d); +} + // For dialing, we need to have our own completion callback, instead of // the user's completion callback. @@ -281,6 +289,7 @@ nni_tls_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->ops.sd_close = tls_dialer_close; d->ops.sd_free = tls_dialer_free; + d->ops.sd_stop = tls_dialer_stop; d->ops.sd_dial = tls_dialer_dial; d->ops.sd_get = tls_dialer_get; d->ops.sd_set = tls_dialer_set; @@ -307,6 +316,13 @@ tls_listener_close(void *arg) } static void +tls_listener_stop(void *arg) +{ + tls_listener *l = arg; + nng_stream_listener_close(l->l); +} + +static void tls_listener_free(void *arg) { tls_listener *l; @@ -436,6 +452,7 @@ nni_tls_listener_alloc(nng_stream_listener **lp, const nng_url *url) } l->ops.sl_free = tls_listener_free; l->ops.sl_close = tls_listener_close; + l->ops.sl_stop = tls_listener_stop; l->ops.sl_accept = tls_listener_accept; l->ops.sl_listen = tls_listener_listen; l->ops.sl_get = tls_listener_get; @@ -526,6 +543,18 @@ tls_close(void *arg) nng_stream_close(conn->tcp); } +static void +tls_stop(void *arg) +{ + tls_conn *conn = arg; + + tls_close(conn); + nng_stream_stop(conn->tcp); + nni_aio_stop(&conn->conn_aio); + nni_aio_stop(&conn->tcp_send); + nni_aio_stop(&conn->tcp_recv); +} + static int tls_get_verified(void *arg, void *buf, size_t *szp, nni_type t) { @@ -624,6 +653,7 @@ tls_alloc(tls_conn **conn_p, nng_tls_config *cfg, nng_aio *user_aio) conn->stream.s_close = tls_close; conn->stream.s_free = tls_free; + conn->stream.s_stop = tls_stop; conn->stream.s_send = tls_send; conn->stream.s_recv = tls_recv; conn->stream.s_get = tls_get; @@ -638,14 +668,7 @@ tls_reap(void *arg) { tls_conn *conn = arg; - // Shut it all down first. We should be freed. - if (conn->tcp != NULL) { - nng_stream_close(conn->tcp); - } - nni_aio_stop(&conn->conn_aio); - nni_aio_stop(&conn->tcp_send); - nni_aio_stop(&conn->tcp_recv); - + tls_stop(conn); conn->ops.fini((void *) (conn + 1)); nni_aio_fini(&conn->conn_aio); nni_aio_fini(&conn->tcp_send); diff --git a/src/supplemental/tls/tls_test.c b/src/supplemental/tls/tls_test.c index 43ce0c85..517be143 100644 --- a/src/supplemental/tls/tls_test.c +++ b/src/supplemental/tls/tls_test.c @@ -57,6 +57,7 @@ test_tls_conn_refused(void) NUTS_FAIL(nng_aio_result(aio), NNG_ECONNREFUSED); nng_aio_free(aio); + nng_stream_dialer_stop(dialer); nng_stream_dialer_free(dialer); } @@ -133,6 +134,10 @@ test_tls_large_message(void) nng_free(buf1, size); nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -214,8 +219,10 @@ test_tls_ecdsa(void) NUTS_PASS(nuts_stream_wait(t2)); NUTS_TRUE(memcmp(buf1, buf2, size) == 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -224,6 +231,8 @@ test_tls_ecdsa(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -241,6 +250,7 @@ test_tls_garbled_cert(void) c1, nuts_garbled_crt, nuts_server_key, NULL), NNG_ECRYPTO); + nng_stream_listener_stop(l); nng_stream_listener_free(l); nng_tls_config_free(c1); } @@ -318,8 +328,10 @@ test_tls_psk(void) NUTS_PASS(nuts_stream_wait(t2)); NUTS_TRUE(memcmp(buf1, buf2, size) == 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -328,6 +340,8 @@ test_tls_psk(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -408,8 +422,10 @@ test_tls_psk_server_identities(void) NUTS_PASS(nuts_stream_wait(t2)); NUTS_TRUE(memcmp(buf1, buf2, size) == 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -418,6 +434,8 @@ test_tls_psk_server_identities(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -495,8 +513,10 @@ test_tls_psk_bad_identity(void) NUTS_ASSERT(nuts_stream_wait(t1) != 0); NUTS_ASSERT(nuts_stream_wait(t2) != 0); - nng_free(buf1, size); - nng_free(buf2, size); + nng_stream_stop(s1); + nng_stream_stop(s2); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_stream_free(s1); nng_stream_free(s2); nng_stream_dialer_free(d); @@ -505,6 +525,8 @@ test_tls_psk_bad_identity(void) nng_tls_config_free(c2); nng_aio_free(aio1); nng_aio_free(aio2); + nng_free(buf1, size); + nng_free(buf2, size); } void @@ -543,6 +565,7 @@ test_tls_psk_config_busy(void) NUTS_FAIL( nng_tls_config_psk(c1, "identity2", key, sizeof(key)), NNG_EBUSY); + nng_stream_listener_stop(l); nng_stream_listener_free(l); nng_aio_free(aio); nng_tls_config_free(c1); diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 9f3f6d0b..e7372a49 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1183,12 +1183,9 @@ ws_close_error(nni_ws *ws, uint16_t code) } static void -ws_fini(void *arg) +ws_stop(void *arg) { - nni_ws *ws = arg; - ws_frame *frame; - nng_aio *aio; - + nni_ws *ws = arg; ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); // Give a chance for the close frame to drain. @@ -1198,7 +1195,6 @@ ws_fini(void *arg) nni_aio_stop(&ws->txaio); nni_aio_stop(&ws->closeaio); nni_aio_stop(&ws->httpaio); - nni_aio_stop(&ws->connaio); if (nni_list_node_active(&ws->node)) { nni_ws_dialer *d; @@ -1210,6 +1206,16 @@ ws_fini(void *arg) nni_mtx_unlock(&d->mtx); } } +} + +static void +ws_fini(void *arg) +{ + nni_ws *ws = arg; + ws_frame *frame; + nng_aio *aio; + + ws_stop(ws); nni_mtx_lock(&ws->mtx); while ((frame = nni_list_first(&ws->rxq)) != NULL) { @@ -1450,6 +1456,7 @@ ws_init(nni_ws **wsp) ws->ops.s_close = ws_str_close; ws->ops.s_free = ws_str_free; + ws->ops.s_stop = ws_stop; ws->ops.s_send = ws_str_send; ws->ops.s_recv = ws_str_recv; ws->ops.s_get = ws_str_get; @@ -1460,10 +1467,11 @@ ws_init(nni_ws **wsp) } static void -ws_listener_free(void *arg) +ws_listener_stop(void *arg) { - nni_ws_listener *l = arg; - ws_header *hdr; + nni_ws_listener *l = arg; + nni_http_handler *h; + nni_http_server *s; ws_listener_close(l); @@ -1471,16 +1479,27 @@ ws_listener_free(void *arg) while (!nni_list_empty(&l->reply)) { nni_cv_wait(&l->cv); } + h = l->handler; + s = l->server; + l->handler = NULL; + l->server = NULL; nni_mtx_unlock(&l->mtx); - if (l->handler != NULL) { - nni_http_handler_fini(l->handler); - l->handler = NULL; + if (h != NULL) { + nni_http_handler_fini(h); } - if (l->server != NULL) { - nni_http_server_fini(l->server); - l->server = NULL; + if (s != NULL) { + nni_http_server_fini(s); } +} + +static void +ws_listener_free(void *arg) +{ + nni_ws_listener *l = arg; + ws_header *hdr; + + ws_listener_stop(l); nni_cv_fini(&l->cv); nni_mtx_fini(&l->mtx); @@ -2148,6 +2167,7 @@ nni_ws_listener_alloc(nng_stream_listener **wslp, const nng_url *url) l->isstream = true; l->ops.sl_free = ws_listener_free; l->ops.sl_close = ws_listener_close; + l->ops.sl_stop = ws_listener_stop; l->ops.sl_accept = ws_listener_accept; l->ops.sl_listen = ws_listener_listen; l->ops.sl_set = ws_listener_set; @@ -2255,16 +2275,43 @@ err: } static void -ws_dialer_free(void *arg) +ws_dialer_close(void *arg) { nni_ws_dialer *d = arg; - ws_header *hdr; + nni_ws *ws; + nni_mtx_lock(&d->mtx); + if (d->closed) { + nni_mtx_unlock(&d->mtx); + return; + } + d->closed = true; + NNI_LIST_FOREACH (&d->wspend, ws) { + nni_aio_close(&ws->connaio); + nni_aio_close(&ws->httpaio); + } + nni_mtx_unlock(&d->mtx); +} +static void +ws_dialer_stop(void *arg) +{ + nni_ws_dialer *d = arg; + + ws_dialer_close(d); nni_mtx_lock(&d->mtx); while (!nni_list_empty(&d->wspend)) { nni_cv_wait(&d->cv); } nni_mtx_unlock(&d->mtx); +} + +static void +ws_dialer_free(void *arg) +{ + nni_ws_dialer *d = arg; + ws_header *hdr; + + ws_dialer_stop(d); nni_strfree(d->proto); while ((hdr = nni_list_first(&d->headers)) != NULL) { @@ -2285,24 +2332,6 @@ ws_dialer_free(void *arg) } static void -ws_dialer_close(void *arg) -{ - nni_ws_dialer *d = arg; - nni_ws *ws; - nni_mtx_lock(&d->mtx); - if (d->closed) { - nni_mtx_unlock(&d->mtx); - return; - } - d->closed = true; - NNI_LIST_FOREACH (&d->wspend, ws) { - nni_aio_close(&ws->connaio); - nni_aio_close(&ws->httpaio); - } - nni_mtx_unlock(&d->mtx); -} - -static void ws_dial_cancel(nni_aio *aio, void *arg, int rv) { nni_ws *ws = arg; @@ -2679,6 +2708,7 @@ nni_ws_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->ops.sd_free = ws_dialer_free; d->ops.sd_close = ws_dialer_close; + d->ops.sd_stop = ws_dialer_stop; d->ops.sd_dial = ws_dialer_dial; d->ops.sd_set = ws_dialer_set; d->ops.sd_get = ws_dialer_get; diff --git a/src/supplemental/websocket/websocket_test.c b/src/supplemental/websocket/websocket_test.c index 781ca1d8..9a28d69b 100644 --- a/src/supplemental/websocket/websocket_test.c +++ b/src/supplemental/websocket/websocket_test.c @@ -107,9 +107,13 @@ test_websocket_wildcard(void) NUTS_TRUE(memcmp(buf1, buf2, 5) == 0); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); nng_aio_free(daio); nng_aio_free(laio); nng_aio_free(aio1); @@ -206,9 +210,13 @@ test_websocket_conn_props(void) nng_strfree(str); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); nng_aio_free(daio); nng_aio_free(laio); nng_stream_listener_free(l); @@ -495,6 +503,7 @@ test_websocket_fragmentation(void) nng_aio_free(caio); nng_stream_close(c); + nng_stream_stop(c); nng_stream_free(c); nng_aio_free(state.aio); @@ -502,6 +511,8 @@ test_websocket_fragmentation(void) nng_cv_free(state.cv); nng_mtx_free(state.lock); + nng_stream_dialer_stop(d); + nng_stream_listener_stop(l); nng_free(send_buf, state.total); nng_free(recv_buf, state.total); nng_aio_free(daio); |
