aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/transport/inproc/inproc.c110
1 files changed, 59 insertions, 51 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 5a9c0472..d15b642e 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -23,8 +23,7 @@ typedef struct nni_inproc_ep nni_inproc_ep;
typedef struct {
nni_mutex mx;
- nni_cond cv;
- nni_list eps;
+ nni_list servers;
} nni_inproc_global;
// nni_inproc_pipe represents one half of a connection.
@@ -52,6 +51,8 @@ struct nni_inproc_ep {
int closed;
nni_list_node node;
uint16_t proto;
+ nni_cond cv;
+ nni_list clients;
void * cpipe; // connected pipe (DIAL only)
};
@@ -71,11 +72,7 @@ nni_inproc_init(void)
if ((rv = nni_mutex_init(&nni_inproc.mx)) != 0) {
return (rv);
}
- if ((rv = nni_cond_init(&nni_inproc.cv, &nni_inproc.mx)) != 0) {
- nni_mutex_fini(&nni_inproc.mx);
- return (rv);
- }
- NNI_LIST_INIT(&nni_inproc.eps, nni_inproc_ep, node);
+ NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node);
return (0);
}
@@ -84,7 +81,6 @@ nni_inproc_init(void)
static void
nni_inproc_fini(void)
{
- nni_cond_fini(&nni_inproc.cv);
nni_mutex_fini(&nni_inproc.mx);
}
@@ -188,6 +184,7 @@ static int
nni_inproc_ep_create(void **epp, const char *url, uint16_t proto)
{
nni_inproc_ep *ep;
+ int rv;
if (strlen(url) > NNG_MAXADDRLEN-1) {
return (NNG_EINVAL);
@@ -195,11 +192,17 @@ nni_inproc_ep_create(void **epp, const char *url, uint16_t proto)
if ((ep = nni_alloc(sizeof (*ep))) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_cond_init(&ep->cv, &nni_inproc.mx)) != 0) {
+ nni_free(ep, sizeof (*ep));
+ return (rv);
+ }
ep->mode = NNI_INPROC_EP_IDLE;
ep->closed = 0;
ep->proto = proto;
NNI_LIST_NODE_INIT(&ep->node);
+ NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
+
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
*epp = ep;
return (0);
@@ -214,6 +217,7 @@ nni_inproc_ep_destroy(void *arg)
if (!ep->closed) {
nni_panic("inproc_ep_destroy while not closed!");
}
+ nni_cond_fini(&ep->cv);
nni_free(ep, sizeof (*free));
}
@@ -226,11 +230,21 @@ nni_inproc_ep_close(void *arg)
nni_mutex_enter(&nni_inproc.mx);
if (!ep->closed) {
ep->closed = 1;
- if ((ep->mode == NNI_INPROC_EP_LISTEN) ||
- (ep->mode == NNI_INPROC_EP_DIAL)) {
- nni_list_remove(&nni_inproc.eps, ep);
+ if (ep->mode == NNI_INPROC_EP_LISTEN) {
+ nni_list_remove(&nni_inproc.servers, ep);
+ for (;;) {
+ // Notify our clients that we are closed.
+ nni_inproc_ep *client;
+ client = nni_list_first(&ep->clients);
+ if (client == NULL) {
+ break;
+ }
+ nni_list_remove(&ep->clients, client);
+ client->mode = NNI_INPROC_EP_IDLE;
+ nni_cond_broadcast(&client->cv);
+ }
}
- nni_cond_broadcast(&nni_inproc.cv);
+ nni_cond_broadcast(&ep->cv);
}
nni_mutex_exit(&nni_inproc.mx);
}
@@ -240,41 +254,44 @@ static int
nni_inproc_ep_connect(void *arg, void **pipep)
{
nni_inproc_ep *ep = arg;
- nni_inproc_ep *srch;
- nni_list *list = &nni_inproc.eps;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
nni_mutex_enter(&nni_inproc.mx);
- NNI_LIST_FOREACH (list, srch) {
- if (srch->mode != NNI_INPROC_EP_LISTEN) {
- continue;
- }
- if (strcmp(srch->addr, ep->addr) == 0) {
- break;
- }
- }
- if (srch == NULL) {
- // No listeners available.
- nni_mutex_exit(&nni_inproc.mx);
- return (NNG_ECONNREFUSED);
- }
- ep->mode = NNI_INPROC_EP_DIAL;
- nni_list_append(list, ep);
- nni_cond_broadcast(&nni_inproc.cv);
for (;;) {
+ nni_inproc_ep *server;
+ nni_list *list;
+
if (ep->closed) {
- // Closer will have removed us from list.
nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECLOSED);
}
if (ep->cpipe != NULL) {
break;
}
- nni_cond_wait(&nni_inproc.cv);
+ // Find a server.
+ NNI_LIST_FOREACH (&nni_inproc.servers, server) {
+ if (server->mode != NNI_INPROC_EP_LISTEN) {
+ continue;
+ }
+ if (strcmp(server->addr, ep->addr) == 0) {
+ break;
+ }
+ }
+ if (server == NULL) {
+ nni_mutex_exit(&nni_inproc.mx);
+ return (NNG_ECONNREFUSED);
+ }
+ ep->mode = NNI_INPROC_EP_DIAL;
+ nni_list_append(&server->clients, ep);
+ nni_cond_broadcast(&server->cv);
+ nni_cond_wait(&ep->cv);
+ if (ep->mode == NNI_INPROC_EP_DIAL) {
+ ep->mode = NNI_INPROC_EP_IDLE;
+ nni_list_remove(&server->clients, ep);
+ }
}
- // NB: The acceptor or closer removes us from the list.
*pipep = ep->cpipe;
nni_mutex_exit(&nni_inproc.mx);
return (ep->closed ? NNG_ECLOSED : 0);
@@ -286,7 +303,7 @@ nni_inproc_ep_bind(void *arg)
{
nni_inproc_ep *ep = arg;
nni_inproc_ep *srch;
- nni_list *list = &nni_inproc.eps;
+ nni_list *list = &nni_inproc.servers;
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
@@ -316,9 +333,8 @@ static int
nni_inproc_ep_accept(void *arg, void **pipep)
{
nni_inproc_ep *ep = arg;
- nni_inproc_ep *srch;
+ nni_inproc_ep *client;
nni_inproc_pair *pair;
- nni_list *list = &nni_inproc.eps;
int rv;
if (ep->mode != NNI_INPROC_EP_LISTEN) {
@@ -348,33 +364,25 @@ nni_inproc_ep_accept(void *arg, void **pipep)
nni_inproc_pair_destroy(pair);
return (NNG_ECLOSED);
}
- NNI_LIST_FOREACH (list, srch) {
- if (srch->mode != NNI_INPROC_EP_DIAL) {
- continue;
- }
- if (strcmp(srch->addr, ep->addr) == 0) {
- break;
- }
- }
- if (srch != NULL) {
+ if ((client = nni_list_first(&ep->clients)) != NULL) {
break;
}
- nni_cond_wait(&nni_inproc.cv);
+ nni_cond_wait(&ep->cv);
}
- nni_list_remove(&nni_inproc.eps, srch);
- srch->mode = NNI_INPROC_EP_IDLE;
+ nni_list_remove(&ep->clients, client);
+ client->mode = NNI_INPROC_EP_IDLE;
(void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr);
pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0];
pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1];
pair->pipe[0].pair = pair->pipe[1].pair = pair;
pair->pipe[0].addr = pair->pipe[1].addr = pair->addr;
- pair->pipe[1].peer = srch->proto;
+ pair->pipe[1].peer = client->proto;
pair->pipe[0].peer = ep->proto;
pair->refcnt = 2;
- srch->cpipe = &pair->pipe[0];
+ client->cpipe = &pair->pipe[0];
*pipep = &pair->pipe[1];
- nni_cond_broadcast(&nni_inproc.cv);
+ nni_cond_broadcast(&client->cv);
nni_mutex_exit(&nni_inproc.mx);