diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 13:22:18 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 13:22:18 -0800 |
| commit | ee969ad99dc1e07e1c38876223e7aed13463b121 (patch) | |
| tree | 40bde325d041532661e7dcc3441185aca7701e53 /src/transport | |
| parent | 6c1325a2b17548a4249d26a846bc32b95b7d747d (diff) | |
| download | nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.gz nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.bz2 nng-ee969ad99dc1e07e1c38876223e7aed13463b121.zip | |
Synchronization enhancements - inproc & msgqueue. Absolute waits...
Diffstat (limited to 'src/transport')
| -rw-r--r-- | src/transport/inproc/inproc.c | 103 |
1 files changed, 53 insertions, 50 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 029825b1..cb79d65a 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -22,8 +22,8 @@ typedef struct nni_inproc_pipe nni_inproc_pipe; typedef struct nni_inproc_ep nni_inproc_ep; typedef struct { - nni_mutex_t mx; - nni_cond_t cv; + nni_mutex mx; + nni_cond cv; nni_list_t eps; } nni_inproc_global; @@ -31,17 +31,17 @@ typedef struct { struct nni_inproc_pipe { const char * addr; nni_inproc_pair * pair; - nni_msgqueue_t rq; - nni_msgqueue_t wq; + nni_msgqueue * rq; + nni_msgqueue * wq; uint16_t peer; }; // nni_inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct nni_inproc_pair { - nni_mutex_t mx; + nni_mutex mx; int refcnt; - nni_msgqueue_t q[2]; + nni_msgqueue * q[2]; nni_inproc_pipe pipe[2]; char addr[NNG_MAXADDRLEN+1]; }; @@ -68,11 +68,11 @@ nni_inproc_init(void) { int rv; - if ((rv = nni_mutex_create(&nni_inproc.mx)) != 0) { + if ((rv = nni_mutex_init(&nni_inproc.mx)) != 0) { return (rv); } - if ((rv = nni_cond_create(&nni_inproc.cv, nni_inproc.mx)) != 0) { - nni_mutex_destroy(nni_inproc.mx); + if ((rv = nni_cond_init(&nni_inproc.cv, &nni_inproc.mx)) != 0) { + nni_mutex_fini(&nni_inproc.mx); return (rv); } NNI_LIST_INIT(&nni_inproc.eps, nni_inproc_ep, node); @@ -84,8 +84,8 @@ nni_inproc_init(void) static void nni_inproc_fini(void) { - nni_cond_destroy(nni_inproc.cv); - nni_mutex_destroy(nni_inproc.mx); + nni_cond_fini(&nni_inproc.cv); + nni_mutex_fini(&nni_inproc.mx); } @@ -104,18 +104,13 @@ nni_inproc_pipe_close(void *arg) static void nni_inproc_pair_destroy(nni_inproc_pair *pair) { - if (pair == NULL) { - return; - } if (pair->q[0]) { nni_msgqueue_destroy(pair->q[0]); } if (pair->q[1]) { nni_msgqueue_destroy(pair->q[1]); } - if (pair->mx) { - nni_mutex_destroy(pair->mx); - } + nni_mutex_fini(&pair->mx); nni_free(pair, sizeof (*pair)); } @@ -129,19 +124,19 @@ nni_inproc_pipe_destroy(void *arg) // We could assert the pipe closed... // If we are the last peer, then toss the pair structure. - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); pair->refcnt--; if (pair->refcnt == 0) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); nni_inproc_pair_destroy(pair); } else { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); } } static int -nni_inproc_pipe_send(void *arg, nng_msg_t msg) +nni_inproc_pipe_send(void *arg, nni_msg *msg) { nni_inproc_pipe *pipe = arg; @@ -152,7 +147,7 @@ nni_inproc_pipe_send(void *arg, nng_msg_t msg) static int -nni_inproc_pipe_recv(void *arg, nng_msg_t *msgp) +nni_inproc_pipe_recv(void *arg, nni_msg **msgp) { nni_inproc_pipe *pipe = arg; @@ -230,13 +225,13 @@ nni_inproc_ep_close(void *arg) { nni_inproc_ep *ep = arg; - nni_mutex_enter(nni_inproc.mx); + nni_mutex_enter(&nni_inproc.mx); if (!ep->closed) { ep->closed = 1; nni_list_remove(&nni_inproc.eps, ep); - nni_cond_broadcast(nni_inproc.cv); + nni_cond_broadcast(&nni_inproc.cv); } - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); } @@ -250,7 +245,7 @@ nni_inproc_ep_dial(void *arg, void **pipep) if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } - nni_mutex_enter(nni_inproc.mx); + nni_mutex_enter(&nni_inproc.mx); NNI_LIST_FOREACH (list, srch) { if (srch->mode != NNI_INPROC_EP_LISTEN) { continue; @@ -261,27 +256,27 @@ nni_inproc_ep_dial(void *arg, void **pipep) } if (srch == NULL) { // No listeners available. - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_ECONNREFUSED); } ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(list, ep); - nni_cond_broadcast(nni_inproc.cv); + nni_cond_broadcast(&nni_inproc.cv); for (;;) { if (ep->closed) { // Closer will have removed us from list. - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_ECLOSED); } if (ep->cpipe != NULL) { break; } - nni_cond_wait(nni_inproc.cv); + nni_cond_wait(&nni_inproc.cv); } // NB: The acceptor or closer removes us from the list. ep->mode = NNI_INPROC_EP_IDLE; *pipep = ep->cpipe; - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (ep->closed ? NNG_ECLOSED : 0); } @@ -296,9 +291,9 @@ nni_inproc_ep_listen(void *arg) if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } - nni_mutex_enter(nni_inproc.mx); + nni_mutex_enter(&nni_inproc.mx); if (ep->closed) { - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { @@ -306,13 +301,13 @@ nni_inproc_ep_listen(void *arg) continue; } if (strcmp(srch->addr, ep->addr) == 0) { - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_EADDRINUSE); } } ep->mode = NNI_INPROC_EP_LISTEN; nni_list_append(list, ep); - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (0); } @@ -326,14 +321,31 @@ nni_inproc_ep_accept(void *arg, void **pipep) nni_list_t *list = &nni_inproc.eps; int rv; - nni_mutex_enter(nni_inproc.mx); if (ep->mode != NNI_INPROC_EP_LISTEN) { - nni_mutex_exit(nni_inproc.mx); return (NNG_EINVAL); } + + // Preallocate the pair, so we don't do it while holding a lock + if ((pair = nni_alloc(sizeof (*pair))) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mutex_init(&pair->mx)) != 0) { + nni_free(pair, sizeof (*pair)); + return (rv); + } + if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || + ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { + nni_inproc_pair_destroy(pair); + return (rv); + } + + nni_mutex_enter(&nni_inproc.mx); for (;;) { if (ep->closed) { - nni_mutex_exit(nni_inproc.mx); + // This is the only possible error path from the + // time we acquired the lock. + nni_mutex_exit(&nni_inproc.mx); + nni_inproc_pair_destroy(pair); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { @@ -347,16 +359,7 @@ nni_inproc_ep_accept(void *arg, void **pipep) if (srch != NULL) { break; } - nni_cond_wait(nni_inproc.cv); - } - if ((pair = nni_alloc(sizeof (*pair))) == NULL) { - nni_mutex_exit(nni_inproc.mx); - return (NNG_ENOMEM); - } - if (((rv = nni_mutex_create(&pair->mx)) != 0) || - ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || - ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { - nni_inproc_pair_destroy(pair); + nni_cond_wait(&nni_inproc.cv); } (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; @@ -368,9 +371,9 @@ nni_inproc_ep_accept(void *arg, void **pipep) pair->refcnt = 2; srch->cpipe = &pair->pipe[0]; *pipep = &pair->pipe[1]; - nni_cond_broadcast(nni_inproc.cv); + nni_cond_broadcast(&nni_inproc.cv); - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (0); } |
