aboutsummaryrefslogtreecommitdiff
path: root/src/transport/inproc
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
committerGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
commit374f93a18edca2e0656c337a5b54927169ec31fa (patch)
treecbaef995db10cfafd795953be203de744dc688c9 /src/transport/inproc
parent6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff)
downloadnng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz
nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2
nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit. The POSIX send/recv logic still uses threads under the hood, but makes use of the AIO framework for send/recv. This is a key stepping stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src/transport/inproc')
-rw-r--r--src/transport/inproc/inproc.c162
1 files changed, 96 insertions, 66 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index de9b5e91..c6f908f8 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -38,22 +38,22 @@ struct nni_inproc_pipe {
// nni_inproc_pair represents a pair of pipes. Because we control both
// sides of the pipes, we can allocate and free this in one structure.
struct nni_inproc_pair {
- nni_mtx mx;
- int refcnt;
- nni_msgq * q[2];
- nni_inproc_pipe pipe[2];
- char addr[NNG_MAXADDRLEN+1];
+ nni_mtx mx;
+ int refcnt;
+ nni_msgq * q[2];
+ nni_inproc_pipe * pipes[2];
+ char addr[NNG_MAXADDRLEN+1];
};
struct nni_inproc_ep {
- char addr[NNG_MAXADDRLEN+1];
- int mode;
- int closed;
- nni_list_node node;
- uint16_t proto;
- nni_cv cv;
- nni_list clients;
- void * cpipe; // connected pipe (DIAL only)
+ char addr[NNG_MAXADDRLEN+1];
+ int mode;
+ int closed;
+ nni_list_node node;
+ uint16_t proto;
+ nni_cv cv;
+ nni_list clients;
+ nni_inproc_pipe * cpipe; // connected pipe (DIAL only)
};
#define NNI_INPROC_EP_IDLE 0
@@ -108,23 +108,45 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair)
}
+static int
+nni_inproc_pipe_init(void **argp)
+{
+ nni_inproc_pipe *pipe;
+
+ if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ *argp = pipe;
+ return (0);
+}
+
+
static void
-nni_inproc_pipe_destroy(void *arg)
+nni_inproc_pipe_fini(void *arg)
{
nni_inproc_pipe *pipe = arg;
- nni_inproc_pair *pair = pipe->pair;
+ nni_inproc_pair *pair;
// We could assert the pipe closed...
- // If we are the last peer, then toss the pair structure.
- nni_mtx_lock(&pair->mx);
- pair->refcnt--;
- if (pair->refcnt == 0) {
- nni_mtx_unlock(&pair->mx);
- nni_inproc_pair_destroy(pair);
- } else {
- nni_mtx_unlock(&pair->mx);
+ if ((pair = pipe->pair) != NULL) {
+ // If we are the last peer, then toss the pair structure.
+ nni_mtx_lock(&pair->mx);
+ if (pair->pipes[0] == pipe) {
+ pair->pipes[0] = NULL;
+ } else if (pair->pipes[1] == pipe) {
+ pair->pipes[1] = NULL;
+ }
+ pair->refcnt--;
+ if (pair->refcnt == 0) {
+ nni_mtx_unlock(&pair->mx);
+ nni_inproc_pair_destroy(pair);
+ } else {
+ nni_mtx_unlock(&pair->mx);
+ }
}
+
+ NNI_FREE_STRUCT(pipe);
}
@@ -262,49 +284,54 @@ nni_inproc_ep_close(void *arg)
static int
-nni_inproc_ep_connect(void *arg, nni_pipe *npipe)
+nni_inproc_ep_connect(void *arg, void *pipearg)
{
+ nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *ep = arg;
+ nni_inproc_ep *server;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
nni_mtx_lock(&nni_inproc.mx);
- for (;;) {
- nni_inproc_ep *server;
- if (ep->closed) {
- nni_mtx_unlock(&nni_inproc.mx);
- return (NNG_ECLOSED);
+ // Find a server.
+ NNI_LIST_FOREACH (&nni_inproc.servers, server) {
+ if (server->mode != NNI_INPROC_EP_LISTEN) {
+ continue;
}
- if (ep->cpipe != NULL) {
+ if (strcmp(server->addr, ep->addr) == 0) {
break;
}
- // Find a server.
- NNI_LIST_FOREACH (&nni_inproc.servers, server) {
- if (server->mode != NNI_INPROC_EP_LISTEN) {
- continue;
- }
- if (strcmp(server->addr, ep->addr) == 0) {
- break;
- }
- }
- if (server == NULL) {
+ }
+ if (server == NULL) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ return (NNG_ECONNREFUSED);
+ }
+
+ ep->mode = NNI_INPROC_EP_DIAL;
+ ep->cpipe = pipe;
+ nni_list_append(&server->clients, ep);
+
+ while (ep->mode != NNI_INPROC_EP_IDLE) {
+ if (ep->closed) {
+ nni_list_remove(&server->clients, ep);
nni_mtx_unlock(&nni_inproc.mx);
- return (NNG_ECONNREFUSED);
+ return (NNG_ECLOSED);
}
-
- ep->mode = NNI_INPROC_EP_DIAL;
- nni_list_append(&server->clients, ep);
nni_cv_wake(&server->cv);
nni_cv_wait(&ep->cv);
- if (ep->mode == NNI_INPROC_EP_DIAL) {
- ep->mode = NNI_INPROC_EP_IDLE;
- nni_list_remove(&server->clients, ep);
- }
}
- nni_pipe_set_tran_data(npipe, ep->cpipe);
- ep->cpipe = NULL;
+
+ // If we got here, either we connected successfully, or the far end
+ // server closed on us. In the former case our cpipe will be NULL,
+ // having been cleared by the server. In the latter, the cpipe will
+ // still be set, indicating server shutdown.
+ if (ep->cpipe != NULL) {
+ nni_mtx_unlock(&nni_inproc.mx);
+ return (NNG_ECONNRESET);
+ }
+
nni_mtx_unlock(&nni_inproc.mx);
return (0);
}
@@ -342,9 +369,10 @@ nni_inproc_ep_bind(void *arg)
static int
-nni_inproc_ep_accept(void *arg, nni_pipe *npipe)
+nni_inproc_ep_accept(void *arg, void *pipearg)
{
nni_inproc_ep *ep = arg;
+ nni_inproc_pipe *pipe = pipearg;
nni_inproc_ep *client;
nni_inproc_pair *pair;
int rv;
@@ -383,17 +411,18 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe)
}
nni_list_remove(&ep->clients, client);
- client->mode = NNI_INPROC_EP_IDLE;
+ pair->pipes[0] = client->cpipe;
+ pair->pipes[1] = pipe;
(void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr);
- pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0];
- pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1];
- pair->pipe[0].pair = pair->pipe[1].pair = pair;
- pair->pipe[0].addr = pair->pipe[1].addr = pair->addr;
- pair->pipe[1].peer = client->proto;
- pair->pipe[0].peer = ep->proto;
+ pair->pipes[0]->rq = pair->pipes[1]->wq = pair->q[0];
+ pair->pipes[1]->rq = pair->pipes[0]->wq = pair->q[1];
+ pair->pipes[0]->pair = pair->pipes[1]->pair = pair;
+ pair->pipes[0]->addr = pair->pipes[1]->addr = pair->addr;
+ pair->pipes[1]->peer = client->proto;
+ pair->pipes[0]->peer = ep->proto;
pair->refcnt = 2;
- client->cpipe = &pair->pipe[0];
- nni_pipe_set_tran_data(npipe, &pair->pipe[1]);
+ client->mode = NNI_INPROC_EP_IDLE;
+ client->cpipe = NULL;
nni_cv_wake(&client->cv);
nni_mtx_unlock(&nni_inproc.mx);
@@ -403,12 +432,13 @@ nni_inproc_ep_accept(void *arg, nni_pipe *npipe)
static nni_tran_pipe nni_inproc_pipe_ops = {
- .pipe_destroy = nni_inproc_pipe_destroy,
- .pipe_aio_send = nni_inproc_pipe_aio_send,
- .pipe_aio_recv = nni_inproc_pipe_aio_recv,
- .pipe_close = nni_inproc_pipe_close,
- .pipe_peer = nni_inproc_pipe_peer,
- .pipe_getopt = nni_inproc_pipe_getopt,
+ .p_init = nni_inproc_pipe_init,
+ .p_fini = nni_inproc_pipe_fini,
+ .p_aio_send = nni_inproc_pipe_aio_send,
+ .p_aio_recv = nni_inproc_pipe_aio_recv,
+ .p_close = nni_inproc_pipe_close,
+ .p_peer = nni_inproc_pipe_peer,
+ .p_getopt = nni_inproc_pipe_getopt,
};
static nni_tran_ep nni_inproc_ep_ops = {