diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-07 00:08:24 -0700 |
| commit | 3730260da3744b549aaa1fe13946a674f924f63c (patch) | |
| tree | 902866876ee71246a299370cbe8f6580d758525c /src/transport/inproc | |
| parent | 3b19940dfcd5d3585b1fb1dcf7915a748ae67289 (diff) | |
| download | nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.gz nng-3730260da3744b549aaa1fe13946a674f924f63c.tar.bz2 nng-3730260da3744b549aaa1fe13946a674f924f63c.zip | |
TCP asynchronous working now.
It turns out that I had to fix a number of subtle asynchronous
handling bugs, but now TCP is fully asynchronous. We need to
change the high-level dial and listen interfaces to be async
as well.
Some of the transport APIs have changed here, and I've elected
to change what we expose to consumers as endpoints into seperate
dialers and listeners. Under the hood they are the same, but
it turns out that its helpful to know the intended use of the
endpoint at initialization time.
Scalability still occasionally hangs on Linux. Investigation
pending.
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 52ed582a..0489ea38 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -49,6 +49,7 @@ struct nni_inproc_ep { char addr[NNG_MAXADDRLEN+1]; int mode; int closed; + int started; nni_list_node node; uint16_t proto; nni_cv cv; @@ -217,7 +218,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) +nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock, int mode) { nni_inproc_ep *ep; int rv; @@ -229,8 +230,9 @@ nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) return (NNG_ENOMEM); } - ep->mode = NNI_INPROC_EP_IDLE; + ep->mode = mode; ep->closed = 0; + ep->started = 0; ep->proto = nni_sock_proto(sock); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -365,7 +367,6 @@ nni_inproc_accept_clients(nni_inproc_ep *server) pair->pipes[1]->peer = pair->pipes[0]->proto; pair->pipes[0]->peer = pair->pipes[1]->proto; pair->refcnt = 2; - client->mode = NNI_INPROC_EP_IDLE; // XXX: ?? nni_inproc_conn_finish(caio, 0); nni_inproc_conn_finish(saio, 0); @@ -390,12 +391,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) nni_inproc_ep *server; int rv; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_DIAL) { nni_aio_finish(aio, NNG_EINVAL, 0); return; } + if (ep->started) { + nni_aio_finish(aio, NNG_EBUSY, 0); + return; + } if (nni_list_active(&ep->clients, ep)) { - nni_aio_finish(aio, NNG_EINVAL, 0); + nni_aio_finish(aio, NNG_EBUSY, 0); return; } @@ -421,7 +426,8 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { - if (server->mode != NNI_INPROC_EP_LISTEN) { + if ((server->mode != NNI_EP_MODE_LISTEN) || + (server->started == 0)) { continue; } if (strcmp(server->addr, ep->addr) == 0) { @@ -434,7 +440,6 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) return; } - ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); @@ -450,16 +455,21 @@ nni_inproc_ep_bind(void *arg) nni_inproc_ep *srch; nni_list *list = &nni_inproc.servers; - if (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->mode != NNI_EP_MODE_LISTEN) { return (NNG_EINVAL); } nni_mtx_lock(&nni_inproc.mx); + if (ep->started) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_EBUSY); + } if (ep->closed) { nni_mtx_unlock(&nni_inproc.mx); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { - if (srch->mode != NNI_INPROC_EP_LISTEN) { + if ((srch->mode != NNI_EP_MODE_LISTEN) || + (!srch->started)) { continue; } if (strcmp(srch->addr, ep->addr) == 0) { @@ -467,7 +477,7 @@ nni_inproc_ep_bind(void *arg) return (NNG_EADDRINUSE); } } - ep->mode = NNI_INPROC_EP_LISTEN; + ep->started = 1; nni_list_append(list, ep); nni_mtx_unlock(&nni_inproc.mx); return (0); @@ -481,10 +491,9 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_inproc_pipe *pipe; int rv; - if (ep->mode != NNI_INPROC_EP_LISTEN) { + if (ep->mode != NNI_EP_MODE_LISTEN) { nni_aio_finish(aio, NNG_EINVAL, 0); } - if ((rv = nni_inproc_pipe_init(&pipe, ep)) != 0) { nni_aio_finish(aio, rv, 0); return; @@ -494,13 +503,16 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) aio->a_pipe = pipe; // We are already on the master list of servers, thanks to bind. - if (ep->closed) { // This is the only possible error path from the // time we acquired the lock. nni_inproc_conn_finish(aio, NNG_ECLOSED); return; } + if (!ep->started) { + nni_inproc_conn_finish(aio, NNG_ESTATE); + return; + } // Insert us into the pending server aios, and then run the // accept list. |
