aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-03 20:28:09 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-05 21:25:57 -0700
commitb0f31f578b0669b598d3ded3a625685b125bef1d (patch)
tree0044b1f6924700a4fe4e557826bb79796f9e94d0 /src/platform
parent557964482f2b9d4246a2943fb1bedc6074d01e0d (diff)
downloadnng-b0f31f578b0669b598d3ded3a625685b125bef1d.tar.gz
nng-b0f31f578b0669b598d3ded3a625685b125bef1d.tar.bz2
nng-b0f31f578b0669b598d3ded3a625685b125bef1d.zip
Improve UDP test coverage, fix numerous issues found.
We introduced richer, deeper tests for UDP functionality. These tests uncovered a number of issues which this commit fixes. The Windows IOCP code needs to support multiple aios on a single nni_win_event. A redesign of the IOCP handling addresses that. The POSIX UDP code also needed fixes; foremost among them is the fact that the UDP file descriptor is not placed into non-blocking mode, leading to potential hangs. A number of race conditions and bugs along the implementation of the above items were uncovered and fixed. To the best of our knowledge the current code is bug-free.
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/posix/posix_pollq_poll.c11
-rw-r--r--src/platform/posix/posix_sockaddr.c6
-rw-r--r--src/platform/posix/posix_udp.c142
-rw-r--r--src/platform/windows/win_debug.c13
-rw-r--r--src/platform/windows/win_impl.h4
-rw-r--r--src/platform/windows/win_iocp.c125
-rw-r--r--src/platform/windows/win_sockaddr.c6
7 files changed, 158 insertions, 149 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c
index d8252af9..e6abd2d2 100644
--- a/src/platform/posix/posix_pollq_poll.c
+++ b/src/platform/posix/posix_pollq_poll.c
@@ -266,14 +266,11 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
// on the polled list. The polled list would be the case where
// the index is set to a positive value.
if ((oevents == 0) && (events != 0) && (node->index < 1)) {
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- }
+ nni_list_node_remove(&node->node);
nni_list_append(&pq->armed, node);
}
if ((events != 0) && (oevents != events)) {
- // Possibly wake up the poller since we're looking for
- // new events.
+ // Possibly wake up poller since we're looking for new events.
if (pq->inpoll) {
nni_plat_pipe_raise(pq->wakewfd);
}
@@ -295,9 +292,7 @@ nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
oevents = node->events;
node->events &= ~events;
if ((node->events == 0) && (oevents != 0)) {
- if (nni_list_node_active(&node->node)) {
- nni_list_node_remove(&node->node);
- }
+ nni_list_node_remove(&node->node);
nni_list_append(&pq->idle, node);
}
// No need to wake anything, we might get a spurious wake up but
diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c
index ea630b01..25953f50 100644
--- a/src/platform/posix/posix_sockaddr.c
+++ b/src/platform/posix/posix_sockaddr.c
@@ -33,6 +33,9 @@ nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na)
const nng_sockaddr_path *nspath;
size_t sz;
+ if ((sa == NULL) || (na == NULL)) {
+ return (-1);
+ }
switch (na->s_un.s_family) {
case NNG_AF_INET:
sin = (void *) sa;
@@ -80,6 +83,9 @@ nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa)
nng_sockaddr_in6 * nsin6;
nng_sockaddr_path * nspath;
+ if ((na == NULL) || (sa == NULL)) {
+ return (-1);
+ }
switch (((struct sockaddr *) sa)->sa_family) {
case AF_INET:
sin = (void *) sa;
diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c
index 05f7bea1..31ef76f6 100644
--- a/src/platform/posix/posix_udp.c
+++ b/src/platform/posix/posix_udp.c
@@ -37,7 +37,6 @@
struct nni_plat_udp {
nni_posix_pollq_node udp_pitem;
int udp_fd;
- int udp_closed;
nni_list udp_recvq;
nni_list udp_sendq;
nni_mtx udp_mtx;
@@ -48,7 +47,6 @@ nni_posix_udp_doclose(nni_plat_udp *udp)
{
nni_aio *aio;
- udp->udp_closed = 1;
while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) ||
((aio = nni_list_first(&udp->udp_sendq)) != NULL)) {
nni_aio_list_remove(aio);
@@ -62,57 +60,51 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
{
nni_aio * aio;
nni_list *q = &udp->udp_recvq;
-
// While we're able to recv, do so.
while ((aio = nni_list_first(q)) != NULL) {
- nni_list_remove(q, aio);
struct iovec iov[4]; // never have more than 4
int niov;
struct sockaddr_storage ss;
struct msghdr hdr;
- int rv;
+ int rv = 0;
+ int cnt = 0;
- hdr.msg_iov = iov;
for (niov = 0; niov < aio->a_niov; niov++) {
iov[niov].iov_base = aio->a_iov[niov].iov_buf;
iov[niov].iov_len = aio->a_iov[niov].iov_len;
}
+ hdr.msg_iov = iov;
hdr.msg_iovlen = niov;
hdr.msg_name = &ss;
hdr.msg_namelen = sizeof(ss);
hdr.msg_flags = 0;
hdr.msg_control = NULL;
hdr.msg_controllen = 0;
- rv = recvmsg(udp->udp_fd, &hdr, 0);
- if (rv < 0) {
+
+ if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
- // No data available at socket. Return
- // the AIO to the head of the queue.
- nni_list_prepend(q, aio);
+ // No data available at socket. Leave
+ // the AIO at the head of the queue.
return;
}
rv = nni_plat_errno(errno);
- nni_aio_finish_error(aio, rv);
- continue;
- }
-
- // We need to store the address information.
- // It is incumbent on the AIO submitter to supply
- // storage for the address.
- if (aio->a_addr != NULL) {
+ } else if (aio->a_addr != NULL) {
+ // We need to store the address information.
+ // It is incumbent on the AIO submitter to supply
+ // storage for the address.
nni_posix_sockaddr2nn(aio->a_addr, (void *) &ss);
}
-
- nni_aio_finish(aio, 0, rv);
+ nni_list_remove(q, aio);
+ nni_aio_finish(aio, rv, cnt);
}
}
static void
nni_posix_udp_dosend(nni_plat_udp *udp)
{
- // XXX: TBD.
nni_aio * aio;
nni_list *q = &udp->udp_sendq;
+ int x = 0;
// While we're able to send, do so.
while ((aio = nni_list_first(q)) != NULL) {
@@ -120,48 +112,37 @@ nni_posix_udp_dosend(nni_plat_udp *udp)
struct msghdr hdr;
struct iovec iov[4];
int niov;
- int rv;
int len;
-
- nni_list_remove(q, aio);
-
- if (aio->a_addr == NULL) {
- // No outgoing address?
- nni_aio_finish_error(aio, NNG_EADDRINVAL);
- return;
- }
- len = nni_posix_nn2sockaddr(&ss, aio->a_addr);
- if (len < 0) {
- nni_aio_finish_error(aio, NNG_EADDRINVAL);
- return;
- }
-
- hdr.msg_iov = iov;
- for (niov = 0; niov < aio->a_niov; niov++) {
- iov[niov].iov_base = aio->a_iov[niov].iov_buf;
- iov[niov].iov_len = aio->a_iov[niov].iov_len;
- }
- hdr.msg_iovlen = niov;
- hdr.msg_name = &ss;
- hdr.msg_namelen = len;
- hdr.msg_flags = NNI_MSG_NOSIGNAL;
- hdr.msg_control = NULL;
- hdr.msg_controllen = 0;
-
- rv = sendmsg(udp->udp_fd, &hdr, 0);
- if (rv < 0) {
- if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
- // Cannot send (buffers full), return to
- // head of queue.
- nni_list_prepend(q, aio);
- return;
+ int rv = 0;
+ int cnt = 0;
+
+ if ((len = nni_posix_nn2sockaddr(&ss, aio->a_addr)) < 0) {
+ rv = NNG_EADDRINVAL;
+ } else {
+ for (niov = 0; niov < aio->a_niov; niov++) {
+ iov[niov].iov_base = aio->a_iov[niov].iov_buf;
+ iov[niov].iov_len = aio->a_iov[niov].iov_len;
+ }
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = niov;
+ hdr.msg_name = &ss;
+ hdr.msg_namelen = len;
+ hdr.msg_flags = NNI_MSG_NOSIGNAL;
+ hdr.msg_control = NULL;
+ hdr.msg_controllen = 0;
+
+ if ((cnt = sendmsg(udp->udp_fd, &hdr, 0)) < 0) {
+ if ((errno == EAGAIN) ||
+ (errno == EWOULDBLOCK)) {
+ // Cannot send now, leave at head.
+ return;
+ }
+ rv = nni_plat_errno(errno);
}
- rv = nni_plat_errno(errno);
- nni_aio_finish_error(aio, rv);
- continue;
}
- nni_aio_finish(aio, 0, rv);
+ nni_list_remove(q, aio);
+ nni_aio_finish(aio, rv, cnt);
}
}
@@ -234,6 +215,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
udp->udp_pitem.cb = nni_posix_udp_cb;
udp->udp_pitem.data = udp;
+ (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK);
+
nni_aio_list_init(&udp->udp_recvq);
nni_aio_list_init(&udp->udp_sendq);
@@ -255,17 +238,10 @@ nni_plat_udp_close(nni_plat_udp *udp)
{
nni_aio *aio;
- nni_mtx_lock(&udp->udp_mtx);
- if (udp->udp_closed) {
- // The only way this happens is in response to a callback that
- // is being canceled. Double close from user code is a bug.
- nni_mtx_unlock(&udp->udp_mtx);
- return;
- }
-
// We're no longer interested in events.
nni_posix_pollq_remove(&udp->udp_pitem);
+ nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
nni_mtx_unlock(&udp->udp_mtx);
@@ -291,19 +267,10 @@ void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) {
- nni_mtx_unlock(&udp->udp_mtx);
- return;
- }
-
- if (udp->udp_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&udp->udp_mtx);
- return;
+ if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
+ nni_list_append(&udp->udp_recvq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
}
-
- nni_list_append(&udp->udp_recvq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
nni_mtx_unlock(&udp->udp_mtx);
}
@@ -311,19 +278,10 @@ void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
nni_mtx_lock(&udp->udp_mtx);
- if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) {
- nni_mtx_unlock(&udp->udp_mtx);
- return;
+ if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
+ nni_list_append(&udp->udp_sendq, aio);
+ nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
}
-
- if (udp->udp_closed) {
- nni_aio_finish_error(aio, NNG_ECLOSED);
- nni_mtx_unlock(&udp->udp_mtx);
- return;
- }
-
- nni_list_append(&udp->udp_sendq, aio);
- nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
nni_mtx_unlock(&udp->udp_mtx);
}
diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c
index 5c6c3fb5..0113af58 100644
--- a/src/platform/windows/win_debug.c
+++ b/src/platform/windows/win_debug.c
@@ -83,8 +83,9 @@ nni_plat_errno(int errnum)
static struct {
int win_err;
int nng_err;
-} nni_win_errnos[] = {
- // clang-format off
+} nni_win_errnos[] =
+ {
+ // clang-format off
{ ERROR_FILE_NOT_FOUND, NNG_ENOENT },
{ ERROR_ACCESS_DENIED, NNG_EPERM },
{ ERROR_INVALID_HANDLE, NNG_ECLOSED },
@@ -114,6 +115,10 @@ static struct {
{ WSAENOPROTOOPT, NNG_ENOTSUP },
{ WSAEPROTONOSUPPORT, NNG_ENOTSUP },
{ WSAEPROTONOSUPPORT, NNG_ENOTSUP },
+ { WSAESOCKTNOSUPPORT, NNG_ENOTSUP },
+ { WSAEOPNOTSUPP, NNG_ENOTSUP },
+ { WSAEPFNOSUPPORT, NNG_ENOTSUP },
+ { WSAEAFNOSUPPORT, NNG_ENOTSUP },
{ WSAEADDRINUSE, NNG_EADDRINUSE },
{ WSAEADDRNOTAVAIL, NNG_EADDRINVAL },
{ WSAENETDOWN, NNG_EUNREACHABLE },
@@ -137,8 +142,8 @@ static struct {
// Must be Last!!
{ 0, 0 },
- // clang-format on
-};
+ // clang-format on
+ };
// This converts a Windows API error (from GetLastError()) to an
// nng standard error code.
diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h
index c2549266..236feb31 100644
--- a/src/platform/windows/win_impl.h
+++ b/src/platform/windows/win_impl.h
@@ -63,13 +63,15 @@ struct nni_win_event_ops {
struct nni_win_event {
OVERLAPPED olpd;
void * ptr;
- nni_aio * aio;
nni_mtx mtx;
nni_cv cv;
unsigned run : 1;
unsigned fini : 1;
+ unsigned closed : 1;
unsigned count;
int status;
+ nni_list aios;
+ nni_aio * active;
nni_win_event_ops ops;
};
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 6d2438d3..0f9348d6 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -24,11 +24,16 @@ static HANDLE nni_win_global_iocp = NULL;
static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS];
static nni_mtx nni_win_iocp_mtx;
+static void nni_win_event_start(nni_win_event *);
+
static void
-nni_win_event_finish(nni_win_event *evt, nni_aio *aio)
+nni_win_event_finish(nni_win_event *evt)
{
+ nni_aio *aio;
evt->run = 0;
- if (aio != NULL) {
+
+ if ((aio = evt->active) != NULL) {
+ evt->active = NULL;
evt->ops.wev_finish(evt, aio);
}
if (evt->fini) {
@@ -44,9 +49,7 @@ nni_win_iocp_handler(void *arg)
ULONG_PTR key;
OVERLAPPED * olpd;
nni_win_event *evt;
- int rv;
BOOL ok;
- nni_aio * aio;
NNI_ARG_UNUSED(arg);
@@ -70,16 +73,15 @@ nni_win_iocp_handler(void *arg)
nni_mtx_lock(&evt->mtx);
if (ok) {
- rv = ERROR_SUCCESS;
+ evt->status = 0;
} else if (evt->status == 0) {
evt->status = nni_win_error(GetLastError());
}
- aio = evt->aio;
- evt->aio = NULL;
evt->count = cnt;
- nni_win_event_finish(evt, aio);
+ nni_win_event_finish(evt);
+ nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
}
}
@@ -90,47 +92,65 @@ nni_win_event_cancel(nni_aio *aio, int rv)
nni_win_event *evt = aio->a_prov_data;
nni_mtx_lock(&evt->mtx);
- if (evt->aio == aio) {
+ if (aio == evt->active) {
evt->status = rv;
// Use provider specific cancellation.
evt->ops.wev_cancel(evt);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
}
nni_mtx_unlock(&evt->mtx);
}
void
-nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
+nni_win_event_start(nni_win_event *evt)
{
- // This is just continuation of a pre-existing AIO operation.
- // For example, continuing I/O of a multi-buffer s/g operation.
- // The lock is held.
+ nni_aio *aio;
+
+ // Lock held.
+
+ if (evt->run) {
+ // Already running.
+ return;
+ }
// Abort operation -- no further activity.
- if (evt->fini) {
- evt->run = 0;
- nni_cv_wake(&evt->cv);
+ if (evt->fini || evt->closed) {
+ while ((aio = nni_list_first(&evt->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
return;
}
+ if ((aio = nni_list_first(&evt->aios)) == NULL) {
+ return;
+ }
+
+ nni_aio_list_remove(aio);
+ evt->active = aio;
evt->status = 0;
evt->count = 0;
if (!ResetEvent(evt->olpd.hEvent)) {
- evt->status = nni_win_error(GetLastError());
- evt->count = 0;
- nni_win_event_finish(evt, aio);
+ evt->active = NULL;
+ nni_aio_finish_error(aio, nni_win_error(GetLastError()));
return;
}
- evt->aio = aio;
evt->run = 1;
if (evt->ops.wev_start(evt, aio) != 0) {
// Start completed synchronously. It will have stored
// the count and status in the evt.
- evt->aio = NULL;
- nni_win_event_finish(evt, aio);
+ nni_win_event_finish(evt);
}
}
+void
+nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio)
+{
+ nni_aio_list_prepend(&evt->aios, aio);
+}
void
nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
@@ -141,7 +161,8 @@ nni_win_event_submit(nni_win_event *evt, nni_aio *aio)
nni_mtx_unlock(&evt->mtx);
return;
}
- nni_win_event_resubmit(evt, aio);
+ nni_aio_list_append(&evt->aios, aio);
+ nni_win_event_start(evt);
nni_mtx_unlock(&evt->mtx);
}
@@ -154,12 +175,20 @@ nni_win_event_complete(nni_win_event *evt, int cnt)
void
nni_win_event_close(nni_win_event *evt)
{
- if (evt->ptr != NULL) {
- nni_mtx_lock(&evt->mtx);
- evt->status = NNG_ECLOSED;
- evt->ops.wev_cancel(evt);
- nni_mtx_unlock(&evt->mtx);
+ nni_aio *aio;
+
+ if (evt->ptr == NULL) {
+ return; // Never initialized
+ }
+ nni_mtx_lock(&evt->mtx);
+ evt->closed = 1;
+ evt->status = NNG_ECLOSED;
+ evt->ops.wev_cancel(evt);
+ while ((aio = nni_list_first(&evt->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
+ nni_mtx_unlock(&evt->mtx);
}
int
@@ -175,44 +204,52 @@ int
nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
{
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
- evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
- if (evt->olpd.hEvent == NULL) {
- return (nni_win_error(GetLastError()));
- }
nni_mtx_init(&evt->mtx);
nni_cv_init(&evt->cv, &evt->mtx);
-
+ nni_aio_list_init(&evt->aios);
evt->ops = *ops;
- evt->aio = NULL;
evt->ptr = ptr;
evt->fini = 0;
evt->run = 0;
+
+ evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
+ if (evt->olpd.hEvent == NULL) {
+ return (nni_win_error(GetLastError()));
+ }
+
return (0);
}
void
nni_win_event_fini(nni_win_event *evt)
{
- if (evt->ptr != NULL) {
- nni_mtx_lock(&evt->mtx);
+ nni_aio *aio;
- evt->fini = 1;
+ if (evt->ptr == NULL) {
+ return; // Never initialized
+ }
+ nni_mtx_lock(&evt->mtx);
- // Use provider specific cancellation.
- evt->ops.wev_cancel(evt);
+ evt->fini = 1;
- // Wait for everything to stop referencing this.
- while (evt->run) {
- nni_cv_wait(&evt->cv);
- }
+ // Use provider specific cancellation.
+ evt->ops.wev_cancel(evt);
- nni_mtx_unlock(&evt->mtx);
+ // Wait for everything to stop referencing this.
+ while (evt->run) {
+ nni_cv_wait(&evt->cv);
+ }
+
+ while ((aio = nni_list_first(&evt->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
if (evt->olpd.hEvent != NULL) {
(void) CloseHandle(evt->olpd.hEvent);
evt->olpd.hEvent = NULL;
}
+ nni_mtx_unlock(&evt->mtx);
nni_cv_fini(&evt->cv);
nni_mtx_fini(&evt->mtx);
}
diff --git a/src/platform/windows/win_sockaddr.c b/src/platform/windows/win_sockaddr.c
index 0fa6dd51..f66542c6 100644
--- a/src/platform/windows/win_sockaddr.c
+++ b/src/platform/windows/win_sockaddr.c
@@ -20,6 +20,9 @@ nni_win_nn2sockaddr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa)
SOCKADDR_IN * sin;
SOCKADDR_IN6 *sin6;
+ if ((ss == NULL) || (sa == NULL)) {
+ return (-1);
+ }
switch (sa->s_un.s_family) {
case NNG_AF_INET:
sin = (void *) ss;
@@ -46,6 +49,9 @@ nni_win_sockaddr2nn(nni_sockaddr *sa, const SOCKADDR_STORAGE *ss)
SOCKADDR_IN * sin;
SOCKADDR_IN6 *sin6;
+ if ((ss == NULL) || (sa == NULL)) {
+ return (-1);
+ }
switch (ss->ss_family) {
case PF_INET:
sin = (void *) ss;