aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-17 22:56:41 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-17 22:56:41 -0800
commit6c949de1cb46182303a85864bad753c12142fa97 (patch)
tree9b66786e0a22d4d38089d8c269d6e096ef0a8617
parentd83f5aea789f896c90208567a9e56599a439e90a (diff)
downloadnng-6c949de1cb46182303a85864bad753c12142fa97.tar.gz
nng-6c949de1cb46182303a85864bad753c12142fa97.tar.bz2
nng-6c949de1cb46182303a85864bad753c12142fa97.zip
POSIX poller: add support for select, and for choosing the poller
Some platforms or configurations may not have more modern options like kqueue or epoll, or may be constrained by policy.
-rw-r--r--src/platform/posix/CMakeLists.txt44
-rw-r--r--src/platform/posix/posix_ipcconn.c4
-rw-r--r--src/platform/posix/posix_pollq.h9
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c10
-rw-r--r--src/platform/posix/posix_pollq_select.c331
-rw-r--r--src/platform/posix/posix_sockfd.c4
-rw-r--r--src/platform/posix/posix_tcpconn.c4
-rw-r--r--src/platform/posix/posix_udp.c16
8 files changed, 399 insertions, 23 deletions
diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt
index 6f3dfcaf..aeedddb9 100644
--- a/src/platform/posix/CMakeLists.txt
+++ b/src/platform/posix/CMakeLists.txt
@@ -60,6 +60,8 @@ if (NNG_PLATFORM_POSIX)
nng_check_sym(port_create port.h NNG_HAVE_PORT_CREATE)
nng_check_sym(epoll_create sys/epoll.h NNG_HAVE_EPOLL)
nng_check_sym(epoll_create1 sys/epoll.h NNG_HAVE_EPOLL_CREATE1)
+ nng_check_sym(poll poll.h NNG_HAVE_POLL)
+ nng_check_sym(select sys/select.h NNG_HAVE_SELECT)
nng_check_sym(getpeereid unistd.h NNG_HAVE_GETPEEREID)
nng_check_sym(SO_PEERCRED sys/socket.h NNG_HAVE_SOPEERCRED)
nng_check_struct_member(sockpeercred uid sys/socket.h NNG_HAVE_SOCKPEERCRED)
@@ -101,14 +103,48 @@ if (NNG_PLATFORM_POSIX)
posix_udp.c
)
- if (NNG_HAVE_PORT_CREATE)
- nng_sources(posix_pollq_port.c)
+ set(NNG_POLLQ_POLLER "auto" CACHE STRING "Poller used for multiplexing I/O")
+ set_property(CACHE NNG_POLLQ_POLLER PROPERTY STRINGS auto ports kqueue epoll poll select)
+ mark_as_advanced(NNG_POLLQ_POLLER)
+ if (NNG_POLLQ_POLLER STREQUAL "ports")
+ nng_defines(NNG_POLLQ_PORTS)
+ elseif (NNG_POLLQ_POLLER STREQUAL "kqueue")
+ nng_defines(NNG_POLLQ_KQUEUE)
+ elseif (NNG_POLLQ_POLLER STREQUAL "epoll")
+ nng_defines(NNG_POLLQ_EPOLL)
+ elseif (NNG_POLLQ_POLLER STREQUAL "poll")
+ nng_defines(NNG_POLLQ_POLL)
+ elseif (NNG_POLLQ_POLLER STREQUAL "select")
+ set(NNG_POLLQ_SELECT ON)
+ elseif (NNG_HAVE_PORT_CREATE)
+ set(NNG_POLLQ_PORTS ON)
elseif (NNG_HAVE_KQUEUE)
- nng_sources(posix_pollq_kqueue.c)
+ set(NNG_POLLQ_KQUEUE ON)
elseif (NNG_HAVE_EPOLL AND NNG_HAVE_EVENTFD)
+ set(NNG_POLLQ_EPOLL ON)
+ elseif (NNG_HAVE_POLL)
+ set(NNG_POLLQ_POLL ON)
+ elseif (NNG_HAVE_SELECT)
+ set(NNG_POLLQ_SELECT TRUE)
+ endif()
+
+ if (NNG_POLLQ_PORTS)
+ message(STATUS "Using port events for multiplexing I/O.")
+ nng_sources(posix_pollq_port.c)
+ elseif (NNG_POLLQ_KQUEUE)
+ message(STATUS "Using kqueue for multiplexing I/O.")
+ nng_sources(posix_pollq_kqueue.c)
+ elseif (NNG_POLLQ_EPOLL)
+ message(DEBUG "Using epoll for multiplexing I/O.")
nng_sources(posix_pollq_epoll.c)
- else ()
+ elseif (NNG_POLLQ_POLL)
+ message(STATUS "Using poll for multiplexing I/O.")
nng_sources(posix_pollq_poll.c)
+ elseif (NNG_POLLQ_SELECT)
+ message(STATUS "Using select for multiplexing I/O.")
+ nng_sources(posix_pollq_select.c)
+ else()
+ message(FATAL_ERROR "No suitable poller found for multiplexing I/O.")
endif ()
if (NNG_HAVE_ARC4RANDOM)
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 8b11b64f..a198b87f 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -264,7 +264,7 @@ ipc_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, POLLOUT);
+ nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -298,7 +298,7 @@ ipc_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, POLLIN);
+ nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h
index 76105acc..8dc92fb1 100644
--- a/src/platform/posix/posix_pollq.h
+++ b/src/platform/posix/posix_pollq.h
@@ -32,11 +32,20 @@ extern int nni_posix_pfd_fd(nni_posix_pfd *);
extern void nni_posix_pfd_close(nni_posix_pfd *);
extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);
+#ifdef POLLIN
#define NNI_POLL_IN ((unsigned) POLLIN)
#define NNI_POLL_OUT ((unsigned) POLLOUT)
#define NNI_POLL_HUP ((unsigned) POLLHUP)
#define NNI_POLL_ERR ((unsigned) POLLERR)
#define NNI_POLL_INVAL ((unsigned) POLLNVAL)
+#else
+// maybe using select
+#define NNI_POLL_IN (0x0001)
+#define NNI_POLL_OUT (0x0010)
+#define NNI_POLL_HUP (0x0004)
+#define NNI_POLL_ERR (0x0008)
+#define NNI_POLL_INVAL (0x0020)
+#endif // POLLIN
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index ae1b2f47..562c888e 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -198,10 +198,10 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
return (0);
}
- if (events & POLLIN) {
+ if (events & NNI_POLL_IN) {
EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
- if (events & POLLOUT) {
+ if (events & NNI_POLL_OUT) {
EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
@@ -254,10 +254,10 @@ nni_posix_poll_thr(void *arg)
switch (ev->filter) {
case EVFILT_READ:
- revents = POLLIN;
+ revents = NNI_POLL_IN;
break;
case EVFILT_WRITE:
- revents = POLLOUT;
+ revents = NNI_POLL_OUT;
break;
}
if (ev->udata == NULL) {
@@ -267,7 +267,7 @@ nni_posix_poll_thr(void *arg)
}
pf = (void *) ev->udata;
if (ev->flags & EV_ERROR) {
- revents |= POLLHUP;
+ revents |= NNI_POLL_HUP;
}
nni_mtx_lock(&pf->mtx);
diff --git a/src/platform/posix/posix_pollq_select.c b/src/platform/posix/posix_pollq_select.c
new file mode 100644
index 00000000..3213aa11
--- /dev/null
+++ b/src/platform/posix/posix_pollq_select.c
@@ -0,0 +1,331 @@
+//
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/defs.h"
+#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+// POSIX AIO using select(). We use a single poll thread to perform
+// I/O operations for the entire system. This is the worst form of
+// I/O multiplexing, but short of using threads or spin-polling from
+// a single thread, this is our only reasonable solution.
+//
+// Note that select() is not scalable, and we will be limited to a small
+// number of open files/sockets. As such it is is not suitable for use
+// on large servers. However, this may be enough for use in constrained
+// systems that are not likely to have many open files anyway.
+//
+
+typedef struct nni_posix_pollq nni_posix_pollq;
+
+struct nni_posix_pollq {
+ nni_mtx mtx;
+ int wakewfd; // write side of waker pipe
+ int wakerfd; // read side of waker pipe
+ bool closing; // request for worker to exit
+ bool closed;
+ nni_thr thr; // worker thread
+ int maxfd;
+ struct nni_posix_pfd *pfds[FD_SETSIZE];
+};
+
+struct nni_posix_pfd {
+ nni_posix_pollq *pq;
+ int fd;
+ nni_cv cv;
+ nni_mtx mtx;
+ unsigned events;
+ nni_posix_pfd_cb cb;
+ void *arg;
+ bool reap;
+};
+
+static nni_posix_pollq nni_posix_global_pollq;
+
+int
+nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
+{
+ nni_posix_pfd *pfd;
+ nni_posix_pollq *pq = &nni_posix_global_pollq;
+
+ // Set this is as soon as possible (narrow the close-exec race as
+ // much as we can; better options are system calls that suppress
+ // this behavior from descriptor creation.)
+ (void) fcntl(fd, F_SETFD, FD_CLOEXEC);
+ (void) fcntl(fd, F_SETFL, O_NONBLOCK);
+
+ if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (fd >= FD_SETSIZE) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+ pfd->fd = fd;
+ pfd->events = 0;
+ pfd->cb = NULL;
+ pfd->arg = NULL;
+ pfd->pq = pq;
+ nni_mtx_lock(&pq->mtx);
+ if (pq->closing) {
+ nni_mtx_unlock(&pq->mtx);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+ return (NNG_ECLOSED);
+ }
+ pq->pfds[fd] = pfd;
+ if (fd > pq->maxfd) {
+ pq->maxfd = fd;
+ }
+ nni_mtx_unlock(&pq->mtx);
+ *pfdp = pfd;
+ return (0);
+}
+
+void
+nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg)
+{
+ nni_mtx_lock(&pfd->mtx);
+ pfd->cb = cb;
+ pfd->arg = arg;
+ nni_mtx_unlock(&pfd->mtx);
+}
+
+int
+nni_posix_pfd_fd(nni_posix_pfd *pfd)
+{
+ return (pfd->fd);
+}
+
+void
+nni_posix_pfd_close(nni_posix_pfd *pfd)
+{
+ (void) shutdown(pfd->fd, SHUT_RDWR);
+}
+
+void
+nni_posix_pfd_fini(nni_posix_pfd *pfd)
+{
+ nni_posix_pollq *pq = pfd->pq;
+ int fd = pfd->fd;
+
+ nni_posix_pfd_close(pfd);
+
+ nni_mtx_lock(&pq->mtx);
+ if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
+ pfd->reap = true;
+ nni_plat_pipe_raise(pq->wakewfd);
+ while (pfd->reap) {
+ nni_cv_wait(&pfd->cv);
+ }
+ } else {
+ pq->pfds[fd] = NULL;
+ }
+ nni_mtx_unlock(&pq->mtx);
+
+ // We're exclusive now.
+ (void) close(fd);
+ nni_cv_fini(&pfd->cv);
+ nni_mtx_fini(&pfd->mtx);
+ NNI_FREE_STRUCT(pfd);
+}
+
+int
+nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
+{
+ nni_posix_pollq *pq = pfd->pq;
+
+ nni_mtx_lock(&pq->mtx);
+ pfd->events |= events;
+ nni_mtx_unlock(&pq->mtx);
+
+ // If we're running on the callback, then don't bother to kick
+ // the pollq again. This is necessary because we cannot modify
+ // the poller while it is polling.
+ if (!nni_thr_is_self(&pq->thr)) {
+ nni_plat_pipe_raise(pq->wakewfd);
+ }
+ return (0);
+}
+
+static void
+nni_posix_poll_thr(void *arg)
+{
+ nni_posix_pollq *pq = arg;
+ fd_set rfds;
+ fd_set wfds;
+ fd_set efds;
+ int maxfd;
+
+ for (;;) {
+ unsigned events;
+ nni_posix_pfd *pfd;
+
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_ZERO(&efds);
+
+ // The waker pipe is set up so that we will be woken
+ // when it is written (this allows us to be signaled).
+ FD_SET(pq->wakerfd, &rfds);
+ FD_SET(pq->wakerfd, &efds);
+
+ nni_plat_pipe_clear(pq->wakerfd);
+ nni_mtx_lock(&pq->mtx);
+
+ // If we're closing down, bail now. This is done *after* we
+ // have ensured that the reapq is empty. Anything still in
+ // the pollq is not going to receive further callbacks.
+ if (pq->closing) {
+ for (int fd = 0; fd <= pq->maxfd; fd++) {
+ if ((pfd = pq->pfds[fd]) != NULL) {
+ pq->pfds[fd] = NULL;
+ pfd->reap = false;
+ nni_cv_wake(&pfd->cv);
+ }
+ }
+ pq->closed = true;
+ nni_mtx_unlock(&pq->mtx);
+ break;
+ }
+
+ // Set up the poll list.
+ maxfd = pq->wakerfd;
+ for (int fd = 0; fd <= pq->maxfd; fd++) {
+ if ((pfd = pq->pfds[fd]) == NULL) {
+ continue;
+ }
+ NNI_ASSERT(pfd->fd == fd);
+ if (pfd->reap) {
+ pq->pfds[fd] = NULL;
+ pfd->reap = false;
+ nni_cv_wake(&pfd->cv);
+ continue;
+ }
+ events = pfd->events;
+
+ if (events != 0) {
+ if (events & NNI_POLL_IN) {
+ FD_SET(fd, &rfds);
+ }
+ if (events & NNI_POLL_OUT) {
+ FD_SET(fd, &wfds);
+ }
+ FD_SET(fd, &efds);
+ if (maxfd < fd) {
+ maxfd = fd;
+ }
+ }
+ }
+ while (pq->maxfd > 0 && (pq->pfds[pq->maxfd] == NULL)) {
+ pq->maxfd--;
+ }
+ nni_mtx_unlock(&pq->mtx);
+
+ // We could get the result from poll, and avoid iterating
+ // over the entire set of pollfds, but since on average we
+ // will be walking half the list, doubling the work we do
+ // (the condition with a potential pipeline stall) seems like
+ // adding complexity with no real benefit. It also makes the
+ // worst case even worse.
+ (void) select(maxfd + 1, &rfds, &wfds, &efds, NULL);
+
+ nni_mtx_lock(&pq->mtx);
+ for (int fd = 0; fd <= maxfd; fd++) {
+ events = 0;
+ if (FD_ISSET(fd, &rfds)) {
+ events |= NNI_POLL_IN;
+ }
+ if (FD_ISSET(fd, &wfds)) {
+ events |= NNI_POLL_OUT;
+ }
+ if (FD_ISSET(fd, &efds)) {
+ events |= NNI_POLL_HUP;
+ }
+ if (events != 0) {
+ nni_posix_pfd_cb cb = NULL;
+ void *arg;
+ if ((pfd = pq->pfds[fd]) != NULL) {
+ cb = pfd->cb;
+ arg = pfd->arg;
+ pfd->events &= ~events;
+ }
+
+ if (cb) {
+ nni_mtx_unlock(&pq->mtx);
+ cb(pfd, events, arg);
+ nni_mtx_lock(&pq->mtx);
+ }
+ }
+ }
+ nni_mtx_unlock(&pq->mtx);
+ }
+}
+
+static void
+nni_posix_pollq_destroy(nni_posix_pollq *pq)
+{
+ nni_mtx_lock(&pq->mtx);
+ pq->closing = true;
+ nni_mtx_unlock(&pq->mtx);
+
+ nni_plat_pipe_raise(pq->wakewfd);
+
+ close(pq->wakewfd);
+ nni_thr_fini(&pq->thr);
+ close(pq->wakerfd);
+ // nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ nni_mtx_fini(&pq->mtx);
+}
+
+static int
+nni_posix_pollq_create(nni_posix_pollq *pq)
+{
+ int rv;
+
+ pq->closing = false;
+ pq->closed = false;
+
+ if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
+ nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
+ return (rv);
+ }
+ nni_thr_set_name(&pq->thr, "nng:poll:select");
+ nni_mtx_init(&pq->mtx);
+ nni_thr_run(&pq->thr);
+ return (0);
+}
+
+int
+nni_posix_pollq_sysinit(nng_init_params *params)
+{
+ NNI_ARG_UNUSED(params);
+ return (nni_posix_pollq_create(&nni_posix_global_pollq));
+}
+
+void
+nni_posix_pollq_sysfini(void)
+{
+ nni_posix_pollq_destroy(&nni_posix_global_pollq);
+}
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
index 997feae6..8ccca66c 100644
--- a/src/platform/posix/posix_sockfd.c
+++ b/src/platform/posix/posix_sockfd.c
@@ -302,7 +302,7 @@ sfd_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, POLLOUT);
+ nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -336,7 +336,7 @@ sfd_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, POLLIN);
+ nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index ce5243b0..d49fc838 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -310,7 +310,7 @@ tcp_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, POLLOUT);
+ nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -344,7 +344,7 @@ tcp_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, POLLIN);
+ nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index a7eb7144..8d0d4a42 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -181,22 +181,21 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
NNI_ARG_UNUSED(pfd);
nni_mtx_lock(&udp->udp_mtx);
- if (events & (unsigned) POLLIN) {
+ if (events & NNI_POLL_IN) {
nni_posix_udp_dorecv(udp);
}
- if (events & (unsigned) POLLOUT) {
+ if (events & NNI_POLL_OUT) {
nni_posix_udp_dosend(udp);
}
- if (events &
- ((unsigned) POLLHUP | (unsigned) POLLERR | (unsigned) POLLNVAL)) {
+ if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) {
nni_posix_udp_doclose(udp);
} else {
events = 0;
if (!nni_list_empty(&udp->udp_sendq)) {
- events |= (unsigned) POLLOUT;
+ events |= NNI_POLL_OUT;
}
if (!nni_list_empty(&udp->udp_recvq)) {
- events |= (unsigned) POLLIN;
+ events |= NNI_POLL_IN;
}
if (events) {
int rv;
@@ -299,7 +298,7 @@ nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
}
nni_list_append(&udp->udp_recvq, aio);
if (nni_list_first(&udp->udp_recvq) == aio) {
- if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_IN)) != 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}
@@ -322,7 +321,8 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
}
nni_list_append(&udp->udp_sendq, aio);
if (nni_list_first(&udp->udp_sendq) == aio) {
- if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) {
+ if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_OUT)) !=
+ 0) {
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
}