diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
| commit | 3730260da3744b549aaa1fe13946a674f924f63c (patch) | |
| tree | 902866876ee71246a299370cbe8f6580d758525c /src/transport | |
| parent | 3b19940dfcd5d3585b1fb1dcf7915a748ae67289 (diff) | |
| download | nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.gz nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.bz2 nng-3730260da3744b549aaa1fe13946a674f924f63c.zip | |
TCP asynchronous working now.
It turns out that I had to fix a number of subtle asynchronous
handling bugs, but now TCP is fully asynchronous. We need to
change the high-level dial and listen interfaces to be async
as well.
Some of the transport APIs have changed here, and I've elected
to change what we expose to consumers as endpoints into seperate
dialers and listeners. Under the hood they are the same, but
it turns out that its helpful to know the intended use of the
endpoint at initialization time.
Scalability still occasionally hangs on Linux. Investigation
pending.
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 38 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 4 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 336 |
3 files changed, 259 insertions, 119 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 52ed582a..0489ea38 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -49,6 +49,7 @@ struct nni_inproc_ep { char addr[NNG_MAXADDRLEN+1]; int mode; int closed; + int started; nni_list_node node; uint16_t proto; nni_cv cv; @@ -217,7 +218,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_inproc_ep *ep; int rv; @@ -229,8 +230,9 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) return (NNG_ENOMEM); } - ep->mode = NNI_INPROC_EP_IDLE; + ep->mode = mode; ep->closed = 0; + ep->started = 0; ep->proto = nni_sock_proto(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -365,7 +367,6 @@ nni_inproc_accept_clients(nni_inproc_ep *server) pair->pipes[1]->peer = pair->pipes[0]->proto; pair->pipes[0]->peer = pair->pipes[1]->proto; pair->refcnt = 2; - client->mode = NNI_INPROC_EP_IDLE; // XXX: ?? nni_inproc_conn_finish(caio, 0); nni_inproc_conn_finish(saio, 0); @@ -390,12 +391,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) nni_inproc_ep *server; int rv; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish(aio, NNG_EINVAL, 0); return; } + if (ep->started) { + nni_aio_finish(aio, NNG_EBUSY, 0); + return; + } if (nni_list_active(&ep->clients, ep)) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish(aio, NNG_EBUSY, 0); return; } @@ -421,7 +426,8 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { - if (server->mode != NNI_INPROC_EP_LISTEN) { + if ((server->mode != NNI_EP_MODE_LISTEN) || + (server->started == 0)) { continue; } if (strcmp(server->addr, ep->addr) == 0) { @@ -434,7 +440,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -450,16 +455,21 @@ nni_inproc_ep_bind(void *arg) nni_inproc_ep *srch; nni_list *list = &nni_inproc.servers; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_LISTEN) { return (NNG_EINVAL); } nni_mtx_lock(&nni_inproc.mx); + if (ep->started) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_EBUSY); + } if (ep->closed) { nni_mtx_unlock(&nni_inproc.mx); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { - if (srch->mode != NNI_INPROC_EP_LISTEN) { + if ((srch->mode != NNI_EP_MODE_LISTEN) || + (!srch->started)) { continue; } if (strcmp(srch->addr, ep->addr) == 0) { @@ -467,7 +477,7 @@ nni_inproc_ep_bind(void *arg) return (NNG_EADDRINUSE); } } - ep->mode = NNI_INPROC_EP_LISTEN; + ep->started = 1; nni_list_append(list, ep); nni_mtx_unlock(&nni_inproc.mx); return (0); @@ -481,10 +491,9 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_pipe *pipe; int rv; - if (ep->mode != NNI_INPROC_EP_LISTEN) { + if (ep->mode != NNI_EP_MODE_LISTEN) { nni_aio_finish(aio, NNG_EINVAL, 0); } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { nni_aio_finish(aio, rv, 0); return; @@ -494,13 +503,16 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) aio->a_pipe = pipe; // We are already on the master list of servers, thanks to bind. - if (ep->closed) { // This is the only possible error path from the // time we acquired the lock. nni_inproc_conn_finish(aio, NNG_ECLOSED); return; } + if (!ep->started) { + nni_inproc_conn_finish(aio, NNG_ESTATE); + return; + } // Insert us into the pending server aios, and then run the // accept list. diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index d9ddeb2d..fcfe44ea 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -493,7 +493,7 @@ nni_ipc_ep_fini(void *arg) static int -nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_ipc_ep *ep; int rv; @@ -508,7 +508,7 @@ nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) } if (((rv = nni_mtx_init(&ep->mtx)) != 0) || ((rv = nni_aio_init(&ep->aio, nni_ipc_ep_cb, ep)) != 0) || - ((rv = nni_plat_ipc_ep_init(&ep->iep, url)) != 0)) { + ((rv = nni_plat_ipc_ep_init(&ep->iep, url, mode)) != 0)) { nni_ipc_ep_fini(ep); return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index a43763e2..237b0c07 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -22,7 +22,7 @@ typedef struct nni_tcp_ep nni_tcp_ep; // nni_tcp_pipe is one end of a TCP connection. struct nni_tcp_pipe { const char * addr; - nni_plat_tcpsock * tsp; + nni_plat_tcp_pipe * tpp; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -46,17 +46,21 @@ struct nni_tcp_pipe { struct nni_tcp_ep { char addr[NNG_MAXADDRLEN+1]; - nni_plat_tcpsock * tsp; + nni_plat_tcp_ep * tep; int closed; uint16_t proto; size_t rcvmax; int ipv4only; + nni_aio aio; + nni_aio * user_aio; + nni_mtx mtx; }; static void nni_tcp_pipe_send_cb(void *); static void nni_tcp_pipe_recv_cb(void *); static void nni_tcp_pipe_nego_cb(void *); +static void nni_tcp_ep_cb(void *arg); static int nni_tcp_tran_init(void) @@ -76,7 +80,7 @@ nni_tcp_pipe_close(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_shutdown(pipe->tsp); + nni_plat_tcp_pipe_close(pipe->tpp); } @@ -88,8 +92,8 @@ nni_tcp_pipe_fini(void *arg) nni_aio_fini(&pipe->rxaio); nni_aio_fini(&pipe->txaio); nni_aio_fini(&pipe->negaio); - if (pipe->tsp != NULL) { - nni_plat_tcp_fini(pipe->tsp); + if (pipe->tpp != NULL) { + nni_plat_tcp_pipe_fini(pipe->tpp); } if (pipe->rxmsg) { nni_msg_free(pipe->rxmsg); @@ -100,7 +104,7 @@ nni_tcp_pipe_fini(void *arg) static int -nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) +nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp) { nni_tcp_pipe *pipe; int rv; @@ -111,9 +115,6 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) if ((rv = nni_mtx_init(&pipe->mtx)) != 0) { goto fail; } - if ((rv = nni_plat_tcp_init(&pipe->tsp)) != 0) { - goto fail; - } rv = nni_aio_init(&pipe->txaio, nni_tcp_pipe_send_cb, pipe); if (rv != 0) { goto fail; @@ -128,6 +129,9 @@ nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep) } pipe->proto = ep->proto; pipe->rcvmax = ep->rcvmax; + pipe->tpp = tpp; + pipe->addr = ep->addr; + *pipep = pipe; return (0); @@ -175,7 +179,7 @@ nni_tcp_pipe_nego_cb(void *arg) aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; aio->a_iov[0].iov_buf = &pipe->txlen[pipe->gottxhead]; // send it down... - nni_plat_tcp_aio_send(pipe->tsp, aio); + nni_plat_tcp_pipe_send(pipe->tpp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -183,7 +187,7 @@ nni_tcp_pipe_nego_cb(void *arg) aio->a_niov = 1; aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; aio->a_iov[0].iov_buf = &pipe->rxlen[pipe->gotrxhead]; - nni_plat_tcp_aio_recv(pipe->tsp, aio); + nni_plat_tcp_pipe_recv(pipe->tpp, aio); nni_mtx_unlock(&pipe->mtx); return; } @@ -296,7 +300,7 @@ nni_tcp_pipe_recv_cb(void *arg) pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg); pipe->rxaio.a_niov = 1; - nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); return; } @@ -353,7 +357,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg); pipe->txaio.a_niov = 3; - nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio); + nni_plat_tcp_pipe_send(pipe->tpp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); } @@ -392,7 +396,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) pipe->rxaio.a_iov[0].iov_len = sizeof (pipe->rxlen); pipe->rxaio.a_niov = 1; - nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); + nni_plat_tcp_pipe_recv(pipe->tpp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); } @@ -431,59 +435,10 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock) -{ - nni_tcp_ep *ep; - int rv; - - if (strlen(url) > NNG_MAXADDRLEN-1) { - return (NNG_EADDRINVAL); - } - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - ep->closed = 0; - ep->proto = nni_sock_proto(sock); - ep->ipv4only = 0; // XXX: FIXME - ep->rcvmax = nni_sock_rcvmaxsz(sock); - - if ((rv = nni_plat_tcp_init(&ep->tsp)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - - (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); - - *epp = ep; - return (0); -} - - -static void -nni_tcp_ep_fini(void *arg) +nni_tcp_parse_pair(char *pair, char **hostp, char **servp) { - nni_tcp_ep *ep = arg; - - nni_plat_tcp_fini(ep->tsp); - NNI_FREE_STRUCT(ep); -} - - -static void -nni_tcp_ep_close(void *arg) -{ - nni_tcp_ep *ep = arg; - - nni_plat_tcp_shutdown(ep->tsp); -} - - -static int -nni_parseaddr(char *pair, char **hostp, uint16_t *portp) -{ - char *host, *port, *end; + char *host, *serv, *end; char c; - int val; if (pair[0] == '[') { host = pair+1; @@ -492,40 +447,73 @@ nni_parseaddr(char *pair, char **hostp, uint16_t *portp) return (NNG_EADDRINVAL); } *end = '\0'; - port = end + 1; - if (*port == ':') { - port++; - } else if (port != '\0') { + serv = end + 1; + if (*serv == ':') { + serv++; + } else if (serv != '\0') { return (NNG_EADDRINVAL); } } else { host = pair; - port = strchr(host, ':'); - if (port != NULL) { - *port = '\0'; - port++; + serv = strchr(host, ':'); + if (serv != NULL) { + *serv = '\0'; + serv++; } } - val = 0; - while ((c = *port) != '\0') { - val *= 10; - if ((c >= '0') && (c <= '9')) { - val += (c - '0'); + if (hostp != NULL) { + if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { + *hostp = NULL; } else { - return (NNG_EADDRINVAL); + *hostp = host; } - if (val > 65535) { - return (NNG_EADDRINVAL); + } + if (servp != NULL) { + if (strlen(serv) == 0) { + *servp = NULL; + } else { + *servp = serv; } - port++; } - if ((strlen(host) == 0) || (strcmp(host, "*") == 0)) { - *hostp = NULL; + // Stash the port in big endian (network) byte order. + return (0); +} + + +// Note that the url *must* be in a modifiable buffer. +int +nni_tcp_parse_url(char *url, char **host1, char **serv1, char **host2, + char **serv2) +{ + char *h1; + int rv; + + if (strncmp(url, "tcp://", strlen("tcp://")) != 0) { + return (NNG_EADDRINVAL); + } + url += strlen("tcp://"); + if ((h1 = strchr(url, ';')) != 0) { + // For these we want the second part first, because + // the "primary" address is the remote address, and the + // "secondary" is the local (bind) address. This is only + // used for dial side. + *h1 = '\0'; + h1++; + if (((rv = nni_tcp_parse_pair(h1, host1, serv1)) != 0) || + ((rv = nni_tcp_parse_pair(url, host2, serv2)) != 0)) { + return (rv); + } } else { - *hostp = host; + if (host2 != NULL) { + *host2 = NULL; + } + if (serv2 != NULL) { + *serv2 = NULL; + } + if ((rv = nni_tcp_parse_pair(url, host1, serv1)) != 0) { + return (rv); + } } - // Stash the port in big endian (network) byte order. - NNI_PUT16((uint8_t *) portp, val); return (0); } @@ -556,11 +544,12 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio) nni_mtx_unlock(&pipe->mtx); return; } - nni_plat_tcp_aio_send(pipe->tsp, &pipe->negaio); + nni_plat_tcp_pipe_send(pipe->tpp, &pipe->negaio); nni_mtx_unlock(&pipe->mtx); } +#if 0 static int nni_tcp_ep_connect_sync(void *arg, void **pipep) { @@ -659,23 +648,162 @@ nni_tcp_ep_bind(void *arg) } +#endif + +static void +nni_tcp_ep_fini(void *arg) +{ + nni_tcp_ep *ep = arg; + + if (ep->tep != NULL) { + nni_plat_tcp_ep_fini(ep->tep); + } + nni_aio_fini(&ep->aio); + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + + static int -nni_tcp_ep_accept_sync(void *arg, void **pipep) +nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode) +{ + nni_tcp_ep *ep; + int rv; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_mtx_init(&ep->mtx)) != 0) || + ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) || + ((rv = nni_plat_tcp_ep_init(&ep->tep, url, mode)) != 0)) { + nni_tcp_ep_fini(ep); + return (rv); + } + ep->closed = 0; + ep->proto = nni_sock_proto(sock); + ep->rcvmax = nni_sock_rcvmaxsz(sock); + (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); + + *epp = ep; + return (0); +} + + +static void +nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; + + nni_plat_tcp_ep_close(ep->tep); +} + + +static int +nni_tcp_ep_bind(void *arg) +{ + nni_tcp_ep *ep = arg; + + return (nni_plat_tcp_ep_listen(ep->tep)); +} + + +static void +nni_tcp_ep_finish(nni_tcp_ep *ep) +{ + nni_aio *aio = ep->user_aio; nni_tcp_pipe *pipe; int rv; - if ((rv = nni_tcp_pipe_init(&pipe, ep)) != 0) { - return (rv); + if ((aio = ep->user_aio) == NULL) { + return; + } + ep->user_aio = NULL; + if ((rv = nni_aio_result(&ep->aio)) != 0) { + goto done; } + NNI_ASSERT(ep->aio.a_pipe != NULL); - if ((rv = nni_plat_tcp_accept(pipe->tsp, ep->tsp)) != 0) { - nni_tcp_pipe_fini(pipe); - return (rv); + // Attempt to allocate the parent pipe. If this fails we'll + // drop the connection (ENOMEM probably). + if ((rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe)) != 0) { + nni_plat_tcp_pipe_fini(ep->aio.a_pipe); + goto done; } - *pipep = pipe; - return (0); + + aio->a_pipe = pipe; + +done: + ep->aio.a_pipe = NULL; + nni_aio_finish(aio, rv, 0); +} + + +static void +nni_tcp_ep_cb(void *arg) +{ + nni_tcp_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + nni_tcp_ep_finish(ep); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_cancel_ep(nni_aio *aio) +{ + nni_tcp_ep *ep = aio->a_prov_data; + + nni_mtx_lock(&ep->mtx); + if (ep->user_aio == aio) { + ep->user_aio = NULL; + } + nni_aio_stop(&ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_ep_accept(void *arg, nni_aio *aio) +{ + nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_plat_tcp_ep_accept(ep->tep, &ep->aio); + nni_mtx_unlock(&ep->mtx); +} + + +static void +nni_tcp_ep_connect(void *arg, nni_aio *aio) +{ + nni_tcp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + NNI_ASSERT(ep->user_aio == NULL); + ep->user_aio = aio; + + // If we can't start, then its dying and we can't report either, + if ((rv = nni_aio_start(aio, nni_tcp_cancel_ep, ep)) != 0) { + ep->user_aio = NULL; + nni_mtx_unlock(&ep->mtx); + return; + } + + nni_plat_tcp_ep_connect(ep->tep, &ep->aio); + nni_mtx_unlock(&ep->mtx); } @@ -690,14 +818,14 @@ static nni_tran_pipe nni_tcp_pipe_ops = { }; static nni_tran_ep nni_tcp_ep_ops = { - .ep_init = nni_tcp_ep_init, - .ep_fini = nni_tcp_ep_fini, - .ep_connect_sync = nni_tcp_ep_connect_sync, - .ep_bind = nni_tcp_ep_bind, - .ep_accept_sync = nni_tcp_ep_accept_sync, - .ep_close = nni_tcp_ep_close, - .ep_setopt = NULL, - .ep_getopt = NULL, + .ep_init = nni_tcp_ep_init, + .ep_fini = nni_tcp_ep_fini, + .ep_connect = nni_tcp_ep_connect, + .ep_bind = nni_tcp_ep_bind, + .ep_accept = nni_tcp_ep_accept, + .ep_close = nni_tcp_ep_close, + .ep_setopt = NULL, + .ep_getopt = NULL, }; // This is the TCP transport linkage, and should be the only global |
