aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/platform/posix/posix_ipc.h9
-rw-r--r--src/platform/posix/posix_ipcconn.c63
-rw-r--r--src/platform/posix/posix_ipcdial.c23
-rw-r--r--src/platform/posix/posix_ipclisten.c56
-rw-r--r--src/platform/posix/posix_pollq.h8
-rw-r--r--src/platform/posix/posix_pollq_epoll.c146
-rw-r--r--src/platform/posix/posix_pollq_epoll.h4
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c128
-rw-r--r--src/platform/posix/posix_pollq_kqueue.h4
-rw-r--r--src/platform/posix/posix_pollq_poll.c55
-rw-r--r--src/platform/posix/posix_pollq_port.c50
-rw-r--r--src/platform/posix/posix_pollq_select.c82
-rw-r--r--src/platform/posix/posix_pollq_select.h1
-rw-r--r--src/platform/posix/posix_sockfd.c57
-rw-r--r--src/platform/posix/posix_tcp.h8
-rw-r--r--src/platform/posix/posix_tcpconn.c64
-rw-r--r--src/platform/posix/posix_tcpdial.c23
-rw-r--r--src/platform/posix/posix_tcplisten.c77
-rw-r--r--src/platform/posix/posix_udp.c35
19 files changed, 366 insertions, 527 deletions
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h
index 98af298b..3f3549c6 100644
--- a/src/platform/posix/posix_ipc.h
+++ b/src/platform/posix/posix_ipc.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -22,7 +22,7 @@
struct nni_ipc_conn {
nng_stream stream;
- nni_posix_pfd *pfd;
+ nni_posix_pfd pfd;
nni_list readq;
nni_list writeq;
bool closed;
@@ -44,10 +44,9 @@ struct nni_ipc_dialer {
};
extern int nni_posix_ipc_alloc(
- nni_ipc_conn **, nni_sockaddr *, nni_ipc_dialer *);
-extern void nni_posix_ipc_init(nni_ipc_conn *, nni_posix_pfd *);
-extern void nni_posix_ipc_start(nni_ipc_conn *);
+ nni_ipc_conn **, nni_sockaddr *, nni_ipc_dialer *, int);
extern void nni_posix_ipc_dialer_rele(nni_ipc_dialer *);
+extern void nni_posix_ipc_dialer_cb(void *arg, unsigned events);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index a198b87f..69ce2d32 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -34,7 +34,7 @@ ipc_dowrite(ipc_conn *c)
nni_aio *aio;
int fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed || ((fd = nni_posix_pfd_fd(&c->pfd)) < 0)) {
return;
}
@@ -103,7 +103,7 @@ ipc_doread(ipc_conn *c)
nni_aio *aio;
int fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed || ((fd = nni_posix_pfd_fd(&c->pfd)) < 0)) {
return;
}
@@ -174,7 +174,7 @@ ipc_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- nni_posix_pfd_close(c->pfd);
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -191,18 +191,20 @@ ipc_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
static void
-ipc_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+ipc_cb(void *arg, unsigned events)
{
ipc_conn *c = arg;
+ if (c->dial_aio != NULL) {
+ nni_posix_ipc_dialer_cb(arg, events);
+ return;
+ }
if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) {
ipc_error(c, NNG_ECONNSHUT);
return;
@@ -222,7 +224,7 @@ ipc_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -264,7 +266,7 @@ ipc_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -298,7 +300,7 @@ ipc_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -312,7 +314,7 @@ ipc_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &id, &ignore,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &id, &ignore,
&ignore, &ignore)) != 0) {
return (rv);
}
@@ -327,7 +329,7 @@ ipc_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &id,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &id,
&ignore, &ignore)) != 0) {
return (rv);
}
@@ -342,7 +344,7 @@ ipc_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &ignore,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &ignore,
&ignore, &id)) != 0) {
return (rv);
}
@@ -361,7 +363,7 @@ ipc_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t)
uint64_t ignore;
uint64_t id = 0;
- if ((rv = nni_posix_peerid(nni_posix_pfd_fd(c->pfd), &ignore, &ignore,
+ if ((rv = nni_posix_peerid(nni_posix_pfd_fd(&c->pfd), &ignore, &ignore,
&id, &ignore)) != 0) {
return (rv);
}
@@ -379,27 +381,14 @@ ipc_get_addr(void *arg, void *buf, size_t *szp, nni_type t)
return (nni_copyout_sockaddr(&c->sa, buf, szp, t));
}
-void
-nni_posix_ipc_start(nni_ipc_conn *c)
-{
- nni_posix_pfd_set_cb(c->pfd, ipc_cb, c);
-}
-
static void
ipc_stop(void *arg)
{
- ipc_conn *c = arg;
- nni_posix_pfd *pfd;
+ ipc_conn *c = arg;
ipc_close(c);
- nni_mtx_lock(&c->mtx);
- pfd = c->pfd;
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&c->pfd);
}
static void
@@ -408,11 +397,13 @@ ipc_reap(void *arg)
ipc_conn *c = arg;
ipc_stop(c);
- nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
nni_posix_ipc_dialer_rele(c->dialer);
}
+ nni_posix_pfd_fini(&c->pfd);
+
+ nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
}
@@ -472,7 +463,8 @@ ipc_set(void *arg, const char *name, const void *val, size_t sz, nni_type t)
}
int
-nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
+nni_posix_ipc_alloc(
+ nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d, int fd)
{
ipc_conn *c;
@@ -494,13 +486,8 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ nni_posix_pfd_init(&c->pfd, fd, ipc_cb, c);
*cp = c;
return (0);
}
-
-void
-nni_posix_ipc_init(nni_ipc_conn *c, nni_posix_pfd *pfd)
-{
- c->pfd = pfd;
-}
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index 667d8262..ae98c4f4 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -106,8 +106,8 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nng_stream_free(&c->stream);
}
-static void
-ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
+void
+nni_posix_ipc_dialer_cb(void *arg, unsigned ev)
{
nni_ipc_conn *c = arg;
nni_ipc_dialer *d = c->dialer;
@@ -126,7 +126,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
} else {
socklen_t sz = sizeof(int);
- int fd = nni_posix_pfd_fd(pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
rv = errno;
}
@@ -153,7 +153,6 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
return;
}
- nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
@@ -188,24 +187,13 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
nni_atomic_inc(&d->ref);
- if ((rv = nni_posix_ipc_alloc(&c, &d->sa, d)) != 0) {
+ if ((rv = nni_posix_ipc_alloc(&c, &d->sa, d, fd)) != 0) {
(void) close(fd);
nni_posix_ipc_dialer_rele(d);
nni_aio_finish_error(aio, rv);
return;
}
- // This arranges for the fd to be in non-blocking mode, and adds the
- // poll fd to the list.
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- // the error label unlocks this
- nni_mtx_lock(&d->mtx);
- goto error;
- }
-
- nni_posix_ipc_init(c, pfd);
- nni_posix_pfd_set_cb(pfd, ipc_dialer_cb, c);
-
nni_mtx_lock(&d->mtx);
if (d->closed) {
rv = NNG_ECLOSED;
@@ -225,10 +213,10 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
goto error;
}
// Asynchronous connect.
+ c->dial_aio = aio;
if ((rv = nni_posix_pfd_arm(pfd, NNI_POLL_OUT)) != 0) {
goto error;
}
- c->dial_aio = aio;
nni_aio_set_prov_data(aio, c);
nni_list_append(&d->connq, aio);
nni_mtx_unlock(&d->mtx);
@@ -238,7 +226,6 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
// on loop back, and probably not on every platform.
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
- nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
return;
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index a122d7bb..7e0250f4 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -31,7 +31,7 @@
typedef struct {
nng_stream_listener sl;
- nni_posix_pfd *pfd;
+ nni_posix_pfd pfd;
nng_sockaddr sa;
nni_list acceptq;
bool started;
@@ -53,9 +53,7 @@ ipc_listener_doclose(ipc_listener *l)
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (l->pfd != NULL) {
- nni_posix_pfd_close(l->pfd);
- }
+ nni_posix_pfd_close(&l->pfd);
if (l->started && ((path = l->path) != NULL)) {
l->path = NULL;
(void) unlink(path);
@@ -75,18 +73,13 @@ ipc_listener_close(void *arg)
static void
ipc_listener_stop(void *arg)
{
- ipc_listener *l = arg;
- nni_posix_pfd *pfd;
+ ipc_listener *l = arg;
nni_mtx_lock(&l->mtx);
ipc_listener_doclose(l);
- pfd = l->pfd;
- l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&l->pfd);
}
static void
@@ -95,13 +88,12 @@ ipc_listener_doaccept(ipc_listener *l)
nni_aio *aio;
while ((aio = nni_list_first(&l->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- nni_posix_pfd *pfd;
- nni_ipc_conn *c;
+ int newfd;
+ int fd;
+ int rv;
+ nni_ipc_conn *c;
- fd = nni_posix_pfd_fd(l->pfd);
+ fd = nni_posix_pfd_fd(&l->pfd);
#ifdef NNG_USE_ACCEPT4
newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
@@ -119,7 +111,7 @@ ipc_listener_doaccept(ipc_listener *l)
case EWOULDBLOCK:
#endif
#endif
- rv = nni_posix_pfd_arm(l->pfd, NNI_POLL_IN);
+ rv = nni_posix_pfd_arm(&l->pfd, NNI_POLL_IN);
if (rv != 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -141,35 +133,23 @@ ipc_listener_doaccept(ipc_listener *l)
}
}
- if ((rv = nni_posix_ipc_alloc(&c, &l->sa, NULL)) != 0) {
+ if ((rv = nni_posix_ipc_alloc(&c, &l->sa, NULL, newfd)) != 0) {
(void) close(newfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
continue;
}
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- nng_stream_stop(&c->stream);
- nng_stream_free(&c->stream);
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- continue;
- }
-
- nni_posix_ipc_init(c, pfd);
-
nni_aio_list_remove(aio);
- nni_posix_ipc_start(c);
nni_aio_set_output(aio, 0, c);
nni_aio_finish(aio, 0, 0);
}
}
static void
-ipc_listener_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+ipc_listener_cb(void *arg, unsigned events)
{
ipc_listener *l = arg;
- NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if ((events & NNI_POLL_INVAL) != 0) {
@@ -325,7 +305,6 @@ ipc_listener_listen(void *arg)
struct sockaddr_storage ss;
int rv;
int fd;
- nni_posix_pfd *pfd;
char *path;
if ((len = nni_posix_nn2sockaddr(&ss, &l->sa)) < sizeof(sa_family_t)) {
@@ -380,7 +359,7 @@ ipc_listener_listen(void *arg)
(listen(fd, 128) != 0)) {
rv = nni_plat_errno(errno);
}
- if ((rv != 0) || ((rv = nni_posix_pfd_init(&pfd, fd)) != 0)) {
+ if (rv != 0) {
nni_mtx_unlock(&l->mtx);
(void) close(fd);
if (path != NULL) {
@@ -390,6 +369,8 @@ ipc_listener_listen(void *arg)
return (rv);
}
+ nni_posix_pfd_init(&l->pfd, fd, ipc_listener_cb, l);
+
#ifdef NNG_HAVE_ABSTRACT_SOCKETS
// If the original address was for a system assigned value,
// then figure out what we got. This is analogous to TCP
@@ -411,9 +392,6 @@ ipc_listener_listen(void *arg)
}
#endif
- nni_posix_pfd_set_cb(pfd, ipc_listener_cb, l);
-
- l->pfd = pfd;
l->started = true;
l->path = path;
nni_mtx_unlock(&l->mtx);
@@ -428,6 +406,7 @@ ipc_listener_free(void *arg)
ipc_listener_stop(l);
+ nni_posix_pfd_fini(&l->pfd);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -512,7 +491,6 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
nni_mtx_init(&l->mtx);
nni_aio_list_init(&l->acceptq);
- l->pfd = NULL;
l->closed = false;
l->started = false;
l->perms = 0;
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index c79a3e18..121834cf 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -22,7 +22,7 @@
#include "core/nng_impl.h"
typedef struct nni_posix_pfd nni_posix_pfd;
-typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *);
+typedef void (*nni_posix_pfd_cb)(void *, unsigned);
#if defined(NNG_POLLQ_KQUEUE)
#include "posix_pollq_kqueue.h"
@@ -33,17 +33,17 @@ typedef void (*nni_posix_pfd_cb)(nni_posix_pfd *, unsigned, void *);
#elif defined(NNG_POLLQ_POLL)
#include "posix_pollq_epoll.h"
#elif defined(NNG_POLLQ_SELECT)
-#include "posix_pollq_epoll.h"
+#include "posix_pollq_select.h"
#else
#error "No suitable poller defined"
#endif
-extern int nni_posix_pfd_init(nni_posix_pfd **, int);
+extern void nni_posix_pfd_init(nni_posix_pfd *, int, nni_posix_pfd_cb, void *);
extern void nni_posix_pfd_fini(nni_posix_pfd *);
+extern void nni_posix_pfd_stop(nni_posix_pfd *);
extern int nni_posix_pfd_arm(nni_posix_pfd *, unsigned);
extern int nni_posix_pfd_fd(nni_posix_pfd *);
extern void nni_posix_pfd_close(nni_posix_pfd *);
-extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index d09289e4..e789e5d2 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -61,32 +61,28 @@ typedef struct nni_posix_pollq {
// single global instance for now.
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pfd;
nni_posix_pollq *pq;
struct epoll_event ev;
- int rv;
pq = &nni_posix_global_pollq;
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
+ nni_atomic_flag_reset(&pfd->stopped);
+ nni_atomic_flag_reset(&pfd->closed);
- pfd->pq = pq;
- pfd->fd = fd;
- pfd->cb = NULL;
- pfd->arg = NULL;
- pfd->events = 0;
- pfd->closing = false;
- pfd->closed = false;
+ pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->cb = cb;
+ pfd->arg = arg;
+ pfd->events = 0;
+ pfd->reap = false;
NNI_LIST_NODE_INIT(&pfd->node);
@@ -95,16 +91,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
ev.events = 0;
ev.data.ptr = pfd;
- if (epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
- rv = nni_plat_errno(errno);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
- return (rv);
- }
-
- *pfdp = pfd;
- return (0);
+ // if this fails the system is probably out of memory - it will fail in
+ // arm
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_ADD, fd, &ev);
}
int
@@ -118,20 +107,18 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
// epoll implementation.
nni_mtx_lock(&pfd->mtx);
- if (!pfd->closing) {
- struct epoll_event ev;
- pfd->events |= events;
- events = pfd->events;
-
- memset(&ev, 0, sizeof(ev));
- ev.events = events | NNI_EPOLL_FLAGS;
- ev.data.ptr = pfd;
-
- if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
- int rv = nni_plat_errno(errno);
- nni_mtx_unlock(&pfd->mtx);
- return (rv);
- }
+ struct epoll_event ev;
+ pfd->events |= events;
+ events = pfd->events;
+
+ memset(&ev, 0, sizeof(ev));
+ ev.events = events | NNI_EPOLL_FLAGS;
+ ev.data.ptr = pfd;
+
+ if (epoll_ctl(pq->epfd, EPOLL_CTL_MOD, pfd->fd, &ev) != 0) {
+ int rv = nni_plat_errno(errno);
+ nni_mtx_unlock(&pfd->mtx);
+ return (rv);
}
nni_mtx_unlock(&pfd->mtx);
return (0);
@@ -144,44 +131,44 @@ nni_posix_pfd_fd(nni_posix_pfd *pfd)
}
void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->arg = arg;
- nni_mtx_unlock(&pfd->mtx);
-}
-
-void
nni_posix_pfd_close(nni_posix_pfd *pfd)
{
+ nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+ if (nni_atomic_flag_test_and_set(&pfd->closed)) {
+ return;
+ }
+
nni_mtx_lock(&pfd->mtx);
- if (!pfd->closing) {
- nni_posix_pollq *pq = pfd->pq;
- struct epoll_event ev; // Not actually used.
- pfd->closing = true;
+ struct epoll_event ev; // Not actually used.
- (void) shutdown(pfd->fd, SHUT_RDWR);
- (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
- }
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
nni_mtx_unlock(&pfd->mtx);
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
- nni_posix_pollq *pq = pfd->pq;
+ nni_posix_pollq *pq = pfd->pq;
+ uint64_t one = 1;
- nni_posix_pfd_close(pfd);
+ if (pq == NULL) {
+ return;
+ }
+ if (nni_atomic_flag_test_and_set(&pfd->stopped)) {
+ return;
+ }
// We have to synchronize with the pollq thread (unless we are
// on that thread!)
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
- uint64_t one = 1;
-
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->reapq, pfd);
+ pfd->reap = true;
// Wake the remote side. For now we assume this always
// succeeds. The only failure modes here occur when we
@@ -195,33 +182,41 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
nni_panic("BUG! write to epoll fd incorrect!");
}
- while (!pfd->closed) {
+ while (pfd->reap) {
nni_cv_wait(&pfd->cv);
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_posix_pfd_stop(pfd);
// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
static void
nni_posix_pollq_reap(nni_posix_pollq *pq)
{
nni_posix_pfd *pfd;
- nni_mtx_lock(&pq->mtx);
while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pfd);
// Let fini know we're done with it, and it's safe to
// remove.
- pfd->closed = true;
+ pfd->reap = false;
nni_cv_wake(&pfd->cv);
}
- nni_mtx_unlock(&pq->mtx);
}
static void
@@ -249,37 +244,28 @@ nni_posix_poll_thr(void *arg)
if ((ev->data.ptr == NULL) &&
(ev->events & (unsigned) POLLIN)) {
uint64_t clear;
- if (read(pq->evfd, &clear, sizeof(clear)) !=
- sizeof(clear)) {
- nni_panic("read from evfd incorrect!");
- }
+ (void) read(pq->evfd, &clear, sizeof(clear));
reap = true;
} else {
- nni_posix_pfd *pfd = ev->data.ptr;
- nni_posix_pfd_cb cb;
- void *cbarg;
- unsigned mask;
+ nni_posix_pfd *pfd = ev->data.ptr;
+ unsigned mask;
mask = ev->events &
- ((unsigned) EPOLLIN | (unsigned) EPOLLOUT |
- (unsigned) EPOLLERR);
+ ((unsigned) (EPOLLIN | EPOLLOUT |
+ EPOLLERR));
nni_mtx_lock(&pfd->mtx);
pfd->events &= ~mask;
- cb = pfd->cb;
- cbarg = pfd->arg;
nni_mtx_unlock(&pfd->mtx);
// Execute the callback with lock released
- if (cb != NULL) {
- cb(pfd, mask, cbarg);
- }
+ pfd->cb(pfd->arg, mask);
}
}
if (reap) {
- nni_posix_pollq_reap(pq);
nni_mtx_lock(&pq->mtx);
+ nni_posix_pollq_reap(pq);
if (pq->close) {
nni_mtx_unlock(&pq->mtx);
return;
diff --git a/src/platform/posix/posix_pollq_epoll.h b/src/platform/posix/posix_pollq_epoll.h
index ee60dc56..0be2a4c5 100644
--- a/src/platform/posix/posix_pollq_epoll.h
+++ b/src/platform/posix/posix_pollq_epoll.h
@@ -21,12 +21,12 @@ struct nni_posix_pfd {
int fd;
nni_posix_pfd_cb cb;
void *arg;
- bool closed;
- bool closing;
bool reap;
unsigned events;
nni_mtx mtx;
nni_cv cv;
+ nni_atomic_flag stopped;
+ nni_atomic_flag closed;
};
#define NNI_POLL_IN ((unsigned) POLLIN)
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index e3727ed3..c754535a 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -40,10 +40,9 @@ typedef struct nni_posix_pollq {
// single global instance for now
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pf;
nni_posix_pollq *pq;
struct kevent ev[2];
unsigned flags = EV_ADD | EV_DISABLE | EV_CLEAR;
@@ -61,88 +60,97 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pq = &nni_posix_global_pollq;
- if ((pf = NNI_ALLOC_STRUCT(pf)) == NULL) {
- return (NNG_ENOMEM);
- }
-
nni_mtx_init(&pf->mtx);
nni_cv_init(&pf->cv, &pq->mtx);
pf->pq = pq;
pf->fd = fd;
- pf->cb = NULL;
- pf->arg = NULL;
+ pf->cb = cb;
+ pf->arg = arg;
pf->events = 0;
- pf->closed = false;
+
+ nni_atomic_flag_reset(&pf->closed);
+ nni_atomic_flag_reset(&pf->stopped);
NNI_LIST_NODE_INIT(&pf->node);
- *pfdp = pf;
// Create entries in the kevent queue, without enabling them.
EV_SET(&ev[0], (uintptr_t) fd, EVFILT_READ, flags, 0, 0, pf);
EV_SET(&ev[1], (uintptr_t) fd, EVFILT_WRITE, flags, 0, 0, pf);
- // We update the kqueue list, without polling for events.
- if (kevent(pq->kq, ev, 2, NULL, 0, NULL) != 0) {
- int rv;
- rv = nni_plat_errno(errno);
- nni_cv_fini(&pf->cv);
- nni_mtx_fini(&pf->mtx);
- NNI_FREE_STRUCT(pf);
- return (rv);
- }
-
- return (0);
+ // This may fail, but if it does, we get another try with
+ // ARM. It's an attempt to preallocate anyway.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
}
void
nni_posix_pfd_close(nni_posix_pfd *pf)
{
nni_posix_pollq *pq = pf->pq;
+ struct kevent ev[2];
+ if (pq == NULL) {
+ return;
+ }
- nni_mtx_lock(&pq->mtx);
- if (!pf->closed) {
- struct kevent ev[2];
- pf->closed = true;
- EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
- EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
- (void) shutdown(pf->fd, SHUT_RDWR);
- // This should never fail -- no allocations, just deletion.
- (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
+ if (nni_atomic_flag_test_and_set(&pf->closed)) {
+ return;
}
+
+ nni_mtx_lock(&pq->mtx);
+ EV_SET(&ev[0], pf->fd, EVFILT_READ, EV_DELETE, 0, 0, pf);
+ EV_SET(&ev[1], pf->fd, EVFILT_WRITE, EV_DELETE, 0, 0, pf);
+ (void) shutdown(pf->fd, SHUT_RDWR);
+ // This should never fail -- no allocations, just deletion.
+ (void) kevent(pq->kq, ev, 2, NULL, 0, NULL);
nni_mtx_unlock(&pq->mtx);
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pf)
+nni_posix_pfd_stop(nni_posix_pfd *pf)
{
- nni_posix_pollq *pq;
-
- pq = pf->pq;
+ nni_posix_pollq *pq = pf->pq;
- nni_posix_pfd_close(pf);
+ if (pq == NULL) {
+ return;
+ }
// All consumers take care to move finalization to the reap thread,
// unless they are synchronous on user threads.
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
- nni_mtx_lock(&pq->mtx);
+ if (nni_atomic_flag_test_and_set(&pf->stopped)) {
+ return;
+ }
- // If we're running on the callback, then don't bother to kick
- // the pollq again. This is necessary because we cannot modify
- // the poller while it is polling.
- if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ nni_posix_pfd_close(pf);
+ nni_mtx_lock(&pq->mtx);
+ if (!pq->closed) {
nni_list_append(&pq->reapq, pf);
nni_plat_pipe_raise(pq->wake_wfd);
- while (nni_list_active(&pq->reapq, pf)) {
+ while (nni_list_node_active(&pf->node)) {
nni_cv_wait(&pf->cv);
}
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pf)
+{
+ nni_posix_pollq *pq = pf->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ // All consumers take care to move finalization to the reap thread,
+ // unless they are synchronous on user threads.
+ NNI_ASSERT(!nni_thr_is_self(&pq->thr));
+
+ nni_posix_pfd_stop(pf);
(void) close(pf->fd);
nni_cv_fini(&pf->cv);
nni_mtx_fini(&pf->mtx);
- NNI_FREE_STRUCT(pf);
}
int
@@ -151,16 +159,6 @@ nni_posix_pfd_fd(nni_posix_pfd *pf)
return (pf->fd);
}
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pf, nni_posix_pfd_cb cb, void *arg)
-{
- NNI_ASSERT(cb != NULL); // must not be null when established.
- nni_mtx_lock(&pf->mtx);
- pf->cb = cb;
- pf->arg = arg;
- nni_mtx_unlock(&pf->mtx);
-}
-
int
nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
{
@@ -170,12 +168,8 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
nni_posix_pollq *pq = pf->pq;
nni_mtx_lock(&pf->mtx);
- if (pf->closed) {
- events = 0;
- } else {
- pf->events |= events;
- events = pf->events;
- }
+ pf->events |= events;
+ events = pf->events;
nni_mtx_unlock(&pf->mtx);
if (events == 0) {
@@ -218,12 +212,10 @@ nni_posix_poll_thr(void *arg)
nni_thr_set_name(NULL, "nng:poll:kqueue");
for (;;) {
- int n;
- struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
- nni_posix_pfd *pf;
- nni_posix_pfd_cb cb;
- void *cbarg;
- unsigned revents;
+ int n;
+ struct kevent evs[NNI_MAX_KQUEUE_EVENTS];
+ nni_posix_pfd *pf;
+ unsigned revents;
n = kevent(pq->kq, NULL, 0, evs, NNI_MAX_KQUEUE_EVENTS, NULL);
@@ -253,14 +245,10 @@ nni_posix_poll_thr(void *arg)
}
nni_mtx_lock(&pf->mtx);
- cb = pf->cb;
- cbarg = pf->arg;
pf->events &= ~(revents);
nni_mtx_unlock(&pf->mtx);
- if (cb != NULL) {
- cb(pf, revents, cbarg);
- }
+ pf->cb(pf->arg, revents);
}
}
}
diff --git a/src/platform/posix/posix_pollq_kqueue.h b/src/platform/posix/posix_pollq_kqueue.h
index 74f8d9f8..a153b363 100644
--- a/src/platform/posix/posix_pollq_kqueue.h
+++ b/src/platform/posix/posix_pollq_kqueue.h
@@ -19,7 +19,9 @@ struct nni_posix_pfd {
int fd; // file descriptor to poll
void *arg; // user data
nni_posix_pfd_cb cb; // user callback on event
- bool closed;
+ nni_atomic_flag closed;
+ nni_atomic_flag stopped;
+ bool reaped;
unsigned events;
nni_cv cv; // signaled when poller has unregistered
nni_mtx mtx;
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index 7adef5ed..6011ab50 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -49,10 +49,9 @@ typedef struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pfd;
nni_posix_pollq *pq = &nni_posix_global_pollq;
// Set this is as soon as possible (narrow the close-exec race as
@@ -68,32 +67,18 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
(void) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
#endif
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
NNI_LIST_NODE_INIT(&pfd->node);
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
pfd->fd = fd;
pfd->events = 0;
- pfd->cb = NULL;
- pfd->arg = NULL;
+ pfd->cb = cb;
+ pfd->arg = arg;
pfd->pq = pq;
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
nni_plat_pipe_raise(pq->wakewfd);
- *pfdp = pfd;
- return (0);
-}
-
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->arg = arg;
- nni_mtx_unlock(&pfd->mtx);
}
int
@@ -109,16 +94,22 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+
nni_posix_pfd_close(pfd);
nni_mtx_lock(&pq->mtx);
- if (nni_list_active(&pq->pollq, pfd)) {
- nni_list_remove(&pq->pollq, pfd);
+ if (!nni_list_active(&pq->pollq, pfd)) {
+ nni_mtx_unlock(&pq->mtx);
+ return;
}
+ nni_list_remove(&pq->pollq, pfd);
if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
nni_list_append(&pq->reapq, pfd);
@@ -128,12 +119,22 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
}
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+ nni_posix_pfd_stop(pfd);
// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
int
@@ -241,8 +242,6 @@ nni_posix_poll_thr(void *arg)
return;
}
} else {
- nni_posix_pfd_cb cb;
- void *arg;
if ((events & (POLLIN | POLLOUT)) != 0) {
// don't emit pollhup yet, we want
@@ -250,14 +249,10 @@ nni_posix_poll_thr(void *arg)
events &= ~POLLHUP;
}
nni_mtx_lock(&pfd->mtx);
- cb = pfd->cb;
- arg = pfd->arg;
pfd->events &= ~events;
nni_mtx_unlock(&pfd->mtx);
- if (cb) {
- cb(pfd, events, arg);
- }
+ pfd->cb(pfd->arg, events);
}
}
}
diff --git a/src/platform/posix/posix_pollq_port.c b/src/platform/posix/posix_pollq_port.c
index 86732e17..49658f3b 100644
--- a/src/platform/posix/posix_pollq_port.c
+++ b/src/platform/posix/posix_pollq_port.c
@@ -1,5 +1,5 @@
//
-// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -35,17 +35,13 @@ typedef struct nni_posix_pollq {
// single global instance for now
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg)
{
nni_posix_pollq *pq;
- nni_posix_pfd *pfd;
pq = &nni_posix_global_pollq;
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
@@ -55,10 +51,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->closing = false;
pfd->fd = fd;
pfd->pq = pq;
- pfd->cb = NULL;
+ pfd->cb = cb;
+ pfd->arg = arg;
pfd->data = NULL;
- *pfdp = pfd;
- return (0);
}
int
@@ -72,6 +67,10 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
+ if (pq == NULL) {
+ return;
+ }
+
nni_mtx_lock(&pfd->mtx);
if (!pfd->closing) {
pfd->closing = true;
@@ -86,11 +85,13 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
- nni_posix_pfd_close(pfd);
+ if (pq == NULL) {
+ return;
+ }
NNI_ASSERT(!nni_thr_is_self(&pq->thr));
@@ -101,18 +102,28 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
}
sched_yield(); // try again later...
}
-
nni_mtx_lock(&pfd->mtx);
while (!pfd->closed) {
nni_cv_wait(&pfd->cv);
}
nni_mtx_unlock(&pfd->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_posix_pfd_stop(pfd);
// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
int
@@ -221,17 +232,6 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
return (0);
}
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- NNI_ASSERT(cb != NULL); // must not be null when established.
-
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->data = arg;
- nni_mtx_unlock(&pfd->mtx);
-}
-
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
diff --git a/src/platform/posix/posix_pollq_select.c b/src/platform/posix/posix_pollq_select.c
index 211c9328..0e24fa16 100644
--- a/src/platform/posix/posix_pollq_select.c
+++ b/src/platform/posix/posix_pollq_select.c
@@ -44,10 +44,9 @@ typedef struct nni_posix_pollq {
static nni_posix_pollq nni_posix_global_pollq;
-int
-nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+void
+nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
- nni_posix_pfd *pfd;
nni_posix_pollq *pq = &nni_posix_global_pollq;
// Set this is as soon as possible (narrow the close-exec race as
@@ -56,43 +55,24 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
- if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
- return (NNG_ENOMEM);
- }
if (fd >= FD_SETSIZE) {
- return (NNG_EINVAL);
+ return;
}
nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pq->mtx);
- pfd->fd = fd;
- pfd->events = 0;
- pfd->cb = NULL;
- pfd->arg = NULL;
- pfd->pq = pq;
+ pfd->fd = fd;
+ pfd->events = 0;
+ pfd->cb = cb;
+ pfd->arg = arg;
+ pfd->pq = pq;
+ pfd->stopped = false;
+ pfd->reap = false;
nni_mtx_lock(&pq->mtx);
- if (pq->closing) {
- nni_mtx_unlock(&pq->mtx);
- nni_cv_fini(&pfd->cv);
- nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
- return (NNG_ECLOSED);
- }
pq->pfds[fd] = pfd;
if (fd > pq->maxfd) {
pq->maxfd = fd;
}
nni_mtx_unlock(&pq->mtx);
- *pfdp = pfd;
- return (0);
-}
-
-void
-nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
-{
- nni_mtx_lock(&pfd->mtx);
- pfd->cb = cb;
- pfd->arg = arg;
- nni_mtx_unlock(&pfd->mtx);
}
int
@@ -104,34 +84,50 @@ nni_posix_pfd_fd(nni_posix_pfd *pfd)
void
nni_posix_pfd_close(nni_posix_pfd *pfd)
{
- (void) shutdown(pfd->fd, SHUT_RDWR);
+ if (pfd->pq != NULL) {
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+ }
}
void
-nni_posix_pfd_fini(nni_posix_pfd *pfd)
+nni_posix_pfd_stop(nni_posix_pfd *pfd)
{
nni_posix_pollq *pq = pfd->pq;
- int fd = pfd->fd;
-
+ if (pq == NULL) {
+ return;
+ }
nni_posix_pfd_close(pfd);
+ NNI_ASSERT(!nni_thr_is_self(&pq->thr));
nni_mtx_lock(&pq->mtx);
- if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
- pfd->reap = true;
+ if (!pfd->stopped) {
+ pfd->stopped = true;
+ pfd->reap = true;
nni_plat_pipe_raise(pq->wakewfd);
while (pfd->reap) {
nni_cv_wait(&pfd->cv);
}
- } else {
- pq->pfds[fd] = NULL;
}
nni_mtx_unlock(&pq->mtx);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ int fd = pfd->fd;
+
+ if (pq == NULL) {
+ return;
+ }
+
+ nni_posix_pfd_stop(pfd);
+ NNI_ASSERT(!nni_thr_is_self(&pq->thr));
// We're exclusive now.
(void) close(fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
- NNI_FREE_STRUCT(pfd);
}
int
@@ -247,17 +243,11 @@ nni_posix_poll_thr(void *arg)
events |= NNI_POLL_HUP;
}
if (events != 0) {
- nni_posix_pfd_cb cb = NULL;
- void *arg;
if ((pfd = pq->pfds[fd]) != NULL) {
- cb = pfd->cb;
- arg = pfd->arg;
pfd->events &= ~events;
- }
- if (cb) {
nni_mtx_unlock(&pq->mtx);
- cb(pfd, events, arg);
+ pfd->cb(pfd->arg, events);
nni_mtx_lock(&pq->mtx);
}
}
diff --git a/src/platform/posix/posix_pollq_select.h b/src/platform/posix/posix_pollq_select.h
index 3a1d48e3..012f10a0 100644
--- a/src/platform/posix/posix_pollq_select.h
+++ b/src/platform/posix/posix_pollq_select.h
@@ -22,6 +22,7 @@ struct nni_posix_pfd {
nni_posix_pfd_cb cb;
void *arg;
bool reap;
+ bool stopped;
};
#define NNI_POLL_IN (0x0001)
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
index 8ccca66c..7ed95681 100644
--- a/src/platform/posix/posix_sockfd.c
+++ b/src/platform/posix/posix_sockfd.c
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
@@ -22,23 +22,23 @@
#include "platform/posix/posix_peerid.h"
struct nni_sfd_conn {
- nng_stream stream;
- nni_posix_pfd *pfd;
- int fd;
- nni_list readq;
- nni_list writeq;
- bool closed;
- nni_mtx mtx;
- nni_reap_node reap;
+ nng_stream stream;
+ nni_posix_pfd pfd;
+ int fd;
+ nni_list readq;
+ nni_list writeq;
+ bool closed;
+ nni_mtx mtx;
+ nni_reap_node reap;
};
static void
sfd_dowrite(nni_sfd_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = c->fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -101,9 +101,9 @@ static void
sfd_doread(nni_sfd_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = c->fd;
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -174,9 +174,7 @@ sfd_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -193,9 +191,7 @@ sfd_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
@@ -206,11 +202,7 @@ sfd_stop(void *arg)
nni_sfd_conn *c = arg;
sfd_close(c);
- // ideally this would *stop* without freeing the pfd
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
- c->pfd = NULL;
- }
+ nni_posix_pfd_stop(&c->pfd);
}
// sfd_fini may block briefly waiting for the pollq thread.
@@ -220,6 +212,7 @@ sfd_fini(void *arg)
{
nni_sfd_conn *c = arg;
sfd_stop(c);
+ nni_posix_pfd_fini(&c->pfd);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -237,7 +230,7 @@ sfd_free(void *arg)
}
static void
-sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+sfd_cb(void *arg, unsigned events)
{
struct nni_sfd_conn *c = arg;
@@ -260,7 +253,7 @@ sfd_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -302,7 +295,7 @@ sfd_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -336,7 +329,7 @@ sfd_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -467,14 +460,10 @@ int
nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
{
nni_sfd_conn *c;
- int rv;
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_posix_pfd_init(&c->pfd, fd)) != 0) {
- NNI_FREE_STRUCT(c);
- return (rv);
- }
+ nni_posix_pfd_init(&c->pfd, fd, sfd_cb, c);
c->closed = false;
c->fd = fd;
@@ -491,8 +480,6 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
c->stream.s_get = sfd_get;
c->stream.s_set = sfd_set;
- nni_posix_pfd_set_cb(c->pfd, sfd_cb, c);
-
*cp = c;
return (0);
}
diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h
index 20504bf5..0eec62fe 100644
--- a/src/platform/posix/posix_tcp.h
+++ b/src/platform/posix/posix_tcp.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -18,7 +18,7 @@
struct nni_tcp_conn {
nng_stream stream;
- nni_posix_pfd *pfd;
+ nni_posix_pfd pfd;
nni_list readq;
nni_list writeq;
bool closed;
@@ -40,9 +40,9 @@ struct nni_tcp_dialer {
nni_atomic_bool fini;
};
-extern int nni_posix_tcp_alloc(nni_tcp_conn **, nni_tcp_dialer *);
-extern void nni_posix_tcp_init(nni_tcp_conn *, nni_posix_pfd *);
+extern int nni_posix_tcp_alloc(nni_tcp_conn **, nni_tcp_dialer *, int);
extern void nni_posix_tcp_start(nni_tcp_conn *, int, int);
extern void nni_posix_tcp_dialer_rele(nni_tcp_dialer *);
+extern void nni_posix_tcp_dial_cb(void *, unsigned);
#endif // PLATFORM_POSIX_TCP_H
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index d49fc838..b1a2233c 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -32,9 +32,9 @@ static void
tcp_dowrite(nni_tcp_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = nni_posix_pfd_fd(&c->pfd);
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -101,9 +101,9 @@ static void
tcp_doread(nni_tcp_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = nni_posix_pfd_fd(&c->pfd);
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -174,9 +174,7 @@ tcp_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -193,9 +191,7 @@ tcp_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
@@ -203,18 +199,10 @@ tcp_close(void *arg)
static void
tcp_stop(void *arg)
{
- nni_tcp_conn *c = arg;
- nni_posix_pfd *pfd;
+ nni_tcp_conn *c = arg;
tcp_close(c);
- nni_mtx_lock(&c->mtx);
- pfd = c->pfd;
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&c->pfd);
}
// tcp_fini may block briefly waiting for the pollq thread.
@@ -224,6 +212,7 @@ tcp_fini(void *arg)
{
nni_tcp_conn *c = arg;
tcp_stop(c);
+ nni_posix_pfd_fini(&c->pfd);
nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
@@ -245,7 +234,7 @@ tcp_free(void *arg)
}
static void
-tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+tcp_cb(void *arg, unsigned events)
{
nni_tcp_conn *c = arg;
@@ -253,6 +242,10 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
tcp_error(c, NNG_ECONNSHUT);
return;
}
+ if (c->dial_aio != NULL) {
+ nni_posix_tcp_dial_cb(c, events);
+ return;
+ }
nni_mtx_lock(&c->mtx);
if ((events & NNI_POLL_IN) != 0) {
tcp_doread(c);
@@ -268,7 +261,7 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -310,7 +303,7 @@ tcp_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -344,7 +337,7 @@ tcp_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -356,7 +349,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
nni_tcp_conn *c = arg;
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int rv;
nng_sockaddr sa;
@@ -375,7 +368,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
nni_tcp_conn *c = arg;
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int rv;
nng_sockaddr sa;
@@ -392,7 +385,7 @@ static int
tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int val = 0;
socklen_t valsz = sizeof(val);
@@ -407,7 +400,7 @@ static int
tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int val = 0;
socklen_t valsz = sizeof(val);
@@ -455,7 +448,7 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
}
int
-nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
+nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d, int fd)
{
nni_tcp_conn *c;
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
@@ -468,6 +461,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ nni_posix_pfd_init(&c->pfd, fd, tcp_cb, c);
c->stream.s_free = tcp_free;
c->stream.s_stop = tcp_stop;
@@ -482,19 +476,11 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
}
void
-nni_posix_tcp_init(nni_tcp_conn *c, nni_posix_pfd *pfd)
-{
- c->pfd = pfd;
-}
-
-void
nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive)
{
// Configure the initial socket options.
- (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY,
+ (void) setsockopt(nni_posix_pfd_fd(&c->pfd), IPPROTO_TCP, TCP_NODELAY,
&nodelay, sizeof(int));
- (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE,
+ (void) setsockopt(nni_posix_pfd_fd(&c->pfd), SOL_SOCKET, SO_KEEPALIVE,
&keepalive, sizeof(int));
-
- nni_posix_pfd_set_cb(c->pfd, tcp_cb, c);
}
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 52ea6cff..442cd824 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -123,8 +123,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nng_stream_free(&c->stream);
}
-static void
-tcp_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
+void
+nni_posix_tcp_dial_cb(void *arg, unsigned ev)
{
nni_tcp_conn *c = arg;
nni_tcp_dialer *d = c->dialer;
@@ -145,7 +145,7 @@ tcp_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
} else {
socklen_t sz = sizeof(int);
- int fd = nni_posix_pfd_fd(pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &rv, &sz) < 0) {
rv = errno;
}
@@ -185,7 +185,6 @@ void
nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
{
nni_tcp_conn *c;
- nni_posix_pfd *pfd = NULL;
struct sockaddr_storage ss;
size_t sslen;
int fd;
@@ -210,24 +209,12 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
nni_atomic_inc(&d->ref);
- if ((rv = nni_posix_tcp_alloc(&c, d)) != 0) {
+ if ((rv = nni_posix_tcp_alloc(&c, d, fd)) != 0) {
nni_aio_finish_error(aio, rv);
nni_posix_tcp_dialer_rele(d);
return;
}
- // This arranges for the fd to be in non-blocking mode, and adds the
- // poll fd to the list.
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- (void) close(fd);
- // the error label unlocks this
- nni_mtx_lock(&d->mtx);
- goto error;
- }
-
- nni_posix_tcp_init(c, pfd);
- nni_posix_pfd_set_cb(pfd, tcp_dialer_cb, c);
-
nni_mtx_lock(&d->mtx);
if (d->closed) {
rv = NNG_ECLOSED;
@@ -248,7 +235,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
goto error;
}
// Asynchronous connect.
- if ((rv = nni_posix_pfd_arm(pfd, NNI_POLL_OUT)) != 0) {
+ if ((rv = nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT)) != 0) {
goto error;
}
c->dial_aio = aio;
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index 32f0fd60..da2e8050 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -34,13 +34,13 @@
#include "posix_tcp.h"
struct nni_tcp_listener {
- nni_posix_pfd *pfd;
- nni_list acceptq;
- bool started;
- bool closed;
- bool nodelay;
- bool keepalive;
- nni_mtx mtx;
+ nni_posix_pfd pfd;
+ nni_list acceptq;
+ bool started;
+ bool closed;
+ bool nodelay;
+ bool keepalive;
+ nni_mtx mtx;
};
int
@@ -53,7 +53,6 @@ nni_tcp_listener_init(nni_tcp_listener **lp)
nni_mtx_init(&l->mtx);
- l->pfd = NULL;
l->closed = false;
l->started = false;
l->nodelay = true;
@@ -74,9 +73,7 @@ tcp_listener_doclose(nni_tcp_listener *l)
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (l->pfd != NULL) {
- nni_posix_pfd_close(l->pfd);
- }
+ nni_posix_pfd_close(&l->pfd);
}
void
@@ -93,15 +90,14 @@ tcp_listener_doaccept(nni_tcp_listener *l)
nni_aio *aio;
while ((aio = nni_list_first(&l->acceptq)) != NULL) {
- int newfd;
- int fd;
- int rv;
- int nd;
- int ka;
- nni_posix_pfd *pfd;
- nni_tcp_conn *c;
+ int newfd;
+ int fd;
+ int rv;
+ int nd;
+ int ka;
+ nni_tcp_conn *c;
- fd = nni_posix_pfd_fd(l->pfd);
+ fd = nni_posix_pfd_fd(&l->pfd);
#ifdef NNG_USE_ACCEPT4
newfd = accept4(fd, NULL, NULL, SOCK_CLOEXEC);
@@ -119,7 +115,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
case EWOULDBLOCK:
#endif
#endif
- rv = nni_posix_pfd_arm(l->pfd, NNI_POLL_IN);
+ rv = nni_posix_pfd_arm(&l->pfd, NNI_POLL_IN);
if (rv != 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -141,24 +137,13 @@ tcp_listener_doaccept(nni_tcp_listener *l)
}
}
- if ((rv = nni_posix_tcp_alloc(&c, NULL)) != 0) {
+ if ((rv = nni_posix_tcp_alloc(&c, NULL, newfd)) != 0) {
close(newfd);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
continue;
}
- if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
- close(newfd);
- nng_stream_stop(&c->stream);
- nng_stream_free(&c->stream);
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, rv);
- continue;
- }
-
- nni_posix_tcp_init(c, pfd);
-
ka = l->keepalive ? 1 : 0;
nd = l->nodelay ? 1 : 0;
nni_aio_list_remove(aio);
@@ -169,10 +154,9 @@ tcp_listener_doaccept(nni_tcp_listener *l)
}
static void
-tcp_listener_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+tcp_listener_cb(void *arg, unsigned events)
{
nni_tcp_listener *l = arg;
- NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&l->mtx);
if ((events & NNI_POLL_INVAL) != 0) {
@@ -209,7 +193,6 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
struct sockaddr_storage ss;
int rv;
int fd;
- nni_posix_pfd *pfd;
if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) ||
#ifdef NNG_ENABLE_IPV6
@@ -236,12 +219,6 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
return (nni_plat_errno(errno));
}
- if ((rv = nni_posix_pfd_init(&pfd, fd)) != 0) {
- nni_mtx_unlock(&l->mtx);
- (void) close(fd);
- return (rv);
- }
-
// On the Windows Subsystem for Linux, SO_REUSEADDR behaves like Windows
// SO_REUSEADDR, which is almost completely different (and wrong!) from
// traditional SO_REUSEADDR.
@@ -257,8 +234,8 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
if (bind(fd, (struct sockaddr *) &ss, len) < 0) {
rv = nni_plat_errno(errno);
+ (void) close(fd);
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
return (rv);
}
@@ -266,14 +243,13 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
// bad things are going to happen.
if (listen(fd, 128) != 0) {
rv = nni_plat_errno(errno);
+ (void) close(fd);
nni_mtx_unlock(&l->mtx);
- nni_posix_pfd_fini(pfd);
return (rv);
}
- nni_posix_pfd_set_cb(pfd, tcp_listener_cb, l);
+ nni_posix_pfd_init(&l->pfd, fd, tcp_listener_cb, l);
- l->pfd = pfd;
l->started = true;
nni_mtx_unlock(&l->mtx);
@@ -283,23 +259,18 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
void
nni_tcp_listener_stop(nni_tcp_listener *l)
{
- nni_posix_pfd *pfd;
-
nni_mtx_lock(&l->mtx);
tcp_listener_doclose(l);
- pfd = l->pfd;
- l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&l->pfd);
}
void
nni_tcp_listener_fini(nni_tcp_listener *l)
{
nni_tcp_listener_stop(l);
+ nni_posix_pfd_fini(&l->pfd);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -350,7 +321,7 @@ tcp_listener_get_locaddr(void *arg, void *buf, size_t *szp, nni_type t)
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
(void) getsockname(
- nni_posix_pfd_fd(l->pfd), (void *) &ss, &len);
+ nni_posix_pfd_fd(&l->pfd), (void *) &ss, &len);
(void) nni_posix_sockaddr2nn(&sa, &ss, len);
} else {
sa.s_family = NNG_AF_UNSPEC;
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 8d0d4a42..69535aa4 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -48,11 +48,11 @@
#endif
struct nni_plat_udp {
- nni_posix_pfd *udp_pfd;
- int udp_fd;
- nni_list udp_recvq;
- nni_list udp_sendq;
- nni_mtx udp_mtx;
+ nni_posix_pfd udp_pfd;
+ int udp_fd;
+ nni_list udp_recvq;
+ nni_list udp_sendq;
+ nni_mtx udp_mtx;
};
static void
@@ -175,10 +175,9 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
// This function is called by the poller on activity on the FD.
static void
-nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+nni_posix_udp_cb(void *arg, unsigned events)
{
nni_plat_udp *udp = arg;
- NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
if (events & NNI_POLL_IN) {
@@ -199,7 +198,7 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
}
if (events) {
int rv;
- rv = nni_posix_pfd_arm(udp->udp_pfd, events);
+ rv = nni_posix_pfd_arm(&udp->udp_pfd, events);
if (rv != 0) {
nni_posix_udp_doerror(udp, rv);
}
@@ -225,6 +224,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
return (NNG_ENOMEM);
}
nni_mtx_init(&udp->udp_mtx);
+ nni_aio_list_init(&udp->udp_recvq);
+ nni_aio_list_init(&udp->udp_sendq);
udp->udp_fd = socket(sa.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (udp->udp_fd < 0) {
@@ -241,16 +242,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
NNI_FREE_STRUCT(udp);
return (rv);
}
- if ((rv = nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd)) != 0) {
- (void) close(udp->udp_fd);
- nni_mtx_fini(&udp->udp_mtx);
- NNI_FREE_STRUCT(udp);
- return (rv);
- }
- nni_posix_pfd_set_cb(udp->udp_pfd, nni_posix_udp_cb, udp);
- nni_aio_list_init(&udp->udp_recvq);
- nni_aio_list_init(&udp->udp_sendq);
+ nni_posix_pfd_init(&udp->udp_pfd, udp->udp_fd, nni_posix_udp_cb, udp);
*upp = udp;
return (0);
@@ -259,12 +252,13 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
void
nni_plat_udp_close(nni_plat_udp *udp)
{
- nni_posix_pfd_fini(udp->udp_pfd);
+ nni_posix_pfd_stop(&udp->udp_pfd);
nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
nni_mtx_unlock(&udp->udp_mtx);
+ nni_posix_pfd_fini(&udp->udp_pfd);
(void) close(udp->udp_fd);
nni_mtx_fini(&udp->udp_mtx);
NNI_FREE_STRUCT(udp);
@@ -298,7 +292,8 @@ nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
}
nni_list_append(&udp->udp_recvq, aio);
if (nni_list_first(&udp->udp_recvq) == aio) {
- if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_IN)) != 0) {
+ if ((rv = nni_posix_pfd_arm(&udp->udp_pfd, NNI_POLL_IN)) !=
+ 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
@@ -321,7 +316,7 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
}
nni_list_append(&udp->udp_sendq, aio);
if (nni_list_first(&udp->udp_sendq) == aio) {
- if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_OUT)) !=
+ if ((rv = nni_posix_pfd_arm(&udp->udp_pfd, NNI_POLL_OUT)) !=
0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);