aboutsummaryrefslogtreecommitdiff
path: root/src/platform
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform')
-rw-r--r--src/platform/windows/win_tcplisten.c254
1 files changed, 154 insertions, 100 deletions
diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c
index 084c30bd..021483d4 100644
--- a/src/platform/windows/win_tcplisten.c
+++ b/src/platform/windows/win_tcplisten.c
@@ -24,11 +24,15 @@ struct nni_tcp_listener {
bool started;
bool nodelay; // initial value for child conns
bool keepalive; // initial value for child conns
+ bool running;
LPFN_ACCEPTEX acceptex;
LPFN_GETACCEPTEXSOCKADDRS getacceptexsockaddrs;
SOCKADDR_STORAGE ss;
nni_mtx mtx;
nni_reap_node reap;
+ nni_win_io accept_io;
+ int accept_rv;
+ nni_tcp_conn *pend_conn;
};
// tcp_listener_funcs looks up function pointers we need for advanced accept
@@ -78,62 +82,42 @@ tcp_listener_funcs(nni_tcp_listener *l)
return (0);
}
+static void tcp_listener_accepted(nni_tcp_listener *l);
+static void tcp_listener_doaccept(nni_tcp_listener *l);
+
static void
tcp_accept_cb(nni_win_io *io, int rv, size_t cnt)
{
- nni_tcp_conn *c = io->ptr;
- nni_tcp_listener *l = c->listener;
+ nni_tcp_listener *l = io->ptr;
nni_aio *aio;
- int len1;
- int len2;
- SOCKADDR *sa1;
- SOCKADDR *sa2;
- BOOL nd;
- BOOL ka;
+ nni_tcp_conn *c;
NNI_ARG_UNUSED(cnt);
nni_mtx_lock(&l->mtx);
- if ((aio = c->conn_aio) == NULL) {
- // This case should not occur. The situation would indicate
- // a case where the connection was accepted already.
- NNI_ASSERT(false);
- nni_mtx_unlock(&l->mtx);
- return;
- }
- c->conn_aio = NULL;
- nni_aio_set_prov_data(aio, NULL);
- nni_aio_list_remove(aio);
- if (c->conn_rv != 0) {
- rv = c->conn_rv;
- }
- nd = l->nodelay ? TRUE : FALSE;
- ka = l->keepalive ? TRUE : FALSE;
- nni_mtx_unlock(&l->mtx);
- if (rv != 0) {
- nng_stream_free(&c->ops);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- len1 = (int) sizeof(c->sockname);
- len2 = (int) sizeof(c->peername);
- l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2);
- memcpy(&c->sockname, sa1, len1);
- memcpy(&c->peername, sa2, len2);
-
- (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
- (char *) &l->s, sizeof(l->s));
-
- (void) setsockopt(
- c->s, SOL_SOCKET, SO_KEEPALIVE, (char *) &ka, sizeof(ka));
-
- (void) setsockopt(
- c->s, IPPROTO_TCP, TCP_NODELAY, (char *) &nd, sizeof(nd));
+ l->running = false;
+ if ((rv == 0) && (!l->closed)) {
+ tcp_listener_accepted(l);
+ } else {
+ if (l->accept_rv != 0) {
+ rv = l->accept_rv;
+ l->accept_rv = 0;
+ } else if (l->closed) {
+ rv = NNG_ECLOSED;
+ }
+ if ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
- nni_aio_set_output(aio, 0, c);
- nni_aio_finish(aio, 0, 0);
+ if ((c = l->pend_conn) != NULL) {
+ l->pend_conn = NULL;
+ nng_stream_free(&c->ops);
+ }
+ }
+ tcp_listener_doaccept(l);
+ nni_mtx_unlock(&l->mtx);
}
int
@@ -148,6 +132,8 @@ nni_tcp_listener_init(nni_tcp_listener **lp)
ZeroMemory(l, sizeof(*l));
nni_mtx_init(&l->mtx);
nni_aio_list_init(&l->aios);
+ nni_win_io_init(&l->accept_io, tcp_accept_cb, l);
+ l->accept_rv = 0;
if ((rv = tcp_listener_funcs(l)) != 0) {
nni_tcp_listener_fini(l);
return (rv);
@@ -165,19 +151,22 @@ nni_tcp_listener_init(nni_tcp_listener **lp)
void
nni_tcp_listener_close(nni_tcp_listener *l)
{
+ nni_aio *aio;
+ nni_tcp_conn *conn;
nni_mtx_lock(&l->mtx);
if (!l->closed) {
- nni_aio *aio;
l->closed = true;
+ if (!nni_list_empty(&l->aios)) {
+ CancelIoEx((HANDLE) l->s, &l->accept_io.olpd);
+ }
closesocket(l->s);
-
- NNI_LIST_FOREACH (&l->aios, aio) {
- nni_tcp_conn *c;
-
- if ((c = nni_aio_get_prov_data(aio)) != NULL) {
- c->conn_rv = NNG_ECLOSED;
- CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
- }
+ if ((conn = l->pend_conn) != NULL) {
+ l->pend_conn = NULL;
+ nng_stream_free(&conn->ops);
+ }
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
}
nni_mtx_unlock(&l->mtx);
@@ -193,7 +182,7 @@ nni_tcp_listener_fini(nni_tcp_listener *l)
{
nni_tcp_listener_close(l);
nni_mtx_lock(&l->mtx);
- if (!nni_list_empty(&l->aios)) {
+ if (l->running) {
nni_mtx_unlock(&l->mtx);
nni_reap(&tcp_listener_reap_list, l);
return;
@@ -271,77 +260,142 @@ static void
tcp_accept_cancel(nni_aio *aio, void *arg, int rv)
{
nni_tcp_listener *l = arg;
- nni_tcp_conn *c;
nni_mtx_lock(&l->mtx);
- if ((c = nni_aio_get_prov_data(aio)) != NULL) {
- if (c->conn_rv == 0) {
- c->conn_rv = rv;
+ if (aio == nni_list_first(&l->aios)) {
+ l->accept_rv = rv;
+ CancelIoEx((HANDLE) l->s, &l->accept_io.olpd);
+ } else {
+ nni_aio *srch;
+ NNI_LIST_FOREACH (&l->aios, srch) {
+ if (srch == aio) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ break;
+ }
}
- CancelIoEx((HANDLE) c->s, &c->conn_io.olpd);
}
nni_mtx_unlock(&l->mtx);
}
-void
-nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
+static void
+tcp_listener_accepted(nni_tcp_listener *l)
{
+ int len1;
+ int len2;
+ SOCKADDR *sa1;
+ SOCKADDR *sa2;
+ BOOL nd;
+ BOOL ka;
+ nni_tcp_conn *c;
+ nni_aio *aio;
+
+ aio = nni_list_first(&l->aios);
+ c = l->pend_conn;
+ l->pend_conn = NULL;
+ len1 = (int) sizeof(c->sockname);
+ len2 = (int) sizeof(c->peername);
+ ka = l->keepalive;
+ nd = l->nodelay;
+
+ nni_aio_list_remove(aio);
+ l->getacceptexsockaddrs(c->buf, 0, 256, 256, &sa1, &len1, &sa2, &len2);
+ memcpy(&c->sockname, sa1, len1);
+ memcpy(&c->peername, sa2, len2);
+
+ (void) setsockopt(c->s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
+ (char *) &l->s, sizeof(l->s));
+
+ (void) setsockopt(
+ c->s, SOL_SOCKET, SO_KEEPALIVE, (char *) &ka, sizeof(ka));
+
+ (void) setsockopt(
+ c->s, IPPROTO_TCP, TCP_NODELAY, (char *) &nd, sizeof(nd));
+
+ nni_aio_set_output(aio, 0, c);
+ nni_aio_finish(aio, 0, 0);
+}
+
+static void
+tcp_listener_doaccept(nni_tcp_listener *l)
+{
+ nni_aio *aio;
SOCKET s;
+ nni_tcp_conn *c;
int rv;
DWORD cnt;
- nni_tcp_conn *c;
+
+ while ((aio = nni_list_first(&l->aios)) != NULL) {
+ // Windows requires us to explicitly create the socket
+ // before calling accept on it.
+ if (l->closed) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ continue;
+ }
+ if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) ==
+ INVALID_SOCKET) {
+ nni_aio_list_remove(aio);
+ rv = nni_win_error(GetLastError());
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+
+ if ((rv = nni_win_tcp_init(&c, s)) != 0) {
+ nni_aio_list_remove(aio);
+ closesocket(s);
+ nni_aio_finish_error(aio, rv);
+ continue;
+ }
+ c->listener = l;
+ l->pend_conn = c;
+ if (l->acceptex(l->s, s, c->buf, 0, 256, 256, &cnt,
+ &l->accept_io.olpd)) {
+ // completed synchronously
+ tcp_listener_accepted(l);
+ continue;
+ }
+
+ if ((rv = GetLastError()) == ERROR_IO_PENDING) {
+ // deferred (will be handled in callback)
+ l->running = true;
+ return;
+ }
+
+ // Fast failure (synchronous.)
+ nni_aio_list_remove(aio);
+ nng_stream_free(&c->ops);
+ nni_aio_finish_error(aio, rv);
+ }
+ l->running = false;
+}
+
+void
+nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
+{
+ int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
+
nni_mtx_lock(&l->mtx);
if (!l->started) {
nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, NNG_ESTATE);
return;
}
- if (l->closed) {
- nni_mtx_unlock(&l->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
-
- // Windows requires us to explicitly create the socket before
- // calling accept on it.
- if ((s = socket(l->ss.ss_family, SOCK_STREAM, 0)) == INVALID_SOCKET) {
- rv = nni_win_error(GetLastError());
- nni_mtx_unlock(&l->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
- if ((rv = nni_win_tcp_init(&c, s)) != 0) {
- nni_mtx_unlock(&l->mtx);
- closesocket(s);
- nni_aio_finish_error(aio, rv);
- return;
- }
- c->listener = l;
- c->conn_aio = aio;
- nni_aio_set_prov_data(aio, c);
- nni_win_io_init(&c->conn_io, tcp_accept_cb, c);
if ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0) {
- nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&l->mtx);
- nng_stream_free(&c->ops);
nni_aio_finish_error(aio, rv);
return;
}
- nni_list_append(&l->aios, aio);
- if ((!l->acceptex(
- l->s, s, c->buf, 0, 256, 256, &cnt, &c->conn_io.olpd)) &&
- ((rv = GetLastError()) != ERROR_IO_PENDING)) {
- // Fast failure (synchronous.)
- nni_aio_list_remove(aio);
- nni_mtx_unlock(&l->mtx);
- nng_stream_free(&c->ops);
- nni_aio_finish_error(aio, rv);
- return;
+
+ nni_aio_list_append(&l->aios, aio);
+
+ if (aio == nni_list_first(&l->aios)) {
+ tcp_listener_doaccept(l);
}
nni_mtx_unlock(&l->mtx);
}