diff options
Diffstat (limited to 'src/platform/windows')
| -rw-r--r-- | src/platform/windows/win_tcplisten.c | 254 |
1 files changed, 154 insertions, 100 deletions
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 084c30bd..021483d4 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -24,11 +24,15 @@ struct nni_tcp_listener { bool started; bool nodelay; // initial value for child conns bool keepalive; // initial value for child conns + bool running; LPFN_ACCEPTEX acceptex; LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs; SOCKADDR_STORAGE ss; nni_mtx mtx; nni_reap_node reap; + nni_win_io accept_io; + int accept_rv; + nni_tcp_conn *pend_conn; }; // tcp_listener_funcs looks up function pointers we need for advanced accept @@ -78,62 +82,42 @@ tcp_listener_funcs(nni_tcp_listener *l) return (0); } +static void tcp_listener_accepted(nni_tcp_listener *l); +static void tcp_listener_doaccept(nni_tcp_listener *l); + static void tcp_accept_cb(nni_win_io *io, int rv, size_t cnt) { - nni_tcp_conn *c = io->ptr; - nni_tcp_listener *l = c->listener; + nni_tcp_listener *l = io->ptr; nni_aio *aio; - int len1; - int len2; - SOCKADDR *sa1; - SOCKADDR *sa2; - BOOL nd; - BOOL ka; + nni_tcp_conn *c; NNI_ARG_UNUSED(cnt); nni_mtx_lock(&l->mtx); - if ((aio = c->conn_aio) == NULL) { - // This case should not occur. The situation would indicate - // a case where the connection was accepted already. - NNI_ASSERT(false); - nni_mtx_unlock(&l->mtx); - return; - } - c->conn_aio = NULL; - nni_aio_set_prov_data(aio, NULL); - nni_aio_list_remove(aio); - if (c->conn_rv != 0) { - rv = c->conn_rv; - } - nd = l->nodelay ? TRUE : FALSE; - ka = l->keepalive ? TRUE : FALSE; - nni_mtx_unlock(&l->mtx); - if (rv != 0) { - nng_stream_free(&c->ops); - nni_aio_finish_error(aio, rv); - return; - } - - len1 = (int) sizeof(c->sockname); - len2 = (int) sizeof(c->peername); - l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); - memcpy(&c->sockname, sa1, len1); - memcpy(&c->peername, sa2, len2); - - (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, - (char *) &l->s, sizeof(l->s)); - - (void) setsockopt( - c->s, SOL_SOCKET, SO_KEEPALIVE, (char *) &ka, sizeof(ka)); - - (void) setsockopt( - c->s, IPPROTO_TCP, TCP_NODELAY, (char *) &nd, sizeof(nd)); + l->running = false; + if ((rv == 0) && (!l->closed)) { + tcp_listener_accepted(l); + } else { + if (l->accept_rv != 0) { + rv = l->accept_rv; + l->accept_rv = 0; + } else if (l->closed) { + rv = NNG_ECLOSED; + } + if ((aio = nni_list_first(&l->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } - nni_aio_set_output(aio, 0, c); - nni_aio_finish(aio, 0, 0); + if ((c = l->pend_conn) != NULL) { + l->pend_conn = NULL; + nng_stream_free(&c->ops); + } + } + tcp_listener_doaccept(l); + nni_mtx_unlock(&l->mtx); } int @@ -148,6 +132,8 @@ nni_tcp_listener_init(nni_tcp_listener **lp) ZeroMemory(l, sizeof(*l)); nni_mtx_init(&l->mtx); nni_aio_list_init(&l->aios); + nni_win_io_init(&l->accept_io, tcp_accept_cb, l); + l->accept_rv = 0; if ((rv = tcp_listener_funcs(l)) != 0) { nni_tcp_listener_fini(l); return (rv); @@ -165,19 +151,22 @@ nni_tcp_listener_init(nni_tcp_listener **lp) void nni_tcp_listener_close(nni_tcp_listener *l) { + nni_aio *aio; + nni_tcp_conn *conn; nni_mtx_lock(&l->mtx); if (!l->closed) { - nni_aio *aio; l->closed = true; + if (!nni_list_empty(&l->aios)) { + CancelIoEx((HANDLE) l->s, &l->accept_io.olpd); + } closesocket(l->s); - - NNI_LIST_FOREACH (&l->aios, aio) { - nni_tcp_conn *c; - - if ((c = nni_aio_get_prov_data(aio)) != NULL) { - c->conn_rv = NNG_ECLOSED; - CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); - } + if ((conn = l->pend_conn) != NULL) { + l->pend_conn = NULL; + nng_stream_free(&conn->ops); + } + while ((aio = nni_list_first(&l->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); } } nni_mtx_unlock(&l->mtx); @@ -193,7 +182,7 @@ nni_tcp_listener_fini(nni_tcp_listener *l) { nni_tcp_listener_close(l); nni_mtx_lock(&l->mtx); - if (!nni_list_empty(&l->aios)) { + if (l->running) { nni_mtx_unlock(&l->mtx); nni_reap(&tcp_listener_reap_list, l); return; @@ -271,77 +260,142 @@ static void tcp_accept_cancel(nni_aio *aio, void *arg, int rv) { nni_tcp_listener *l = arg; - nni_tcp_conn *c; nni_mtx_lock(&l->mtx); - if ((c = nni_aio_get_prov_data(aio)) != NULL) { - if (c->conn_rv == 0) { - c->conn_rv = rv; + if (aio == nni_list_first(&l->aios)) { + l->accept_rv = rv; + CancelIoEx((HANDLE) l->s, &l->accept_io.olpd); + } else { + nni_aio *srch; + NNI_LIST_FOREACH (&l->aios, srch) { + if (srch == aio) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + break; + } } - CancelIoEx((HANDLE) c->s, &c->conn_io.olpd); } nni_mtx_unlock(&l->mtx); } -void -nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +static void +tcp_listener_accepted(nni_tcp_listener *l) { + int len1; + int len2; + SOCKADDR *sa1; + SOCKADDR *sa2; + BOOL nd; + BOOL ka; + nni_tcp_conn *c; + nni_aio *aio; + + aio = nni_list_first(&l->aios); + c = l->pend_conn; + l->pend_conn = NULL; + len1 = (int) sizeof(c->sockname); + len2 = (int) sizeof(c->peername); + ka = l->keepalive; + nd = l->nodelay; + + nni_aio_list_remove(aio); + l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2); + memcpy(&c->sockname, sa1, len1); + memcpy(&c->peername, sa2, len2); + + (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *) &l->s, sizeof(l->s)); + + (void) setsockopt( + c->s, SOL_SOCKET, SO_KEEPALIVE, (char *) &ka, sizeof(ka)); + + (void) setsockopt( + c->s, IPPROTO_TCP, TCP_NODELAY, (char *) &nd, sizeof(nd)); + + nni_aio_set_output(aio, 0, c); + nni_aio_finish(aio, 0, 0); +} + +static void +tcp_listener_doaccept(nni_tcp_listener *l) +{ + nni_aio *aio; SOCKET s; + nni_tcp_conn *c; int rv; DWORD cnt; - nni_tcp_conn *c; + + while ((aio = nni_list_first(&l->aios)) != NULL) { + // Windows requires us to explicitly create the socket + // before calling accept on it. + if (l->closed) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + continue; + } + if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == + INVALID_SOCKET) { + nni_aio_list_remove(aio); + rv = nni_win_error(GetLastError()); + nni_aio_finish_error(aio, rv); + continue; + } + + if ((rv = nni_win_tcp_init(&c, s)) != 0) { + nni_aio_list_remove(aio); + closesocket(s); + nni_aio_finish_error(aio, rv); + continue; + } + c->listener = l; + l->pend_conn = c; + if (l->acceptex(l->s, s, c->buf, 0, 256, 256, &cnt, + &l->accept_io.olpd)) { + // completed synchronously + tcp_listener_accepted(l); + continue; + } + + if ((rv = GetLastError()) == ERROR_IO_PENDING) { + // deferred (will be handled in callback) + l->running = true; + return; + } + + // Fast failure (synchronous.) + nni_aio_list_remove(aio); + nng_stream_free(&c->ops); + nni_aio_finish_error(aio, rv); + } + l->running = false; +} + +void +nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) +{ + int rv; if (nni_aio_begin(aio) != 0) { return; } + nni_mtx_lock(&l->mtx); if (!l->started) { nni_mtx_unlock(&l->mtx); nni_aio_finish_error(aio, NNG_ESTATE); return; } - if (l->closed) { - nni_mtx_unlock(&l->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } - - // Windows requires us to explicitly create the socket before - // calling accept on it. - if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) { - rv = nni_win_error(GetLastError()); - nni_mtx_unlock(&l->mtx); - nni_aio_finish_error(aio, rv); - return; - } - if ((rv = nni_win_tcp_init(&c, s)) != 0) { - nni_mtx_unlock(&l->mtx); - closesocket(s); - nni_aio_finish_error(aio, rv); - return; - } - c->listener = l; - c->conn_aio = aio; - nni_aio_set_prov_data(aio, c); - nni_win_io_init(&c->conn_io, tcp_accept_cb, c); if ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0) { - nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&l->mtx); - nng_stream_free(&c->ops); nni_aio_finish_error(aio, rv); return; } - nni_list_append(&l->aios, aio); - if ((!l->acceptex( - l->s, s, c->buf, 0, 256, 256, &cnt, &c->conn_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Fast failure (synchronous.) - nni_aio_list_remove(aio); - nni_mtx_unlock(&l->mtx); - nng_stream_free(&c->ops); - nni_aio_finish_error(aio, rv); - return; + + nni_aio_list_append(&l->aios, aio); + + if (aio == nni_list_first(&l->aios)) { + tcp_listener_doaccept(l); } nni_mtx_unlock(&l->mtx); } |
