aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 13:22:18 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 13:22:18 -0800
commitee969ad99dc1e07e1c38876223e7aed13463b121 (patch)
tree40bde325d041532661e7dcc3441185aca7701e53 /src/transport
parent6c1325a2b17548a4249d26a846bc32b95b7d747d (diff)
downloadnng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.gz
nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.bz2
nng-ee969ad99dc1e07e1c38876223e7aed13463b121.zip
Synchronization enhancements - inproc & msgqueue. Absolute waits...
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/inproc/inproc.c103
1 files changed, 53 insertions, 50 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 029825b1..cb79d65a 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -22,8 +22,8 @@ typedef struct nni_inproc_pipe nni_inproc_pipe;
typedef struct nni_inproc_ep nni_inproc_ep;
typedef struct {
- nni_mutex_t mx;
- nni_cond_t cv;
+ nni_mutex mx;
+ nni_cond cv;
nni_list_t eps;
} nni_inproc_global;
@@ -31,17 +31,17 @@ typedef struct {
struct nni_inproc_pipe {
const char * addr;
nni_inproc_pair * pair;
- nni_msgqueue_t rq;
- nni_msgqueue_t wq;
+ nni_msgqueue * rq;
+ nni_msgqueue * wq;
uint16_t peer;
};
// nni_inproc_pair represents a pair of pipes. Because we control both
// sides of the pipes, we can allocate and free this in one structure.
struct nni_inproc_pair {
- nni_mutex_t mx;
+ nni_mutex mx;
int refcnt;
- nni_msgqueue_t q[2];
+ nni_msgqueue * q[2];
nni_inproc_pipe pipe[2];
char addr[NNG_MAXADDRLEN+1];
};
@@ -68,11 +68,11 @@ nni_inproc_init(void)
{
int rv;
- if ((rv = nni_mutex_create(&nni_inproc.mx)) != 0) {
+ if ((rv = nni_mutex_init(&nni_inproc.mx)) != 0) {
return (rv);
}
- if ((rv = nni_cond_create(&nni_inproc.cv, nni_inproc.mx)) != 0) {
- nni_mutex_destroy(nni_inproc.mx);
+ if ((rv = nni_cond_init(&nni_inproc.cv, &nni_inproc.mx)) != 0) {
+ nni_mutex_fini(&nni_inproc.mx);
return (rv);
}
NNI_LIST_INIT(&nni_inproc.eps, nni_inproc_ep, node);
@@ -84,8 +84,8 @@ nni_inproc_init(void)
static void
nni_inproc_fini(void)
{
- nni_cond_destroy(nni_inproc.cv);
- nni_mutex_destroy(nni_inproc.mx);
+ nni_cond_fini(&nni_inproc.cv);
+ nni_mutex_fini(&nni_inproc.mx);
}
@@ -104,18 +104,13 @@ nni_inproc_pipe_close(void *arg)
static void
nni_inproc_pair_destroy(nni_inproc_pair *pair)
{
- if (pair == NULL) {
- return;
- }
if (pair->q[0]) {
nni_msgqueue_destroy(pair->q[0]);
}
if (pair->q[1]) {
nni_msgqueue_destroy(pair->q[1]);
}
- if (pair->mx) {
- nni_mutex_destroy(pair->mx);
- }
+ nni_mutex_fini(&pair->mx);
nni_free(pair, sizeof (*pair));
}
@@ -129,19 +124,19 @@ nni_inproc_pipe_destroy(void *arg)
// We could assert the pipe closed...
// If we are the last peer, then toss the pair structure.
- nni_mutex_enter(pair->mx);
+ nni_mutex_enter(&pair->mx);
pair->refcnt--;
if (pair->refcnt == 0) {
- nni_mutex_exit(pair->mx);
+ nni_mutex_exit(&pair->mx);
nni_inproc_pair_destroy(pair);
} else {
- nni_mutex_exit(pair->mx);
+ nni_mutex_exit(&pair->mx);
}
}
static int
-nni_inproc_pipe_send(void *arg, nng_msg_t msg)
+nni_inproc_pipe_send(void *arg, nni_msg *msg)
{
nni_inproc_pipe *pipe = arg;
@@ -152,7 +147,7 @@ nni_inproc_pipe_send(void *arg, nng_msg_t msg)
static int
-nni_inproc_pipe_recv(void *arg, nng_msg_t *msgp)
+nni_inproc_pipe_recv(void *arg, nni_msg **msgp)
{
nni_inproc_pipe *pipe = arg;
@@ -230,13 +225,13 @@ nni_inproc_ep_close(void *arg)
{
nni_inproc_ep *ep = arg;
- nni_mutex_enter(nni_inproc.mx);
+ nni_mutex_enter(&nni_inproc.mx);
if (!ep->closed) {
ep->closed = 1;
nni_list_remove(&nni_inproc.eps, ep);
- nni_cond_broadcast(nni_inproc.cv);
+ nni_cond_broadcast(&nni_inproc.cv);
}
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
}
@@ -250,7 +245,7 @@ nni_inproc_ep_dial(void *arg, void **pipep)
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
- nni_mutex_enter(nni_inproc.mx);
+ nni_mutex_enter(&nni_inproc.mx);
NNI_LIST_FOREACH (list, srch) {
if (srch->mode != NNI_INPROC_EP_LISTEN) {
continue;
@@ -261,27 +256,27 @@ nni_inproc_ep_dial(void *arg, void **pipep)
}
if (srch == NULL) {
// No listeners available.
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECONNREFUSED);
}
ep->mode = NNI_INPROC_EP_DIAL;
nni_list_append(list, ep);
- nni_cond_broadcast(nni_inproc.cv);
+ nni_cond_broadcast(&nni_inproc.cv);
for (;;) {
if (ep->closed) {
// Closer will have removed us from list.
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECLOSED);
}
if (ep->cpipe != NULL) {
break;
}
- nni_cond_wait(nni_inproc.cv);
+ nni_cond_wait(&nni_inproc.cv);
}
// NB: The acceptor or closer removes us from the list.
ep->mode = NNI_INPROC_EP_IDLE;
*pipep = ep->cpipe;
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (ep->closed ? NNG_ECLOSED : 0);
}
@@ -296,9 +291,9 @@ nni_inproc_ep_listen(void *arg)
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
- nni_mutex_enter(nni_inproc.mx);
+ nni_mutex_enter(&nni_inproc.mx);
if (ep->closed) {
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECLOSED);
}
NNI_LIST_FOREACH (list, srch) {
@@ -306,13 +301,13 @@ nni_inproc_ep_listen(void *arg)
continue;
}
if (strcmp(srch->addr, ep->addr) == 0) {
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_EADDRINUSE);
}
}
ep->mode = NNI_INPROC_EP_LISTEN;
nni_list_append(list, ep);
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (0);
}
@@ -326,14 +321,31 @@ nni_inproc_ep_accept(void *arg, void **pipep)
nni_list_t *list = &nni_inproc.eps;
int rv;
- nni_mutex_enter(nni_inproc.mx);
if (ep->mode != NNI_INPROC_EP_LISTEN) {
- nni_mutex_exit(nni_inproc.mx);
return (NNG_EINVAL);
}
+
+ // Preallocate the pair, so we don't do it while holding a lock
+ if ((pair = nni_alloc(sizeof (*pair))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mutex_init(&pair->mx)) != 0) {
+ nni_free(pair, sizeof (*pair));
+ return (rv);
+ }
+ if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) ||
+ ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) {
+ nni_inproc_pair_destroy(pair);
+ return (rv);
+ }
+
+ nni_mutex_enter(&nni_inproc.mx);
for (;;) {
if (ep->closed) {
- nni_mutex_exit(nni_inproc.mx);
+ // This is the only possible error path from the
+ // time we acquired the lock.
+ nni_mutex_exit(&nni_inproc.mx);
+ nni_inproc_pair_destroy(pair);
return (NNG_ECLOSED);
}
NNI_LIST_FOREACH (list, srch) {
@@ -347,16 +359,7 @@ nni_inproc_ep_accept(void *arg, void **pipep)
if (srch != NULL) {
break;
}
- nni_cond_wait(nni_inproc.cv);
- }
- if ((pair = nni_alloc(sizeof (*pair))) == NULL) {
- nni_mutex_exit(nni_inproc.mx);
- return (NNG_ENOMEM);
- }
- if (((rv = nni_mutex_create(&pair->mx)) != 0) ||
- ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) ||
- ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) {
- nni_inproc_pair_destroy(pair);
+ nni_cond_wait(&nni_inproc.cv);
}
(void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr);
pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0];
@@ -368,9 +371,9 @@ nni_inproc_ep_accept(void *arg, void **pipep)
pair->refcnt = 2;
srch->cpipe = &pair->pipe[0];
*pipep = &pair->pipe[1];
- nni_cond_broadcast(nni_inproc.cv);
+ nni_cond_broadcast(&nni_inproc.cv);
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (0);
}