diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-12 17:55:48 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-12 17:55:48 -0800 |
| commit | 81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d (patch) | |
| tree | f9f21aa66bd22cfd95ae0c4b8abe57036c8fce0d /src/platform | |
| parent | 371eedeeb6fafe628ae89b9ad2690fa3d6a57e8a (diff) | |
| download | nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.tar.gz nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.tar.bz2 nng-81f5d3c6268ff91ee9c36c4cb34f6f9bfd54740d.zip | |
streams: add explicit stop functions
This allows us to explicitly stop streams, dialers, and listeners,
before we start tearing down things. This hopefully will be useful
in resolving use-after-free bugs in http, tls, and websockets.
The new functions are not yet documented, but they are
nng_stream_stop, nng_stream_dialer_stop, and nng_stream_listener_stop.
They should be called after close, and before free. The close
functions now close without blocking, but the stop function is
allowed to block.
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/posix_ipcconn.c | 25 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcdial.c | 14 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipclisten.c | 30 | ||||
| -rw-r--r-- | src/platform/posix/posix_sockfd.c | 19 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 25 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpdial.c | 12 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcplisten.c | 12 | ||||
| -rw-r--r-- | src/platform/tcp_stream_test.c | 8 | ||||
| -rw-r--r-- | src/platform/windows/win_ipcconn.c | 11 | ||||
| -rw-r--r-- | src/platform/windows/win_ipcdial.c | 10 | ||||
| -rw-r--r-- | src/platform/windows/win_ipclisten.c | 16 | ||||
| -rw-r--r-- | src/platform/windows/win_tcpconn.c | 26 | ||||
| -rw-r--r-- | src/platform/windows/win_tcpdial.c | 11 | ||||
| -rw-r--r-- | src/platform/windows/win_tcplisten.c | 7 |
14 files changed, 179 insertions, 47 deletions
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 7f979a0a..8b11b64f 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c) } static void -ipc_reap(void *arg) +ipc_stop(void *arg) { - ipc_conn *c = arg; + ipc_conn *c = arg; + nni_posix_pfd *pfd; + ipc_close(c); - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); + pfd = c->pfd; + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); } - nni_mtx_fini(&c->mtx); +} +static void +ipc_reap(void *arg) +{ + ipc_conn *c = arg; + ipc_stop(c); + + nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { nni_posix_ipc_dialer_rele(c->dialer); } @@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d) c->dialer = d; c->stream.s_free = ipc_free; c->stream.s_close = ipc_close; + c->stream.s_stop = ipc_stop; c->stream.s_send = ipc_send; c->stream.s_recv = ipc_recv; c->stream.s_get = ipc_get; diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index 7799aca2..667d8262 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -41,6 +41,7 @@ ipc_dialer_close(void *arg) c->dial_aio = NULL; nni_aio_set_prov_data(aio, NULL); nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } nni_aio_finish_error(aio, NNG_ECLOSED); @@ -50,6 +51,13 @@ ipc_dialer_close(void *arg) } static void +ipc_dialer_stop(void *arg) +{ + nni_ipc_dialer *d = arg; + ipc_dialer_close(d); +} + +static void ipc_dialer_fini(ipc_dialer *d) { nni_mtx_fini(&d->mtx); @@ -61,7 +69,7 @@ ipc_dialer_free(void *arg) { ipc_dialer *d = arg; - ipc_dialer_close(d); + ipc_dialer_stop(d); nni_atomic_set_bool(&d->fini, true); nni_posix_ipc_dialer_rele(d); } @@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } @@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) if (rv != 0) { nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); return; @@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio) error: nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } @@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->closed = false; d->sd.sd_free = ipc_dialer_free; d->sd.sd_close = ipc_dialer_close; + d->sd.sd_stop = ipc_dialer_stop; d->sd.sd_dial = ipc_dialer_dial; d->sd.sd_get = ipc_dialer_get; d->sd.sd_set = ipc_dialer_set; diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index bd938841..a122d7bb 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -73,6 +73,23 @@ ipc_listener_close(void *arg) } static void +ipc_listener_stop(void *arg) +{ + ipc_listener *l = arg; + nni_posix_pfd *pfd; + + nni_mtx_lock(&l->mtx); + ipc_listener_doclose(l); + pfd = l->pfd; + l->pfd = NULL; + nni_mtx_unlock(&l->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } +} + +static void ipc_listener_doaccept(ipc_listener *l) { nni_aio *aio; @@ -132,6 +149,7 @@ ipc_listener_doaccept(ipc_listener *l) } if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -406,17 +424,10 @@ ipc_listener_listen(void *arg) static void ipc_listener_free(void *arg) { - ipc_listener *l = arg; - nni_posix_pfd *pfd; + ipc_listener *l = arg; - nni_mtx_lock(&l->mtx); - ipc_listener_doclose(l); - pfd = l->pfd; - nni_mtx_unlock(&l->mtx); + ipc_listener_stop(l); - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -507,6 +518,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->perms = 0; l->sl.sl_free = ipc_listener_free; l->sl.sl_close = ipc_listener_close; + l->sl.sl_stop = ipc_listener_stop; l->sl.sl_listen = ipc_listener_listen; l->sl.sl_accept = ipc_listener_accept; l->sl.sl_get = ipc_listener_get; diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c index b0d88a31..997feae6 100644 --- a/src/platform/posix/posix_sockfd.c +++ b/src/platform/posix/posix_sockfd.c @@ -200,16 +200,26 @@ sfd_close(void *arg) nni_mtx_unlock(&c->mtx); } -// sfd_fini may block briefly waiting for the pollq thread. -// To get that out of our context, we simply reap this. static void -sfd_fini(void *arg) +sfd_stop(void *arg) { nni_sfd_conn *c = arg; sfd_close(c); + + // ideally this would *stop* without freeing the pfd if (c->pfd != NULL) { nni_posix_pfd_fini(c->pfd); + c->pfd = NULL; } +} + +// sfd_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +sfd_fini(void *arg) +{ + nni_sfd_conn *c = arg; + sfd_stop(c); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -475,6 +485,7 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) c->stream.s_free = sfd_free; c->stream.s_close = sfd_close; + c->stream.s_stop = sfd_stop; c->stream.s_recv = sfd_recv; c->stream.s_send = sfd_send; c->stream.s_get = sfd_get; @@ -490,4 +501,4 @@ void nni_sfd_close_fd(int fd) { close(fd); -}
\ No newline at end of file +} diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index 74b3371b..ce5243b0 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -10,6 +10,7 @@ // #include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" #include <errno.h> #include <fcntl.h> @@ -199,16 +200,30 @@ tcp_close(void *arg) nni_mtx_unlock(&c->mtx); } +static void +tcp_stop(void *arg) +{ + nni_tcp_conn *c = arg; + nni_posix_pfd *pfd; + tcp_close(c); + + nni_mtx_lock(&c->mtx); + pfd = c->pfd; + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + + if (pfd != NULL) { + nni_posix_pfd_fini(pfd); + } +} + // tcp_fini may block briefly waiting for the pollq thread. // To get that out of our context, we simply reap this. static void tcp_fini(void *arg) { nni_tcp_conn *c = arg; - tcp_close(c); - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); - } + tcp_stop(c); nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { @@ -221,6 +236,7 @@ static nni_reap_list tcp_reap_list = { .rl_offset = offsetof(nni_tcp_conn, reap), .rl_func = tcp_fini, }; + static void tcp_free(void *arg) { @@ -454,6 +470,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) nni_aio_list_init(&c->writeq); c->stream.s_free = tcp_free; + c->stream.s_stop = tcp_stop; c->stream.s_close = tcp_close; c->stream.s_recv = tcp_recv; c->stream.s_send = tcp_send; diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 73ba4394..52ea6cff 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -77,9 +77,15 @@ tcp_dialer_fini(nni_tcp_dialer *d) } void -nni_tcp_dialer_fini(nni_tcp_dialer *d) +nni_tcp_dialer_stop(nni_tcp_dialer *d) { nni_tcp_dialer_close(d); +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_stop(d); nni_atomic_set_bool(&d->fini, true); nni_posix_tcp_dialer_rele(d); } @@ -112,6 +118,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); + nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); } @@ -263,6 +271,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) error: nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); + nng_stream_close(&c->stream); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index a38411c5..32f0fd60 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -150,6 +150,7 @@ tcp_listener_doaccept(nni_tcp_listener *l) if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { close(newfd); + nng_stream_stop(&c->stream); nng_stream_free(&c->stream); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -280,18 +281,25 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) } void -nni_tcp_listener_fini(nni_tcp_listener *l) +nni_tcp_listener_stop(nni_tcp_listener *l) { nni_posix_pfd *pfd; nni_mtx_lock(&l->mtx); tcp_listener_doclose(l); - pfd = l->pfd; + pfd = l->pfd; + l->pfd = NULL; nni_mtx_unlock(&l->mtx); if (pfd != NULL) { nni_posix_pfd_fini(pfd); } +} + +void +nni_tcp_listener_fini(nni_tcp_listener *l) +{ + nni_tcp_listener_stop(l); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } diff --git a/src/platform/tcp_stream_test.c b/src/platform/tcp_stream_test.c index b46d2d1c..708120c3 100644 --- a/src/platform/tcp_stream_test.c +++ b/src/platform/tcp_stream_test.c @@ -116,8 +116,10 @@ test_tcp_stream(void) NUTS_TRUE(sa2.s_in.sa_port == sa.s_in.sa_port); nng_stream_listener_close(l); - nng_stream_listener_free(l); nng_stream_dialer_close(d); + nng_stream_listener_stop(l); + nng_stream_dialer_stop(d); + nng_stream_listener_free(l); nng_stream_dialer_free(d); nng_aio_free(aio1); nng_aio_free(aio2); @@ -125,8 +127,10 @@ test_tcp_stream(void) nng_aio_free(laio); nng_aio_free(maio); nng_stream_close(c1); - nng_stream_free(c1); nng_stream_close(c2); + nng_stream_stop(c1); + nng_stream_stop(c2); + nng_stream_free(c1); nng_stream_free(c2); } diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index 5f540a6c..56a688a4 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -352,7 +352,7 @@ ipc_close(void *arg) } static void -ipc_free(void *arg) +ipc_stop(void *arg) { ipc_conn *c = arg; nni_aio *aio; @@ -381,6 +381,14 @@ ipc_free(void *arg) DisconnectNamedPipe(f); CloseHandle(f); } +} + +static void +ipc_free(void *arg) +{ + ipc_conn *c = arg; + + ipc_stop(c); nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); @@ -460,6 +468,7 @@ nni_win_ipc_init( c->sa = *sa; c->stream.s_free = ipc_free; c->stream.s_close = ipc_close; + c->stream.s_stop = ipc_stop; c->stream.s_send = ipc_send; c->stream.s_recv = ipc_recv; c->stream.s_get = ipc_get; diff --git a/src/platform/windows/win_ipcdial.c b/src/platform/windows/win_ipcdial.c index cb826308..347ee39b 100644 --- a/src/platform/windows/win_ipcdial.c +++ b/src/platform/windows/win_ipcdial.c @@ -192,10 +192,17 @@ ipc_dialer_close(void *arg) } static void -ipc_dialer_free(void *arg) +ipc_dialer_stop(void *arg) { ipc_dialer *d = arg; ipc_dialer_close(d); +} + +static void +ipc_dialer_free(void *arg) +{ + ipc_dialer *d = arg; + ipc_dialer_stop(d); if (d->path) { nni_strfree(d->path); } @@ -260,6 +267,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) d->closed = false; d->sd.sd_free = ipc_dialer_free; d->sd.sd_close = ipc_dialer_close; + d->sd.sd_stop = ipc_dialer_stop; d->sd.sd_dial = ipc_dialer_dial; d->sd.sd_get = ipc_dialer_get; d->sd.sd_set = ipc_dialer_set; diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index 98cb8273..d09c98c7 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -300,8 +300,18 @@ ipc_listener_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - bool accepting = l->accepting; nni_mtx_unlock(&l->mtx); +} + +static void +ipc_listener_stop(void *arg) +{ + ipc_listener *l = arg; + + ipc_listener_close(l); + + nni_mtx_lock(&l->mtx); + bool accepting = l->accepting; // This craziness because CancelIoEx on ConnectNamedPipe // seems to be incredibly unreliable. It does work, sometimes, @@ -309,6 +319,7 @@ ipc_listener_close(void *arg) // to be retired in favor of UNIX domain sockets anyway. while (accepting) { + nni_mtx_unlock(&l->mtx); if (!CancelIoEx(l->f, &l->io.olpd)) { // operation not found probably // We just inject a safety sleep to @@ -323,8 +334,8 @@ ipc_listener_close(void *arg) nng_msleep(100); nni_mtx_lock(&l->mtx); accepting = l->accepting; - nni_mtx_unlock(&l->mtx); } + nni_mtx_unlock(&l->mtx); DisconnectNamedPipe(l->f); CloseHandle(l->f); } @@ -360,6 +371,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) l->sec_attr.bInheritHandle = FALSE; l->sa.s_ipc.sa_family = NNG_AF_IPC; l->sl.sl_free = ipc_listener_free; + l->sl.sl_stop = ipc_listener_stop; l->sl.sl_close = ipc_listener_close; l->sl.sl_listen = ipc_listener_listen; l->sl.sl_accept = ipc_listener_accept; diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index 3f3acd17..2f2fd378 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -180,6 +180,7 @@ tcp_send_start(nni_tcp_conn *c) return; } } + nni_cv_wake(&c->cv); } static void @@ -266,17 +267,6 @@ tcp_close(void *arg) closesocket(s); } } - now = nni_clock(); - // wait up to a maximum of 10 seconds before assuming something is - // badly amiss. from what we can tell, this doesn't happen, and we do - // see the timer expire properly, but this safeguard can prevent a - // hang. - while ((c->recving || c->sending) && - ((nni_clock() - now) < (NNI_SECOND * 10))) { - nni_mtx_unlock(&c->mtx); - nni_msleep(1); - nni_mtx_lock(&c->mtx); - } nni_mtx_unlock(&c->mtx); } @@ -369,21 +359,28 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) } static void -tcp_free(void *arg) +tcp_stop(void *arg) { nni_tcp_conn *c = arg; tcp_close(c); nni_mtx_lock(&c->mtx); - while ((!nni_list_empty(&c->recv_aios)) || + while (c->recving || c->sending || (!nni_list_empty(&c->recv_aios)) || (!nni_list_empty(&c->send_aios))) { nni_cv_wait(&c->cv); } nni_mtx_unlock(&c->mtx); - if (c->s != INVALID_SOCKET) { closesocket(c->s); } +} + +static void +tcp_free(void *arg) +{ + nni_tcp_conn *c = arg; + tcp_stop(c); + nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -410,6 +407,7 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s) nni_aio_list_init(&c->send_aios); c->conn_aio = NULL; c->ops.s_close = tcp_close; + c->ops.s_stop = tcp_stop; c->ops.s_free = tcp_free; c->ops.s_send = tcp_send; c->ops.s_recv = tcp_recv; diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index 5492d265..c0385648 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -95,9 +95,16 @@ static nni_reap_list tcp_dialer_reap_list = { }; void -nni_tcp_dialer_fini(nni_tcp_dialer *d) +nni_tcp_dialer_stop(nni_tcp_dialer *d) { nni_tcp_dialer_close(d); + // TODO: wait for conn_io.olpd? +} + +void +nni_tcp_dialer_fini(nni_tcp_dialer *d) +{ + nni_tcp_dialer_stop(d); nni_mtx_lock(&d->mtx); if (!nni_list_empty(&d->aios)) { nni_mtx_unlock(&d->mtx); @@ -155,6 +162,8 @@ tcp_dial_cb(nni_win_io *io, int rv, size_t cnt) nni_mtx_unlock(&d->mtx); if (rv != 0) { + nng_stream_close(&c->ops); + nng_stream_stop(&c->ops); nng_stream_free(&c->ops); nni_aio_finish_error(aio, rv); } else { diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 021483d4..4e4fb090 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -178,6 +178,13 @@ static nni_reap_list tcp_listener_reap_list = { }; void +nni_tcp_listener_stop(nni_tcp_listener *l) +{ + nni_tcp_listener_close(l); + // TODO: maybe wait for l->l_accept_io.olpd to finish? +} + +void nni_tcp_listener_fini(nni_tcp_listener *l) { nni_tcp_listener_close(l); |
