diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-03 20:28:09 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-05 21:25:57 -0700 |
| commit | b0f31f578b0669b598d3ded3a625685b125bef1d (patch) | |
| tree | 0044b1f6924700a4fe4e557826bb79796f9e94d0 /src/platform/posix | |
| parent | 557964482f2b9d4246a2943fb1bedc6074d01e0d (diff) | |
| download | nng-b0f31f578b0669b598d3ded3a625685b125bef1d.tar.gz nng-b0f31f578b0669b598d3ded3a625685b125bef1d.tar.bz2 nng-b0f31f578b0669b598d3ded3a625685b125bef1d.zip | |
Improve UDP test coverage, fix numerous issues found.
We introduced richer, deeper tests for UDP functionality.
These tests uncovered a number of issues which this commit fixes.
The Windows IOCP code needs to support multiple aios on a single
nni_win_event. A redesign of the IOCP handling addresses that.
The POSIX UDP code also needed fixes; foremost among them is the
fact that the UDP file descriptor is not placed into non-blocking
mode, leading to potential hangs.
A number of race conditions and bugs along the implementation of
the above items were uncovered and fixed. To the best of our knowledge
the current code is bug-free.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 11 | ||||
| -rw-r--r-- | src/platform/posix/posix_sockaddr.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 142 |
3 files changed, 59 insertions, 100 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d8252af9..e6abd2d2 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -266,14 +266,11 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) // on the polled list. The polled list would be the case where // the index is set to a positive value. if ((oevents == 0) && (events != 0) && (node->index < 1)) { - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - } + nni_list_node_remove(&node->node); nni_list_append(&pq->armed, node); } if ((events != 0) && (oevents != events)) { - // Possibly wake up the poller since we're looking for - // new events. + // Possibly wake up poller since we're looking for new events. if (pq->inpoll) { nni_plat_pipe_raise(pq->wakewfd); } @@ -295,9 +292,7 @@ nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) oevents = node->events; node->events &= ~events; if ((node->events == 0) && (oevents != 0)) { - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - } + nni_list_node_remove(&node->node); nni_list_append(&pq->idle, node); } // No need to wake anything, we might get a spurious wake up but diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c index ea630b01..25953f50 100644 --- a/src/platform/posix/posix_sockaddr.c +++ b/src/platform/posix/posix_sockaddr.c @@ -33,6 +33,9 @@ nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na) const nng_sockaddr_path *nspath; size_t sz; + if ((sa == NULL) || (na == NULL)) { + return (-1); + } switch (na->s_un.s_family) { case NNG_AF_INET: sin = (void *) sa; @@ -80,6 +83,9 @@ nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa) nng_sockaddr_in6 * nsin6; nng_sockaddr_path * nspath; + if ((na == NULL) || (sa == NULL)) { + return (-1); + } switch (((struct sockaddr *) sa)->sa_family) { case AF_INET: sin = (void *) sa; diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 05f7bea1..31ef76f6 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -37,7 +37,6 @@ struct nni_plat_udp { nni_posix_pollq_node udp_pitem; int udp_fd; - int udp_closed; nni_list udp_recvq; nni_list udp_sendq; nni_mtx udp_mtx; @@ -48,7 +47,6 @@ nni_posix_udp_doclose(nni_plat_udp *udp) { nni_aio *aio; - udp->udp_closed = 1; while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) || ((aio = nni_list_first(&udp->udp_sendq)) != NULL)) { nni_aio_list_remove(aio); @@ -62,57 +60,51 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) { nni_aio * aio; nni_list *q = &udp->udp_recvq; - // While we're able to recv, do so. while ((aio = nni_list_first(q)) != NULL) { - nni_list_remove(q, aio); struct iovec iov[4]; // never have more than 4 int niov; struct sockaddr_storage ss; struct msghdr hdr; - int rv; + int rv = 0; + int cnt = 0; - hdr.msg_iov = iov; for (niov = 0; niov < aio->a_niov; niov++) { iov[niov].iov_base = aio->a_iov[niov].iov_buf; iov[niov].iov_len = aio->a_iov[niov].iov_len; } + hdr.msg_iov = iov; hdr.msg_iovlen = niov; hdr.msg_name = &ss; hdr.msg_namelen = sizeof(ss); hdr.msg_flags = 0; hdr.msg_control = NULL; hdr.msg_controllen = 0; - rv = recvmsg(udp->udp_fd, &hdr, 0); - if (rv < 0) { + + if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // No data available at socket. Return - // the AIO to the head of the queue. - nni_list_prepend(q, aio); + // No data available at socket. Leave + // the AIO at the head of the queue. return; } rv = nni_plat_errno(errno); - nni_aio_finish_error(aio, rv); - continue; - } - - // We need to store the address information. - // It is incumbent on the AIO submitter to supply - // storage for the address. - if (aio->a_addr != NULL) { + } else if (aio->a_addr != NULL) { + // We need to store the address information. + // It is incumbent on the AIO submitter to supply + // storage for the address. nni_posix_sockaddr2nn(aio->a_addr, (void *) &ss); } - - nni_aio_finish(aio, 0, rv); + nni_list_remove(q, aio); + nni_aio_finish(aio, rv, cnt); } } static void nni_posix_udp_dosend(nni_plat_udp *udp) { - // XXX: TBD. nni_aio * aio; nni_list *q = &udp->udp_sendq; + int x = 0; // While we're able to send, do so. while ((aio = nni_list_first(q)) != NULL) { @@ -120,48 +112,37 @@ nni_posix_udp_dosend(nni_plat_udp *udp) struct msghdr hdr; struct iovec iov[4]; int niov; - int rv; int len; - - nni_list_remove(q, aio); - - if (aio->a_addr == NULL) { - // No outgoing address? - nni_aio_finish_error(aio, NNG_EADDRINVAL); - return; - } - len = nni_posix_nn2sockaddr(&ss, aio->a_addr); - if (len < 0) { - nni_aio_finish_error(aio, NNG_EADDRINVAL); - return; - } - - hdr.msg_iov = iov; - for (niov = 0; niov < aio->a_niov; niov++) { - iov[niov].iov_base = aio->a_iov[niov].iov_buf; - iov[niov].iov_len = aio->a_iov[niov].iov_len; - } - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = len; - hdr.msg_flags = NNI_MSG_NOSIGNAL; - hdr.msg_control = NULL; - hdr.msg_controllen = 0; - - rv = sendmsg(udp->udp_fd, &hdr, 0); - if (rv < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // Cannot send (buffers full), return to - // head of queue. - nni_list_prepend(q, aio); - return; + int rv = 0; + int cnt = 0; + + if ((len = nni_posix_nn2sockaddr(&ss, aio->a_addr)) < 0) { + rv = NNG_EADDRINVAL; + } else { + for (niov = 0; niov < aio->a_niov; niov++) { + iov[niov].iov_base = aio->a_iov[niov].iov_buf; + iov[niov].iov_len = aio->a_iov[niov].iov_len; + } + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = len; + hdr.msg_flags = NNI_MSG_NOSIGNAL; + hdr.msg_control = NULL; + hdr.msg_controllen = 0; + + if ((cnt = sendmsg(udp->udp_fd, &hdr, 0)) < 0) { + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) { + // Cannot send now, leave at head. + return; + } + rv = nni_plat_errno(errno); } - rv = nni_plat_errno(errno); - nni_aio_finish_error(aio, rv); - continue; } - nni_aio_finish(aio, 0, rv); + nni_list_remove(q, aio); + nni_aio_finish(aio, rv, cnt); } } @@ -234,6 +215,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) udp->udp_pitem.cb = nni_posix_udp_cb; udp->udp_pitem.data = udp; + (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK); + nni_aio_list_init(&udp->udp_recvq); nni_aio_list_init(&udp->udp_sendq); @@ -255,17 +238,10 @@ nni_plat_udp_close(nni_plat_udp *udp) { nni_aio *aio; - nni_mtx_lock(&udp->udp_mtx); - if (udp->udp_closed) { - // The only way this happens is in response to a callback that - // is being canceled. Double close from user code is a bug. - nni_mtx_unlock(&udp->udp_mtx); - return; - } - // We're no longer interested in events. nni_posix_pollq_remove(&udp->udp_pitem); + nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); nni_mtx_unlock(&udp->udp_mtx); @@ -291,19 +267,10 @@ void nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) { nni_mtx_lock(&udp->udp_mtx); - if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { - nni_mtx_unlock(&udp->udp_mtx); - return; - } - - if (udp->udp_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&udp->udp_mtx); - return; + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) { + nni_list_append(&udp->udp_recvq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); } - - nni_list_append(&udp->udp_recvq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); nni_mtx_unlock(&udp->udp_mtx); } @@ -311,19 +278,10 @@ void nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) { nni_mtx_lock(&udp->udp_mtx); - if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { - nni_mtx_unlock(&udp->udp_mtx); - return; + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) { + nni_list_append(&udp->udp_sendq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); } - - if (udp->udp_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&udp->udp_mtx); - return; - } - - nni_list_append(&udp->udp_sendq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); nni_mtx_unlock(&udp->udp_mtx); } |
