From 5febda44a4dd207bddab52613738b2d2545ad1a9 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 20 Aug 2018 15:52:12 -0700 Subject: fixes #665 Convert Windows UDP to raw IOCP --- src/platform/windows/win_udp.c | 311 +++++++++++++++++++++-------------------- 1 file changed, 162 insertions(+), 149 deletions(-) diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c index ba6b400c..e3e5a369 100644 --- a/src/platform/windows/win_udp.c +++ b/src/platform/windows/win_udp.c @@ -21,36 +21,20 @@ struct nni_plat_udp { SOCKET s; nni_mtx lk; - nni_win_event rxev; - nni_win_event txev; + nni_cv cv; + nni_list rxq; + nni_win_io rxio; + int cancel_rv; + bool closed; SOCKADDR_STORAGE rxsa; - SOCKADDR_STORAGE txsa; int rxsalen; - int txsalen; }; -static int nni_win_udp_start_rx(nni_win_event *, nni_aio *); -static int nni_win_udp_start_tx(nni_win_event *, nni_aio *); -static void nni_win_udp_finish_rx(nni_win_event *, nni_aio *); -static void nni_win_udp_finish_tx(nni_win_event *, nni_aio *); -static void nni_win_udp_cancel(nni_win_event *); - -static nni_win_event_ops nni_win_udp_rxo = { - .wev_start = nni_win_udp_start_rx, - .wev_finish = nni_win_udp_finish_rx, - .wev_cancel = nni_win_udp_cancel, -}; - -static nni_win_event_ops nni_win_udp_txo = { - .wev_start = nni_win_udp_start_tx, - .wev_finish = nni_win_udp_finish_tx, - .wev_cancel = nni_win_udp_cancel, -}; +static void udp_recv_cb(nni_win_io *, int, size_t); +static void udp_recv_start(nni_plat_udp *); // nni_plat_udp_open initializes a UDP socket, binding to the local -// address specified specified in the AIO. The remote address is -// not used. The resulting nni_plat_udp structure is returned in the -// the aio's a_pipe. +// address specified specified. int nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) { @@ -67,7 +51,9 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) if ((u = NNI_ALLOC_STRUCT(u)) == NULL) { return (NNG_ENOMEM); } + nni_aio_list_init(&u->rxq); nni_mtx_init(&u->lk); + nni_cv_init(&u->cv, &u->lk); u->s = socket(ss.ss_family, SOCK_DGRAM, IPPROTO_UDP); if (u->s == INVALID_SOCKET) { @@ -81,9 +67,8 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) (void) setsockopt( u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); - if (((rv = nni_win_event_init(&u->rxev, &nni_win_udp_rxo, u)) != 0) || - ((rv = nni_win_event_init(&u->txev, &nni_win_udp_txo, u)) != 0) || - ((rv = nni_win_iocp_register((HANDLE) u->s)) != 0)) { + if (((rv = nni_win_io_init(&u->rxio, udp_recv_cb, u)) != 0) || + ((rv = nni_win_io_register((HANDLE) u->s)) != 0)) { nni_plat_udp_close(u); return (rv); } @@ -103,117 +88,167 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) void nni_plat_udp_close(nni_plat_udp *u) { + nni_mtx_lock(&u->lk); + u->closed = true; + if (!nni_list_empty(&u->rxq)) { + CancelIoEx((HANDLE) u->s, &u->rxio.olpd); + } + while (!nni_list_empty(&u->rxq)) { + nni_cv_wait(&u->cv); + } + nni_mtx_unlock(&u->lk); + if (u->s != INVALID_SOCKET) { closesocket(u->s); } - nni_win_event_fini(&u->rxev); - nni_win_event_fini(&u->txev); + + nni_win_io_fini(&u->rxio); nni_mtx_fini(&u->lk); + nni_cv_fini(&u->cv); NNI_FREE_STRUCT(u); } // nni_plat_udp_send sends the data in the aio to the the -// destination specified in the nni_aio. The iovs are the -// UDP payload. +// destination specified in the nni_aio. The iovs are the UDP payload. void nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio) { - nni_win_event_submit(&u->txev, aio); -} - -// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs -// from the UDP payload. If the UDP payload will not fit, then -// NNG_EMSGSIZE results. -void -nni_plat_udp_recv(nni_plat_udp *u, nni_aio *aio) -{ - nni_win_event_submit(&u->rxev, aio); -} + SOCKADDR_STORAGE to; + int tolen; + nng_sockaddr * sa; + unsigned naiov; + nni_iov * aiov; + WSABUF * iov; + int rv; + DWORD nsent; -static int -nni_win_udp_start_rx(nni_win_event *evt, nni_aio *aio) -{ - int rv; - SOCKET s; - DWORD flags; - nni_plat_udp *u = evt->ptr; - nni_iov * aiov; - unsigned naiov; - WSABUF * iov; - - if ((s = u->s) == INVALID_SOCKET) { - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); + if (nni_aio_begin(aio) != 0) { + return; + } + sa = nni_aio_get_input(aio, 0); + if ((tolen = nni_win_nn2sockaddr(&to, sa)) < 0) { + nni_aio_finish_error(aio, NNG_EADDRINVAL); + return; } - u->rxsalen = sizeof(SOCKADDR_STORAGE); nni_aio_get_iov(aio, &naiov, &aiov); - - // This is a stack allocation- it should always succeed - or - // throw an exception if there is not sufficient stack space. - // (Turns out it can allocate from the heap, but same semantics.) iov = _malloca(sizeof(*iov) * naiov); + // NB: UDP send runs "quickly" on Windows, without any need for + // a blocking or asynchronous operation. If the message can't be + // sent immediately (or queued for it), then it is dropped. + + nni_mtx_lock(&u->lk); + if ((u->s == INVALID_SOCKET) || u->closed) { + nni_mtx_unlock(&u->lk); + nni_aio_finish_error(aio, NNG_ECLOSED); + _freea(iov); + return; + } + // Put the AIOs in Windows form. for (unsigned i = 0; i < naiov; i++) { iov[i].buf = aiov[i].iov_buf; iov[i].len = (ULONG) aiov[i].iov_len; } - // Note that the IOVs for the event were prepared on entry - // already. The actual aio's iov array we don't touch. + // We can use a "non-overlapping" send; there is little point in + // handling UDP send completions asynchronously. + rv = WSASendTo(u->s, iov, (DWORD) naiov, &nsent, 0, + (struct sockaddr *) &to, tolen, NULL, NULL); - evt->count = 0; - flags = 0; - - rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags, - (struct sockaddr *) &u->rxsa, &u->rxsalen, &evt->olpd, NULL); + if (rv == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + nsent = 0; + } + nni_mtx_unlock(&u->lk); _freea(iov); - if ((rv == SOCKET_ERROR) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); - } + nni_aio_finish(aio, rv, nsent); - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); + return; } -static int -nni_win_udp_start_tx(nni_win_event *evt, nni_aio *aio) +static void +udp_recv_cancel(nni_aio *aio, void *arg, int rv) { - int rv; - SOCKET s; - unsigned naiov; - nni_iov * aiov; - nni_plat_udp *u = evt->ptr; - int salen; + nni_plat_udp *u = arg; + nni_mtx_lock(&u->lk); + if (aio == nni_list_first(&u->rxq)) { + u->cancel_rv = rv; + CancelIoEx((HANDLE) u->s, &u->rxio.olpd); + } else if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&u->cv); + } + nni_mtx_unlock(&u->lk); +} + +static void +udp_recv_cb(nni_win_io *io, int rv, size_t num) +{ + nni_plat_udp *u = io->ptr; nni_sockaddr *sa; - WSABUF * iov; + nni_aio * aio; - if ((s = u->s) == INVALID_SOCKET) { - evt->status = NNG_ECLOSED; - evt->count = 0; - return (1); + nni_mtx_lock(&u->lk); + if ((aio = nni_list_first(&u->rxq)) == NULL) { + // Should indicate that it was closed. + nni_mtx_unlock(&u->lk); + return; + } + if (u->cancel_rv != 0) { + rv = u->cancel_rv; + u->cancel_rv = 0; } - sa = nni_aio_get_input(aio, 0); + // convert address from Windows form... + if ((sa = nni_aio_get_input(aio, 0)) != NULL) { + if (nni_win_sockaddr2nn(sa, &u->rxsa) != 0) { + rv = NNG_EADDRINVAL; + num = 0; + } + } + + nni_aio_list_remove(aio); + udp_recv_start(u); + nni_mtx_unlock(&u->lk); + + nni_aio_finish_synch(aio, rv, num); +} + +static void +udp_recv_start(nni_plat_udp *u) +{ + int rv; + DWORD flags; + nni_iov *aiov; + unsigned naiov; + WSABUF * iov; + nni_aio *aio; + + if ((u->s == INVALID_SOCKET) || (u->closed)) { + while ((aio = nni_list_first(&u->rxq)) != NULL) { + nni_list_remove(&u->rxq, aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_cv_wake(&u->cv); + return; + } - if ((salen = nni_win_nn2sockaddr(&u->txsa, sa)) < 0) { - evt->status = NNG_EADDRINVAL; - evt->count = 0; - return (1); +again: + if ((aio = nni_list_first(&u->rxq)) == NULL) { + return; } + u->rxsalen = sizeof(SOCKADDR_STORAGE); nni_aio_get_iov(aio, &naiov, &aiov); + // This is a stack allocation- it should always succeed - or + // throw an exception if there is not sufficient stack space. + // (Turns out it can allocate from the heap, but same semantics.) iov = _malloca(sizeof(*iov) * naiov); // Put the AIOs in Windows form. @@ -224,69 +259,47 @@ nni_win_udp_start_tx(nni_win_event *evt, nni_aio *aio) // Note that the IOVs for the event were prepared on entry // already. The actual aio's iov array we don't touch. + flags = 0; - evt->count = 0; - - rv = WSASendTo(u->s, iov, (DWORD) naiov, NULL, 0, - (struct sockaddr *) &u->txsa, salen, &evt->olpd, NULL); + rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags, + (struct sockaddr *) &u->rxsa, &u->rxsalen, &u->rxio.olpd, NULL); _freea(iov); if ((rv == SOCKET_ERROR) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { // Synchronous failure. - evt->status = nni_win_error(rv); - evt->count = 0; - return (1); + nni_aio_finish_error(aio, nni_win_error(rv)); + goto again; } - - // Wait for the I/O completion event. Note that when an I/O - // completes immediately, the I/O completion packet is still - // delivered. - return (0); } -static void -nni_win_udp_cancel(nni_win_event *evt) -{ - nni_plat_udp *u = evt->ptr; - - (void) CancelIoEx((HANDLE) u->s, &evt->olpd); -} - -static void -nni_win_udp_finish_rx(nni_win_event *evt, nni_aio *aio) +// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs +// from the UDP payload. If the UDP payload will not fit, then +// NNG_EMSGSIZE results. +void +nni_plat_udp_recv(nni_plat_udp *u, nni_aio *aio) { - int rv; - size_t cnt; - nni_plat_udp *u = evt->ptr; - - cnt = evt->count; - if ((rv = evt->status) == 0) { - nni_sockaddr *sa; - // convert address from Windows form... - if ((sa = nni_aio_get_input(aio, 0)) != NULL) { - if (nni_win_sockaddr2nn(sa, &u->rxsa) != 0) { - rv = NNG_EADDRINVAL; - cnt = 0; - } - } + int rv; + if (nni_aio_begin(aio) != 0) { + return; } - - // All done; hopefully successfully. - nni_aio_finish(aio, rv, cnt); -} - -static void -nni_win_udp_finish_tx(nni_win_event *evt, nni_aio *aio) -{ - int rv; - size_t cnt; - - cnt = evt->count; - rv = evt->status; - - nni_aio_finish(aio, rv, cnt); + nni_mtx_lock(&u->lk); + if (u->closed) { + nni_mtx_unlock(&u->lk); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, udp_recv_cancel, u)) != 0) { + nni_mtx_unlock(&u->lk); + nni_aio_finish_error(aio, rv); + return; + } + nni_list_append(&u->rxq, aio); + if (nni_list_first(&u->rxq) == aio) { + udp_recv_start(u); + } + nni_mtx_unlock(&u->lk); } int -- cgit v1.2.3-70-g09d2