summaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-20 15:52:12 -0700
committerGarrett D'Amore <garrett@damore.org>2018-08-20 16:18:15 -0700
commit5febda44a4dd207bddab52613738b2d2545ad1a9 (patch)
tree0d3d34be248c38a6bed8c0359e32ef88b122e5fc /src/platform
parent474a1f57caa2935d676f646b43f6142880517b82 (diff)
downloadnng-5febda44a4dd207bddab52613738b2d2545ad1a9.tar.gz
nng-5febda44a4dd207bddab52613738b2d2545ad1a9.tar.bz2
nng-5febda44a4dd207bddab52613738b2d2545ad1a9.zip
fixes #665 Convert Windows UDP to raw IOCP
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/windows/win_udp.c311
1 files changed, 162 insertions, 149 deletions
diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c
index ba6b400c..e3e5a369 100644
--- a/src/platform/windows/win_udp.c
+++ b/src/platform/windows/win_udp.c
@@ -21,36 +21,20 @@
struct nni_plat_udp {
SOCKET s;
nni_mtx lk;
- nni_win_event rxev;
- nni_win_event txev;
+ nni_cv cv;
+ nni_list rxq;
+ nni_win_io rxio;
+ int cancel_rv;
+ bool closed;
SOCKADDR_STORAGE rxsa;
- SOCKADDR_STORAGE txsa;
int rxsalen;
- int txsalen;
};
-static int nni_win_udp_start_rx(nni_win_event *, nni_aio *);
-static int nni_win_udp_start_tx(nni_win_event *, nni_aio *);
-static void nni_win_udp_finish_rx(nni_win_event *, nni_aio *);
-static void nni_win_udp_finish_tx(nni_win_event *, nni_aio *);
-static void nni_win_udp_cancel(nni_win_event *);
-
-static nni_win_event_ops nni_win_udp_rxo = {
- .wev_start = nni_win_udp_start_rx,
- .wev_finish = nni_win_udp_finish_rx,
- .wev_cancel = nni_win_udp_cancel,
-};
-
-static nni_win_event_ops nni_win_udp_txo = {
- .wev_start = nni_win_udp_start_tx,
- .wev_finish = nni_win_udp_finish_tx,
- .wev_cancel = nni_win_udp_cancel,
-};
+static void udp_recv_cb(nni_win_io *, int, size_t);
+static void udp_recv_start(nni_plat_udp *);
// nni_plat_udp_open initializes a UDP socket, binding to the local
-// address specified specified in the AIO. The remote address is
-// not used. The resulting nni_plat_udp structure is returned in the
-// the aio's a_pipe.
+// address specified specified.
int
nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
{
@@ -67,7 +51,9 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
if ((u = NNI_ALLOC_STRUCT(u)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_aio_list_init(&u->rxq);
nni_mtx_init(&u->lk);
+ nni_cv_init(&u->cv, &u->lk);
u->s = socket(ss.ss_family, SOCK_DGRAM, IPPROTO_UDP);
if (u->s == INVALID_SOCKET) {
@@ -81,9 +67,8 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
(void) setsockopt(
u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));
- if (((rv = nni_win_event_init(&u->rxev, &nni_win_udp_rxo, u)) != 0) ||
- ((rv = nni_win_event_init(&u->txev, &nni_win_udp_txo, u)) != 0) ||
- ((rv = nni_win_iocp_register((HANDLE) u->s)) != 0)) {
+ if (((rv = nni_win_io_init(&u->rxio, udp_recv_cb, u)) != 0) ||
+ ((rv = nni_win_io_register((HANDLE) u->s)) != 0)) {
nni_plat_udp_close(u);
return (rv);
}
@@ -103,117 +88,167 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
void
nni_plat_udp_close(nni_plat_udp *u)
{
+ nni_mtx_lock(&u->lk);
+ u->closed = true;
+ if (!nni_list_empty(&u->rxq)) {
+ CancelIoEx((HANDLE) u->s, &u->rxio.olpd);
+ }
+ while (!nni_list_empty(&u->rxq)) {
+ nni_cv_wait(&u->cv);
+ }
+ nni_mtx_unlock(&u->lk);
+
if (u->s != INVALID_SOCKET) {
closesocket(u->s);
}
- nni_win_event_fini(&u->rxev);
- nni_win_event_fini(&u->txev);
+
+ nni_win_io_fini(&u->rxio);
nni_mtx_fini(&u->lk);
+ nni_cv_fini(&u->cv);
NNI_FREE_STRUCT(u);
}
// nni_plat_udp_send sends the data in the aio to the the
-// destination specified in the nni_aio. The iovs are the
-// UDP payload.
+// destination specified in the nni_aio. The iovs are the UDP payload.
void
nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio)
{
- nni_win_event_submit(&u->txev, aio);
-}
-
-// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs
-// from the UDP payload. If the UDP payload will not fit, then
-// NNG_EMSGSIZE results.
-void
-nni_plat_udp_recv(nni_plat_udp *u, nni_aio *aio)
-{
- nni_win_event_submit(&u->rxev, aio);
-}
+ SOCKADDR_STORAGE to;
+ int tolen;
+ nng_sockaddr * sa;
+ unsigned naiov;
+ nni_iov * aiov;
+ WSABUF * iov;
+ int rv;
+ DWORD nsent;
-static int
-nni_win_udp_start_rx(nni_win_event *evt, nni_aio *aio)
-{
- int rv;
- SOCKET s;
- DWORD flags;
- nni_plat_udp *u = evt->ptr;
- nni_iov * aiov;
- unsigned naiov;
- WSABUF * iov;
-
- if ((s = u->s) == INVALID_SOCKET) {
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+ sa = nni_aio_get_input(aio, 0);
+ if ((tolen = nni_win_nn2sockaddr(&to, sa)) < 0) {
+ nni_aio_finish_error(aio, NNG_EADDRINVAL);
+ return;
}
- u->rxsalen = sizeof(SOCKADDR_STORAGE);
nni_aio_get_iov(aio, &naiov, &aiov);
-
- // This is a stack allocation- it should always succeed - or
- // throw an exception if there is not sufficient stack space.
- // (Turns out it can allocate from the heap, but same semantics.)
iov = _malloca(sizeof(*iov) * naiov);
+ // NB: UDP send runs "quickly" on Windows, without any need for
+ // a blocking or asynchronous operation. If the message can't be
+ // sent immediately (or queued for it), then it is dropped.
+
+ nni_mtx_lock(&u->lk);
+ if ((u->s == INVALID_SOCKET) || u->closed) {
+ nni_mtx_unlock(&u->lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ _freea(iov);
+ return;
+ }
+
// Put the AIOs in Windows form.
for (unsigned i = 0; i < naiov; i++) {
iov[i].buf = aiov[i].iov_buf;
iov[i].len = (ULONG) aiov[i].iov_len;
}
- // Note that the IOVs for the event were prepared on entry
- // already. The actual aio's iov array we don't touch.
+ // We can use a "non-overlapping" send; there is little point in
+ // handling UDP send completions asynchronously.
+ rv = WSASendTo(u->s, iov, (DWORD) naiov, &nsent, 0,
+ (struct sockaddr *) &to, tolen, NULL, NULL);
- evt->count = 0;
- flags = 0;
-
- rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags,
- (struct sockaddr *) &u->rxsa, &u->rxsalen, &evt->olpd, NULL);
+ if (rv == SOCKET_ERROR) {
+ rv = nni_win_error(GetLastError());
+ nsent = 0;
+ }
+ nni_mtx_unlock(&u->lk);
_freea(iov);
- if ((rv == SOCKET_ERROR) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
- }
+ nni_aio_finish(aio, rv, nsent);
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
+ return;
}
-static int
-nni_win_udp_start_tx(nni_win_event *evt, nni_aio *aio)
+static void
+udp_recv_cancel(nni_aio *aio, void *arg, int rv)
{
- int rv;
- SOCKET s;
- unsigned naiov;
- nni_iov * aiov;
- nni_plat_udp *u = evt->ptr;
- int salen;
+ nni_plat_udp *u = arg;
+ nni_mtx_lock(&u->lk);
+ if (aio == nni_list_first(&u->rxq)) {
+ u->cancel_rv = rv;
+ CancelIoEx((HANDLE) u->s, &u->rxio.olpd);
+ } else if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ nni_cv_wake(&u->cv);
+ }
+ nni_mtx_unlock(&u->lk);
+}
+
+static void
+udp_recv_cb(nni_win_io *io, int rv, size_t num)
+{
+ nni_plat_udp *u = io->ptr;
nni_sockaddr *sa;
- WSABUF * iov;
+ nni_aio * aio;
- if ((s = u->s) == INVALID_SOCKET) {
- evt->status = NNG_ECLOSED;
- evt->count = 0;
- return (1);
+ nni_mtx_lock(&u->lk);
+ if ((aio = nni_list_first(&u->rxq)) == NULL) {
+ // Should indicate that it was closed.
+ nni_mtx_unlock(&u->lk);
+ return;
+ }
+ if (u->cancel_rv != 0) {
+ rv = u->cancel_rv;
+ u->cancel_rv = 0;
}
- sa = nni_aio_get_input(aio, 0);
+ // convert address from Windows form...
+ if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
+ if (nni_win_sockaddr2nn(sa, &u->rxsa) != 0) {
+ rv = NNG_EADDRINVAL;
+ num = 0;
+ }
+ }
+
+ nni_aio_list_remove(aio);
+ udp_recv_start(u);
+ nni_mtx_unlock(&u->lk);
+
+ nni_aio_finish_synch(aio, rv, num);
+}
+
+static void
+udp_recv_start(nni_plat_udp *u)
+{
+ int rv;
+ DWORD flags;
+ nni_iov *aiov;
+ unsigned naiov;
+ WSABUF * iov;
+ nni_aio *aio;
+
+ if ((u->s == INVALID_SOCKET) || (u->closed)) {
+ while ((aio = nni_list_first(&u->rxq)) != NULL) {
+ nni_list_remove(&u->rxq, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_cv_wake(&u->cv);
+ return;
+ }
- if ((salen = nni_win_nn2sockaddr(&u->txsa, sa)) < 0) {
- evt->status = NNG_EADDRINVAL;
- evt->count = 0;
- return (1);
+again:
+ if ((aio = nni_list_first(&u->rxq)) == NULL) {
+ return;
}
+ u->rxsalen = sizeof(SOCKADDR_STORAGE);
nni_aio_get_iov(aio, &naiov, &aiov);
+ // This is a stack allocation- it should always succeed - or
+ // throw an exception if there is not sufficient stack space.
+ // (Turns out it can allocate from the heap, but same semantics.)
iov = _malloca(sizeof(*iov) * naiov);
// Put the AIOs in Windows form.
@@ -224,69 +259,47 @@ nni_win_udp_start_tx(nni_win_event *evt, nni_aio *aio)
// Note that the IOVs for the event were prepared on entry
// already. The actual aio's iov array we don't touch.
+ flags = 0;
- evt->count = 0;
-
- rv = WSASendTo(u->s, iov, (DWORD) naiov, NULL, 0,
- (struct sockaddr *) &u->txsa, salen, &evt->olpd, NULL);
+ rv = WSARecvFrom(u->s, iov, (DWORD) naiov, NULL, &flags,
+ (struct sockaddr *) &u->rxsa, &u->rxsalen, &u->rxio.olpd, NULL);
_freea(iov);
if ((rv == SOCKET_ERROR) &&
((rv = GetLastError()) != ERROR_IO_PENDING)) {
// Synchronous failure.
- evt->status = nni_win_error(rv);
- evt->count = 0;
- return (1);
+ nni_aio_finish_error(aio, nni_win_error(rv));
+ goto again;
}
-
- // Wait for the I/O completion event. Note that when an I/O
- // completes immediately, the I/O completion packet is still
- // delivered.
- return (0);
}
-static void
-nni_win_udp_cancel(nni_win_event *evt)
-{
- nni_plat_udp *u = evt->ptr;
-
- (void) CancelIoEx((HANDLE) u->s, &evt->olpd);
-}
-
-static void
-nni_win_udp_finish_rx(nni_win_event *evt, nni_aio *aio)
+// nni_plat_udp_pipe_recv recvs a message, storing it in the iovs
+// from the UDP payload. If the UDP payload will not fit, then
+// NNG_EMSGSIZE results.
+void
+nni_plat_udp_recv(nni_plat_udp *u, nni_aio *aio)
{
- int rv;
- size_t cnt;
- nni_plat_udp *u = evt->ptr;
-
- cnt = evt->count;
- if ((rv = evt->status) == 0) {
- nni_sockaddr *sa;
- // convert address from Windows form...
- if ((sa = nni_aio_get_input(aio, 0)) != NULL) {
- if (nni_win_sockaddr2nn(sa, &u->rxsa) != 0) {
- rv = NNG_EADDRINVAL;
- cnt = 0;
- }
- }
+ int rv;
+ if (nni_aio_begin(aio) != 0) {
+ return;
}
-
- // All done; hopefully successfully.
- nni_aio_finish(aio, rv, cnt);
-}
-
-static void
-nni_win_udp_finish_tx(nni_win_event *evt, nni_aio *aio)
-{
- int rv;
- size_t cnt;
-
- cnt = evt->count;
- rv = evt->status;
-
- nni_aio_finish(aio, rv, cnt);
+ nni_mtx_lock(&u->lk);
+ if (u->closed) {
+ nni_mtx_unlock(&u->lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((rv = nni_aio_schedule(aio, udp_recv_cancel, u)) != 0) {
+ nni_mtx_unlock(&u->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_list_append(&u->rxq, aio);
+ if (nni_list_first(&u->rxq) == aio) {
+ udp_recv_start(u);
+ }
+ nni_mtx_unlock(&u->lk);
}
int