diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-29 13:07:35 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-29 13:07:35 -0700 |
| commit | 374f93a18edca2e0656c337a5b54927169ec31fa (patch) | |
| tree | cbaef995db10cfafd795953be203de744dc688c9 /src/transport/inproc | |
| parent | 6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff) | |
| download | nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2 nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip | |
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit.
The POSIX send/recv logic still uses threads under the hood, but
makes use of the AIO framework for send/recv. This is a key stepping
stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 162 |
1 files changed, 96 insertions, 66 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index de9b5e91..c6f908f8 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -38,22 +38,22 @@ struct nni_inproc_pipe { // nni_inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct nni_inproc_pair { - nni_mtx mx; - int refcnt; - nni_msgq * q[2]; - nni_inproc_pipe pipe[2]; - char addr[NNG_MAXADDRLEN+1]; + nni_mtx mx; + int refcnt; + nni_msgq * q[2]; + nni_inproc_pipe * pipes[2]; + char addr[NNG_MAXADDRLEN+1]; }; struct nni_inproc_ep { - char addr[NNG_MAXADDRLEN+1]; - int mode; - int closed; - nni_list_node node; - uint16_t proto; - nni_cv cv; - nni_list clients; - void * cpipe; // connected pipe (DIAL only) + char addr[NNG_MAXADDRLEN+1]; + int mode; + int closed; + nni_list_node node; + uint16_t proto; + nni_cv cv; + nni_list clients; + nni_inproc_pipe * cpipe; // connected pipe (DIAL only) }; #define NNI_INPROC_EP_IDLE 0 @@ -108,23 +108,45 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) } +static int +nni_inproc_pipe_init(void **argp) +{ + nni_inproc_pipe *pipe; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + *argp = pipe; + return (0); +} + + static void -nni_inproc_pipe_destroy(void *arg) +nni_inproc_pipe_fini(void *arg) { nni_inproc_pipe *pipe = arg; - nni_inproc_pair *pair = pipe->pair; + nni_inproc_pair *pair; // We could assert the pipe closed... - // If we are the last peer, then toss the pair structure. - nni_mtx_lock(&pair->mx); - pair->refcnt--; - if (pair->refcnt == 0) { - nni_mtx_unlock(&pair->mx); - nni_inproc_pair_destroy(pair); - } else { - nni_mtx_unlock(&pair->mx); + if ((pair = pipe->pair) != NULL) { + // If we are the last peer, then toss the pair structure. + nni_mtx_lock(&pair->mx); + if (pair->pipes[0] == pipe) { + pair->pipes[0] = NULL; + } else if (pair->pipes[1] == pipe) { + pair->pipes[1] = NULL; + } + pair->refcnt--; + if (pair->refcnt == 0) { + nni_mtx_unlock(&pair->mx); + nni_inproc_pair_destroy(pair); + } else { + nni_mtx_unlock(&pair->mx); + } } + + NNI_FREE_STRUCT(pipe); } @@ -262,49 +284,54 @@ nni_inproc_ep_close(void *arg) static int -nni_inproc_ep_connect(void *arg, nni_pipe *npipe) +nni_inproc_ep_connect(void *arg, void *pipearg) { + nni_inproc_pipe *pipe = pipearg; nni_inproc_ep *ep = arg; + nni_inproc_ep *server; if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } nni_mtx_lock(&nni_inproc.mx); - for (;;) { - nni_inproc_ep *server; - if (ep->closed) { - nni_mtx_unlock(&nni_inproc.mx); - return (NNG_ECLOSED); + // Find a server. + NNI_LIST_FOREACH (&nni_inproc.servers, server) { + if (server->mode != NNI_INPROC_EP_LISTEN) { + continue; } - if (ep->cpipe != NULL) { + if (strcmp(server->addr, ep->addr) == 0) { break; } - // Find a server. - NNI_LIST_FOREACH (&nni_inproc.servers, server) { - if (server->mode != NNI_INPROC_EP_LISTEN) { - continue; - } - if (strcmp(server->addr, ep->addr) == 0) { - break; - } - } - if (server == NULL) { + } + if (server == NULL) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_ECONNREFUSED); + } + + ep->mode = NNI_INPROC_EP_DIAL; + ep->cpipe = pipe; + nni_list_append(&server->clients, ep); + + while (ep->mode != NNI_INPROC_EP_IDLE) { + if (ep->closed) { + nni_list_remove(&server->clients, ep); nni_mtx_unlock(&nni_inproc.mx); - return (NNG_ECONNREFUSED); + return (NNG_ECLOSED); } - - ep->mode = NNI_INPROC_EP_DIAL; - nni_list_append(&server->clients, ep); nni_cv_wake(&server->cv); nni_cv_wait(&ep->cv); - if (ep->mode == NNI_INPROC_EP_DIAL) { - ep->mode = NNI_INPROC_EP_IDLE; - nni_list_remove(&server->clients, ep); - } } - nni_pipe_set_tran_data(npipe, ep->cpipe); - ep->cpipe = NULL; + + // If we got here, either we connected successfully, or the far end + // server closed on us. In the former case our cpipe will be NULL, + // having been cleared by the server. In the latter, the cpipe will + // still be set, indicating server shutdown. + if (ep->cpipe != NULL) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_ECONNRESET); + } + nni_mtx_unlock(&nni_inproc.mx); return (0); } @@ -342,9 +369,10 @@ nni_inproc_ep_bind(void *arg) static int -nni_inproc_ep_accept(void *arg, nni_pipe *npipe) +nni_inproc_ep_accept(void *arg, void *pipearg) { nni_inproc_ep *ep = arg; + nni_inproc_pipe *pipe = pipearg; nni_inproc_ep *client; nni_inproc_pair *pair; int rv; @@ -383,17 +411,18 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe) } nni_list_remove(&ep->clients, client); - client->mode = NNI_INPROC_EP_IDLE; + pair->pipes[0] = client->cpipe; + pair->pipes[1] = pipe; (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); - pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; - pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1]; - pair->pipe[0].pair = pair->pipe[1].pair = pair; - pair->pipe[0].addr = pair->pipe[1].addr = pair->addr; - pair->pipe[1].peer = client->proto; - pair->pipe[0].peer = ep->proto; + pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0]; + pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1]; + pair->pipes[0]->pair = pair->pipes[1]->pair = pair; + pair->pipes[0]->addr = pair->pipes[1]->addr = pair->addr; + pair->pipes[1]->peer = client->proto; + pair->pipes[0]->peer = ep->proto; pair->refcnt = 2; - client->cpipe = &pair->pipe[0]; - nni_pipe_set_tran_data(npipe, &pair->pipe[1]); + client->mode = NNI_INPROC_EP_IDLE; + client->cpipe = NULL; nni_cv_wake(&client->cv); nni_mtx_unlock(&nni_inproc.mx); @@ -403,12 +432,13 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe) static nni_tran_pipe nni_inproc_pipe_ops = { - .pipe_destroy = nni_inproc_pipe_destroy, - .pipe_aio_send = nni_inproc_pipe_aio_send, - .pipe_aio_recv = nni_inproc_pipe_aio_recv, - .pipe_close = nni_inproc_pipe_close, - .pipe_peer = nni_inproc_pipe_peer, - .pipe_getopt = nni_inproc_pipe_getopt, + .p_init = nni_inproc_pipe_init, + .p_fini = nni_inproc_pipe_fini, + .p_aio_send = nni_inproc_pipe_aio_send, + .p_aio_recv = nni_inproc_pipe_aio_recv, + .p_close = nni_inproc_pipe_close, + .p_peer = nni_inproc_pipe_peer, + .p_getopt = nni_inproc_pipe_getopt, }; static nni_tran_ep nni_inproc_ep_ops = { |
