diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-31 11:21:49 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-31 11:21:49 -0800 |
| commit | a432e040d620e7d1315018ea6717f81e61463608 (patch) | |
| tree | d24059611fdd99cd065f8e689ff4b6e79cbcda5c | |
| parent | c142c01a110be7d4677d3bfff4777be5e0bb1cf5 (diff) | |
| download | nng-a432e040d620e7d1315018ea6717f81e61463608.tar.gz nng-a432e040d620e7d1315018ea6717f81e61463608.tar.bz2 nng-a432e040d620e7d1315018ea6717f81e61463608.zip | |
More directed wakeups (and hopefully resolve races) for inproc.
| -rw-r--r-- | src/transport/inproc/inproc.c | 110 |
1 files changed, 59 insertions, 51 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 5a9c0472..d15b642e 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -23,8 +23,7 @@ typedef struct nni_inproc_ep nni_inproc_ep; typedef struct { nni_mutex mx; - nni_cond cv; - nni_list eps; + nni_list servers; } nni_inproc_global; // nni_inproc_pipe represents one half of a connection. @@ -52,6 +51,8 @@ struct nni_inproc_ep { int closed; nni_list_node node; uint16_t proto; + nni_cond cv; + nni_list clients; void * cpipe; // connected pipe (DIAL only) }; @@ -71,11 +72,7 @@ nni_inproc_init(void) if ((rv = nni_mutex_init(&nni_inproc.mx)) != 0) { return (rv); } - if ((rv = nni_cond_init(&nni_inproc.cv, &nni_inproc.mx)) != 0) { - nni_mutex_fini(&nni_inproc.mx); - return (rv); - } - NNI_LIST_INIT(&nni_inproc.eps, nni_inproc_ep, node); + NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node); return (0); } @@ -84,7 +81,6 @@ nni_inproc_init(void) static void nni_inproc_fini(void) { - nni_cond_fini(&nni_inproc.cv); nni_mutex_fini(&nni_inproc.mx); } @@ -188,6 +184,7 @@ static int nni_inproc_ep_create(void **epp, const char *url, uint16_t proto) { nni_inproc_ep *ep; + int rv; if (strlen(url) > NNG_MAXADDRLEN-1) { return (NNG_EINVAL); @@ -195,11 +192,17 @@ nni_inproc_ep_create(void **epp, const char *url, uint16_t proto) if ((ep = nni_alloc(sizeof (*ep))) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_cond_init(&ep->cv, &nni_inproc.mx)) != 0) { + nni_free(ep, sizeof (*ep)); + return (rv); + } ep->mode = NNI_INPROC_EP_IDLE; ep->closed = 0; ep->proto = proto; NNI_LIST_NODE_INIT(&ep->node); + NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); + (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); *epp = ep; return (0); @@ -214,6 +217,7 @@ nni_inproc_ep_destroy(void *arg) if (!ep->closed) { nni_panic("inproc_ep_destroy while not closed!"); } + nni_cond_fini(&ep->cv); nni_free(ep, sizeof (*free)); } @@ -226,11 +230,21 @@ nni_inproc_ep_close(void *arg) nni_mutex_enter(&nni_inproc.mx); if (!ep->closed) { ep->closed = 1; - if ((ep->mode == NNI_INPROC_EP_LISTEN) || - (ep->mode == NNI_INPROC_EP_DIAL)) { - nni_list_remove(&nni_inproc.eps, ep); + if (ep->mode == NNI_INPROC_EP_LISTEN) { + nni_list_remove(&nni_inproc.servers, ep); + for (;;) { + // Notify our clients that we are closed. + nni_inproc_ep *client; + client = nni_list_first(&ep->clients); + if (client == NULL) { + break; + } + nni_list_remove(&ep->clients, client); + client->mode = NNI_INPROC_EP_IDLE; + nni_cond_broadcast(&client->cv); + } } - nni_cond_broadcast(&nni_inproc.cv); + nni_cond_broadcast(&ep->cv); } nni_mutex_exit(&nni_inproc.mx); } @@ -240,41 +254,44 @@ static int nni_inproc_ep_connect(void *arg, void **pipep) { nni_inproc_ep *ep = arg; - nni_inproc_ep *srch; - nni_list *list = &nni_inproc.eps; if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } nni_mutex_enter(&nni_inproc.mx); - NNI_LIST_FOREACH (list, srch) { - if (srch->mode != NNI_INPROC_EP_LISTEN) { - continue; - } - if (strcmp(srch->addr, ep->addr) == 0) { - break; - } - } - if (srch == NULL) { - // No listeners available. - nni_mutex_exit(&nni_inproc.mx); - return (NNG_ECONNREFUSED); - } - ep->mode = NNI_INPROC_EP_DIAL; - nni_list_append(list, ep); - nni_cond_broadcast(&nni_inproc.cv); for (;;) { + nni_inproc_ep *server; + nni_list *list; + if (ep->closed) { - // Closer will have removed us from list. nni_mutex_exit(&nni_inproc.mx); return (NNG_ECLOSED); } if (ep->cpipe != NULL) { break; } - nni_cond_wait(&nni_inproc.cv); + // Find a server. + NNI_LIST_FOREACH (&nni_inproc.servers, server) { + if (server->mode != NNI_INPROC_EP_LISTEN) { + continue; + } + if (strcmp(server->addr, ep->addr) == 0) { + break; + } + } + if (server == NULL) { + nni_mutex_exit(&nni_inproc.mx); + return (NNG_ECONNREFUSED); + } + ep->mode = NNI_INPROC_EP_DIAL; + nni_list_append(&server->clients, ep); + nni_cond_broadcast(&server->cv); + nni_cond_wait(&ep->cv); + if (ep->mode == NNI_INPROC_EP_DIAL) { + ep->mode = NNI_INPROC_EP_IDLE; + nni_list_remove(&server->clients, ep); + } } - // NB: The acceptor or closer removes us from the list. *pipep = ep->cpipe; nni_mutex_exit(&nni_inproc.mx); return (ep->closed ? NNG_ECLOSED : 0); @@ -286,7 +303,7 @@ nni_inproc_ep_bind(void *arg) { nni_inproc_ep *ep = arg; nni_inproc_ep *srch; - nni_list *list = &nni_inproc.eps; + nni_list *list = &nni_inproc.servers; if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); @@ -316,9 +333,8 @@ static int nni_inproc_ep_accept(void *arg, void **pipep) { nni_inproc_ep *ep = arg; - nni_inproc_ep *srch; + nni_inproc_ep *client; nni_inproc_pair *pair; - nni_list *list = &nni_inproc.eps; int rv; if (ep->mode != NNI_INPROC_EP_LISTEN) { @@ -348,33 +364,25 @@ nni_inproc_ep_accept(void *arg, void **pipep) nni_inproc_pair_destroy(pair); return (NNG_ECLOSED); } - NNI_LIST_FOREACH (list, srch) { - if (srch->mode != NNI_INPROC_EP_DIAL) { - continue; - } - if (strcmp(srch->addr, ep->addr) == 0) { - break; - } - } - if (srch != NULL) { + if ((client = nni_list_first(&ep->clients)) != NULL) { break; } - nni_cond_wait(&nni_inproc.cv); + nni_cond_wait(&ep->cv); } - nni_list_remove(&nni_inproc.eps, srch); - srch->mode = NNI_INPROC_EP_IDLE; + nni_list_remove(&ep->clients, client); + client->mode = NNI_INPROC_EP_IDLE; (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1]; pair->pipe[0].pair = pair->pipe[1].pair = pair; pair->pipe[0].addr = pair->pipe[1].addr = pair->addr; - pair->pipe[1].peer = srch->proto; + pair->pipe[1].peer = client->proto; pair->pipe[0].peer = ep->proto; pair->refcnt = 2; - srch->cpipe = &pair->pipe[0]; + client->cpipe = &pair->pipe[0]; *pipep = &pair->pipe[1]; - nni_cond_broadcast(&nni_inproc.cv); + nni_cond_broadcast(&client->cv); nni_mutex_exit(&nni_inproc.mx); |
