diff options
Diffstat (limited to 'src/platform')
| -rw-r--r-- | src/platform/posix/posix_pollq_poll.c | 11 | ||||
| -rw-r--r-- | src/platform/posix/posix_sockaddr.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_udp.c | 142 | ||||
| -rw-r--r-- | src/platform/windows/win_debug.c | 13 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 4 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 125 | ||||
| -rw-r--r-- | src/platform/windows/win_sockaddr.c | 6 |
7 files changed, 158 insertions, 149 deletions
diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index d8252af9..e6abd2d2 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -266,14 +266,11 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events) // on the polled list. The polled list would be the case where // the index is set to a positive value. if ((oevents == 0) && (events != 0) && (node->index < 1)) { - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - } + nni_list_node_remove(&node->node); nni_list_append(&pq->armed, node); } if ((events != 0) && (oevents != events)) { - // Possibly wake up the poller since we're looking for - // new events. + // Possibly wake up poller since we're looking for new events. if (pq->inpoll) { nni_plat_pipe_raise(pq->wakewfd); } @@ -295,9 +292,7 @@ nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events) oevents = node->events; node->events &= ~events; if ((node->events == 0) && (oevents != 0)) { - if (nni_list_node_active(&node->node)) { - nni_list_node_remove(&node->node); - } + nni_list_node_remove(&node->node); nni_list_append(&pq->idle, node); } // No need to wake anything, we might get a spurious wake up but diff --git a/src/platform/posix/posix_sockaddr.c b/src/platform/posix/posix_sockaddr.c index ea630b01..25953f50 100644 --- a/src/platform/posix/posix_sockaddr.c +++ b/src/platform/posix/posix_sockaddr.c @@ -33,6 +33,9 @@ nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na) const nng_sockaddr_path *nspath; size_t sz; + if ((sa == NULL) || (na == NULL)) { + return (-1); + } switch (na->s_un.s_family) { case NNG_AF_INET: sin = (void *) sa; @@ -80,6 +83,9 @@ nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa) nng_sockaddr_in6 * nsin6; nng_sockaddr_path * nspath; + if ((na == NULL) || (sa == NULL)) { + return (-1); + } switch (((struct sockaddr *) sa)->sa_family) { case AF_INET: sin = (void *) sa; diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 05f7bea1..31ef76f6 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -37,7 +37,6 @@ struct nni_plat_udp { nni_posix_pollq_node udp_pitem; int udp_fd; - int udp_closed; nni_list udp_recvq; nni_list udp_sendq; nni_mtx udp_mtx; @@ -48,7 +47,6 @@ nni_posix_udp_doclose(nni_plat_udp *udp) { nni_aio *aio; - udp->udp_closed = 1; while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) || ((aio = nni_list_first(&udp->udp_sendq)) != NULL)) { nni_aio_list_remove(aio); @@ -62,57 +60,51 @@ nni_posix_udp_dorecv(nni_plat_udp *udp) { nni_aio * aio; nni_list *q = &udp->udp_recvq; - // While we're able to recv, do so. while ((aio = nni_list_first(q)) != NULL) { - nni_list_remove(q, aio); struct iovec iov[4]; // never have more than 4 int niov; struct sockaddr_storage ss; struct msghdr hdr; - int rv; + int rv = 0; + int cnt = 0; - hdr.msg_iov = iov; for (niov = 0; niov < aio->a_niov; niov++) { iov[niov].iov_base = aio->a_iov[niov].iov_buf; iov[niov].iov_len = aio->a_iov[niov].iov_len; } + hdr.msg_iov = iov; hdr.msg_iovlen = niov; hdr.msg_name = &ss; hdr.msg_namelen = sizeof(ss); hdr.msg_flags = 0; hdr.msg_control = NULL; hdr.msg_controllen = 0; - rv = recvmsg(udp->udp_fd, &hdr, 0); - if (rv < 0) { + + if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // No data available at socket. Return - // the AIO to the head of the queue. - nni_list_prepend(q, aio); + // No data available at socket. Leave + // the AIO at the head of the queue. return; } rv = nni_plat_errno(errno); - nni_aio_finish_error(aio, rv); - continue; - } - - // We need to store the address information. - // It is incumbent on the AIO submitter to supply - // storage for the address. - if (aio->a_addr != NULL) { + } else if (aio->a_addr != NULL) { + // We need to store the address information. + // It is incumbent on the AIO submitter to supply + // storage for the address. nni_posix_sockaddr2nn(aio->a_addr, (void *) &ss); } - - nni_aio_finish(aio, 0, rv); + nni_list_remove(q, aio); + nni_aio_finish(aio, rv, cnt); } } static void nni_posix_udp_dosend(nni_plat_udp *udp) { - // XXX: TBD. nni_aio * aio; nni_list *q = &udp->udp_sendq; + int x = 0; // While we're able to send, do so. while ((aio = nni_list_first(q)) != NULL) { @@ -120,48 +112,37 @@ nni_posix_udp_dosend(nni_plat_udp *udp) struct msghdr hdr; struct iovec iov[4]; int niov; - int rv; int len; - - nni_list_remove(q, aio); - - if (aio->a_addr == NULL) { - // No outgoing address? - nni_aio_finish_error(aio, NNG_EADDRINVAL); - return; - } - len = nni_posix_nn2sockaddr(&ss, aio->a_addr); - if (len < 0) { - nni_aio_finish_error(aio, NNG_EADDRINVAL); - return; - } - - hdr.msg_iov = iov; - for (niov = 0; niov < aio->a_niov; niov++) { - iov[niov].iov_base = aio->a_iov[niov].iov_buf; - iov[niov].iov_len = aio->a_iov[niov].iov_len; - } - hdr.msg_iovlen = niov; - hdr.msg_name = &ss; - hdr.msg_namelen = len; - hdr.msg_flags = NNI_MSG_NOSIGNAL; - hdr.msg_control = NULL; - hdr.msg_controllen = 0; - - rv = sendmsg(udp->udp_fd, &hdr, 0); - if (rv < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // Cannot send (buffers full), return to - // head of queue. - nni_list_prepend(q, aio); - return; + int rv = 0; + int cnt = 0; + + if ((len = nni_posix_nn2sockaddr(&ss, aio->a_addr)) < 0) { + rv = NNG_EADDRINVAL; + } else { + for (niov = 0; niov < aio->a_niov; niov++) { + iov[niov].iov_base = aio->a_iov[niov].iov_buf; + iov[niov].iov_len = aio->a_iov[niov].iov_len; + } + hdr.msg_iov = iov; + hdr.msg_iovlen = niov; + hdr.msg_name = &ss; + hdr.msg_namelen = len; + hdr.msg_flags = NNI_MSG_NOSIGNAL; + hdr.msg_control = NULL; + hdr.msg_controllen = 0; + + if ((cnt = sendmsg(udp->udp_fd, &hdr, 0)) < 0) { + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) { + // Cannot send now, leave at head. + return; + } + rv = nni_plat_errno(errno); } - rv = nni_plat_errno(errno); - nni_aio_finish_error(aio, rv); - continue; } - nni_aio_finish(aio, 0, rv); + nni_list_remove(q, aio); + nni_aio_finish(aio, rv, cnt); } } @@ -234,6 +215,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr) udp->udp_pitem.cb = nni_posix_udp_cb; udp->udp_pitem.data = udp; + (void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK); + nni_aio_list_init(&udp->udp_recvq); nni_aio_list_init(&udp->udp_sendq); @@ -255,17 +238,10 @@ nni_plat_udp_close(nni_plat_udp *udp) { nni_aio *aio; - nni_mtx_lock(&udp->udp_mtx); - if (udp->udp_closed) { - // The only way this happens is in response to a callback that - // is being canceled. Double close from user code is a bug. - nni_mtx_unlock(&udp->udp_mtx); - return; - } - // We're no longer interested in events. nni_posix_pollq_remove(&udp->udp_pitem); + nni_mtx_lock(&udp->udp_mtx); nni_posix_udp_doclose(udp); nni_mtx_unlock(&udp->udp_mtx); @@ -291,19 +267,10 @@ void nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) { nni_mtx_lock(&udp->udp_mtx); - if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { - nni_mtx_unlock(&udp->udp_mtx); - return; - } - - if (udp->udp_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&udp->udp_mtx); - return; + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) { + nni_list_append(&udp->udp_recvq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); } - - nni_list_append(&udp->udp_recvq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLIN); nni_mtx_unlock(&udp->udp_mtx); } @@ -311,19 +278,10 @@ void nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) { nni_mtx_lock(&udp->udp_mtx); - if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) { - nni_mtx_unlock(&udp->udp_mtx); - return; + if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) { + nni_list_append(&udp->udp_sendq, aio); + nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); } - - if (udp->udp_closed) { - nni_aio_finish_error(aio, NNG_ECLOSED); - nni_mtx_unlock(&udp->udp_mtx); - return; - } - - nni_list_append(&udp->udp_sendq, aio); - nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT); nni_mtx_unlock(&udp->udp_mtx); } diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c index 5c6c3fb5..0113af58 100644 --- a/src/platform/windows/win_debug.c +++ b/src/platform/windows/win_debug.c @@ -83,8 +83,9 @@ nni_plat_errno(int errnum) static struct { int win_err; int nng_err; -} nni_win_errnos[] = { - // clang-format off +} nni_win_errnos[] = + { + // clang-format off { ERROR_FILE_NOT_FOUND, NNG_ENOENT }, { ERROR_ACCESS_DENIED, NNG_EPERM }, { ERROR_INVALID_HANDLE, NNG_ECLOSED }, @@ -114,6 +115,10 @@ static struct { { WSAENOPROTOOPT, NNG_ENOTSUP }, { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, + { WSAESOCKTNOSUPPORT, NNG_ENOTSUP }, + { WSAEOPNOTSUPP, NNG_ENOTSUP }, + { WSAEPFNOSUPPORT, NNG_ENOTSUP }, + { WSAEAFNOSUPPORT, NNG_ENOTSUP }, { WSAEADDRINUSE, NNG_EADDRINUSE }, { WSAEADDRNOTAVAIL, NNG_EADDRINVAL }, { WSAENETDOWN, NNG_EUNREACHABLE }, @@ -137,8 +142,8 @@ static struct { // Must be Last!! { 0, 0 }, - // clang-format on -}; + // clang-format on + }; // This converts a Windows API error (from GetLastError()) to an // nng standard error code. diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index c2549266..236feb31 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -63,13 +63,15 @@ struct nni_win_event_ops { struct nni_win_event { OVERLAPPED olpd; void * ptr; - nni_aio * aio; nni_mtx mtx; nni_cv cv; unsigned run : 1; unsigned fini : 1; + unsigned closed : 1; unsigned count; int status; + nni_list aios; + nni_aio * active; nni_win_event_ops ops; }; diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 6d2438d3..0f9348d6 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -24,11 +24,16 @@ static HANDLE nni_win_global_iocp = NULL; static nni_thr nni_win_iocp_thrs[NNI_WIN_IOCP_NTHREADS]; static nni_mtx nni_win_iocp_mtx; +static void nni_win_event_start(nni_win_event *); + static void -nni_win_event_finish(nni_win_event *evt, nni_aio *aio) +nni_win_event_finish(nni_win_event *evt) { + nni_aio *aio; evt->run = 0; - if (aio != NULL) { + + if ((aio = evt->active) != NULL) { + evt->active = NULL; evt->ops.wev_finish(evt, aio); } if (evt->fini) { @@ -44,9 +49,7 @@ nni_win_iocp_handler(void *arg) ULONG_PTR key; OVERLAPPED * olpd; nni_win_event *evt; - int rv; BOOL ok; - nni_aio * aio; NNI_ARG_UNUSED(arg); @@ -70,16 +73,15 @@ nni_win_iocp_handler(void *arg) nni_mtx_lock(&evt->mtx); if (ok) { - rv = ERROR_SUCCESS; + evt->status = 0; } else if (evt->status == 0) { evt->status = nni_win_error(GetLastError()); } - aio = evt->aio; - evt->aio = NULL; evt->count = cnt; - nni_win_event_finish(evt, aio); + nni_win_event_finish(evt); + nni_win_event_start(evt); nni_mtx_unlock(&evt->mtx); } } @@ -90,47 +92,65 @@ nni_win_event_cancel(nni_aio *aio, int rv) nni_win_event *evt = aio->a_prov_data; nni_mtx_lock(&evt->mtx); - if (evt->aio == aio) { + if (aio == evt->active) { evt->status = rv; // Use provider specific cancellation. evt->ops.wev_cancel(evt); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&evt->mtx); } void -nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +nni_win_event_start(nni_win_event *evt) { - // This is just continuation of a pre-existing AIO operation. - // For example, continuing I/O of a multi-buffer s/g operation. - // The lock is held. + nni_aio *aio; + + // Lock held. + + if (evt->run) { + // Already running. + return; + } // Abort operation -- no further activity. - if (evt->fini) { - evt->run = 0; - nni_cv_wake(&evt->cv); + if (evt->fini || evt->closed) { + while ((aio = nni_list_first(&evt->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } return; } + if ((aio = nni_list_first(&evt->aios)) == NULL) { + return; + } + + nni_aio_list_remove(aio); + evt->active = aio; evt->status = 0; evt->count = 0; if (!ResetEvent(evt->olpd.hEvent)) { - evt->status = nni_win_error(GetLastError()); - evt->count = 0; - nni_win_event_finish(evt, aio); + evt->active = NULL; + nni_aio_finish_error(aio, nni_win_error(GetLastError())); return; } - evt->aio = aio; evt->run = 1; if (evt->ops.wev_start(evt, aio) != 0) { // Start completed synchronously. It will have stored // the count and status in the evt. - evt->aio = NULL; - nni_win_event_finish(evt, aio); + nni_win_event_finish(evt); } } +void +nni_win_event_resubmit(nni_win_event *evt, nni_aio *aio) +{ + nni_aio_list_prepend(&evt->aios, aio); +} void nni_win_event_submit(nni_win_event *evt, nni_aio *aio) @@ -141,7 +161,8 @@ nni_win_event_submit(nni_win_event *evt, nni_aio *aio) nni_mtx_unlock(&evt->mtx); return; } - nni_win_event_resubmit(evt, aio); + nni_aio_list_append(&evt->aios, aio); + nni_win_event_start(evt); nni_mtx_unlock(&evt->mtx); } @@ -154,12 +175,20 @@ nni_win_event_complete(nni_win_event *evt, int cnt) void nni_win_event_close(nni_win_event *evt) { - if (evt->ptr != NULL) { - nni_mtx_lock(&evt->mtx); - evt->status = NNG_ECLOSED; - evt->ops.wev_cancel(evt); - nni_mtx_unlock(&evt->mtx); + nni_aio *aio; + + if (evt->ptr == NULL) { + return; // Never initialized + } + nni_mtx_lock(&evt->mtx); + evt->closed = 1; + evt->status = NNG_ECLOSED; + evt->ops.wev_cancel(evt); + while ((aio = nni_list_first(&evt->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } + nni_mtx_unlock(&evt->mtx); } int @@ -175,44 +204,52 @@ int nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) { ZeroMemory(&evt->olpd, sizeof(evt->olpd)); - evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); - if (evt->olpd.hEvent == NULL) { - return (nni_win_error(GetLastError())); - } nni_mtx_init(&evt->mtx); nni_cv_init(&evt->cv, &evt->mtx); - + nni_aio_list_init(&evt->aios); evt->ops = *ops; - evt->aio = NULL; evt->ptr = ptr; evt->fini = 0; evt->run = 0; + + evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); + if (evt->olpd.hEvent == NULL) { + return (nni_win_error(GetLastError())); + } + return (0); } void nni_win_event_fini(nni_win_event *evt) { - if (evt->ptr != NULL) { - nni_mtx_lock(&evt->mtx); + nni_aio *aio; - evt->fini = 1; + if (evt->ptr == NULL) { + return; // Never initialized + } + nni_mtx_lock(&evt->mtx); - // Use provider specific cancellation. - evt->ops.wev_cancel(evt); + evt->fini = 1; - // Wait for everything to stop referencing this. - while (evt->run) { - nni_cv_wait(&evt->cv); - } + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); - nni_mtx_unlock(&evt->mtx); + // Wait for everything to stop referencing this. + while (evt->run) { + nni_cv_wait(&evt->cv); + } + + while ((aio = nni_list_first(&evt->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } if (evt->olpd.hEvent != NULL) { (void) CloseHandle(evt->olpd.hEvent); evt->olpd.hEvent = NULL; } + nni_mtx_unlock(&evt->mtx); nni_cv_fini(&evt->cv); nni_mtx_fini(&evt->mtx); } diff --git a/src/platform/windows/win_sockaddr.c b/src/platform/windows/win_sockaddr.c index 0fa6dd51..f66542c6 100644 --- a/src/platform/windows/win_sockaddr.c +++ b/src/platform/windows/win_sockaddr.c @@ -20,6 +20,9 @@ nni_win_nn2sockaddr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa) SOCKADDR_IN * sin; SOCKADDR_IN6 *sin6; + if ((ss == NULL) || (sa == NULL)) { + return (-1); + } switch (sa->s_un.s_family) { case NNG_AF_INET: sin = (void *) ss; @@ -46,6 +49,9 @@ nni_win_sockaddr2nn(nni_sockaddr *sa, const SOCKADDR_STORAGE *ss) SOCKADDR_IN * sin; SOCKADDR_IN6 *sin6; + if ((ss == NULL) || (sa == NULL)) { + return (-1); + } switch (ss->ss_family) { case PF_INET: sin = (void *) ss; |
