diff options
| -rw-r--r-- | src/core/platform.h | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 1 | ||||
| -rw-r--r-- | src/platform/windows/win_debug.c | 35 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 10 | ||||
| -rw-r--r-- | src/platform/windows/win_iocp.c | 57 | ||||
| -rw-r--r-- | src/platform/windows/win_ipc.c | 55 | ||||
| -rw-r--r-- | src/platform/windows/win_net.c | 1030 | ||||
| -rw-r--r-- | src/platform/windows/win_pipe.c | 2 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 19 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 23 |
10 files changed, 661 insertions, 578 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index b3fc7572..ed32c550 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -187,13 +187,6 @@ extern int nni_plat_init(int (*)(void)); // will be called until nni_platform_init is called. extern void nni_plat_fini(void); -// nni_plat_lookup_host looks up a hostname in DNS, or the local hosts -// file, or whatever. If your platform lacks support for naming, it must -// at least cope with converting IP addresses in string form. The final -// flags may include NNI_FLAG_IPV4ONLY to prevent IPv6 names from being -// returned on dual stack machines. -extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); - // // TCP Support. // diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 78e1e6ee..69c0e772 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -125,6 +125,7 @@ nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) len = nni_posix_tcp_addr(&ss, &aio.a_addrs[0]); nni_posix_epdesc_set_local(ed, &ss, len); } + nni_aio_fini(&aio); *epp = (void *) ed; return (0); diff --git a/src/platform/windows/win_debug.c b/src/platform/windows/win_debug.c index cc2adb88..cbf0a61c 100644 --- a/src/platform/windows/win_debug.c +++ b/src/platform/windows/win_debug.c @@ -77,7 +77,9 @@ nni_plat_errno(int errnum) // Windows has infinite numbers of error codes it seems. We only bother // with the ones that are relevant to us (we think). Note that there is -// no overlap between errnos and GetLastError values. +// no overlap between errnos and GetLastError values. Note also that +// the WinSock errors are basically in the same number space as other +// errors, and WSAGetLastError() is an alias for GetLastError(). static struct { int win_err; int nng_err; @@ -101,6 +103,37 @@ static struct { { ERROR_PIPE_NOT_CONNECTED, NNG_ECLOSED }, { ERROR_OPERATION_ABORTED, NNG_ECLOSED }, { WAIT_TIMEOUT, NNG_ETIMEDOUT }, + { WSAEINTR, NNG_EINTR }, + { WSAEBADF, NNG_ECLOSED }, + { WSAEACCES, NNG_EPERM }, + { WSAEWOULDBLOCK, NNG_EAGAIN }, + { WSAEINPROGRESS, NNG_EAGAIN }, + { WSAENOTSOCK, NNG_ECLOSED }, + { WSAEMSGSIZE, NNG_EMSGSIZE }, + { WSAENOPROTOOPT, NNG_ENOTSUP }, + { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, + { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, + { WSAEADDRINUSE, NNG_EADDRINUSE }, + { WSAEADDRNOTAVAIL, NNG_EADDRINVAL }, + { WSAENETDOWN, NNG_EUNREACHABLE }, + { WSAENETUNREACH, NNG_EUNREACHABLE }, + { WSAECONNABORTED, NNG_ETIMEDOUT }, + { WSAECONNRESET, NNG_ECLOSED }, + { WSAENOBUFS, NNG_ENOMEM }, + { WSAENOTCONN, NNG_ECLOSED }, + { WSAESHUTDOWN, NNG_ECLOSED }, + { WSAETIMEDOUT, NNG_ETIMEDOUT }, + { WSAECONNREFUSED, NNG_ECONNREFUSED }, + { WSAEHOSTDOWN, NNG_EUNREACHABLE }, + { WSAEHOSTUNREACH, NNG_EUNREACHABLE }, + { WSAVERNOTSUPPORTED, NNG_ENOTSUP }, + { WSAEDISCON, NNG_ECLOSED }, + { WSAECANCELLED, NNG_ECANCELED }, + { WSA_E_CANCELLED, NNG_ECANCELED }, + { WSAHOST_NOT_FOUND, NNG_EADDRINVAL }, + { WSATRY_AGAIN, NNG_EAGAIN }, + { WSANO_DATA, NNG_EADDRINVAL }, + // Must be Last!! { 0, 0 }, // clang-format on diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 9049a81d..a77fcf0b 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -57,11 +57,10 @@ typedef struct nni_win_event_ops nni_win_event_ops; struct nni_win_event_ops { int (*wev_start)(nni_win_event *, nni_aio *); void (*wev_finish)(nni_win_event *, nni_aio *); - void (*wev_cancel)(nni_win_event *, nni_aio *); + void (*wev_cancel)(nni_win_event *); }; struct nni_win_event { OVERLAPPED olpd; - HANDLE h; void * ptr; nni_aio * aio; nni_mtx mtx; @@ -78,10 +77,8 @@ enum nni_win_event_flags { }; extern int nni_win_error(int); -extern int nni_winsock_error(int); -extern int nni_win_event_init( - nni_win_event *, nni_win_event_ops *, void *, HANDLE); +extern int nni_win_event_init(nni_win_event *, nni_win_event_ops *, void *); extern void nni_win_event_fini(nni_win_event *); extern void nni_win_event_submit(nni_win_event *, nni_aio *); extern void nni_win_event_resubmit(nni_win_event *, nni_aio *); @@ -96,6 +93,9 @@ extern void nni_win_iocp_sysfini(void); extern int nni_win_ipc_sysinit(void); extern void nni_win_ipc_sysfini(void); +extern int nni_win_tcp_sysinit(void); +extern void nni_win_tcp_sysfini(void); + extern int nni_win_resolv_sysinit(void); extern void nni_win_resolv_sysfini(void); diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c index 306f029c..2635a4fc 100644 --- a/src/platform/windows/win_iocp.c +++ b/src/platform/windows/win_iocp.c @@ -89,7 +89,7 @@ nni_win_event_cancel(nni_aio *aio) evt->aio = NULL; // Use provider specific cancellation. - evt->ops.wev_cancel(evt, aio); + evt->ops.wev_cancel(evt); // Wait for everything to stop referencing this. while (evt->flags & NNI_WIN_EVENT_RUNNING) { @@ -154,23 +154,21 @@ void nni_win_event_close(nni_win_event *evt) { nni_aio *aio; - nni_mtx_lock(&evt->mtx); - if (evt->h != NULL) { - if (CancelIoEx(evt->h, &evt->olpd)) { - DWORD cnt; - // Stall waiting for the I/O to complete. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + + if (evt->ptr != NULL) { + nni_mtx_lock(&evt->mtx); + evt->flags |= NNI_WIN_EVENT_ABORT; + evt->ops.wev_cancel(evt); + if ((aio = evt->aio) != NULL) { + evt->aio = NULL; + // We really don't care if we transferred data or not. + // The caller indicates they have closed the pipe. + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + evt->ops.wev_finish(evt, aio); } + nni_mtx_unlock(&evt->mtx); } - if ((aio = evt->aio) != NULL) { - evt->aio = NULL; - // We really don't care if we transferred data or not. - // The caller indicates they have closed the pipe. - evt->status = ERROR_INVALID_HANDLE; - evt->count = 0; - evt->ops.wev_finish(evt, aio); - } - nni_mtx_unlock(&evt->mtx); } int @@ -183,8 +181,7 @@ nni_win_iocp_register(HANDLE h) } int -nni_win_event_init( - nni_win_event *evt, nni_win_event_ops *ops, void *ptr, HANDLE h) +nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr) { int rv; @@ -200,7 +197,6 @@ nni_win_event_init( evt->ops = *ops; evt->aio = NULL; evt->ptr = ptr; - evt->h = h; return (0); } @@ -208,20 +204,23 @@ void nni_win_event_fini(nni_win_event *evt) { nni_aio *aio; - nni_mtx_lock(&evt->mtx); - if ((aio = evt->aio) != NULL) { - evt->flags |= NNI_WIN_EVENT_ABORT; - evt->aio = NULL; - // Use provider specific cancellation. - evt->ops.wev_cancel(evt, aio); + if (evt->ptr != NULL) { + nni_mtx_lock(&evt->mtx); + if ((aio = evt->aio) != NULL) { + evt->flags |= NNI_WIN_EVENT_ABORT; + evt->aio = NULL; + + // Use provider specific cancellation. + evt->ops.wev_cancel(evt); - // Wait for everything to stop referencing this. - while (evt->flags & NNI_WIN_EVENT_RUNNING) { - nni_cv_wait(&evt->cv); + // Wait for everything to stop referencing this. + while (evt->flags & NNI_WIN_EVENT_RUNNING) { + nni_cv_wait(&evt->cv); + } } + nni_mtx_unlock(&evt->mtx); } - nni_mtx_unlock(&evt->mtx); if (evt->olpd.hEvent != NULL) { (void) CloseHandle(evt->olpd.hEvent); diff --git a/src/platform/windows/win_ipc.c b/src/platform/windows/win_ipc.c index dff65941..f9c9f0d3 100644 --- a/src/platform/windows/win_ipc.c +++ b/src/platform/windows/win_ipc.c @@ -23,7 +23,6 @@ struct nni_plat_ipc_ep { char path[256]; int mode; int started; - nni_list aios; HANDLE p; // accept side only nni_win_event acc_ev; // accept side only nni_aio * con_aio; // conn side only @@ -32,7 +31,7 @@ struct nni_plat_ipc_ep { static int nni_win_ipc_pipe_start(nni_win_event *, nni_aio *); static void nni_win_ipc_pipe_finish(nni_win_event *, nni_aio *); -static void nni_win_ipc_pipe_cancel(nni_win_event *, nni_aio *); +static void nni_win_ipc_pipe_cancel(nni_win_event *); static nni_win_event_ops nni_win_ipc_pipe_ops = { .wev_start = nni_win_ipc_pipe_start, @@ -42,7 +41,7 @@ static nni_win_event_ops nni_win_ipc_pipe_ops = { static int nni_win_ipc_acc_start(nni_win_event *, nni_aio *); static void nni_win_ipc_acc_finish(nni_win_event *, nni_aio *); -static void nni_win_ipc_acc_cancel(nni_win_event *, nni_aio *); +static void nni_win_ipc_acc_cancel(nni_win_event *); static nni_win_event_ops nni_win_ipc_acc_ops = { .wev_start = nni_win_ipc_acc_start, @@ -64,7 +63,7 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) NNI_ASSERT(aio->a_iov[0].iov_len > 0); NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); - if (evt->h == INVALID_HANDLE_VALUE) { + if (pipe->p == INVALID_HANDLE_VALUE) { evt->status = ERROR_INVALID_HANDLE; evt->count = 0; return (1); @@ -86,13 +85,13 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) evt->count = 0; if (evt == &pipe->snd_ev) { - ok = WriteFile(evt->h, buf, len, NULL, &evt->olpd); + ok = WriteFile(pipe->p, buf, len, NULL, &evt->olpd); } else { - ok = ReadFile(evt->h, buf, len, NULL, &evt->olpd); + ok = ReadFile(pipe->p, buf, len, NULL, &evt->olpd); } if ((!ok) && ((rv = GetLastError()) != ERROR_IO_PENDING)) { // Synchronous failure. - evt->status = GetLastError(); + evt->status = rv; evt->count = 0; return (1); } @@ -104,23 +103,23 @@ nni_win_ipc_pipe_start(nni_win_event *evt, nni_aio *aio) } static void -nni_win_ipc_pipe_cancel(nni_win_event *evt, nni_aio *aio) +nni_win_ipc_pipe_cancel(nni_win_event *evt) { - NNI_ARG_UNUSED(aio); + nni_plat_ipc_pipe *pipe = evt->ptr; - if (CancelIoEx(evt->h, &evt->olpd)) { + if (CancelIoEx(pipe->p, &evt->olpd)) { DWORD cnt; // If we canceled, make sure that we've completely // finished with the overlapped. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + GetOverlappedResult(pipe->p, &evt->olpd, &cnt, TRUE); } } static void nni_win_ipc_pipe_finish(nni_win_event *evt, nni_aio *aio) { - int rv = 0; + int rv; DWORD cnt; cnt = evt->count; @@ -158,12 +157,12 @@ nni_win_ipc_pipe_init(nni_plat_ipc_pipe **pipep, HANDLE p) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } - rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe, p); + rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_ipc_pipe_ops, pipe); if (rv != 0) { nni_plat_ipc_pipe_fini(pipe); return (rv); } - rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe, p); + rv = nni_win_event_init(&pipe->snd_ev, &nni_win_ipc_pipe_ops, pipe); if (rv != 0) { nni_plat_ipc_pipe_fini(pipe); return (rv); @@ -191,12 +190,13 @@ nni_plat_ipc_pipe_close(nni_plat_ipc_pipe *pipe) { HANDLE p; + nni_win_event_close(&pipe->snd_ev); + nni_win_event_close(&pipe->rcv_ev); + if ((p = pipe->p) != INVALID_HANDLE_VALUE) { pipe->p = INVALID_HANDLE_VALUE; CloseHandle(p); } - nni_win_event_close(&pipe->snd_ev); - nni_win_event_close(&pipe->rcv_ev); } void @@ -262,7 +262,7 @@ nni_plat_ipc_ep_listen(nni_plat_ipc_ep *ep) } goto failed; } - rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep, p); + rv = nni_win_event_init(&ep->acc_ev, &nni_win_ipc_acc_ops, ep); if (rv != 0) { goto failed; } @@ -320,9 +320,8 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) return; } - oldp = ep->p; - ep->p = newp; - evt->h = newp; + oldp = ep->p; + ep->p = newp; if ((rv = nni_win_ipc_pipe_init(&pipe, oldp)) != 0) { // The new pipe is already fine for us. Discard @@ -338,25 +337,27 @@ nni_win_ipc_acc_finish(nni_win_event *evt, nni_aio *aio) } static void -nni_win_ipc_acc_cancel(nni_win_event *evt, nni_aio *aio) +nni_win_ipc_acc_cancel(nni_win_event *evt) { - NNI_ARG_UNUSED(aio); + nni_plat_ipc_ep *ep = evt->ptr; - if (CancelIoEx(evt->h, &evt->olpd)) { + if (CancelIoEx(ep->p, &evt->olpd)) { DWORD cnt; // If we canceled, make sure that we've completely // finished with the overlapped. - GetOverlappedResult(evt->h, &evt->olpd, &cnt, TRUE); + GetOverlappedResult(ep->p, &evt->olpd, &cnt, TRUE); } // Just to be sure. - (void) DisconnectNamedPipe(evt->h); + (void) DisconnectNamedPipe(ep->p); } static int nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) { - if (!ConnectNamedPipe(evt->h, &evt->olpd)) { + nni_plat_ipc_ep *ep = evt->ptr; + + if (!ConnectNamedPipe(ep->p, &evt->olpd)) { int rv = GetLastError(); switch (rv) { case ERROR_PIPE_CONNECTED: @@ -372,7 +373,7 @@ nni_win_ipc_acc_start(nni_win_event *evt, nni_aio *aio) default: // Fast-fail (synchronous). - evt->status = GetLastError(); + evt->status = rv; evt->count = 0; return (1); } diff --git a/src/platform/windows/win_net.c b/src/platform/windows/win_net.c index 19491d02..9ddccdb4 100644 --- a/src/platform/windows/win_net.c +++ b/src/platform/windows/win_net.c @@ -13,21 +13,27 @@ #include <stdio.h> -static LPFN_CONNECTEX nni_win_connectex; -static LPFN_ACCEPTEX nni_win_acceptex; - struct nni_plat_tcp_pipe { SOCKET s; - nni_win_event recv_evt; - nni_win_event send_evt; - WSAOVERLAPPED recv_olpd; - WSAOVERLAPPED send_olpd; + nni_win_event rcv_ev; + nni_win_event snd_ev; }; struct nni_plat_tcp_ep { SOCKET s; - nni_win_event evt; - WSAOVERLAPPED olpd; + SOCKET acc_s; + nni_win_event con_ev; + nni_win_event acc_ev; + int mode; + int started; + int bound; + + SOCKADDR_STORAGE remaddr; + int remlen; + SOCKADDR_STORAGE locaddr; + int loclen; + + char buf[512]; // to hold acceptex results // We have to lookup some function pointers using ioctls. Winsock, // gotta love it. @@ -35,649 +41,685 @@ struct nni_plat_tcp_ep { LPFN_ACCEPTEX acceptex; }; -// Windows has infinite numbers of error codes it seems. -static struct { - int wsa_err; - int nng_err; -} nni_plat_wsa_errnos[] = { - // clang-format off - { WSA_INVALID_HANDLE, NNG_ECLOSED }, - { WSA_NOT_ENOUGH_MEMORY, NNG_ENOMEM }, - { WSA_INVALID_PARAMETER, NNG_EINVAL }, - { WSA_OPERATION_ABORTED, NNG_ECLOSED }, - { WSA_IO_INCOMPLETE, NNG_EAGAIN }, - - { WSAEINTR, NNG_EINTR }, - { WSAEBADF, NNG_ECLOSED }, - { WSAEACCES, NNG_EPERM }, - { WSAEFAULT, NNG_ESYSERR + WSAEFAULT }, - { WSAEWOULDBLOCK, NNG_EAGAIN }, - { WSAEINPROGRESS, NNG_EAGAIN }, - { WSAEALREADY, NNG_ESYSERR + WSAEALREADY }, - { WSAENOTSOCK, NNG_ECLOSED }, - { WSAEMSGSIZE, NNG_EMSGSIZE }, - { WSAEPROTOTYPE, NNG_ESYSERR + WSAEPROTOTYPE }, - { WSAENOPROTOOPT, NNG_ENOTSUP }, - { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, - { WSAEPROTONOSUPPORT, NNG_ENOTSUP }, - { WSAEADDRINUSE, NNG_EADDRINUSE }, - { WSAEADDRNOTAVAIL, NNG_EADDRINVAL }, - { WSAENETDOWN, NNG_EUNREACHABLE }, - { WSAENETUNREACH, NNG_EUNREACHABLE }, - { WSAECONNABORTED, NNG_ETIMEDOUT }, - { WSAECONNRESET, NNG_ECLOSED }, - { WSAENOBUFS, NNG_ENOMEM }, - { WSAEISCONN, NNG_ESYSERR + WSAEISCONN }, - { WSAENOTCONN, NNG_ECLOSED }, - { WSAESHUTDOWN, NNG_ECLOSED }, - { WSAETOOMANYREFS, NNG_ESYSERR + WSAETOOMANYREFS }, - { WSAETIMEDOUT, NNG_ETIMEDOUT }, - { WSAECONNREFUSED, NNG_ECONNREFUSED }, - { WSAELOOP, NNG_ESYSERR + WSAELOOP }, - { WSAENAMETOOLONG, NNG_ESYSERR + WSAENAMETOOLONG }, - { WSAEHOSTDOWN, NNG_EUNREACHABLE }, - { WSAEHOSTUNREACH, NNG_EUNREACHABLE }, - { WSAENOTEMPTY, NNG_ESYSERR + WSAENOTEMPTY }, - { WSAEPROCLIM, NNG_ESYSERR + WSAEPROCLIM }, - { WSAEUSERS, NNG_ESYSERR + WSAEUSERS }, - { WSAEDQUOT, NNG_ESYSERR + WSAEDQUOT }, - { WSAESTALE, NNG_ESYSERR + WSAESTALE }, - { WSAEREMOTE, NNG_ESYSERR + WSAEREMOTE }, - { WSASYSNOTREADY, NNG_ESYSERR + WSASYSNOTREADY }, - { WSAVERNOTSUPPORTED, NNG_ENOTSUP }, - { WSANOTINITIALISED, NNG_ESYSERR + WSANOTINITIALISED }, - { WSAEDISCON, NNG_ECLOSED }, - { WSAENOMORE, NNG_ESYSERR + WSAENOMORE }, - { WSAECANCELLED, NNG_ESYSERR + WSAECANCELLED }, - { WSAEINVALIDPROVIDER, NNG_ESYSERR + WSAEINVALIDPROVIDER }, - { WSAEPROVIDERFAILEDINIT, NNG_ESYSERR + WSAEPROVIDERFAILEDINIT }, - { WSASYSCALLFAILURE, NNG_ESYSERR + WSASYSCALLFAILURE }, - { WSASERVICE_NOT_FOUND, NNG_ESYSERR + WSASERVICE_NOT_FOUND }, - { WSATYPE_NOT_FOUND, NNG_ESYSERR + WSATYPE_NOT_FOUND }, - { WSA_E_CANCELLED, NNG_ESYSERR + WSA_E_CANCELLED }, - { WSAEREFUSED, NNG_ESYSERR + WSAEREFUSED }, - { WSAHOST_NOT_FOUND, NNG_EADDRINVAL }, - { WSATRY_AGAIN, NNG_EAGAIN }, - { WSANO_RECOVERY, NNG_ESYSERR + WSANO_RECOVERY }, - { WSANO_DATA, NNG_EADDRINVAL }, - // Eliding all the QoS related errors. - // Must be Last!! - { 0, 0 }, - // clang-format on +static int nni_win_tcp_pipe_start(nni_win_event *, nni_aio *); +static void nni_win_tcp_pipe_finish(nni_win_event *, nni_aio *); +static void nni_win_tcp_pipe_cancel(nni_win_event *); + +static nni_win_event_ops nni_win_tcp_pipe_ops = { + .wev_start = nni_win_tcp_pipe_start, + .wev_finish = nni_win_tcp_pipe_finish, + .wev_cancel = nni_win_tcp_pipe_cancel, +}; + +static int nni_win_tcp_acc_start(nni_win_event *, nni_aio *); +static void nni_win_tcp_acc_finish(nni_win_event *, nni_aio *); +static void nni_win_tcp_acc_cancel(nni_win_event *); + +static nni_win_event_ops nni_win_tcp_acc_ops = { + .wev_start = nni_win_tcp_acc_start, + .wev_finish = nni_win_tcp_acc_finish, + .wev_cancel = nni_win_tcp_acc_cancel, }; +static int nni_win_tcp_con_start(nni_win_event *, nni_aio *); +static void nni_win_tcp_con_finish(nni_win_event *, nni_aio *); +static void nni_win_tcp_con_cancel(nni_win_event *); -int -nni_winsock_error(int werr) +static nni_win_event_ops nni_win_tcp_con_ops = { + .wev_start = nni_win_tcp_con_start, + .wev_finish = nni_win_tcp_con_finish, + .wev_cancel = nni_win_tcp_con_cancel, +}; + +static void +nni_win_tcp_sockinit(SOCKET s) { - int i; + BOOL yes; + DWORD no; - if (werr == 0) { - return (0); - } + // Don't inherit the handle (CLOEXEC really). + SetHandleInformation((HANDLE) s, HANDLE_FLAG_INHERIT, 0); - for (i = 0; nni_plat_wsa_errnos[i].nng_err != 0; i++) { - if (werr == nni_plat_wsa_errnos[i].wsa_err) { - return (nni_plat_wsa_errnos[i].nng_err); - } - } - // Other system errno. - return (NNG_ESYSERR + werr); -} + no = 0; + (void) setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); + // Also disable Nagle. We are careful to group data with WSASend, + // and latency is king for most of our users. (Consider adding + // a method to enable this later.) + yes = 1; + (void) setsockopt( + s, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, sizeof(yes)); +} static int -nni_plat_to_sockaddr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa) +nni_win_tcp_addr(SOCKADDR_STORAGE *ss, const nni_sockaddr *sa) { - struct sockaddr_in *sin; - struct sockaddr_in6 *sin6; + SOCKADDR_IN * sin; + SOCKADDR_IN6 *sin6; switch (sa->s_un.s_family) { case NNG_AF_INET: sin = (void *) ss; - memset(sin, 0, sizeof (*sin)); - sin->sin_family = PF_INET; - sin->sin_port = sa->s_un.s_in.sa_port; + memset(sin, 0, sizeof(*sin)); + sin->sin_family = PF_INET; + sin->sin_port = sa->s_un.s_in.sa_port; sin->sin_addr.s_addr = sa->s_un.s_in.sa_addr; - return (sizeof (*sin)); + return (sizeof(*sin)); case NNG_AF_INET6: sin6 = (void *) ss; - memset(sin6, 0, sizeof (*sin6)); + memset(sin6, 0, sizeof(*sin6)); sin6->sin6_family = PF_INET6; - sin6->sin6_port = sa->s_un.s_in6.sa_port; + sin6->sin6_port = sa->s_un.s_in6.sa_port; memcpy(sin6->sin6_addr.s6_addr, sa->s_un.s_in6.sa_addr, 16); - return (sizeof (*sin6)); + return (sizeof(*sin6)); } return (-1); } - static int -nni_plat_from_sockaddr(nni_sockaddr *sa, const struct sockaddr *ss) +nni_win_tcp_pipe_start(nni_win_event *evt, nni_aio *aio) { - const struct sockaddr_in *sin; - const struct sockaddr_in6 *sin6; - - memset(sa, 0, sizeof (*sa)); - switch (ss->sa_family) { - case PF_INET: - sin = (const void *) ss; - sa->s_un.s_in.sa_family = NNG_AF_INET; - sa->s_un.s_in.sa_port = sin->sin_port; - sa->s_un.s_in.sa_addr = sin->sin_addr.s_addr; - return (0); - - case PF_INET6: - sin6 = (const void *) ss; - sa->s_un.s_in6.sa_family = NNG_AF_INET6; - sa->s_un.s_in6.sa_port = sin6->sin6_port; - memcpy(sa->s_un.s_in6.sa_addr, sin6->sin6_addr.s6_addr, 16); - return (0); - } - return (-1); -} + int rv; + SOCKET s; + WSABUF iov[4]; + DWORD niov; + DWORD flags; + nni_plat_tcp_pipe *pipe = evt->ptr; + int i; + NNI_ASSERT(aio->a_niov > 0); + NNI_ASSERT(aio->a_niov <= 4); + NNI_ASSERT(aio->a_iov[0].iov_len > 0); + NNI_ASSERT(aio->a_iov[0].iov_buf != NULL); -int -nni_plat_lookup_host(const char *host, nni_sockaddr *addr, int flags) -{ - ADDRINFO hint; - ADDRINFO *ai; - - ZeroMemory(&hint, sizeof (hint)); - hint.ai_flags = AI_PASSIVE | AI_ALL; - hint.ai_socktype = SOCK_STREAM; - hint.ai_protocol = IPPROTO_TCP; - - // XXX: For some reason, using IPv6 (AF_INET6) leads to surprising - // results, particularly for the AI_PASSIVE NULL host, where the - // documented behavior is that a single zeroed address works. That - // does not seem to be what we get back, and attempts to bind to that - // specifically also seem to fail. Investigation is called for. - // For now we juse use AF_INET if HOST == NULL. - if (flags & NNI_FLAG_IPV4ONLY) { - hint.ai_family = AF_INET; - } else { - hint.ai_family = AF_UNSPEC; - } - if (host == NULL) { - // See above about why we had to do this terrible thing. - // We need to remove this before 1.0. - hint.ai_family = AF_INET; - } + niov = aio->a_niov; - if (getaddrinfo(host, "1", &hint, &ai) != 0) { - return (NNG_EADDRINVAL); - } - if (nni_plat_from_sockaddr(addr, ai->ai_addr) < 0) { - freeaddrinfo(ai); - return (NNG_EADDRINVAL); + // Put the AIOs in Windows form. + for (i = 0; i < aio->a_niov; i++) { + iov[i].buf = aio->a_iov[i].iov_buf; + iov[i].len = aio->a_iov[i].iov_len; } - freeaddrinfo(ai); - return (0); -} + if ((s = pipe->s) == INVALID_SOCKET) { + evt->status = ERROR_INVALID_HANDLE; + evt->count = 0; + return (1); + } -int -nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) -{ - nni_plat_tcp_ep *ep; + // Note that the IOVs for the event were prepared on entry already. + // The actual aio's iov array we don't touch. - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); + evt->count = 0; + flags = 0; + if (evt == &pipe->snd_ev) { + rv = WSASend(s, iov, niov, NULL, flags, &evt->olpd, NULL); + } else { + rv = WSARecv(s, iov, niov, NULL, &flags, &evt->olpd, NULL); } - ZeroMemory(ep, sizeof (*ep)); - ep->s = INVALID_SOCKET; - // XXX: no need to create event? - ep->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (ep->olpd.hEvent == INVALID_HANDLE_VALUE) { - return (nni_win_error(GetLastError())); + if ((rv == SOCKET_ERROR) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + evt->status = rv; + evt->count = 0; + return (1); } - // XXX: save URL, do lookups? etc. etc. - - *epp = ep; - return (NNG_ENOTSUP); + // Wait for the I/O completion event. Note that when an I/O + // completes immediately, the I/O completion packet is still + // delivered. + return (0); } - -void -nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) +static void +nni_win_tcp_pipe_cancel(nni_win_event *evt) { -} + nni_plat_tcp_pipe *pipe = evt->ptr; + if (CancelIoEx((HANDLE) pipe->s, &evt->olpd)) { + DWORD cnt; -void -nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) -{ + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult((HANDLE) pipe->s, &evt->olpd, &cnt, TRUE); + } } - -extern int -nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) +static void +nni_win_tcp_pipe_finish(nni_win_event *evt, nni_aio *aio) { - return (NNG_ENOTSUP); + int rv; + DWORD cnt; + + cnt = evt->count; + nni_win_error(evt->status), evt->count); + if ((rv = evt->status) == 0) { + int i; + aio->a_count += cnt; + + while (cnt > 0) { + // If we didn't write the first full iov, + // then we're done for now. Record progress + // and move on. + if (cnt < aio->a_iov[0].iov_len) { + aio->a_iov[0].iov_len -= cnt; + aio->a_iov[0].iov_buf = + (char *) aio->a_iov[0].iov_buf + cnt; + break; + } + + // We consumed the full iov, so just move the + // remaininng ones up, and decrement count handled. + cnt -= aio->a_iov[0].iov_len; + for (i = 1; i < aio->a_niov; i++) { + aio->a_iov[i - 1] = aio->a_iov[i]; + } + NNI_ASSERT(aio->a_niov > 0); + aio->a_niov--; + } + + if (aio->a_niov > 0) { + // If we have more to do, submit it! + nni_win_event_resubmit(evt, aio); + return; + } + } + + // All done; hopefully successfully. + nni_aio_finish(aio, nni_win_error(rv), aio->a_count); } - -extern void -nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) +static int +nni_win_tcp_pipe_init(nni_plat_tcp_pipe **pipep, SOCKET s) { -} + nni_plat_tcp_pipe *pipe; + int rv; - -extern void -nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) -{ + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + rv = nni_win_event_init(&pipe->rcv_ev, &nni_win_tcp_pipe_ops, pipe); + if (rv != 0) { + nni_plat_tcp_pipe_fini(pipe); + return (rv); + } + rv = nni_win_event_init(&pipe->snd_ev, &nni_win_tcp_pipe_ops, pipe); + if (rv != 0) { + nni_plat_tcp_pipe_fini(pipe); + return (rv); + } + nni_win_tcp_sockinit(s); + pipe->s = s; + *pipep = pipe; + return (0); } - void -nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *p, nni_aio *aio) +nni_plat_tcp_pipe_send(nni_plat_tcp_pipe *pipe, nni_aio *aio) { + nni_win_event_submit(&pipe->snd_ev, aio); } - void -nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *p, nni_aio *aio) +nni_plat_tcp_pipe_recv(nni_plat_tcp_pipe *pipe, nni_aio *aio) { + nni_win_event_submit(&pipe->rcv_ev, aio); } - void -nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *p) +nni_plat_tcp_pipe_close(nni_plat_tcp_pipe *pipe) { -} + SOCKET s; + + nni_win_event_close(&pipe->rcv_ev); + if ((s = pipe->s) != INVALID_SOCKET) { + pipe->s = INVALID_SOCKET; + closesocket(s); + } +} void -nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *p) +nni_plat_tcp_pipe_fini(nni_plat_tcp_pipe *pipe) { -} + nni_plat_tcp_pipe_close(pipe); + nni_win_event_fini(&pipe->snd_ev); + nni_win_event_fini(&pipe->rcv_ev); + NNI_FREE_STRUCT(pipe); +} -#if 0 +extern int nni_tcp_parse_url(char *, char **, char **, char **, char **); int -nni_plat_tcp_send_old(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) +nni_plat_tcp_ep_init(nni_plat_tcp_ep **epp, const char *url, int mode) { - WSABUF iov[4]; // We never have more than 3 at present - int i; - int rv; - DWORD nsent; - DWORD resid; - DWORD flags; - WSAOVERLAPPED *olp = &s->send_olpd; + char buf[NNG_MAXADDRLEN]; + nni_plat_tcp_ep *ep; + char * rhost; + char * rserv; + char * lhost; + char * lserv; + int rv; + nni_aio aio; + SOCKET s; + DWORD nbytes; + GUID guid1 = WSAID_CONNECTEX; + GUID guid2 = WSAID_ACCEPTEX; - if (cnt > 4) { - return (NNG_EINVAL); + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); } + ZeroMemory(ep, sizeof(ep)); - for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) { - iov[i].buf = iovs[i].iov_buf; - iov[i].len = (DWORD) iovs[i].iov_len; - } + ep->mode = mode; + ep->s = INVALID_SOCKET; + + nni_aio_init(&aio, NULL, NULL); - i = 0; - while (resid) { - flags = 0; - rv = WSASend(s->s, &iov[i], cnt, &nsent, flags, olp, NULL); - if (rv == SOCKET_ERROR) { - if ((rv = WSAGetLastError()) != WSA_IO_PENDING) { - return (nni_winsock_error(rv)); - } + snprintf(buf, sizeof(buf), "%s", url); + if (mode == NNI_EP_MODE_DIAL) { + rv = nni_tcp_parse_url(buf, &rhost, &rserv, &lhost, &lserv); + if (rv != 0) { + goto fail; } - flags = 0; - if (!WSAGetOverlappedResult(s->s, olp, &nsent, TRUE, &flags)) { - return (nni_winsock_error(WSAGetLastError())); + // Have to ahve a remote destination. + if ((rhost == NULL) || (rserv == NULL)) { + rv = NNG_EADDRINVAL; + goto fail; } + } else { + rv = nni_tcp_parse_url(buf, &lhost, &lserv, &rhost, &rserv); + if (rv != 0) { + goto fail; + } + // Remote destination makes no sense when listening. + if ((rhost != NULL) || (rserv != NULL)) { + rv = NNG_EADDRINVAL; + goto fail; + } + if (lserv == NULL) { + // missing port to listen on! + rv = NNG_EADDRINVAL; + goto fail; + } + } - if (nsent > resid) { - nni_panic("WSASend says it sent too much"); + if ((rserv != NULL) || (rhost != NULL)) { + nni_plat_tcp_resolv(rhost, rserv, NNG_AF_INET6, 0, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto fail; } + ep->remlen = nni_win_tcp_addr(&ep->remaddr, &aio.a_addrs[0]); + } - resid -= nsent; - while (nsent) { - if (iov[i].len <= nsent) { - nsent -= iov[i].len; - i++; - cnt--; - } else { - iov[i].len -= nsent; - iov[i].buf += nsent; - nsent = 0; - } + if ((lserv != NULL) || (lhost != NULL)) { + nni_plat_tcp_resolv(lhost, lserv, NNG_AF_INET6, 1, &aio); + nni_aio_wait(&aio); + if ((rv = nni_aio_result(&aio)) != 0) { + goto fail; } + ep->loclen = nni_win_tcp_addr(&ep->locaddr, &aio.a_addrs[0]); + } + + // Create a scratch socket for use with ioctl. + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + goto fail; + } + + // Look up the function pointer. + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, + sizeof(guid1), &ep->connectex, sizeof(ep->connectex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + goto fail; + } + if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, + sizeof(guid2), &ep->acceptex, sizeof(ep->acceptex), &nbytes, + NULL, NULL) == SOCKET_ERROR) { + rv = nni_win_error(GetLastError()); + goto fail; + } + closesocket(s); + s = INVALID_SOCKET; + + // Now initialize the win events for later use. + rv = nni_win_event_init(&ep->acc_ev, &nni_win_tcp_acc_ops, ep); + if (rv != 0) { + goto fail; + } + rv = nni_win_event_init(&ep->con_ev, &nni_win_tcp_con_ops, ep); + if (rv != 0) { + goto fail; } + nni_aio_fini(&aio); + *epp = ep; return (0); + +fail: + if (s != INVALID_SOCKET) { + closesocket(s); + } + nni_plat_tcp_ep_fini(ep); + nni_aio_fini(&aio); + return (rv); } +void +nni_plat_tcp_ep_close(nni_plat_tcp_ep *ep) +{ + nni_win_event_close(&ep->acc_ev); + nni_win_event_close(&ep->con_ev); + if (ep->s != INVALID_SOCKET) { + closesocket(ep->s); + ep->s = INVALID_SOCKET; + } + if (ep->acc_s != INVALID_SOCKET) { + closesocket(ep->acc_s); + } +} -int -nni_plat_tcp_recv_old(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) +void +nni_plat_tcp_ep_fini(nni_plat_tcp_ep *ep) { - WSABUF iov[4]; // We never have more than 3 at present - int i; - int rv; - DWORD resid; - DWORD nrecv; - DWORD flags; - WSAOVERLAPPED *olp = &s->recv_olpd; + nni_plat_tcp_ep_close(ep); + NNI_FREE_STRUCT(ep); +} - if (cnt > 4) { +static int +nni_win_tcp_listen(nni_plat_tcp_ep *ep) +{ + int rv; + BOOL yes; + SOCKET s; + + if (ep->mode != NNI_EP_MODE_LISTEN) { return (NNG_EINVAL); } - - for (i = 0, resid = 0; i < cnt; resid += iov[i].len, i++) { - iov[i].buf = iovs[i].iov_buf; - iov[i].len = (DWORD) iovs[i].iov_len; + if (ep->started) { + return (NNG_EBUSY); } - i = 0; - while (resid) { - flags = 0; - rv = WSARecv(s->s, &iov[i], cnt, &nrecv, &flags, olp, NULL); - if (rv == SOCKET_ERROR) { - if ((rv = WSAGetLastError()) != WSA_IO_PENDING) { - return (nni_winsock_error(rv)); - } - } - flags = 0; - if (!WSAGetOverlappedResult(s->s, olp, &nrecv, TRUE, &flags)) { - return (nni_winsock_error(WSAGetLastError())); - } + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + rv = nni_win_error(GetLastError()); + goto fail; + } - if (nrecv > resid) { - nni_panic("WSARecv says it read too much!"); - } + nni_win_tcp_sockinit(s); - resid -= nrecv; - while (nrecv) { - if (iov[i].len <= nrecv) { - nrecv -= iov[i].len; - i++; - cnt--; - } else { - iov[i].len -= nrecv; - iov[i].buf += nrecv; - nrecv = 0; - } - } + if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { + goto fail; } - return (0); -} + // Make sure that we use the address exclusively. Windows lets + // others hijack us by default. + yes = 1; + rv = setsockopt( + s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, sizeof(yes)); + if (rv != 0) { + rv = nni_win_error(GetLastError()); + goto fail; + } + if (bind(s, (struct sockaddr *) &ep->locaddr, ep->loclen) != 0) { + rv = nni_win_error(GetLastError()); + goto fail; + } -static void -nni_plat_tcp_setopts(SOCKET fd) -{ - BOOL yes; - DWORD no; + if (listen(s, SOMAXCONN) != 0) { + rv = nni_win_error(GetLastError()); + goto fail; + } - // Don't inherit the handle (CLOEXEC really). - SetHandleInformation((HANDLE) fd, HANDLE_FLAG_INHERIT, 0); + ep->s = s; + ep->started = 1; - no = 0; - (void) setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, - sizeof (no)); + return (0); - // Also disable Nagle. We are careful to group data with WSASend, - // and latency is king for most of our users. (Consider adding - // a method to enable this later.) - yes = 1; - (void) setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &yes, - sizeof (yes)); +fail: + if (s != INVALID_SOCKET) { + closesocket(s); + } + return (rv); } - int -nni_plat_tcp_init(nni_plat_tcpsock *s) +nni_plat_tcp_ep_listen(nni_plat_tcp_ep *ep) { int rv; - ZeroMemory(s, sizeof (*s)); - s->s = INVALID_SOCKET; - s->recv_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->recv_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - return (NNG_ESYSERR+rv); - } - s->send_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->send_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - CloseHandle(s->recv_olpd.hEvent); - return (NNG_ESYSERR+rv); - } - s->conn_olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (s->conn_olpd.hEvent == INVALID_HANDLE_VALUE) { - rv = GetLastError(); - CloseHandle(s->send_olpd.hEvent); - CloseHandle(s->recv_olpd.hEvent); - return (NNG_ESYSERR+rv); - } - return (0); + nni_mtx_lock(&ep->acc_ev.mtx); + rv = nni_win_tcp_listen(ep); + nni_mtx_unlock(&ep->acc_ev.mtx); + return (rv); } - -static int -nni_plat_tcp_open(nni_plat_tcpsock *s, int fam) +static void +nni_win_tcp_acc_cancel(nni_win_event *evt) { - int rv; - DWORD nbytes; - GUID guid1 = WSAID_CONNECTEX; - GUID guid2 = WSAID_ACCEPTEX; + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s = ep->s; - s->s = socket(fam, SOCK_STREAM, IPPROTO_TCP); - if (s->s == INVALID_SOCKET) { - rv = WSAGetLastError(); - return (nni_winsock_error(rv)); - } + if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) { + DWORD cnt; - if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER, - &guid1, sizeof (guid1), &s->connectex, sizeof (s->connectex), - &nbytes, NULL, NULL) == SOCKET_ERROR) { - nni_panic("failed lookup for ConnectEx function"); - } - if (WSAIoctl(s->s, SIO_GET_EXTENSION_FUNCTION_POINTER, - &guid2, sizeof (guid2), &s->acceptex, sizeof (s->acceptex), - &nbytes, NULL, NULL) == SOCKET_ERROR) { - nni_panic("failed lookup for AcceptEx function"); + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE); } - - nni_plat_tcp_setopts(s->s); - s->family = fam; - - return (0); } - static void -nni_plat_tcp_close(nni_plat_tcpsock *s) +nni_win_tcp_acc_finish(nni_win_event *evt, nni_aio *aio) { - SOCKET fd; + nni_plat_tcp_ep * ep = evt->ptr; + nni_plat_tcp_pipe *pipe; + SOCKET s; + int rv; + + s = ep->acc_s; + ep->acc_s = INVALID_SOCKET; - if ((fd = s->s) != INVALID_SOCKET) { - s->s = INVALID_SOCKET; - (void) shutdown(fd, SD_BOTH); - (void) CancelIoEx((HANDLE) fd, &s->conn_olpd); - (void) CancelIoEx((HANDLE) fd, &s->recv_olpd); - (void) CancelIoEx((HANDLE) fd, &s->send_olpd); - (void) closesocket(fd); + if (s == INVALID_SOCKET) { + return; } -} + if ((rv = evt->status) != 0) { + closesocket(s); + nni_aio_finish(aio, nni_win_error(rv), 0); + return; + } -void -nni_plat_tcp_fini(nni_plat_tcpsock *s) -{ - SOCKET fd; - - if ((fd = s->s) != INVALID_SOCKET) { - s->s = INVALID_SOCKET; - (void) CancelIoEx((HANDLE) fd, &s->conn_olpd); - (void) CancelIoEx((HANDLE) fd, &s->recv_olpd); - (void) CancelIoEx((HANDLE) fd, &s->send_olpd); - (void) closesocket(fd); - } - CloseHandle(s->recv_olpd.hEvent); - CloseHandle(s->send_olpd.hEvent); - CloseHandle(s->conn_olpd.hEvent); + if (((rv = nni_win_iocp_register((HANDLE) s)) != 0) || + ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0)) { + closesocket(s); + nni_aio_finish(aio, rv, 0); + return; + } + + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); } +static int +nni_win_tcp_acc_start(nni_win_event *evt, nni_aio *aio) +{ + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s = ep->s; + SOCKET acc_s; + DWORD cnt; + + acc_s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (acc_s == INVALID_SOCKET) { + evt->status = GetLastError(); + evt->count = 0; + return (1); + } + ep->acc_s = acc_s; + + if (!ep->acceptex(s, acc_s, ep->buf, 0, 256, 256, &cnt, &evt->olpd)) { + int rv = GetLastError(); + switch (rv) { + case ERROR_IO_PENDING: + // Normal asynchronous operation. Wait for + // completion. + return (0); + + default: + // Fast-fail (synchronous). + evt->status = rv; + evt->count = 0; + return (1); + } + } + + // Synch completion right now. I/O completion packet delivered + // already. + return (0); +} void -nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +nni_plat_tcp_ep_accept(nni_plat_tcp_ep *ep, nni_aio *aio) { - nni_plat_tcp_close(s); + aio->a_pipe = NULL; + nni_win_event_submit(&ep->acc_ev, aio); } - -// nni_plat_tcp_listen creates a file descriptor bound to the given address. -// This basically does the equivalent of socket, bind, and listen. We have -// chosen a default value for the listen backlog of 128, which should be -// plenty. (If it isn't, then the accept thread can't get enough resources -// to keep up, and your clients are going to experience bad things. Normally -// the actual backlog should hover near 0 anyway.) -int -nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +static void +nni_win_tcp_con_cancel(nni_win_event *evt) { - int len; - SOCKADDR_STORAGE ss; - ULONG yes; - int rv; + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s = ep->s; - len = nni_plat_to_sockaddr(&ss, addr); - if (len < 0) { - return (NNG_EADDRINVAL); - } + if ((s != INVALID_SOCKET) && CancelIoEx((HANDLE) s, &evt->olpd)) { + DWORD cnt; - if ((rv = nni_plat_tcp_open(s, ss.ss_family)) != 0) { - return (rv); + // If we canceled, make sure that we've completely + // finished with the overlapped. + GetOverlappedResult((HANDLE) s, &evt->olpd, &cnt, TRUE); } +} - // Make sure that we use the address exclusively. Windows lets - // others hijack us by default. - yes = 1; - if (setsockopt(s->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *) &yes, - sizeof (yes)) == SOCKET_ERROR) { - rv = WSAGetLastError(); - nni_plat_tcp_close(s); - return (nni_winsock_error(rv)); - } - if (bind(s->s, (struct sockaddr *) &ss, len) != 0) { - rv = WSAGetLastError(); - nni_plat_tcp_close(s); - return (nni_winsock_error(rv)); +static void +nni_win_tcp_con_finish(nni_win_event *evt, nni_aio *aio) +{ + nni_plat_tcp_ep * ep = evt->ptr; + nni_plat_tcp_pipe *pipe; + SOCKET s; + int rv; + + s = ep->s; + ep->s = INVALID_SOCKET; + + if ((rv = evt->status) != 0) { + closesocket(s); + nni_aio_finish(aio, nni_win_error(rv), 0); + return; } + // The socket was already registere with the IOCP. - // Listen -- 128 depth is probably sufficient. If it isn't, other - // bad things are going to happen. - if (listen(s->s, 128) != 0) { - rv = WSAGetLastError(); - nni_plat_tcp_close(s); - return (nni_winsock_error(rv)); + if ((rv = nni_win_tcp_pipe_init(&pipe, s)) != 0) { + // The new pipe is already fine for us. Discard + // the old one, since failed to be able to use it. + closesocket(s); + nni_aio_finish(aio, rv, 0); + return; } - return (0); + aio->a_pipe = pipe; + nni_aio_finish(aio, 0, 0); } - -// nni_plat_tcp_connect establishes an outbound connection. It the -// bind address is not null, then it will attempt to bind to the local -// address specified first. -int -nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, - const nni_sockaddr *bindaddr) +static int +nni_win_tcp_con_start(nni_win_event *evt, nni_aio *aio) { - int len; - SOCKADDR_STORAGE ss; + nni_plat_tcp_ep *ep = evt->ptr; + SOCKET s; SOCKADDR_STORAGE bss; - WSAOVERLAPPED *olp = &s->conn_olpd; - DWORD nbytes; - DWORD flags; - int rv; + int len; + int rv; - len = nni_plat_to_sockaddr(&ss, addr); - if (len < 0) { - return (NNG_EADDRINVAL); + s = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + evt->status = GetLastError(); + evt->count = 0; + return (1); } - if (bindaddr != NULL) { - if (bindaddr->s_un.s_family != addr->s_un.s_family) { - return (NNG_EADDRINVAL); - } - if (nni_plat_to_sockaddr(&bss, bindaddr) < 0) { - return (NNG_EADDRINVAL); - } - } else { - ZeroMemory(&bss, sizeof (bss)); - bss.ss_family = ss.ss_family; - } - - if ((rv = nni_plat_tcp_open(s, ss.ss_family)) != 0) { - return (rv); - } + nni_win_tcp_sockinit(s); - // ConnectEx must always be bound first. - if (bind(s->s, (struct sockaddr *) &bss, len) < 0) { - rv = WSAGetLastError(); - nni_plat_tcp_close(s); - return (nni_winsock_error(rv)); - } - - if (!s->connectex(s->s, (struct sockaddr *) &ss, len, NULL, 0, NULL, - olp)) { - if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) { - nni_plat_tcp_close(s); - return (nni_winsock_error(rv)); + // Windows ConnectEx requires the socket to be bound first. + if (ep->loclen != 0) { + bss = ep->locaddr; + len = ep->loclen; + } else { + ZeroMemory(&bss, sizeof(bss)); + bss.ss_family = ep->remaddr.ss_family; + len = ep->remlen; + } + if (bind(s, (struct sockaddr *) &bss, len) < 0) { + evt->status = GetLastError(); + evt->count = 0; + closesocket(s); + return (1); + } + // Register with the I/O completion port so we can get the + // events for the next call. + if ((rv = nni_win_iocp_register((HANDLE) s)) != 0) { + closesocket(s); + evt->status = rv; + evt->count = 0; + return (1); + } + + ep->s = s; + if (!ep->connectex(s, (struct sockaddr *) &ep->remaddr, ep->remlen, + NULL, 0, NULL, &evt->olpd)) { + if ((rv = GetLastError()) != ERROR_IO_PENDING) { + closesocket(s); + ep->s = INVALID_SOCKET; + evt->status = rv; + evt->count = 0; + return (1); } } - nbytes = flags = 0; - if (!WSAGetOverlappedResult(s->s, olp, &nbytes, TRUE, &flags)) { - rv = WSAGetLastError(); - nni_plat_tcp_close(s); - return (nni_winsock_error(rv)); - } return (0); } +extern void +nni_plat_tcp_ep_connect(nni_plat_tcp_ep *ep, nni_aio *aio) +{ + aio->a_pipe = NULL; + nni_win_event_submit(&ep->con_ev, aio); +} int -nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +nni_win_tcp_sysinit(void) { - DWORD nbytes; - DWORD flags; - WSAOVERLAPPED *olp = &s->conn_olpd; - char ainfo[512]; - int rv; - - if ((rv = nni_plat_tcp_open(s, server->family)) != 0) { - return (rv); - } - - // 256 > (sizeof (SOCKADDR_STORAGE) + 16) - nbytes = 0; - if (!s->acceptex(server->s, s->s, ainfo, 0, 256, 256, &nbytes, olp)) { - if ((rv = WSAGetLastError()) != ERROR_IO_PENDING) { - return (nni_winsock_error(rv)); - } - } - nbytes = flags = 0; - if (!WSAGetOverlappedResult(server->s, olp, &nbytes, TRUE, &flags)) { - rv = WSAGetLastError(); - return (nni_winsock_error(rv)); + WSADATA data; + WORD ver; + ver = MAKEWORD(2, 2); + if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { + NNI_ASSERT(LOBYTE(data.wVersion) == 2); + NNI_ASSERT(HIBYTE(data.wVersion) == 2); + return (nni_win_error(GetLastError())); } return (0); } - -#endif - +void +nni_win_tcp_sysfini(void) +{ + WSACleanup(); +} #else diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c index ed6149c0..861fbc76 100644 --- a/src/platform/windows/win_pipe.c +++ b/src/platform/windows/win_pipe.c @@ -95,7 +95,7 @@ nni_plat_pipe_open(int *wfdp, int *rfdp) return (0); fail: - rv = nni_winsock_error(WSAGetLastError()); + rv = nni_win_error(GetLastError()); if (afd != INVALID_SOCKET) { closesocket(afd); } diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 693516bd..0859b4a4 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -148,24 +148,16 @@ nni_plat_init(int (*helper)(void)) AcquireSRWLockExclusive(&lock); if (!inited) { - WSADATA data; - WORD ver; - ver = MAKEWORD(2, 2); - if (WSAStartup(MAKEWORD(2, 2), &data) != 0) { - if ((LOBYTE(data.wVersion) != 2) || - (HIBYTE(data.wVersion) != 2)) { - nni_panic("got back wrong winsock ver"); - } - rv = NNG_EINVAL; + if ((rv = nni_win_iocp_sysinit()) != 0) { goto out; } - if ((rv = nni_win_iocp_sysinit()) != 0) { + if ((rv = nni_win_ipc_sysinit()) != 0) { goto out; } - if ((rv = nni_win_resolv_sysinit()) != 0) { + if ((rv = nni_win_tcp_sysinit()) != 0) { goto out; } - if ((rv = nni_win_ipc_sysinit()) != 0) { + if ((rv = nni_win_resolv_sysinit()) != 0) { goto out; } @@ -182,8 +174,9 @@ out: void nni_plat_fini(void) { - nni_win_ipc_sysfini(); nni_win_resolv_sysfini(); + nni_win_ipc_sysfini(); + nni_win_tcp_sysfini(); nni_win_iocp_sysfini(); WSACleanup(); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index e60a1b1d..875eb71a 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -567,15 +567,28 @@ nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + ep->closed = 1; nni_plat_tcp_ep_close(ep->tep); + nni_mtx_unlock(&ep->mtx); } static int nni_tcp_ep_bind(void *arg) { nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + return (NNG_ECLOSED); + } - return (nni_plat_tcp_ep_listen(ep->tep)); + rv = nni_plat_tcp_ep_listen(ep->tep); + nni_mtx_unlock(&ep->mtx); + + return (rv); } static void @@ -638,6 +651,10 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&ep->mtx); + } NNI_ASSERT(ep->user_aio == NULL); ep->user_aio = aio; @@ -659,6 +676,10 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&ep->mtx); + } NNI_ASSERT(ep->user_aio == NULL); ep->user_aio = aio; |
