diff options
| -rw-r--r-- | src/platform/posix/posix_ipc.h | 9 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcconn.c | 63 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcdial.c | 23 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipclisten.c | 56 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq.h | 8 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.c | 146 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 128 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 55 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_port.c | 50 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_select.c | 82 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_select.h | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_sockfd.c | 57 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.h | 8 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 64 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpdial.c | 23 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcplisten.c | 77 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 35 |
19 files changed, 366 insertions, 527 deletions
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h index 98af298b..3f3549c6 100644 --- a/src/platform/posix/posix_ipc.h +++ b/src/platform/posix/posix_ipc.h @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -22,7 +22,7 @@ struct nni_ipc_conn { nng_stream stream; - nni_posix_pfd *pfd; + nni_posix_pfd pfd; nni_list readq; nni_list writeq; bool closed; @@ -44,10 +44,9 @@ struct nni_ipc_dialer { }; extern int nni_posix_ipc_alloc( - nni_ipc_conn **, nni_sockaddr *, nni_ipc_dialer *); -extern void nni_posix_ipc_init(nni_ipc_conn *, nni_posix_pfd *); -extern void nni_posix_ipc_start(nni_ipc_conn *); + nni_ipc_conn **, nni_sockaddr *, nni_ipc_dialer *, int); extern void nni_posix_ipc_dialer_rele(nni_ipc_dialer *); +extern void nni_posix_ipc_dialer_cb(void *arg, unsigned events); #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index a198b87f..69ce2d32 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -34,7 +34,7 @@ ipc_dowrite(ipc_conn *c) nni_aio *aio; int fd; - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed || ((fd = nni_posix_pfd_fd(&c->pfd)) < 0)) { return; } @@ -103,7 +103,7 @@ ipc_doread(ipc_conn *c) nni_aio *aio; int fd; - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed || ((fd = nni_posix_pfd_fd(&c->pfd)) < 0)) { return; } @@ -174,7 +174,7 @@ ipc_error(void *arg, int err) nni_aio_list_remove(aio); nni_aio_finish_error(aio, err); } - nni_posix_pfd_close(c->pfd); + nni_posix_pfd_close(&c->pfd); nni_mtx_unlock(&c->mtx); } @@ -191,18 +191,20 @@ ipc_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); } nni_mtx_unlock(&c->mtx); } static void -ipc_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +ipc_cb(void *arg, unsigned events) { ipc_conn *c = arg; + if (c->dial_aio != NULL) { + nni_posix_ipc_dialer_cb(arg, events); + return; + } if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) { ipc_error(c, NNG_ECONNSHUT); return; @@ -222,7 +224,7 @@ ipc_cb(nni_posix_pfd *pfd, unsigned events, void *arg) events |= NNI_POLL_IN; } if ((!c->closed) && (events != 0)) { - nni_posix_pfd_arm(pfd, events); + nni_posix_pfd_arm(&c->pfd, events); } nni_mtx_unlock(&c->mtx); } @@ -264,7 +266,7 @@ ipc_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -298,7 +300,7 @@ ipc_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); @@ -312,7 +314,7 @@ ipc_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t) uint64_t ignore; uint64_t id = 0; - if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &id, &ignore, + if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &id, &ignore, &ignore, &ignore)) != 0) { return (rv); } @@ -327,7 +329,7 @@ ipc_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t) uint64_t ignore; uint64_t id = 0; - if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &id, + if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &id, &ignore, &ignore)) != 0) { return (rv); } @@ -342,7 +344,7 @@ ipc_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t) uint64_t ignore; uint64_t id = 0; - if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &ignore, + if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &ignore, &ignore, &id)) != 0) { return (rv); } @@ -361,7 +363,7 @@ ipc_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t) uint64_t ignore; uint64_t id = 0; - if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &ignore, + if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &ignore, &id, &ignore)) != 0) { return (rv); } @@ -379,27 +381,14 @@ ipc_get_addr(void *arg, void *buf, size_t *szp, nni_type t) return (nni_copyout_sockaddr(&c->sa, buf, szp, t)); } -void -nni_posix_ipc_start(nni_ipc_conn *c) -{ - nni_posix_pfd_set_cb(c->pfd, ipc_cb, c); -} - static void ipc_stop(void *arg) { - ipc_conn *c = arg; - nni_posix_pfd *pfd; + ipc_conn *c = arg; ipc_close(c); - nni_mtx_lock(&c->mtx); - pfd = c->pfd; - c->pfd = NULL; - nni_mtx_unlock(&c->mtx); - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } + nni_posix_pfd_stop(&c->pfd); } static void @@ -408,11 +397,13 @@ ipc_reap(void *arg) ipc_conn *c = arg; ipc_stop(c); - nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { nni_posix_ipc_dialer_rele(c->dialer); } + nni_posix_pfd_fini(&c->pfd); + + nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); } @@ -472,7 +463,8 @@ ipc_set(void *arg, const char *name, const void *val, size_t sz, nni_type t) } int -nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d) +nni_posix_ipc_alloc( + nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d, int fd) { ipc_conn *c; @@ -494,13 +486,8 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d) nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); nni_aio_list_init(&c->writeq); + nni_posix_pfd_init(&c->pfd, fd, ipc_cb, c); *cp = c; return (0); } - -void -nni_posix_ipc_init(nni_ipc_conn *c, nni_posix_pfd *pfd) -{ - c->pfd = pfd; -} diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index 667d8262..ae98c4f4 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -106,8 +106,8 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) nng_stream_free(&c->stream); } -static void -ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) +void +nni_posix_ipc_dialer_cb(void *arg, unsigned ev) { nni_ipc_conn *c = arg; nni_ipc_dialer *d = c->dialer; @@ -126,7 +126,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) } else { socklen_t sz = sizeof(int); - int fd = nni_posix_pfd_fd(pfd); + int fd = nni_posix_pfd_fd(&c->pfd); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { rv = errno; } @@ -153,7 +153,6 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) return; } - nni_posix_ipc_start(c); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); } @@ -188,24 +187,13 @@ ipc_dialer_dial(void *arg, nni_aio *aio) nni_atomic_inc(&d->ref); - if ((rv = nni_posix_ipc_alloc(&c, &d->sa, d)) != 0) { + if ((rv = nni_posix_ipc_alloc(&c, &d->sa, d, fd)) != 0) { (void) close(fd); nni_posix_ipc_dialer_rele(d); nni_aio_finish_error(aio, rv); return; } - // This arranges for the fd to be in non-blocking mode, and adds the - // poll fd to the list. - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - // the error label unlocks this - nni_mtx_lock(&d->mtx); - goto error; - } - - nni_posix_ipc_init(c, pfd); - nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c); - nni_mtx_lock(&d->mtx); if (d->closed) { rv = NNG_ECLOSED; @@ -225,10 +213,10 @@ ipc_dialer_dial(void *arg, nni_aio *aio) goto error; } // Asynchronous connect. + c->dial_aio = aio; if ((rv = nni_posix_pfd_arm(pfd, NNI_POLL_OUT)) != 0) { goto error; } - c->dial_aio = aio; nni_aio_set_prov_data(aio, c); nni_list_append(&d->connq, aio); nni_mtx_unlock(&d->mtx); @@ -238,7 +226,6 @@ ipc_dialer_dial(void *arg, nni_aio *aio) // on loop back, and probably not on every platform. nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&d->mtx); - nni_posix_ipc_start(c); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); return; diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index a122d7bb..7e0250f4 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -31,7 +31,7 @@ typedef struct { nng_stream_listener sl; - nni_posix_pfd *pfd; + nni_posix_pfd pfd; nng_sockaddr sa; nni_list acceptq; bool started; @@ -53,9 +53,7 @@ ipc_listener_doclose(ipc_listener *l) nni_aio_finish_error(aio, NNG_ECLOSED); } - if (l->pfd != NULL) { - nni_posix_pfd_close(l->pfd); - } + nni_posix_pfd_close(&l->pfd); if (l->started && ((path = l->path) != NULL)) { l->path = NULL; (void) unlink(path); @@ -75,18 +73,13 @@ ipc_listener_close(void *arg) static void ipc_listener_stop(void *arg) { - ipc_listener *l = arg; - nni_posix_pfd *pfd; + ipc_listener *l = arg; nni_mtx_lock(&l->mtx); ipc_listener_doclose(l); - pfd = l->pfd; - l->pfd = NULL; nni_mtx_unlock(&l->mtx); - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } + nni_posix_pfd_stop(&l->pfd); } static void @@ -95,13 +88,12 @@ ipc_listener_doaccept(ipc_listener *l) nni_aio *aio; while ((aio = nni_list_first(&l->acceptq)) != NULL) { - int newfd; - int fd; - int rv; - nni_posix_pfd *pfd; - nni_ipc_conn *c; + int newfd; + int fd; + int rv; + nni_ipc_conn *c; - fd = nni_posix_pfd_fd(l->pfd); + fd = nni_posix_pfd_fd(&l->pfd); #ifdef NNG_USE_ACCEPT4 newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); @@ -119,7 +111,7 @@ ipc_listener_doaccept(ipc_listener *l) case EWOULDBLOCK: #endif #endif - rv = nni_posix_pfd_arm(l->pfd, NNI_POLL_IN); + rv = nni_posix_pfd_arm(&l->pfd, NNI_POLL_IN); if (rv != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -141,35 +133,23 @@ ipc_listener_doaccept(ipc_listener *l) } } - if ((rv = nni_posix_ipc_alloc(&c, &l->sa, NULL)) != 0) { + if ((rv = nni_posix_ipc_alloc(&c, &l->sa, NULL, newfd)) != 0) { (void) close(newfd); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); continue; } - if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { - nng_stream_stop(&c->stream); - nng_stream_free(&c->stream); - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - continue; - } - - nni_posix_ipc_init(c, pfd); - nni_aio_list_remove(aio); - nni_posix_ipc_start(c); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); } } static void -ipc_listener_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +ipc_listener_cb(void *arg, unsigned events) { ipc_listener *l = arg; - NNI_ARG_UNUSED(pfd); nni_mtx_lock(&l->mtx); if ((events & NNI_POLL_INVAL) != 0) { @@ -325,7 +305,6 @@ ipc_listener_listen(void *arg) struct sockaddr_storage ss; int rv; int fd; - nni_posix_pfd *pfd; char *path; if ((len = nni_posix_nn2sockaddr(&ss, &l->sa)) < sizeof(sa_family_t)) { @@ -380,7 +359,7 @@ ipc_listener_listen(void *arg) (listen(fd, 128) != 0)) { rv = nni_plat_errno(errno); } - if ((rv != 0) || ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) { + if (rv != 0) { nni_mtx_unlock(&l->mtx); (void) close(fd); if (path != NULL) { @@ -390,6 +369,8 @@ ipc_listener_listen(void *arg) return (rv); } + nni_posix_pfd_init(&l->pfd, fd, ipc_listener_cb, l); + #ifdef NNG_HAVE_ABSTRACT_SOCKETS // If the original address was for a system assigned value, // then figure out what we got. This is analogous to TCP @@ -411,9 +392,6 @@ ipc_listener_listen(void *arg) } #endif - nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l); - - l->pfd = pfd; l->started = true; l->path = path; nni_mtx_unlock(&l->mtx); @@ -428,6 +406,7 @@ ipc_listener_free(void *arg) ipc_listener_stop(l); + nni_posix_pfd_fini(&l->pfd); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -512,7 +491,6 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) nni_mtx_init(&l->mtx); nni_aio_list_init(&l->acceptq); - l->pfd = NULL; l->closed = false; l->started = false; l->perms = 0; diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index c79a3e18..121834cf 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -22,7 +22,7 @@ #include "core/nng_impl.h" typedef struct nni_posix_pfd nni_posix_pfd; -typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *); +typedef void (*nni_posix_pfd_cb)(void *, unsigned); #if defined(NNG_POLLQ_KQUEUE) #include "posix_pollq_kqueue.h" @@ -33,17 +33,17 @@ typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *); #elif defined(NNG_POLLQ_POLL) #include "posix_pollq_epoll.h" #elif defined(NNG_POLLQ_SELECT) -#include "posix_pollq_epoll.h" +#include "posix_pollq_select.h" #else #error "No suitable poller defined" #endif -extern int nni_posix_pfd_init(nni_posix_pfd **, int); +extern void nni_posix_pfd_init(nni_posix_pfd *, int, nni_posix_pfd_cb, void *); extern void nni_posix_pfd_fini(nni_posix_pfd *); +extern void nni_posix_pfd_stop(nni_posix_pfd *); extern int nni_posix_pfd_arm(nni_posix_pfd *, unsigned); extern int nni_posix_pfd_fd(nni_posix_pfd *); extern void nni_posix_pfd_close(nni_posix_pfd *); -extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *); #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index d09289e4..e789e5d2 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -61,32 +61,28 @@ typedef struct nni_posix_pollq { // single global instance for now. static nni_posix_pollq nni_posix_global_pollq; -int -nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +void +nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pfd *pfd; nni_posix_pollq *pq; struct epoll_event ev; - int rv; pq = &nni_posix_global_pollq; (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&pfd->mtx); nni_cv_init(&pfd->cv, &pq->mtx); + nni_atomic_flag_reset(&pfd->stopped); + nni_atomic_flag_reset(&pfd->closed); - pfd->pq = pq; - pfd->fd = fd; - pfd->cb = NULL; - pfd->arg = NULL; - pfd->events = 0; - pfd->closing = false; - pfd->closed = false; + pfd->pq = pq; + pfd->fd = fd; + pfd->cb = cb; + pfd->arg = arg; + pfd->events = 0; + pfd->reap = false; NNI_LIST_NODE_INIT(&pfd->node); @@ -95,16 +91,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) ev.events = 0; ev.data.ptr = pfd; - if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { - rv = nni_plat_errno(errno); - nni_cv_fini(&pfd->cv); - nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); - return (rv); - } - - *pfdp = pfd; - return (0); + // if this fails the system is probably out of memory - it will fail in + // arm + (void) epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev); } int @@ -118,20 +107,18 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) // epoll implementation. nni_mtx_lock(&pfd->mtx); - if (!pfd->closing) { - struct epoll_event ev; - pfd->events |= events; - events = pfd->events; - - memset(&ev, 0, sizeof(ev)); - ev.events = events | NNI_EPOLL_FLAGS; - ev.data.ptr = pfd; - - if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) { - int rv = nni_plat_errno(errno); - nni_mtx_unlock(&pfd->mtx); - return (rv); - } + struct epoll_event ev; + pfd->events |= events; + events = pfd->events; + + memset(&ev, 0, sizeof(ev)); + ev.events = events | NNI_EPOLL_FLAGS; + ev.data.ptr = pfd; + + if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) { + int rv = nni_plat_errno(errno); + nni_mtx_unlock(&pfd->mtx); + return (rv); } nni_mtx_unlock(&pfd->mtx); return (0); @@ -144,44 +131,44 @@ nni_posix_pfd_fd(nni_posix_pfd *pfd) } void -nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) -{ - nni_mtx_lock(&pfd->mtx); - pfd->cb = cb; - pfd->arg = arg; - nni_mtx_unlock(&pfd->mtx); -} - -void nni_posix_pfd_close(nni_posix_pfd *pfd) { + nni_posix_pollq *pq = pfd->pq; + if (pq == NULL) { + return; + } + if (nni_atomic_flag_test_and_set(&pfd->closed)) { + return; + } + nni_mtx_lock(&pfd->mtx); - if (!pfd->closing) { - nni_posix_pollq *pq = pfd->pq; - struct epoll_event ev; // Not actually used. - pfd->closing = true; + struct epoll_event ev; // Not actually used. - (void) shutdown(pfd->fd, SHUT_RDWR); - (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev); - } + (void) shutdown(pfd->fd, SHUT_RDWR); + (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev); nni_mtx_unlock(&pfd->mtx); } void -nni_posix_pfd_fini(nni_posix_pfd *pfd) +nni_posix_pfd_stop(nni_posix_pfd *pfd) { - nni_posix_pollq *pq = pfd->pq; + nni_posix_pollq *pq = pfd->pq; + uint64_t one = 1; - nni_posix_pfd_close(pfd); + if (pq == NULL) { + return; + } + if (nni_atomic_flag_test_and_set(&pfd->stopped)) { + return; + } // We have to synchronize with the pollq thread (unless we are // on that thread!) NNI_ASSERT(!nni_thr_is_self(&pq->thr)); - uint64_t one = 1; - nni_mtx_lock(&pq->mtx); nni_list_append(&pq->reapq, pfd); + pfd->reap = true; // Wake the remote side. For now we assume this always // succeeds. The only failure modes here occur when we @@ -195,33 +182,41 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) nni_panic("BUG! write to epoll fd incorrect!"); } - while (!pfd->closed) { + while (pfd->reap) { nni_cv_wait(&pfd->cv); } nni_mtx_unlock(&pq->mtx); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + if (pq == NULL) { + return; + } + + nni_posix_pfd_stop(pfd); // We're exclusive now. (void) close(pfd->fd); nni_cv_fini(&pfd->cv); nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); } static void nni_posix_pollq_reap(nni_posix_pollq *pq) { nni_posix_pfd *pfd; - nni_mtx_lock(&pq->mtx); while ((pfd = nni_list_first(&pq->reapq)) != NULL) { nni_list_remove(&pq->reapq, pfd); // Let fini know we're done with it, and it's safe to // remove. - pfd->closed = true; + pfd->reap = false; nni_cv_wake(&pfd->cv); } - nni_mtx_unlock(&pq->mtx); } static void @@ -249,37 +244,28 @@ nni_posix_poll_thr(void *arg) if ((ev->data.ptr == NULL) && (ev->events & (unsigned) POLLIN)) { uint64_t clear; - if (read(pq->evfd, &clear, sizeof(clear)) != - sizeof(clear)) { - nni_panic("read from evfd incorrect!"); - } + (void) read(pq->evfd, &clear, sizeof(clear)); reap = true; } else { - nni_posix_pfd *pfd = ev->data.ptr; - nni_posix_pfd_cb cb; - void *cbarg; - unsigned mask; + nni_posix_pfd *pfd = ev->data.ptr; + unsigned mask; mask = ev->events & - ((unsigned) EPOLLIN | (unsigned) EPOLLOUT | - (unsigned) EPOLLERR); + ((unsigned) (EPOLLIN | EPOLLOUT | + EPOLLERR)); nni_mtx_lock(&pfd->mtx); pfd->events &= ~mask; - cb = pfd->cb; - cbarg = pfd->arg; nni_mtx_unlock(&pfd->mtx); // Execute the callback with lock released - if (cb != NULL) { - cb(pfd, mask, cbarg); - } + pfd->cb(pfd->arg, mask); } } if (reap) { - nni_posix_pollq_reap(pq); nni_mtx_lock(&pq->mtx); + nni_posix_pollq_reap(pq); if (pq->close) { nni_mtx_unlock(&pq->mtx); return; diff --git a/src/platform/posix/posix_pollq_epoll.h b/src/platform/posix/posix_pollq_epoll.h index ee60dc56..0be2a4c5 100644 --- a/src/platform/posix/posix_pollq_epoll.h +++ b/src/platform/posix/posix_pollq_epoll.h @@ -21,12 +21,12 @@ struct nni_posix_pfd { int fd; nni_posix_pfd_cb cb; void *arg; - bool closed; - bool closing; bool reap; unsigned events; nni_mtx mtx; nni_cv cv; + nni_atomic_flag stopped; + nni_atomic_flag closed; }; #define NNI_POLL_IN ((unsigned) POLLIN) diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index e3727ed3..c754535a 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -40,10 +40,9 @@ typedef struct nni_posix_pollq { // single global instance for now static nni_posix_pollq nni_posix_global_pollq; -int -nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +void +nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pfd *pf; nni_posix_pollq *pq; struct kevent ev[2]; unsigned flags = EV_ADD | EV_DISABLE | EV_CLEAR; @@ -61,88 +60,97 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pq = &nni_posix_global_pollq; - if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&pf->mtx); nni_cv_init(&pf->cv, &pq->mtx); pf->pq = pq; pf->fd = fd; - pf->cb = NULL; - pf->arg = NULL; + pf->cb = cb; + pf->arg = arg; pf->events = 0; - pf->closed = false; + + nni_atomic_flag_reset(&pf->closed); + nni_atomic_flag_reset(&pf->stopped); NNI_LIST_NODE_INIT(&pf->node); - *pfdp = pf; // Create entries in the kevent queue, without enabling them. EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf); EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf); - // We update the kqueue list, without polling for events. - if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) { - int rv; - rv = nni_plat_errno(errno); - nni_cv_fini(&pf->cv); - nni_mtx_fini(&pf->mtx); - NNI_FREE_STRUCT(pf); - return (rv); - } - - return (0); + // This may fail, but if it does, we get another try with + // ARM. It's an attempt to preallocate anyway. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); } void nni_posix_pfd_close(nni_posix_pfd *pf) { nni_posix_pollq *pq = pf->pq; + struct kevent ev[2]; + if (pq == NULL) { + return; + } - nni_mtx_lock(&pq->mtx); - if (!pf->closed) { - struct kevent ev[2]; - pf->closed = true; - EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); - EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); - (void) shutdown(pf->fd, SHUT_RDWR); - // This should never fail -- no allocations, just deletion. - (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); + if (nni_atomic_flag_test_and_set(&pf->closed)) { + return; } + + nni_mtx_lock(&pq->mtx); + EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf); + EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf); + (void) shutdown(pf->fd, SHUT_RDWR); + // This should never fail -- no allocations, just deletion. + (void) kevent(pq->kq, ev, 2, NULL, 0, NULL); nni_mtx_unlock(&pq->mtx); } void -nni_posix_pfd_fini(nni_posix_pfd *pf) +nni_posix_pfd_stop(nni_posix_pfd *pf) { - nni_posix_pollq *pq; - - pq = pf->pq; + nni_posix_pollq *pq = pf->pq; - nni_posix_pfd_close(pf); + if (pq == NULL) { + return; + } // All consumers take care to move finalization to the reap thread, // unless they are synchronous on user threads. NNI_ASSERT(!nni_thr_is_self(&pq->thr)); - nni_mtx_lock(&pq->mtx); + if (nni_atomic_flag_test_and_set(&pf->stopped)) { + return; + } - // If we're running on the callback, then don't bother to kick - // the pollq again. This is necessary because we cannot modify - // the poller while it is polling. - if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + nni_posix_pfd_close(pf); + nni_mtx_lock(&pq->mtx); + if (!pq->closed) { nni_list_append(&pq->reapq, pf); nni_plat_pipe_raise(pq->wake_wfd); - while (nni_list_active(&pq->reapq, pf)) { + while (nni_list_node_active(&pf->node)) { nni_cv_wait(&pf->cv); } } nni_mtx_unlock(&pq->mtx); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pf) +{ + nni_posix_pollq *pq = pf->pq; + + if (pq == NULL) { + return; + } + + // All consumers take care to move finalization to the reap thread, + // unless they are synchronous on user threads. + NNI_ASSERT(!nni_thr_is_self(&pq->thr)); + + nni_posix_pfd_stop(pf); (void) close(pf->fd); nni_cv_fini(&pf->cv); nni_mtx_fini(&pf->mtx); - NNI_FREE_STRUCT(pf); } int @@ -151,16 +159,6 @@ nni_posix_pfd_fd(nni_posix_pfd *pf) return (pf->fd); } -void -nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg) -{ - NNI_ASSERT(cb != NULL); // must not be null when established. - nni_mtx_lock(&pf->mtx); - pf->cb = cb; - pf->arg = arg; - nni_mtx_unlock(&pf->mtx); -} - int nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) { @@ -170,12 +168,8 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) nni_posix_pollq *pq = pf->pq; nni_mtx_lock(&pf->mtx); - if (pf->closed) { - events = 0; - } else { - pf->events |= events; - events = pf->events; - } + pf->events |= events; + events = pf->events; nni_mtx_unlock(&pf->mtx); if (events == 0) { @@ -218,12 +212,10 @@ nni_posix_poll_thr(void *arg) nni_thr_set_name(NULL, "nng:poll:kqueue"); for (;;) { - int n; - struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; - nni_posix_pfd *pf; - nni_posix_pfd_cb cb; - void *cbarg; - unsigned revents; + int n; + struct kevent evs[NNI_MAX_KQUEUE_EVENTS]; + nni_posix_pfd *pf; + unsigned revents; n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL); @@ -253,14 +245,10 @@ nni_posix_poll_thr(void *arg) } nni_mtx_lock(&pf->mtx); - cb = pf->cb; - cbarg = pf->arg; pf->events &= ~(revents); nni_mtx_unlock(&pf->mtx); - if (cb != NULL) { - cb(pf, revents, cbarg); - } + pf->cb(pf->arg, revents); } } } diff --git a/src/platform/posix/posix_pollq_kqueue.h b/src/platform/posix/posix_pollq_kqueue.h index 74f8d9f8..a153b363 100644 --- a/src/platform/posix/posix_pollq_kqueue.h +++ b/src/platform/posix/posix_pollq_kqueue.h @@ -19,7 +19,9 @@ struct nni_posix_pfd { int fd; // file descriptor to poll void *arg; // user data nni_posix_pfd_cb cb; // user callback on event - bool closed; + nni_atomic_flag closed; + nni_atomic_flag stopped; + bool reaped; unsigned events; nni_cv cv; // signaled when poller has unregistered nni_mtx mtx; diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index 7adef5ed..6011ab50 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -49,10 +49,9 @@ typedef struct nni_posix_pollq { static nni_posix_pollq nni_posix_global_pollq; -int -nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +void +nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pfd *pfd; nni_posix_pollq *pq = &nni_posix_global_pollq; // Set this is as soon as possible (narrow the close-exec race as @@ -68,32 +67,18 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) (void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); #endif - if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { - return (NNG_ENOMEM); - } NNI_LIST_NODE_INIT(&pfd->node); nni_mtx_init(&pfd->mtx); nni_cv_init(&pfd->cv, &pq->mtx); pfd->fd = fd; pfd->events = 0; - pfd->cb = NULL; - pfd->arg = NULL; + pfd->cb = cb; + pfd->arg = arg; pfd->pq = pq; nni_mtx_lock(&pq->mtx); nni_list_append(&pq->addq, pfd); nni_mtx_unlock(&pq->mtx); nni_plat_pipe_raise(pq->wakewfd); - *pfdp = pfd; - return (0); -} - -void -nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) -{ - nni_mtx_lock(&pfd->mtx); - pfd->cb = cb; - pfd->arg = arg; - nni_mtx_unlock(&pfd->mtx); } int @@ -109,16 +94,22 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) } void -nni_posix_pfd_fini(nni_posix_pfd *pfd) +nni_posix_pfd_stop(nni_posix_pfd *pfd) { nni_posix_pollq *pq = pfd->pq; + if (pq == NULL) { + return; + } + nni_posix_pfd_close(pfd); nni_mtx_lock(&pq->mtx); - if (nni_list_active(&pq->pollq, pfd)) { - nni_list_remove(&pq->pollq, pfd); + if (!nni_list_active(&pq->pollq, pfd)) { + nni_mtx_unlock(&pq->mtx); + return; } + nni_list_remove(&pq->pollq, pfd); if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { nni_list_append(&pq->reapq, pfd); @@ -128,12 +119,22 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) } } nni_mtx_unlock(&pq->mtx); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + + if (pq == NULL) { + return; + } + nni_posix_pfd_stop(pfd); // We're exclusive now. (void) close(pfd->fd); nni_cv_fini(&pfd->cv); nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); } int @@ -241,8 +242,6 @@ nni_posix_poll_thr(void *arg) return; } } else { - nni_posix_pfd_cb cb; - void *arg; if ((events & (POLLIN | POLLOUT)) != 0) { // don't emit pollhup yet, we want @@ -250,14 +249,10 @@ nni_posix_poll_thr(void *arg) events &= ~POLLHUP; } nni_mtx_lock(&pfd->mtx); - cb = pfd->cb; - arg = pfd->arg; pfd->events &= ~events; nni_mtx_unlock(&pfd->mtx); - if (cb) { - cb(pfd, events, arg); - } + pfd->cb(pfd->arg, events); } } } diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c index 86732e17..49658f3b 100644 --- a/src/platform/posix/posix_pollq_port.c +++ b/src/platform/posix/posix_pollq_port.c @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -35,17 +35,13 @@ typedef struct nni_posix_pollq { // single global instance for now static nni_posix_pollq nni_posix_global_pollq; -int -nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +void +nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg) { nni_posix_pollq *pq; - nni_posix_pfd *pfd; pq = &nni_posix_global_pollq; - if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { - return (NNG_ENOMEM); - } (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); @@ -55,10 +51,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pfd->closing = false; pfd->fd = fd; pfd->pq = pq; - pfd->cb = NULL; + pfd->cb = cb; + pfd->arg = arg; pfd->data = NULL; - *pfdp = pfd; - return (0); } int @@ -72,6 +67,10 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) { nni_posix_pollq *pq = pfd->pq; + if (pq == NULL) { + return; + } + nni_mtx_lock(&pfd->mtx); if (!pfd->closing) { pfd->closing = true; @@ -86,11 +85,13 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) } void -nni_posix_pfd_fini(nni_posix_pfd *pfd) +nni_posix_pfd_stop(nni_posix_pfd *pfd) { nni_posix_pollq *pq = pfd->pq; - nni_posix_pfd_close(pfd); + if (pq == NULL) { + return; + } NNI_ASSERT(!nni_thr_is_self(&pq->thr)); @@ -101,18 +102,28 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) } sched_yield(); // try again later... } - nni_mtx_lock(&pfd->mtx); while (!pfd->closed) { nni_cv_wait(&pfd->cv); } nni_mtx_unlock(&pfd->mtx); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + + if (pq == NULL) { + return; + } + + nni_posix_pfd_stop(pfd); // We're exclusive now. (void) close(pfd->fd); nni_cv_fini(&pfd->cv); nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); } int @@ -221,17 +232,6 @@ nni_posix_pollq_create(nni_posix_pollq *pq) return (0); } -void -nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) -{ - NNI_ASSERT(cb != NULL); // must not be null when established. - - nni_mtx_lock(&pfd->mtx); - pfd->cb = cb; - pfd->data = arg; - nni_mtx_unlock(&pfd->mtx); -} - int nni_posix_pollq_sysinit(nng_init_params *params) { diff --git a/src/platform/posix/posix_pollq_select.c b/src/platform/posix/posix_pollq_select.c index 211c9328..0e24fa16 100644 --- a/src/platform/posix/posix_pollq_select.c +++ b/src/platform/posix/posix_pollq_select.c @@ -44,10 +44,9 @@ typedef struct nni_posix_pollq { static nni_posix_pollq nni_posix_global_pollq; -int -nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +void +nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) { - nni_posix_pfd *pfd; nni_posix_pollq *pq = &nni_posix_global_pollq; // Set this is as soon as possible (narrow the close-exec race as @@ -56,43 +55,24 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); - if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { - return (NNG_ENOMEM); - } if (fd >= FD_SETSIZE) { - return (NNG_EINVAL); + return; } nni_mtx_init(&pfd->mtx); nni_cv_init(&pfd->cv, &pq->mtx); - pfd->fd = fd; - pfd->events = 0; - pfd->cb = NULL; - pfd->arg = NULL; - pfd->pq = pq; + pfd->fd = fd; + pfd->events = 0; + pfd->cb = cb; + pfd->arg = arg; + pfd->pq = pq; + pfd->stopped = false; + pfd->reap = false; nni_mtx_lock(&pq->mtx); - if (pq->closing) { - nni_mtx_unlock(&pq->mtx); - nni_cv_fini(&pfd->cv); - nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); - return (NNG_ECLOSED); - } pq->pfds[fd] = pfd; if (fd > pq->maxfd) { pq->maxfd = fd; } nni_mtx_unlock(&pq->mtx); - *pfdp = pfd; - return (0); -} - -void -nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) -{ - nni_mtx_lock(&pfd->mtx); - pfd->cb = cb; - pfd->arg = arg; - nni_mtx_unlock(&pfd->mtx); } int @@ -104,34 +84,50 @@ nni_posix_pfd_fd(nni_posix_pfd *pfd) void nni_posix_pfd_close(nni_posix_pfd *pfd) { - (void) shutdown(pfd->fd, SHUT_RDWR); + if (pfd->pq != NULL) { + (void) shutdown(pfd->fd, SHUT_RDWR); + } } void -nni_posix_pfd_fini(nni_posix_pfd *pfd) +nni_posix_pfd_stop(nni_posix_pfd *pfd) { nni_posix_pollq *pq = pfd->pq; - int fd = pfd->fd; - + if (pq == NULL) { + return; + } nni_posix_pfd_close(pfd); + NNI_ASSERT(!nni_thr_is_self(&pq->thr)); nni_mtx_lock(&pq->mtx); - if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { - pfd->reap = true; + if (!pfd->stopped) { + pfd->stopped = true; + pfd->reap = true; nni_plat_pipe_raise(pq->wakewfd); while (pfd->reap) { nni_cv_wait(&pfd->cv); } - } else { - pq->pfds[fd] = NULL; } nni_mtx_unlock(&pq->mtx); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + int fd = pfd->fd; + + if (pq == NULL) { + return; + } + + nni_posix_pfd_stop(pfd); + NNI_ASSERT(!nni_thr_is_self(&pq->thr)); // We're exclusive now. (void) close(fd); nni_cv_fini(&pfd->cv); nni_mtx_fini(&pfd->mtx); - NNI_FREE_STRUCT(pfd); } int @@ -247,17 +243,11 @@ nni_posix_poll_thr(void *arg) events |= NNI_POLL_HUP; } if (events != 0) { - nni_posix_pfd_cb cb = NULL; - void *arg; if ((pfd = pq->pfds[fd]) != NULL) { - cb = pfd->cb; - arg = pfd->arg; pfd->events &= ~events; - } - if (cb) { nni_mtx_unlock(&pq->mtx); - cb(pfd, events, arg); + pfd->cb(pfd->arg, events); nni_mtx_lock(&pq->mtx); } } diff --git a/src/platform/posix/posix_pollq_select.h b/src/platform/posix/posix_pollq_select.h index 3a1d48e3..012f10a0 100644 --- a/src/platform/posix/posix_pollq_select.h +++ b/src/platform/posix/posix_pollq_select.h @@ -22,6 +22,7 @@ struct nni_posix_pfd { nni_posix_pfd_cb cb; void *arg; bool reap; + bool stopped; }; #define NNI_POLL_IN (0x0001) diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c index 8ccca66c..7ed95681 100644 --- a/src/platform/posix/posix_sockfd.c +++ b/src/platform/posix/posix_sockfd.c @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -22,23 +22,23 @@ #include "platform/posix/posix_peerid.h" struct nni_sfd_conn { - nng_stream stream; - nni_posix_pfd *pfd; - int fd; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; - nni_reap_node reap; + nng_stream stream; + nni_posix_pfd pfd; + int fd; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_reap_node reap; }; static void sfd_dowrite(nni_sfd_conn *c) { nni_aio *aio; - int fd; + int fd = c->fd; - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed) { return; } @@ -101,9 +101,9 @@ static void sfd_doread(nni_sfd_conn *c) { nni_aio *aio; - int fd; + int fd = c->fd; - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed) { return; } @@ -174,9 +174,7 @@ sfd_error(void *arg, int err) nni_aio_list_remove(aio); nni_aio_finish_error(aio, err); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); nni_mtx_unlock(&c->mtx); } @@ -193,9 +191,7 @@ sfd_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); } nni_mtx_unlock(&c->mtx); } @@ -206,11 +202,7 @@ sfd_stop(void *arg) nni_sfd_conn *c = arg; sfd_close(c); - // ideally this would *stop* without freeing the pfd - if (c->pfd != NULL) { - nni_posix_pfd_fini(c->pfd); - c->pfd = NULL; - } + nni_posix_pfd_stop(&c->pfd); } // sfd_fini may block briefly waiting for the pollq thread. @@ -220,6 +212,7 @@ sfd_fini(void *arg) { nni_sfd_conn *c = arg; sfd_stop(c); + nni_posix_pfd_fini(&c->pfd); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); @@ -237,7 +230,7 @@ sfd_free(void *arg) } static void -sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +sfd_cb(void *arg, unsigned events) { struct nni_sfd_conn *c = arg; @@ -260,7 +253,7 @@ sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg) events |= NNI_POLL_IN; } if ((!c->closed) && (events != 0)) { - nni_posix_pfd_arm(pfd, events); + nni_posix_pfd_arm(&c->pfd, events); } nni_mtx_unlock(&c->mtx); } @@ -302,7 +295,7 @@ sfd_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -336,7 +329,7 @@ sfd_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); @@ -467,14 +460,10 @@ int nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) { nni_sfd_conn *c; - int rv; if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) { - NNI_FREE_STRUCT(c); - return (rv); - } + nni_posix_pfd_init(&c->pfd, fd, sfd_cb, c); c->closed = false; c->fd = fd; @@ -491,8 +480,6 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd) c->stream.s_get = sfd_get; c->stream.s_set = sfd_set; - nni_posix_pfd_set_cb(c->pfd, sfd_cb, c); - *cp = c; return (0); } diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h index 20504bf5..0eec62fe 100644 --- a/src/platform/posix/posix_tcp.h +++ b/src/platform/posix/posix_tcp.h @@ -1,5 +1,5 @@ // -// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -18,7 +18,7 @@ struct nni_tcp_conn { nng_stream stream; - nni_posix_pfd *pfd; + nni_posix_pfd pfd; nni_list readq; nni_list writeq; bool closed; @@ -40,9 +40,9 @@ struct nni_tcp_dialer { nni_atomic_bool fini; }; -extern int nni_posix_tcp_alloc(nni_tcp_conn **, nni_tcp_dialer *); -extern void nni_posix_tcp_init(nni_tcp_conn *, nni_posix_pfd *); +extern int nni_posix_tcp_alloc(nni_tcp_conn **, nni_tcp_dialer *, int); extern void nni_posix_tcp_start(nni_tcp_conn *, int, int); extern void nni_posix_tcp_dialer_rele(nni_tcp_dialer *); +extern void nni_posix_tcp_dial_cb(void *, unsigned); #endif // PLATFORM_POSIX_TCP_H diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index d49fc838..b1a2233c 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -32,9 +32,9 @@ static void tcp_dowrite(nni_tcp_conn *c) { nni_aio *aio; - int fd; + int fd = nni_posix_pfd_fd(&c->pfd); - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed) { return; } @@ -101,9 +101,9 @@ static void tcp_doread(nni_tcp_conn *c) { nni_aio *aio; - int fd; + int fd = nni_posix_pfd_fd(&c->pfd); - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed) { return; } @@ -174,9 +174,7 @@ tcp_error(void *arg, int err) nni_aio_list_remove(aio); nni_aio_finish_error(aio, err); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); nni_mtx_unlock(&c->mtx); } @@ -193,9 +191,7 @@ tcp_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); } nni_mtx_unlock(&c->mtx); } @@ -203,18 +199,10 @@ tcp_close(void *arg) static void tcp_stop(void *arg) { - nni_tcp_conn *c = arg; - nni_posix_pfd *pfd; + nni_tcp_conn *c = arg; tcp_close(c); - nni_mtx_lock(&c->mtx); - pfd = c->pfd; - c->pfd = NULL; - nni_mtx_unlock(&c->mtx); - - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } + nni_posix_pfd_stop(&c->pfd); } // tcp_fini may block briefly waiting for the pollq thread. @@ -224,6 +212,7 @@ tcp_fini(void *arg) { nni_tcp_conn *c = arg; tcp_stop(c); + nni_posix_pfd_fini(&c->pfd); nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { @@ -245,7 +234,7 @@ tcp_free(void *arg) } static void -tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +tcp_cb(void *arg, unsigned events) { nni_tcp_conn *c = arg; @@ -253,6 +242,10 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) tcp_error(c, NNG_ECONNSHUT); return; } + if (c->dial_aio != NULL) { + nni_posix_tcp_dial_cb(c, events); + return; + } nni_mtx_lock(&c->mtx); if ((events & NNI_POLL_IN) != 0) { tcp_doread(c); @@ -268,7 +261,7 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) events |= NNI_POLL_IN; } if ((!c->closed) && (events != 0)) { - nni_posix_pfd_arm(pfd, events); + nni_posix_pfd_arm(&c->pfd, events); } nni_mtx_unlock(&c->mtx); } @@ -310,7 +303,7 @@ tcp_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -344,7 +337,7 @@ tcp_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); @@ -356,7 +349,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn *c = arg; struct sockaddr_storage ss; socklen_t len = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int rv; nng_sockaddr sa; @@ -375,7 +368,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn *c = arg; struct sockaddr_storage ss; socklen_t len = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int rv; nng_sockaddr sa; @@ -392,7 +385,7 @@ static int tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int val = 0; socklen_t valsz = sizeof(val); @@ -407,7 +400,7 @@ static int tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int val = 0; socklen_t valsz = sizeof(val); @@ -455,7 +448,7 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) } int -nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) +nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d, int fd) { nni_tcp_conn *c; if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { @@ -468,6 +461,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); nni_aio_list_init(&c->writeq); + nni_posix_pfd_init(&c->pfd, fd, tcp_cb, c); c->stream.s_free = tcp_free; c->stream.s_stop = tcp_stop; @@ -482,19 +476,11 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) } void -nni_posix_tcp_init(nni_tcp_conn *c, nni_posix_pfd *pfd) -{ - c->pfd = pfd; -} - -void nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive) { // Configure the initial socket options. - (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY, + (void) setsockopt(nni_posix_pfd_fd(&c->pfd), IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(int)); - (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE, + (void) setsockopt(nni_posix_pfd_fd(&c->pfd), SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(int)); - - nni_posix_pfd_set_cb(c->pfd, tcp_cb, c); } diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index 52ea6cff..442cd824 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -123,8 +123,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) nng_stream_free(&c->stream); } -static void -tcp_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) +void +nni_posix_tcp_dial_cb(void *arg, unsigned ev) { nni_tcp_conn *c = arg; nni_tcp_dialer *d = c->dialer; @@ -145,7 +145,7 @@ tcp_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg) } else { socklen_t sz = sizeof(int); - int fd = nni_posix_pfd_fd(pfd); + int fd = nni_posix_pfd_fd(&c->pfd); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) { rv = errno; } @@ -185,7 +185,6 @@ void nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) { nni_tcp_conn *c; - nni_posix_pfd *pfd = NULL; struct sockaddr_storage ss; size_t sslen; int fd; @@ -210,24 +209,12 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) nni_atomic_inc(&d->ref); - if ((rv = nni_posix_tcp_alloc(&c, d)) != 0) { + if ((rv = nni_posix_tcp_alloc(&c, d, fd)) != 0) { nni_aio_finish_error(aio, rv); nni_posix_tcp_dialer_rele(d); return; } - // This arranges for the fd to be in non-blocking mode, and adds the - // poll fd to the list. - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - (void) close(fd); - // the error label unlocks this - nni_mtx_lock(&d->mtx); - goto error; - } - - nni_posix_tcp_init(c, pfd); - nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c); - nni_mtx_lock(&d->mtx); if (d->closed) { rv = NNG_ECLOSED; @@ -248,7 +235,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) goto error; } // Asynchronous connect. - if ((rv = nni_posix_pfd_arm(pfd, NNI_POLL_OUT)) != 0) { + if ((rv = nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT)) != 0) { goto error; } c->dial_aio = aio; diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index 32f0fd60..da2e8050 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -34,13 +34,13 @@ #include "posix_tcp.h" struct nni_tcp_listener { - nni_posix_pfd *pfd; - nni_list acceptq; - bool started; - bool closed; - bool nodelay; - bool keepalive; - nni_mtx mtx; + nni_posix_pfd pfd; + nni_list acceptq; + bool started; + bool closed; + bool nodelay; + bool keepalive; + nni_mtx mtx; }; int @@ -53,7 +53,6 @@ nni_tcp_listener_init(nni_tcp_listener **lp) nni_mtx_init(&l->mtx); - l->pfd = NULL; l->closed = false; l->started = false; l->nodelay = true; @@ -74,9 +73,7 @@ tcp_listener_doclose(nni_tcp_listener *l) nni_aio_finish_error(aio, NNG_ECLOSED); } - if (l->pfd != NULL) { - nni_posix_pfd_close(l->pfd); - } + nni_posix_pfd_close(&l->pfd); } void @@ -93,15 +90,14 @@ tcp_listener_doaccept(nni_tcp_listener *l) nni_aio *aio; while ((aio = nni_list_first(&l->acceptq)) != NULL) { - int newfd; - int fd; - int rv; - int nd; - int ka; - nni_posix_pfd *pfd; - nni_tcp_conn *c; + int newfd; + int fd; + int rv; + int nd; + int ka; + nni_tcp_conn *c; - fd = nni_posix_pfd_fd(l->pfd); + fd = nni_posix_pfd_fd(&l->pfd); #ifdef NNG_USE_ACCEPT4 newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC); @@ -119,7 +115,7 @@ tcp_listener_doaccept(nni_tcp_listener *l) case EWOULDBLOCK: #endif #endif - rv = nni_posix_pfd_arm(l->pfd, NNI_POLL_IN); + rv = nni_posix_pfd_arm(&l->pfd, NNI_POLL_IN); if (rv != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -141,24 +137,13 @@ tcp_listener_doaccept(nni_tcp_listener *l) } } - if ((rv = nni_posix_tcp_alloc(&c, NULL)) != 0) { + if ((rv = nni_posix_tcp_alloc(&c, NULL, newfd)) != 0) { close(newfd); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); continue; } - if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) { - close(newfd); - nng_stream_stop(&c->stream); - nng_stream_free(&c->stream); - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - continue; - } - - nni_posix_tcp_init(c, pfd); - ka = l->keepalive ? 1 : 0; nd = l->nodelay ? 1 : 0; nni_aio_list_remove(aio); @@ -169,10 +154,9 @@ tcp_listener_doaccept(nni_tcp_listener *l) } static void -tcp_listener_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +tcp_listener_cb(void *arg, unsigned events) { nni_tcp_listener *l = arg; - NNI_ARG_UNUSED(pfd); nni_mtx_lock(&l->mtx); if ((events & NNI_POLL_INVAL) != 0) { @@ -209,7 +193,6 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) struct sockaddr_storage ss; int rv; int fd; - nni_posix_pfd *pfd; if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || #ifdef NNG_ENABLE_IPV6 @@ -236,12 +219,6 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) return (nni_plat_errno(errno)); } - if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) { - nni_mtx_unlock(&l->mtx); - (void) close(fd); - return (rv); - } - // On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows // SO_REUSEADDR, which is almost completely different (and wrong!) from // traditional SO_REUSEADDR. @@ -257,8 +234,8 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) if (bind(fd, (struct sockaddr *) &ss, len) < 0) { rv = nni_plat_errno(errno); + (void) close(fd); nni_mtx_unlock(&l->mtx); - nni_posix_pfd_fini(pfd); return (rv); } @@ -266,14 +243,13 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) // bad things are going to happen. if (listen(fd, 128) != 0) { rv = nni_plat_errno(errno); + (void) close(fd); nni_mtx_unlock(&l->mtx); - nni_posix_pfd_fini(pfd); return (rv); } - nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l); + nni_posix_pfd_init(&l->pfd, fd, tcp_listener_cb, l); - l->pfd = pfd; l->started = true; nni_mtx_unlock(&l->mtx); @@ -283,23 +259,18 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa) void nni_tcp_listener_stop(nni_tcp_listener *l) { - nni_posix_pfd *pfd; - nni_mtx_lock(&l->mtx); tcp_listener_doclose(l); - pfd = l->pfd; - l->pfd = NULL; nni_mtx_unlock(&l->mtx); - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } + nni_posix_pfd_stop(&l->pfd); } void nni_tcp_listener_fini(nni_tcp_listener *l) { nni_tcp_listener_stop(l); + nni_posix_pfd_fini(&l->pfd); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -350,7 +321,7 @@ tcp_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_type t) struct sockaddr_storage ss; socklen_t len = sizeof(ss); (void) getsockname( - nni_posix_pfd_fd(l->pfd), (void *) &ss, &len); + nni_posix_pfd_fd(&l->pfd), (void *) &ss, &len); (void) nni_posix_sockaddr2nn(&sa, &ss, len); } else { sa.s_family = NNG_AF_UNSPEC; diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 8d0d4a42..69535aa4 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -48,11 +48,11 @@ #endif struct nni_plat_udp { - nni_posix_pfd *udp_pfd; - int udp_fd; - nni_list udp_recvq; - nni_list udp_sendq; - nni_mtx udp_mtx; + nni_posix_pfd udp_pfd; + int udp_fd; + nni_list udp_recvq; + nni_list udp_sendq; + nni_mtx udp_mtx; }; static void @@ -175,10 +175,9 @@ nni_posix_udp_dosend(nni_plat_udp *udp) // This function is called by the poller on activity on the FD. static void -nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +nni_posix_udp_cb(void *arg, unsigned events) { nni_plat_udp *udp = arg; - NNI_ARG_UNUSED(pfd); nni_mtx_lock(&udp->udp_mtx); if (events & NNI_POLL_IN) { @@ -199,7 +198,7 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) } if (events) { int rv; - rv = nni_posix_pfd_arm(udp->udp_pfd, events); + rv = nni_posix_pfd_arm(&udp->udp_pfd, events); if (rv != 0) { nni_posix_udp_doerror(udp, rv); } @@ -225,6 +224,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) return (NNG_ENOMEM); } nni_mtx_init(&udp->udp_mtx); + nni_aio_list_init(&udp->udp_recvq); + nni_aio_list_init(&udp->udp_sendq); udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (udp->udp_fd < 0) { @@ -241,16 +242,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) NNI_FREE_STRUCT(udp); return (rv); } - if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) { - (void) close(udp->udp_fd); - nni_mtx_fini(&udp->udp_mtx); - NNI_FREE_STRUCT(udp); - return (rv); - } - nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp); - nni_aio_list_init(&udp->udp_recvq); - nni_aio_list_init(&udp->udp_sendq); + nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp); *upp = udp; return (0); @@ -259,12 +252,13 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) void nni_plat_udp_close(nni_plat_udp *udp) { - nni_posix_pfd_fini(udp->udp_pfd); + nni_posix_pfd_stop(&udp->udp_pfd); nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); nni_mtx_unlock(&udp->udp_mtx); + nni_posix_pfd_fini(&udp->udp_pfd); (void) close(udp->udp_fd); nni_mtx_fini(&udp->udp_mtx); NNI_FREE_STRUCT(udp); @@ -298,7 +292,8 @@ nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) } nni_list_append(&udp->udp_recvq, aio); if (nni_list_first(&udp->udp_recvq) == aio) { - if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_IN)) != 0) { + if ((rv = nni_posix_pfd_arm(&udp->udp_pfd, NNI_POLL_IN)) != + 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } @@ -321,7 +316,7 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) } nni_list_append(&udp->udp_sendq, aio); if (nni_list_first(&udp->udp_sendq) == aio) { - if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_OUT)) != + if ((rv = nni_posix_pfd_arm(&udp->udp_pfd, NNI_POLL_OUT)) != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); |
