aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-19 23:50:13 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-20 08:12:42 -0800
commit20e625137a38232c87871661c684953ef2cfc5f8 (patch)
treeaf75f9c134ea89160a1ccbe80d17b0f3582c8239 /src
parent60f63557d87528497fe1392fa6a676b2a51efb16 (diff)
downloadnng-20e625137a38232c87871661c684953ef2cfc5f8.tar.gz
nng-20e625137a38232c87871661c684953ef2cfc5f8.tar.bz2
nng-20e625137a38232c87871661c684953ef2cfc5f8.zip
posix pollers: inline the pfd and make callbacks constant
This change moves the posix pollers to inline the PFD and makes the callbacks constant, so that we can dispense with tests, failures, and locks. It is anticipated that this will reduce lock based pressure on the bus and increase performance modestly.
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_ipc.h9
-rw-r--r--src/platform/posix/posix_ipcconn.c63
-rw-r--r--src/platform/posix/posix_ipcdial.c23
-rw-r--r--src/platform/posix/posix_ipclisten.c56
-rw-r--r--src/platform/posix/posix_pollq.h8
-rw-r--r--src/platform/posix/posix_pollq_epoll.c146
-rw-r--r--src/platform/posix/posix_pollq_epoll.h4
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c128
-rw-r--r--src/platform/posix/posix_pollq_kqueue.h4
-rw-r--r--src/platform/posix/posix_pollq_poll.c55
-rw-r--r--src/platform/posix/posix_pollq_port.c50
-rw-r--r--src/platform/posix/posix_pollq_select.c82
-rw-r--r--src/platform/posix/posix_pollq_select.h1
-rw-r--r--src/platform/posix/posix_sockfd.c57
-rw-r--r--src/platform/posix/posix_tcp.h8
-rw-r--r--src/platform/posix/posix_tcpconn.c64
-rw-r--r--src/platform/posix/posix_tcpdial.c23
-rw-r--r--src/platform/posix/posix_tcplisten.c77
-rw-r--r--src/platform/posix/posix_udp.c35
19 files changed, 366 insertions, 527 deletions
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
index 98af298b..3f3549c6 100644
--- a/src/platform/posix/posix_ipc.h
+++ b/src/platform/posix/posix_ipc.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -22,7 +22,7 @@
struct nni_ipc_conn {
nng_stream stream;
- nni_posix_pfd *pfd;
+ nni_posix_pfd pfd;
nni_list readq;
nni_list writeq;
bool closed;
@@ -44,10 +44,9 @@ struct nni_ipc_dialer {
};
extern int nni_posix_ipc_alloc(
- nni_ipc_conn **, nni_sockaddr *, nni_ipc_dialer *);
-extern void nni_posix_ipc_init(nni_ipc_conn *, nni_posix_pfd *);
-extern void nni_posix_ipc_start(nni_ipc_conn *);
+ nni_ipc_conn **, nni_sockaddr *, nni_ipc_dialer *, int);
extern void nni_posix_ipc_dialer_rele(nni_ipc_dialer *);
+extern void nni_posix_ipc_dialer_cb(void *arg, unsigned events);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index a198b87f..69ce2d32 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -34,7 +34,7 @@ ipc_dowrite(ipc_conn *c)
nni_aio *aio;
int fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed || ((fd = nni_posix_pfd_fd(&c->pfd)) < 0)) {
return;
}
@@ -103,7 +103,7 @@ ipc_doread(ipc_conn *c)
nni_aio *aio;
int fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed || ((fd = nni_posix_pfd_fd(&c->pfd)) < 0)) {
return;
}
@@ -174,7 +174,7 @@ ipc_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- nni_posix_pfd_close(c->pfd);
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -191,18 +191,20 @@ ipc_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
static void
-ipc_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+ipc_cb(void *arg, unsigned events)
{
ipc_conn *c = arg;
+ if (c->dial_aio != NULL) {
+ nni_posix_ipc_dialer_cb(arg, events);
+ return;
+ }
if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) {
ipc_error(c, NNG_ECONNSHUT);
return;
@@ -222,7 +224,7 @@ ipc_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -264,7 +266,7 @@ ipc_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -298,7 +300,7 @@ ipc_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -312,7 +314,7 @@ ipc_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &id, &ignore,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &id, &ignore,
&ignore, &ignore)) != 0) {
return (rv);
}
@@ -327,7 +329,7 @@ ipc_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &id,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &id,
&ignore, &ignore)) != 0) {
return (rv);
}
@@ -342,7 +344,7 @@ ipc_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &ignore,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &ignore,
&ignore, &id)) != 0) {
return (rv);
}
@@ -361,7 +363,7 @@ ipc_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &ignore,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &ignore,
&id, &ignore)) != 0) {
return (rv);
}
@@ -379,27 +381,14 @@ ipc_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_sockaddr(&c->sa, buf, szp, t));
}
-void
-nni_posix_ipc_start(nni_ipc_conn *c)
-{
- nni_posix_pfd_set_cb(c->pfd, ipc_cb, c);
-}
-
static void
ipc_stop(void *arg)
{
- ipc_conn *c = arg;
- nni_posix_pfd *pfd;
+ ipc_conn *c = arg;
ipc_close(c);
- nni_mtx_lock(&c->mtx);
- pfd = c->pfd;
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&c->pfd);
}
static void
@@ -408,11 +397,13 @@ ipc_reap(void *arg)
ipc_conn *c = arg;
ipc_stop(c);
- nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
nni_posix_ipc_dialer_rele(c->dialer);
}
+ nni_posix_pfd_fini(&c->pfd);
+
+ nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
@@ -472,7 +463,8 @@ ipc_set(void *arg, const char *name, const void *val, size_t sz, nni_type t)
}
int
-nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
+nni_posix_ipc_alloc(
+ nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d, int fd)
{
ipc_conn *c;
@@ -494,13 +486,8 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ nni_posix_pfd_init(&c->pfd, fd, ipc_cb, c);
*cp = c;
return (0);
}
-
-void
-nni_posix_ipc_init(nni_ipc_conn *c, nni_posix_pfd *pfd)
-{
- c->pfd = pfd;
-}
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index 667d8262..ae98c4f4 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -106,8 +106,8 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nng_stream_free(&c->stream);
}
-static void
-ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
+void
+nni_posix_ipc_dialer_cb(void *arg, unsigned ev)
{
nni_ipc_conn *c = arg;
nni_ipc_dialer *d = c->dialer;
@@ -126,7 +126,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
} else {
socklen_t sz = sizeof(int);
- int fd = nni_posix_pfd_fd(pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
rv = errno;
}
@@ -153,7 +153,6 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
return;
}
- nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -188,24 +187,13 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
nni_atomic_inc(&d->ref);
- if ((rv = nni_posix_ipc_alloc(&c, &d->sa, d)) != 0) {
+ if ((rv = nni_posix_ipc_alloc(&c, &d->sa, d, fd)) != 0) {
(void) close(fd);
nni_posix_ipc_dialer_rele(d);
nni_aio_finish_error(aio, rv);
return;
}
- // This arranges for the fd to be in non-blocking mode, and adds the
- // poll fd to the list.
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- // the error label unlocks this
- nni_mtx_lock(&d->mtx);
- goto error;
- }
-
- nni_posix_ipc_init(c, pfd);
- nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c);
-
nni_mtx_lock(&d->mtx);
if (d->closed) {
rv = NNG_ECLOSED;
@@ -225,10 +213,10 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
goto error;
}
// Asynchronous connect.
+ c->dial_aio = aio;
if ((rv = nni_posix_pfd_arm(pfd, NNI_POLL_OUT)) != 0) {
goto error;
}
- c->dial_aio = aio;
nni_aio_set_prov_data(aio, c);
nni_list_append(&d->connq, aio);
nni_mtx_unlock(&d->mtx);
@@ -238,7 +226,6 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
// on loop back, and probably not on every platform.
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
- nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
return;
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index a122d7bb..7e0250f4 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -31,7 +31,7 @@
typedef struct {
nng_stream_listener sl;
- nni_posix_pfd *pfd;
+ nni_posix_pfd pfd;
nng_sockaddr sa;
nni_list acceptq;
bool started;
@@ -53,9 +53,7 @@ ipc_listener_doclose(ipc_listener *l)
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (l->pfd != NULL) {
- nni_posix_pfd_close(l->pfd);
- }
+ nni_posix_pfd_close(&l->pfd);
if (l->started && ((path = l->path) != NULL)) {
l->path = NULL;
(void) unlink(path);
@@ -75,18 +73,13 @@ ipc_listener_close(void *arg)
static void
ipc_listener_stop(void *arg)
{
- ipc_listener *l = arg;
- nni_posix_pfd *pfd;
+ ipc_listener *l = arg;
nni_mtx_lock(&l->mtx);
ipc_listener_doclose(l);
- pfd = l->pfd;
- l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&l->pfd);
}
static void
@@ -95,13 +88,12 @@ ipc_listener_doaccept(ipc_listener *l)
nni_aio *aio;
while ((aio = nni_list_first(&l->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- nni_posix_pfd *pfd;
- nni_ipc_conn *c;
+ int newfd;
+ int fd;
+ int rv;
+ nni_ipc_conn *c;
- fd = nni_posix_pfd_fd(l->pfd);
+ fd = nni_posix_pfd_fd(&l->pfd);
#ifdef NNG_USE_ACCEPT4
newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
@@ -119,7 +111,7 @@ ipc_listener_doaccept(ipc_listener *l)
case EWOULDBLOCK:
#endif
#endif
- rv = nni_posix_pfd_arm(l->pfd, NNI_POLL_IN);
+ rv = nni_posix_pfd_arm(&l->pfd, NNI_POLL_IN);
if (rv != 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -141,35 +133,23 @@ ipc_listener_doaccept(ipc_listener *l)
}
}
- if ((rv = nni_posix_ipc_alloc(&c, &l->sa, NULL)) != 0) {
+ if ((rv = nni_posix_ipc_alloc(&c, &l->sa, NULL, newfd)) != 0) {
(void) close(newfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
continue;
}
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- nng_stream_stop(&c->stream);
- nng_stream_free(&c->stream);
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- continue;
- }
-
- nni_posix_ipc_init(c, pfd);
-
nni_aio_list_remove(aio);
- nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
}
static void
-ipc_listener_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+ipc_listener_cb(void *arg, unsigned events)
{
ipc_listener *l = arg;
- NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if ((events & NNI_POLL_INVAL) != 0) {
@@ -325,7 +305,6 @@ ipc_listener_listen(void *arg)
struct sockaddr_storage ss;
int rv;
int fd;
- nni_posix_pfd *pfd;
char *path;
if ((len = nni_posix_nn2sockaddr(&ss, &l->sa)) < sizeof(sa_family_t)) {
@@ -380,7 +359,7 @@ ipc_listener_listen(void *arg)
(listen(fd, 128) != 0)) {
rv = nni_plat_errno(errno);
}
- if ((rv != 0) || ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) {
+ if (rv != 0) {
nni_mtx_unlock(&l->mtx);
(void) close(fd);
if (path != NULL) {
@@ -390,6 +369,8 @@ ipc_listener_listen(void *arg)
return (rv);
}
+ nni_posix_pfd_init(&l->pfd, fd, ipc_listener_cb, l);
+
#ifdef NNG_HAVE_ABSTRACT_SOCKETS
// If the original address was for a system assigned value,
// then figure out what we got. This is analogous to TCP
@@ -411,9 +392,6 @@ ipc_listener_listen(void *arg)
}
#endif
- nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l);
-
- l->pfd = pfd;
l->started = true;
l->path = path;
nni_mtx_unlock(&l->mtx);
@@ -428,6 +406,7 @@ ipc_listener_free(void *arg)
ipc_listener_stop(l);
+ nni_posix_pfd_fini(&l->pfd);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -512,7 +491,6 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
nni_mtx_init(&l->mtx);
nni_aio_list_init(&l->acceptq);
- l->pfd = NULL;
l->closed = false;
l->started = false;
l->perms = 0;
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index c79a3e18..121834cf 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -22,7 +22,7 @@
#include "core/nng_impl.h"
typedef struct nni_posix_pfd nni_posix_pfd;
-typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *);
+typedef void (*nni_posix_pfd_cb)(void *, unsigned);
#if defined(NNG_POLLQ_KQUEUE)
#include "posix_pollq_kqueue.h"
@@ -33,17 +33,17 @@ typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *);
#elif defined(NNG_POLLQ_POLL)
#include "posix_pollq_epoll.h"
#elif defined(NNG_POLLQ_SELECT)
-#include "posix_pollq_epoll.h"
+#include "posix_pollq_select.h"
#else
#error "No suitable poller defined"
#endif
-extern int nni_posix_pfd_init(nni_posix_pfd **, int);
+extern void nni_posix_pfd_init(nni_posix_pfd *, int, nni_posix_pfd_cb, void *);
extern void nni_posix_pfd_fini(nni_posix_pfd *);
+extern void nni_posix_pfd_stop(nni_posix_pfd *);
extern int nni_posix_pfd_arm(nni_posix_pfd *, unsigned);
extern int nni_posix_pfd_fd(nni_posix_pfd *);
extern void nni_posix_pfd_close(nni_posix_pfd *);
-extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index d09289e4..e789e5d2 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -61,32 +61,28 @@ typedef struct nni_posix_pollq {
// single global instance for now.
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pfd;
nni_posix_pollq *pq;
struct epoll_event ev;
- int rv;
pq = &nni_posix_global_pollq;
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
+ nni_atomic_flag_reset(&pfd->stopped);
+ nni_atomic_flag_reset(&pfd->closed);
- pfd->pq = pq;
- pfd->fd = fd;
- pfd->cb = NULL;
- pfd->arg = NULL;
- pfd->events = 0;
- pfd->closing = false;
- pfd->closed = false;
+ pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->cb = cb;
+ pfd->arg = arg;
+ pfd->events = 0;
+ pfd->reap = false;
NNI_LIST_NODE_INIT(&pfd->node);
@@ -95,16 +91,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
ev.events = 0;
ev.data.ptr = pfd;
- if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
- rv = nni_plat_errno(errno);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
- return (rv);
- }
-
- *pfdp = pfd;
- return (0);
+ // if this fails the system is probably out of memory - it will fail in
+ // arm
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev);
}
int
@@ -118,20 +107,18 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
// epoll implementation.
nni_mtx_lock(&pfd->mtx);
- if (!pfd->closing) {
- struct epoll_event ev;
- pfd->events |= events;
- events = pfd->events;
-
- memset(&ev, 0, sizeof(ev));
- ev.events = events | NNI_EPOLL_FLAGS;
- ev.data.ptr = pfd;
-
- if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
- int rv = nni_plat_errno(errno);
- nni_mtx_unlock(&pfd->mtx);
- return (rv);
- }
+ struct epoll_event ev;
+ pfd->events |= events;
+ events = pfd->events;
+
+ memset(&ev, 0, sizeof(ev));
+ ev.events = events | NNI_EPOLL_FLAGS;
+ ev.data.ptr = pfd;
+
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
}
nni_mtx_unlock(&pfd->mtx);
return (0);
@@ -144,44 +131,44 @@ nni_posix_pfd_fd(nni_posix_pfd *pfd)
}
void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->arg = arg;
- nni_mtx_unlock(&pfd->mtx);
-}
-
-void
nni_posix_pfd_close(nni_posix_pfd *pfd)
{
+ nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+ if (nni_atomic_flag_test_and_set(&pfd->closed)) {
+ return;
+ }
+
nni_mtx_lock(&pfd->mtx);
- if (!pfd->closing) {
- nni_posix_pollq *pq = pfd->pq;
- struct epoll_event ev; // Not actually used.
- pfd->closing = true;
+ struct epoll_event ev; // Not actually used.
- (void) shutdown(pfd->fd, SHUT_RDWR);
- (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
- }
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
nni_mtx_unlock(&pfd->mtx);
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = pfd->pq;
+ nni_posix_pollq *pq = pfd->pq;
+ uint64_t one = 1;
- nni_posix_pfd_close(pfd);
+ if (pq == NULL) {
+ return;
+ }
+ if (nni_atomic_flag_test_and_set(&pfd->stopped)) {
+ return;
+ }
// We have to synchronize with the pollq thread (unless we are
// on that thread!)
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
- uint64_t one = 1;
-
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->reapq, pfd);
+ pfd->reap = true;
// Wake the remote side. For now we assume this always
// succeeds. The only failure modes here occur when we
@@ -195,33 +182,41 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
nni_panic("BUG! write to epoll fd incorrect!");
}
- while (!pfd->closed) {
+ while (pfd->reap) {
nni_cv_wait(&pfd->cv);
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_posix_pfd_stop(pfd);
// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
static void
nni_posix_pollq_reap(nni_posix_pollq *pq)
{
nni_posix_pfd *pfd;
- nni_mtx_lock(&pq->mtx);
while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pfd);
// Let fini know we're done with it, and it's safe to
// remove.
- pfd->closed = true;
+ pfd->reap = false;
nni_cv_wake(&pfd->cv);
}
- nni_mtx_unlock(&pq->mtx);
}
static void
@@ -249,37 +244,28 @@ nni_posix_poll_thr(void *arg)
if ((ev->data.ptr == NULL) &&
(ev->events & (unsigned) POLLIN)) {
uint64_t clear;
- if (read(pq->evfd, &clear, sizeof(clear)) !=
- sizeof(clear)) {
- nni_panic("read from evfd incorrect!");
- }
+ (void) read(pq->evfd, &clear, sizeof(clear));
reap = true;
} else {
- nni_posix_pfd *pfd = ev->data.ptr;
- nni_posix_pfd_cb cb;
- void *cbarg;
- unsigned mask;
+ nni_posix_pfd *pfd = ev->data.ptr;
+ unsigned mask;
mask = ev->events &
- ((unsigned) EPOLLIN | (unsigned) EPOLLOUT |
- (unsigned) EPOLLERR);
+ ((unsigned) (EPOLLIN | EPOLLOUT |
+ EPOLLERR));
nni_mtx_lock(&pfd->mtx);
pfd->events &= ~mask;
- cb = pfd->cb;
- cbarg = pfd->arg;
nni_mtx_unlock(&pfd->mtx);
// Execute the callback with lock released
- if (cb != NULL) {
- cb(pfd, mask, cbarg);
- }
+ pfd->cb(pfd->arg, mask);
}
}
if (reap) {
- nni_posix_pollq_reap(pq);
nni_mtx_lock(&pq->mtx);
+ nni_posix_pollq_reap(pq);
if (pq->close) {
nni_mtx_unlock(&pq->mtx);
return;
diff --git a/src/platform/posix/posix_pollq_epoll.h b/src/platform/posix/posix_pollq_epoll.h
index ee60dc56..0be2a4c5 100644
--- a/src/platform/posix/posix_pollq_epoll.h
+++ b/src/platform/posix/posix_pollq_epoll.h
@@ -21,12 +21,12 @@ struct nni_posix_pfd {
int fd;
nni_posix_pfd_cb cb;
void *arg;
- bool closed;
- bool closing;
bool reap;
unsigned events;
nni_mtx mtx;
nni_cv cv;
+ nni_atomic_flag stopped;
+ nni_atomic_flag closed;
};
#define NNI_POLL_IN ((unsigned) POLLIN)
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index e3727ed3..c754535a 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -40,10 +40,9 @@ typedef struct nni_posix_pollq {
// single global instance for now
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pf;
nni_posix_pollq *pq;
struct kevent ev[2];
unsigned flags = EV_ADD | EV_DISABLE | EV_CLEAR;
@@ -61,88 +60,97 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pq = &nni_posix_global_pollq;
- if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) {
- return (NNG_ENOMEM);
- }
-
nni_mtx_init(&pf->mtx);
nni_cv_init(&pf->cv, &pq->mtx);
pf->pq = pq;
pf->fd = fd;
- pf->cb = NULL;
- pf->arg = NULL;
+ pf->cb = cb;
+ pf->arg = arg;
pf->events = 0;
- pf->closed = false;
+
+ nni_atomic_flag_reset(&pf->closed);
+ nni_atomic_flag_reset(&pf->stopped);
NNI_LIST_NODE_INIT(&pf->node);
- *pfdp = pf;
// Create entries in the kevent queue, without enabling them.
EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
- // We update the kqueue list, without polling for events.
- if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
- int rv;
- rv = nni_plat_errno(errno);
- nni_cv_fini(&pf->cv);
- nni_mtx_fini(&pf->mtx);
- NNI_FREE_STRUCT(pf);
- return (rv);
- }
-
- return (0);
+ // This may fail, but if it does, we get another try with
+ // ARM. It's an attempt to preallocate anyway.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
}
void
nni_posix_pfd_close(nni_posix_pfd *pf)
{
nni_posix_pollq *pq = pf->pq;
+ struct kevent ev[2];
+ if (pq == NULL) {
+ return;
+ }
- nni_mtx_lock(&pq->mtx);
- if (!pf->closed) {
- struct kevent ev[2];
- pf->closed = true;
- EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
- EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
- (void) shutdown(pf->fd, SHUT_RDWR);
- // This should never fail -- no allocations, just deletion.
- (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
+ if (nni_atomic_flag_test_and_set(&pf->closed)) {
+ return;
}
+
+ nni_mtx_lock(&pq->mtx);
+ EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
+ EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
+ (void) shutdown(pf->fd, SHUT_RDWR);
+ // This should never fail -- no allocations, just deletion.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
nni_mtx_unlock(&pq->mtx);
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pf)
+nni_posix_pfd_stop(nni_posix_pfd *pf)
{
- nni_posix_pollq *pq;
-
- pq = pf->pq;
+ nni_posix_pollq *pq = pf->pq;
- nni_posix_pfd_close(pf);
+ if (pq == NULL) {
+ return;
+ }
// All consumers take care to move finalization to the reap thread,
// unless they are synchronous on user threads.
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
- nni_mtx_lock(&pq->mtx);
+ if (nni_atomic_flag_test_and_set(&pf->stopped)) {
+ return;
+ }
- // If we're running on the callback, then don't bother to kick
- // the pollq again. This is necessary because we cannot modify
- // the poller while it is polling.
- if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_posix_pfd_close(pf);
+ nni_mtx_lock(&pq->mtx);
+ if (!pq->closed) {
nni_list_append(&pq->reapq, pf);
nni_plat_pipe_raise(pq->wake_wfd);
- while (nni_list_active(&pq->reapq, pf)) {
+ while (nni_list_node_active(&pf->node)) {
nni_cv_wait(&pf->cv);
}
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pf)
+{
+ nni_posix_pollq *pq = pf->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ // All consumers take care to move finalization to the reap thread,
+ // unless they are synchronous on user threads.
+ NNI_ASSERT(!nni_thr_is_self(&pq->thr));
+
+ nni_posix_pfd_stop(pf);
(void) close(pf->fd);
nni_cv_fini(&pf->cv);
nni_mtx_fini(&pf->mtx);
- NNI_FREE_STRUCT(pf);
}
int
@@ -151,16 +159,6 @@ nni_posix_pfd_fd(nni_posix_pfd *pf)
return (pf->fd);
}
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
-{
- NNI_ASSERT(cb != NULL); // must not be null when established.
- nni_mtx_lock(&pf->mtx);
- pf->cb = cb;
- pf->arg = arg;
- nni_mtx_unlock(&pf->mtx);
-}
-
int
nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
{
@@ -170,12 +168,8 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
nni_posix_pollq *pq = pf->pq;
nni_mtx_lock(&pf->mtx);
- if (pf->closed) {
- events = 0;
- } else {
- pf->events |= events;
- events = pf->events;
- }
+ pf->events |= events;
+ events = pf->events;
nni_mtx_unlock(&pf->mtx);
if (events == 0) {
@@ -218,12 +212,10 @@ nni_posix_poll_thr(void *arg)
nni_thr_set_name(NULL, "nng:poll:kqueue");
for (;;) {
- int n;
- struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
- nni_posix_pfd *pf;
- nni_posix_pfd_cb cb;
- void *cbarg;
- unsigned revents;
+ int n;
+ struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
+ nni_posix_pfd *pf;
+ unsigned revents;
n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
@@ -253,14 +245,10 @@ nni_posix_poll_thr(void *arg)
}
nni_mtx_lock(&pf->mtx);
- cb = pf->cb;
- cbarg = pf->arg;
pf->events &= ~(revents);
nni_mtx_unlock(&pf->mtx);
- if (cb != NULL) {
- cb(pf, revents, cbarg);
- }
+ pf->cb(pf->arg, revents);
}
}
}
diff --git a/src/platform/posix/posix_pollq_kqueue.h b/src/platform/posix/posix_pollq_kqueue.h
index 74f8d9f8..a153b363 100644
--- a/src/platform/posix/posix_pollq_kqueue.h
+++ b/src/platform/posix/posix_pollq_kqueue.h
@@ -19,7 +19,9 @@ struct nni_posix_pfd {
int fd; // file descriptor to poll
void *arg; // user data
nni_posix_pfd_cb cb; // user callback on event
- bool closed;
+ nni_atomic_flag closed;
+ nni_atomic_flag stopped;
+ bool reaped;
unsigned events;
nni_cv cv; // signaled when poller has unregistered
nni_mtx mtx;
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 7adef5ed..6011ab50 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -49,10 +49,9 @@ typedef struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pfd;
nni_posix_pollq *pq = &nni_posix_global_pollq;
// Set this is as soon as possible (narrow the close-exec race as
@@ -68,32 +67,18 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
(void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
#endif
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_NODE_INIT(&pfd->node);
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
pfd->fd = fd;
pfd->events = 0;
- pfd->cb = NULL;
- pfd->arg = NULL;
+ pfd->cb = cb;
+ pfd->arg = arg;
pfd->pq = pq;
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
nni_plat_pipe_raise(pq->wakewfd);
- *pfdp = pfd;
- return (0);
-}
-
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->arg = arg;
- nni_mtx_unlock(&pfd->mtx);
}
int
@@ -109,16 +94,22 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+
nni_posix_pfd_close(pfd);
nni_mtx_lock(&pq->mtx);
- if (nni_list_active(&pq->pollq, pfd)) {
- nni_list_remove(&pq->pollq, pfd);
+ if (!nni_list_active(&pq->pollq, pfd)) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
}
+ nni_list_remove(&pq->pollq, pfd);
if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
nni_list_append(&pq->reapq, pfd);
@@ -128,12 +119,22 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
}
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+ nni_posix_pfd_stop(pfd);
// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
int
@@ -241,8 +242,6 @@ nni_posix_poll_thr(void *arg)
return;
}
} else {
- nni_posix_pfd_cb cb;
- void *arg;
if ((events & (POLLIN | POLLOUT)) != 0) {
// don't emit pollhup yet, we want
@@ -250,14 +249,10 @@ nni_posix_poll_thr(void *arg)
events &= ~POLLHUP;
}
nni_mtx_lock(&pfd->mtx);
- cb = pfd->cb;
- arg = pfd->arg;
pfd->events &= ~events;
nni_mtx_unlock(&pfd->mtx);
- if (cb) {
- cb(pfd, events, arg);
- }
+ pfd->cb(pfd->arg, events);
}
}
}
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 86732e17..49658f3b 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -35,17 +35,13 @@ typedef struct nni_posix_pollq {
// single global instance for now
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg)
{
nni_posix_pollq *pq;
- nni_posix_pfd *pfd;
pq = &nni_posix_global_pollq;
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
@@ -55,10 +51,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->closing = false;
pfd->fd = fd;
pfd->pq = pq;
- pfd->cb = NULL;
+ pfd->cb = cb;
+ pfd->arg = arg;
pfd->data = NULL;
- *pfdp = pfd;
- return (0);
}
int
@@ -72,6 +67,10 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+
nni_mtx_lock(&pfd->mtx);
if (!pfd->closing) {
pfd->closing = true;
@@ -86,11 +85,13 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
- nni_posix_pfd_close(pfd);
+ if (pq == NULL) {
+ return;
+ }
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
@@ -101,18 +102,28 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
}
sched_yield(); // try again later...
}
-
nni_mtx_lock(&pfd->mtx);
while (!pfd->closed) {
nni_cv_wait(&pfd->cv);
}
nni_mtx_unlock(&pfd->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_posix_pfd_stop(pfd);
// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
int
@@ -221,17 +232,6 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- NNI_ASSERT(cb != NULL); // must not be null when established.
-
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->data = arg;
- nni_mtx_unlock(&pfd->mtx);
-}
-
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
diff --git a/src/platform/posix/posix_pollq_select.c b/src/platform/posix/posix_pollq_select.c
index 211c9328..0e24fa16 100644
--- a/src/platform/posix/posix_pollq_select.c
+++ b/src/platform/posix/posix_pollq_select.c
@@ -44,10 +44,9 @@ typedef struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pfd;
nni_posix_pollq *pq = &nni_posix_global_pollq;
// Set this is as soon as possible (narrow the close-exec race as
@@ -56,43 +55,24 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
if (fd >= FD_SETSIZE) {
- return (NNG_EINVAL);
+ return;
}
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
- pfd->fd = fd;
- pfd->events = 0;
- pfd->cb = NULL;
- pfd->arg = NULL;
- pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->events = 0;
+ pfd->cb = cb;
+ pfd->arg = arg;
+ pfd->pq = pq;
+ pfd->stopped = false;
+ pfd->reap = false;
nni_mtx_lock(&pq->mtx);
- if (pq->closing) {
- nni_mtx_unlock(&pq->mtx);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
- return (NNG_ECLOSED);
- }
pq->pfds[fd] = pfd;
if (fd > pq->maxfd) {
pq->maxfd = fd;
}
nni_mtx_unlock(&pq->mtx);
- *pfdp = pfd;
- return (0);
-}
-
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->arg = arg;
- nni_mtx_unlock(&pfd->mtx);
}
int
@@ -104,34 +84,50 @@ nni_posix_pfd_fd(nni_posix_pfd *pfd)
void
nni_posix_pfd_close(nni_posix_pfd *pfd)
{
- (void) shutdown(pfd->fd, SHUT_RDWR);
+ if (pfd->pq != NULL) {
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ }
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
- int fd = pfd->fd;
-
+ if (pq == NULL) {
+ return;
+ }
nni_posix_pfd_close(pfd);
+ NNI_ASSERT(!nni_thr_is_self(&pq->thr));
nni_mtx_lock(&pq->mtx);
- if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
- pfd->reap = true;
+ if (!pfd->stopped) {
+ pfd->stopped = true;
+ pfd->reap = true;
nni_plat_pipe_raise(pq->wakewfd);
while (pfd->reap) {
nni_cv_wait(&pfd->cv);
}
- } else {
- pq->pfds[fd] = NULL;
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ int fd = pfd->fd;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_posix_pfd_stop(pfd);
+ NNI_ASSERT(!nni_thr_is_self(&pq->thr));
// We're exclusive now.
(void) close(fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
int
@@ -247,17 +243,11 @@ nni_posix_poll_thr(void *arg)
events |= NNI_POLL_HUP;
}
if (events != 0) {
- nni_posix_pfd_cb cb = NULL;
- void *arg;
if ((pfd = pq->pfds[fd]) != NULL) {
- cb = pfd->cb;
- arg = pfd->arg;
pfd->events &= ~events;
- }
- if (cb) {
nni_mtx_unlock(&pq->mtx);
- cb(pfd, events, arg);
+ pfd->cb(pfd->arg, events);
nni_mtx_lock(&pq->mtx);
}
}
diff --git a/src/platform/posix/posix_pollq_select.h b/src/platform/posix/posix_pollq_select.h
index 3a1d48e3..012f10a0 100644
--- a/src/platform/posix/posix_pollq_select.h
+++ b/src/platform/posix/posix_pollq_select.h
@@ -22,6 +22,7 @@ struct nni_posix_pfd {
nni_posix_pfd_cb cb;
void *arg;
bool reap;
+ bool stopped;
};
#define NNI_POLL_IN (0x0001)
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
index 8ccca66c..7ed95681 100644
--- a/src/platform/posix/posix_sockfd.c
+++ b/src/platform/posix/posix_sockfd.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -22,23 +22,23 @@
#include "platform/posix/posix_peerid.h"
struct nni_sfd_conn {
- nng_stream stream;
- nni_posix_pfd *pfd;
- int fd;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
- nni_reap_node reap;
+ nng_stream stream;
+ nni_posix_pfd pfd;
+ int fd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_reap_node reap;
};
static void
sfd_dowrite(nni_sfd_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = c->fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -101,9 +101,9 @@ static void
sfd_doread(nni_sfd_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = c->fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -174,9 +174,7 @@ sfd_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -193,9 +191,7 @@ sfd_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
@@ -206,11 +202,7 @@ sfd_stop(void *arg)
nni_sfd_conn *c = arg;
sfd_close(c);
- // ideally this would *stop* without freeing the pfd
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
- c->pfd = NULL;
- }
+ nni_posix_pfd_stop(&c->pfd);
}
// sfd_fini may block briefly waiting for the pollq thread.
@@ -220,6 +212,7 @@ sfd_fini(void *arg)
{
nni_sfd_conn *c = arg;
sfd_stop(c);
+ nni_posix_pfd_fini(&c->pfd);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -237,7 +230,7 @@ sfd_free(void *arg)
}
static void
-sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+sfd_cb(void *arg, unsigned events)
{
struct nni_sfd_conn *c = arg;
@@ -260,7 +253,7 @@ sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -302,7 +295,7 @@ sfd_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -336,7 +329,7 @@ sfd_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -467,14 +460,10 @@ int
nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
{
nni_sfd_conn *c;
- int rv;
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) {
- NNI_FREE_STRUCT(c);
- return (rv);
- }
+ nni_posix_pfd_init(&c->pfd, fd, sfd_cb, c);
c->closed = false;
c->fd = fd;
@@ -491,8 +480,6 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
c->stream.s_get = sfd_get;
c->stream.s_set = sfd_set;
- nni_posix_pfd_set_cb(c->pfd, sfd_cb, c);
-
*cp = c;
return (0);
}
diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h
index 20504bf5..0eec62fe 100644
--- a/src/platform/posix/posix_tcp.h
+++ b/src/platform/posix/posix_tcp.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -18,7 +18,7 @@
struct nni_tcp_conn {
nng_stream stream;
- nni_posix_pfd *pfd;
+ nni_posix_pfd pfd;
nni_list readq;
nni_list writeq;
bool closed;
@@ -40,9 +40,9 @@ struct nni_tcp_dialer {
nni_atomic_bool fini;
};
-extern int nni_posix_tcp_alloc(nni_tcp_conn **, nni_tcp_dialer *);
-extern void nni_posix_tcp_init(nni_tcp_conn *, nni_posix_pfd *);
+extern int nni_posix_tcp_alloc(nni_tcp_conn **, nni_tcp_dialer *, int);
extern void nni_posix_tcp_start(nni_tcp_conn *, int, int);
extern void nni_posix_tcp_dialer_rele(nni_tcp_dialer *);
+extern void nni_posix_tcp_dial_cb(void *, unsigned);
#endif // PLATFORM_POSIX_TCP_H
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index d49fc838..b1a2233c 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -32,9 +32,9 @@ static void
tcp_dowrite(nni_tcp_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = nni_posix_pfd_fd(&c->pfd);
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -101,9 +101,9 @@ static void
tcp_doread(nni_tcp_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = nni_posix_pfd_fd(&c->pfd);
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -174,9 +174,7 @@ tcp_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -193,9 +191,7 @@ tcp_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
@@ -203,18 +199,10 @@ tcp_close(void *arg)
static void
tcp_stop(void *arg)
{
- nni_tcp_conn *c = arg;
- nni_posix_pfd *pfd;
+ nni_tcp_conn *c = arg;
tcp_close(c);
- nni_mtx_lock(&c->mtx);
- pfd = c->pfd;
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&c->pfd);
}
// tcp_fini may block briefly waiting for the pollq thread.
@@ -224,6 +212,7 @@ tcp_fini(void *arg)
{
nni_tcp_conn *c = arg;
tcp_stop(c);
+ nni_posix_pfd_fini(&c->pfd);
nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
@@ -245,7 +234,7 @@ tcp_free(void *arg)
}
static void
-tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+tcp_cb(void *arg, unsigned events)
{
nni_tcp_conn *c = arg;
@@ -253,6 +242,10 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
tcp_error(c, NNG_ECONNSHUT);
return;
}
+ if (c->dial_aio != NULL) {
+ nni_posix_tcp_dial_cb(c, events);
+ return;
+ }
nni_mtx_lock(&c->mtx);
if ((events & NNI_POLL_IN) != 0) {
tcp_doread(c);
@@ -268,7 +261,7 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -310,7 +303,7 @@ tcp_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -344,7 +337,7 @@ tcp_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -356,7 +349,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
nni_tcp_conn *c = arg;
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int rv;
nng_sockaddr sa;
@@ -375,7 +368,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
nni_tcp_conn *c = arg;
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int rv;
nng_sockaddr sa;
@@ -392,7 +385,7 @@ static int
tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int val = 0;
socklen_t valsz = sizeof(val);
@@ -407,7 +400,7 @@ static int
tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int val = 0;
socklen_t valsz = sizeof(val);
@@ -455,7 +448,7 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
}
int
-nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
+nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d, int fd)
{
nni_tcp_conn *c;
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
@@ -468,6 +461,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ nni_posix_pfd_init(&c->pfd, fd, tcp_cb, c);
c->stream.s_free = tcp_free;
c->stream.s_stop = tcp_stop;
@@ -482,19 +476,11 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
}
void
-nni_posix_tcp_init(nni_tcp_conn *c, nni_posix_pfd *pfd)
-{
- c->pfd = pfd;
-}
-
-void
nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive)
{
// Configure the initial socket options.
- (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY,
+ (void) setsockopt(nni_posix_pfd_fd(&c->pfd), IPPROTO_TCP, TCP_NODELAY,
&nodelay, sizeof(int));
- (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE,
+ (void) setsockopt(nni_posix_pfd_fd(&c->pfd), SOL_SOCKET, SO_KEEPALIVE,
&keepalive, sizeof(int));
-
- nni_posix_pfd_set_cb(c->pfd, tcp_cb, c);
}
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 52ea6cff..442cd824 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -123,8 +123,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nng_stream_free(&c->stream);
}
-static void
-tcp_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
+void
+nni_posix_tcp_dial_cb(void *arg, unsigned ev)
{
nni_tcp_conn *c = arg;
nni_tcp_dialer *d = c->dialer;
@@ -145,7 +145,7 @@ tcp_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
} else {
socklen_t sz = sizeof(int);
- int fd = nni_posix_pfd_fd(pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
rv = errno;
}
@@ -185,7 +185,6 @@ void
nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
{
nni_tcp_conn *c;
- nni_posix_pfd *pfd = NULL;
struct sockaddr_storage ss;
size_t sslen;
int fd;
@@ -210,24 +209,12 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_atomic_inc(&d->ref);
- if ((rv = nni_posix_tcp_alloc(&c, d)) != 0) {
+ if ((rv = nni_posix_tcp_alloc(&c, d, fd)) != 0) {
nni_aio_finish_error(aio, rv);
nni_posix_tcp_dialer_rele(d);
return;
}
- // This arranges for the fd to be in non-blocking mode, and adds the
- // poll fd to the list.
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- (void) close(fd);
- // the error label unlocks this
- nni_mtx_lock(&d->mtx);
- goto error;
- }
-
- nni_posix_tcp_init(c, pfd);
- nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c);
-
nni_mtx_lock(&d->mtx);
if (d->closed) {
rv = NNG_ECLOSED;
@@ -248,7 +235,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
goto error;
}
// Asynchronous connect.
- if ((rv = nni_posix_pfd_arm(pfd, NNI_POLL_OUT)) != 0) {
+ if ((rv = nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT)) != 0) {
goto error;
}
c->dial_aio = aio;
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index 32f0fd60..da2e8050 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -34,13 +34,13 @@
#include "posix_tcp.h"
struct nni_tcp_listener {
- nni_posix_pfd *pfd;
- nni_list acceptq;
- bool started;
- bool closed;
- bool nodelay;
- bool keepalive;
- nni_mtx mtx;
+ nni_posix_pfd pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ bool nodelay;
+ bool keepalive;
+ nni_mtx mtx;
};
int
@@ -53,7 +53,6 @@ nni_tcp_listener_init(nni_tcp_listener **lp)
nni_mtx_init(&l->mtx);
- l->pfd = NULL;
l->closed = false;
l->started = false;
l->nodelay = true;
@@ -74,9 +73,7 @@ tcp_listener_doclose(nni_tcp_listener *l)
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (l->pfd != NULL) {
- nni_posix_pfd_close(l->pfd);
- }
+ nni_posix_pfd_close(&l->pfd);
}
void
@@ -93,15 +90,14 @@ tcp_listener_doaccept(nni_tcp_listener *l)
nni_aio *aio;
while ((aio = nni_list_first(&l->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- int nd;
- int ka;
- nni_posix_pfd *pfd;
- nni_tcp_conn *c;
+ int newfd;
+ int fd;
+ int rv;
+ int nd;
+ int ka;
+ nni_tcp_conn *c;
- fd = nni_posix_pfd_fd(l->pfd);
+ fd = nni_posix_pfd_fd(&l->pfd);
#ifdef NNG_USE_ACCEPT4
newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
@@ -119,7 +115,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
case EWOULDBLOCK:
#endif
#endif
- rv = nni_posix_pfd_arm(l->pfd, NNI_POLL_IN);
+ rv = nni_posix_pfd_arm(&l->pfd, NNI_POLL_IN);
if (rv != 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -141,24 +137,13 @@ tcp_listener_doaccept(nni_tcp_listener *l)
}
}
- if ((rv = nni_posix_tcp_alloc(&c, NULL)) != 0) {
+ if ((rv = nni_posix_tcp_alloc(&c, NULL, newfd)) != 0) {
close(newfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
continue;
}
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- close(newfd);
- nng_stream_stop(&c->stream);
- nng_stream_free(&c->stream);
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- continue;
- }
-
- nni_posix_tcp_init(c, pfd);
-
ka = l->keepalive ? 1 : 0;
nd = l->nodelay ? 1 : 0;
nni_aio_list_remove(aio);
@@ -169,10 +154,9 @@ tcp_listener_doaccept(nni_tcp_listener *l)
}
static void
-tcp_listener_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+tcp_listener_cb(void *arg, unsigned events)
{
nni_tcp_listener *l = arg;
- NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if ((events & NNI_POLL_INVAL) != 0) {
@@ -209,7 +193,6 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
struct sockaddr_storage ss;
int rv;
int fd;
- nni_posix_pfd *pfd;
if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
#ifdef NNG_ENABLE_IPV6
@@ -236,12 +219,6 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
return (nni_plat_errno(errno));
}
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- nni_mtx_unlock(&l->mtx);
- (void) close(fd);
- return (rv);
- }
-
// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows
// SO_REUSEADDR, which is almost completely different (and wrong!) from
// traditional SO_REUSEADDR.
@@ -257,8 +234,8 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
rv = nni_plat_errno(errno);
+ (void) close(fd);
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
return (rv);
}
@@ -266,14 +243,13 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
// bad things are going to happen.
if (listen(fd, 128) != 0) {
rv = nni_plat_errno(errno);
+ (void) close(fd);
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
return (rv);
}
- nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l);
+ nni_posix_pfd_init(&l->pfd, fd, tcp_listener_cb, l);
- l->pfd = pfd;
l->started = true;
nni_mtx_unlock(&l->mtx);
@@ -283,23 +259,18 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
void
nni_tcp_listener_stop(nni_tcp_listener *l)
{
- nni_posix_pfd *pfd;
-
nni_mtx_lock(&l->mtx);
tcp_listener_doclose(l);
- pfd = l->pfd;
- l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&l->pfd);
}
void
nni_tcp_listener_fini(nni_tcp_listener *l)
{
nni_tcp_listener_stop(l);
+ nni_posix_pfd_fini(&l->pfd);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -350,7 +321,7 @@ tcp_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_type t)
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
(void) getsockname(
- nni_posix_pfd_fd(l->pfd), (void *) &ss, &len);
+ nni_posix_pfd_fd(&l->pfd), (void *) &ss, &len);
(void) nni_posix_sockaddr2nn(&sa, &ss, len);
} else {
sa.s_family = NNG_AF_UNSPEC;
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 8d0d4a42..69535aa4 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -48,11 +48,11 @@
#endif
struct nni_plat_udp {
- nni_posix_pfd *udp_pfd;
- int udp_fd;
- nni_list udp_recvq;
- nni_list udp_sendq;
- nni_mtx udp_mtx;
+ nni_posix_pfd udp_pfd;
+ int udp_fd;
+ nni_list udp_recvq;
+ nni_list udp_sendq;
+ nni_mtx udp_mtx;
};
static void
@@ -175,10 +175,9 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
// This function is called by the poller on activity on the FD.
static void
-nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+nni_posix_udp_cb(void *arg, unsigned events)
{
nni_plat_udp *udp = arg;
- NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
if (events & NNI_POLL_IN) {
@@ -199,7 +198,7 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
}
if (events) {
int rv;
- rv = nni_posix_pfd_arm(udp->udp_pfd, events);
+ rv = nni_posix_pfd_arm(&udp->udp_pfd, events);
if (rv != 0) {
nni_posix_udp_doerror(udp, rv);
}
@@ -225,6 +224,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
return (NNG_ENOMEM);
}
nni_mtx_init(&udp->udp_mtx);
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (udp->udp_fd < 0) {
@@ -241,16 +242,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
NNI_FREE_STRUCT(udp);
return (rv);
}
- if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) {
- (void) close(udp->udp_fd);
- nni_mtx_fini(&udp->udp_mtx);
- NNI_FREE_STRUCT(udp);
- return (rv);
- }
- nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp);
- nni_aio_list_init(&udp->udp_recvq);
- nni_aio_list_init(&udp->udp_sendq);
+ nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp);
*upp = udp;
return (0);
@@ -259,12 +252,13 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
void
nni_plat_udp_close(nni_plat_udp *udp)
{
- nni_posix_pfd_fini(udp->udp_pfd);
+ nni_posix_pfd_stop(&udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
nni_mtx_unlock(&udp->udp_mtx);
+ nni_posix_pfd_fini(&udp->udp_pfd);
(void) close(udp->udp_fd);
nni_mtx_fini(&udp->udp_mtx);
NNI_FREE_STRUCT(udp);
@@ -298,7 +292,8 @@ nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
}
nni_list_append(&udp->udp_recvq, aio);
if (nni_list_first(&udp->udp_recvq) == aio) {
- if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_IN)) != 0) {
+ if ((rv = nni_posix_pfd_arm(&udp->udp_pfd, NNI_POLL_IN)) !=
+ 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
@@ -321,7 +316,7 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
}
nni_list_append(&udp->udp_sendq, aio);
if (nni_list_first(&udp->udp_sendq) == aio) {
- if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_OUT)) !=
+ if ((rv = nni_posix_pfd_arm(&udp->udp_pfd, NNI_POLL_OUT)) !=
0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);