aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-01 14:34:29 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-01 14:34:29 -0800
commit3fd43c488b47874db22a87a1d87eed94bbd85725 (patch)
treeed9fe38b370c9a6162ac05596b91adfac9cb5579
parentc7b541af4a1a2c410dc63a638a17adb31d7342a3 (diff)
downloadnng-3fd43c488b47874db22a87a1d87eed94bbd85725.tar.gz
nng-3fd43c488b47874db22a87a1d87eed94bbd85725.tar.bz2
nng-3fd43c488b47874db22a87a1d87eed94bbd85725.zip
Pipe simplifications for thread management.
This may also address a race in closing down pipes. Now pipes are always registered with the socket. They also always have both a sender and receiver thread. If the protocol doesn't need one or the other, the stock thread just exits early.
-rw-r--r--src/core/endpt.c5
-rw-r--r--src/core/pipe.c113
-rw-r--r--src/core/pipe.h9
-rw-r--r--src/core/platform.h6
-rw-r--r--src/core/socket.c4
-rw-r--r--src/core/thread.c17
-rw-r--r--src/core/thread.h4
-rw-r--r--src/platform/posix/posix_impl.h4
-rw-r--r--src/platform/posix/posix_thread.c7
-rw-r--r--src/protocol/pair/pair.c28
10 files changed, 80 insertions, 117 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index c87c8ce6..a04cfa00 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -111,7 +111,7 @@ nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
}
rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata);
if (rv != 0) {
- nni_pipe_destroy(pipe);
+ nni_pipe_close(pipe);
return (rv);
}
ep->ep_pipe = pipe;
@@ -179,6 +179,7 @@ nni_dialer(void *arg)
break;
case NNG_ECLOSED:
return;
+
default:
// XXX: THIS NEEDS TO BE A PROPER BACKOFF.
cooldown = 1000000;
@@ -265,7 +266,7 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
return (rv);
}
if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) {
- nni_pipe_destroy(pipe);
+ nni_pipe_close(pipe);
return (rv);
}
*pp = pipe;
diff --git a/src/core/pipe.c b/src/core/pipe.c
index b12a7f1c..8fa41934 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -44,16 +44,15 @@ nni_pipe_close(nni_pipe *p)
{
nni_socket *sock = p->p_sock;
- p->p_ops.p_close(p->p_trandata);
+ if (p->p_trandata != NULL) {
+ p->p_ops.p_close(p->p_trandata);
+ }
nni_mutex_enter(&sock->s_mx);
if (!p->p_reap) {
// schedule deferred reap/close
p->p_reap = 1;
- if (p->p_active) {
- nni_list_remove(&sock->s_pipes, p);
- p->p_active = 0;
- }
+ nni_list_remove(&sock->s_pipes, p);
nni_list_append(&sock->s_reaps, p);
nni_cond_broadcast(&sock->s_cv);
}
@@ -71,16 +70,12 @@ nni_pipe_peer(nni_pipe *p)
void
nni_pipe_destroy(nni_pipe *p)
{
- if (p->p_send_thr != NULL) {
- nni_thread_reap(p->p_send_thr);
- }
- if (p->p_recv_thr != NULL) {
- nni_thread_reap(p->p_recv_thr);
- }
+ nni_thr_fini(&p->p_send_thr);
+ nni_thr_fini(&p->p_recv_thr);
+
if (p->p_trandata != NULL) {
p->p_ops.p_destroy(p->p_trandata);
}
- nni_cond_fini(&p->p_cv);
if (p->p_pdata != NULL) {
nni_free(p->p_pdata, p->p_psize);
}
@@ -93,29 +88,41 @@ nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
{
nni_pipe *p;
nni_socket *sock = ep->ep_sock;
+ nni_protocol *proto = &sock->s_ops;
int rv;
if ((p = nni_alloc(sizeof (*p))) == NULL) {
return (NNG_ENOMEM);
}
- p->p_send_thr = NULL;
- p->p_recv_thr = NULL;
+ p->p_sock = sock;
+ p->p_ops = *ep->ep_ops.ep_pipe_ops;
p->p_trandata = NULL;
p->p_active = 0;
- p->p_abort = 0;
- if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) {
+ p->p_psize = proto->proto_pipe_size;
+ NNI_LIST_NODE_INIT(&p->p_node);
+
+ if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) {
+ nni_free(p, sizeof (*p));
+ return (NNG_ENOMEM);
+ }
+ rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata);
+ if (rv != 0) {
+ nni_free(p->p_pdata, p->p_psize);
nni_free(p, sizeof (*p));
return (rv);
}
- p->p_psize = sock->s_ops.proto_pipe_size;
- if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) {
- nni_cond_fini(&p->p_cv);
+ rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata);
+ if (rv != 0) {
+ nni_thr_fini(&p->p_recv_thr);
+ nni_free(p->p_pdata, p->p_psize);
nni_free(p, sizeof (*p));
- return (NNG_ENOMEM);
+ return (rv);
}
- p->p_sock = sock;
- p->p_ops = *ep->ep_ops.ep_pipe_ops;
- NNI_LIST_NODE_INIT(&p->p_node);
+ p->p_psize = sock->s_ops.proto_pipe_size;
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_pipes, p);
+ nni_mutex_exit(&sock->s_mx);
+
*pp = p;
return (0);
}
@@ -132,46 +139,6 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
}
-static void
-nni_pipe_sender(void *arg)
-{
- nni_pipe *p = arg;
-
- nni_mutex_enter(&p->p_sock->s_mx);
- while ((!p->p_active) && (!p->p_abort)) {
- nni_cond_wait(&p->p_cv);
- }
- if (p->p_abort) {
- nni_mutex_exit(&p->p_sock->s_mx);
- return;
- }
- nni_mutex_exit(&p->p_sock->s_mx);
- if (p->p_sock->s_ops.proto_pipe_send != NULL) {
- p->p_sock->s_ops.proto_pipe_send(p->p_pdata);
- }
-}
-
-
-static void
-nni_pipe_receiver(void *arg)
-{
- nni_pipe *p = arg;
-
- nni_mutex_enter(&p->p_sock->s_mx);
- while ((!p->p_active) && (!p->p_abort)) {
- nni_cond_wait(&p->p_cv);
- }
- if (p->p_abort) {
- nni_mutex_exit(&p->p_sock->s_mx);
- return;
- }
- nni_mutex_exit(&p->p_sock->s_mx);
- if (p->p_sock->s_ops.proto_pipe_recv != NULL) {
- p->p_sock->s_ops.proto_pipe_recv(p->p_pdata);
- }
-}
-
-
int
nni_pipe_start(nni_pipe *pipe)
{
@@ -194,41 +161,29 @@ nni_pipe_start(nni_pipe *pipe)
pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF;
collide = 0;
NNI_LIST_FOREACH (&sock->s_pipes, check) {
- if (check->p_id == pipe->p_id) {
+ if ((pipe != check) && (check->p_id == pipe->p_id)) {
collide = 1;
break;
}
}
} while (collide);
- rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_thread_create(&pipe->p_recv_thr, nni_pipe_receiver, pipe);
- if (rv != 0) {
- goto fail;
- }
-
rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata);
if (rv != 0) {
goto fail;
}
- nni_list_append(&sock->s_pipes, pipe);
+ nni_thr_run(&pipe->p_send_thr);
+ nni_thr_run(&pipe->p_recv_thr);
pipe->p_active = 1;
// XXX: Publish event
- nni_cond_broadcast(&pipe->p_cv);
nni_mutex_exit(&sock->s_mx);
return (0);
fail:
- pipe->p_abort = 1;
pipe->p_reap = 1;
- nni_list_append(&sock->s_reaps, pipe);
- nni_cond_broadcast(&sock->s_cv);
- nni_cond_broadcast(&pipe->p_cv);
nni_mutex_exit(&sock->s_mx);
+ nni_pipe_close(pipe);
return (rv);
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index d12320b2..3c2c1166 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -27,13 +27,8 @@ struct nng_pipe {
nni_endpt * p_ep;
int p_reap;
int p_active;
- int p_abort;
- nni_mutex p_mx;
- nni_cond p_cv;
- void (*p_send)(void *);
- void (*p_recv)(void *);
- nni_thread * p_send_thr;
- nni_thread * p_recv_thr;
+ nni_thr p_send_thr;
+ nni_thr p_recv_thr;
};
// Pipe operations that protocols use.
diff --git a/src/core/platform.h b/src/core/platform.h
index bdec8bb6..3dd9fa29 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -63,9 +63,9 @@ extern void nni_free(void *, size_t);
typedef struct nni_mutex nni_mutex;
typedef struct nni_cond nni_cond;
-typedef struct nni_plat_mtx nni_plat_mtx;
-typedef struct nni_plat_cv nni_plat_cv;
-typedef struct nni_plat_thr nni_plat_thr;
+typedef struct nni_plat_mtx nni_plat_mtx;
+typedef struct nni_plat_cv nni_plat_cv;
+typedef struct nni_plat_thr nni_plat_thr;
// Mutex handling.
diff --git a/src/core/socket.c b/src/core/socket.c
index 1700ca9f..2e95c229 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,7 +53,9 @@ nni_reaper(void *arg)
nni_mutex_exit(&sock->s_mx);
// This should already have been done.
- pipe->p_ops.p_close(pipe->p_trandata);
+ if (pipe->p_trandata != NULL) {
+ pipe->p_ops.p_close(pipe->p_trandata);
+ }
// Remove the pipe from the protocol. Protocols may
// keep lists of pipes for managing their topologies.
diff --git a/src/core/thread.c b/src/core/thread.c
index 4f714341..d1f46e3f 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -15,48 +15,56 @@ nni_mtx_init(nni_mtx *mtx)
return (nni_plat_mtx_init(&mtx->mtx));
}
+
void
nni_mtx_fini(nni_mtx *mtx)
{
nni_plat_mtx_fini(&mtx->mtx);
}
+
void
nni_mtx_lock(nni_mtx *mtx)
{
nni_plat_mtx_lock(&mtx->mtx);
}
+
void
nni_mtx_unlock(nni_mtx *mtx)
{
nni_plat_mtx_unlock(&mtx->mtx);
}
+
int
nni_mtx_trylock(nni_mtx *mtx)
{
return (nni_plat_mtx_trylock(&mtx->mtx));
}
+
int
nni_cv_init(nni_cv *cv, nni_mtx *mtx)
{
return (nni_plat_cv_init(&cv->cv, &mtx->mtx));
}
+
void
nni_cv_fini(nni_cv *cv)
{
nni_plat_cv_fini(&cv->cv);
}
+
void
nni_cv_wait(nni_cv *cv)
{
nni_plat_cv_wait(&cv->cv);
}
+
int
nni_cv_until(nni_cv *cv, nni_time until)
{
@@ -73,12 +81,14 @@ nni_cv_until(nni_cv *cv, nni_time until)
return (nni_plat_cv_until(&cv->cv, until));
}
+
void
nni_cv_wake(nni_cv *cv)
{
return (nni_plat_cv_wake(&cv->cv));
}
+
static void
nni_thr_wrap(void *arg)
{
@@ -90,7 +100,7 @@ nni_thr_wrap(void *arg)
nni_plat_cv_wait(&thr->cv);
}
nni_plat_mtx_unlock(&thr->mtx);
- if (!stop) {
+ if ((!stop) && (thr->fn != NULL)) {
thr->fn(thr->arg);
}
nni_plat_mtx_lock(&thr->mtx);
@@ -99,6 +109,7 @@ nni_thr_wrap(void *arg)
nni_plat_mtx_unlock(&thr->mtx);
}
+
int
nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg)
{
@@ -125,6 +136,7 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg)
return (0);
}
+
void
nni_thr_run(nni_thr *thr)
{
@@ -134,6 +146,7 @@ nni_thr_run(nni_thr *thr)
nni_plat_mtx_unlock(&thr->mtx);
}
+
void
nni_thr_fini(nni_thr *thr)
{
@@ -147,4 +160,4 @@ nni_thr_fini(nni_thr *thr)
nni_plat_thr_fini(&thr->thr);
nni_plat_cv_fini(&thr->cv);
nni_plat_mtx_fini(&thr->mtx);
-} \ No newline at end of file
+}
diff --git a/src/core/thread.h b/src/core/thread.h
index 062941a1..8ef463e5 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -27,7 +27,7 @@ typedef struct {
nni_plat_mtx mtx;
nni_plat_cv cv;
nni_thr_func fn;
- void *arg;
+ void * arg;
int start;
int stop;
int done;
@@ -49,4 +49,4 @@ extern int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg);
extern void nni_thr_fini(nni_thr *thr);
extern void nni_thr_run(nni_thr *thr);
-#endif CORE_THREAD_H \ No newline at end of file
+#endif CORE_THREAD_H
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 5ca7812f..5c5c3798 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -54,8 +54,8 @@ struct nni_plat_thr {
};
struct nni_plat_cv {
- pthread_cond_t cv;
- pthread_mutex_t *mtx;
+ pthread_cond_t cv;
+ pthread_mutex_t * mtx;
};
#endif
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 0c928d84..d1944879 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -175,6 +175,7 @@ nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx)
return (0);
}
+
void
nni_plat_cv_wake(nni_plat_cv *cv)
{
@@ -185,6 +186,7 @@ nni_plat_cv_wake(nni_plat_cv *cv)
}
}
+
void
nni_plat_cv_wait(nni_plat_cv *cv)
{
@@ -195,6 +197,7 @@ nni_plat_cv_wait(nni_plat_cv *cv)
}
}
+
int
nni_plat_cv_until(nni_plat_cv *cv, nni_time until)
{
@@ -217,9 +220,9 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until)
nni_panic("pthread_cond_timedwait: %d", rv);
}
return (0);
-
}
+
void
nni_plat_cv_fini(nni_plat_cv *cv)
{
@@ -230,6 +233,7 @@ nni_plat_cv_fini(nni_plat_cv *cv)
}
}
+
int
nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg)
{
@@ -262,7 +266,6 @@ nni_atfork_child(void)
}
-
int
nni_plat_init(int (*helper)(void))
{
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 3f57c12b..9336af8c 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -23,7 +23,7 @@ typedef struct nni_pair_sock nni_pair_sock;
struct nni_pair_sock {
nni_socket * sock;
nni_pair_pipe * pipe;
- nni_mutex mx;
+ nni_mtx mx;
nni_msgqueue * uwq;
nni_msgqueue * urq;
};
@@ -50,7 +50,7 @@ nni_pair_create(void **pairp, nni_socket *sock)
if ((pair = nni_alloc(sizeof (*pair))) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mutex_init(&pair->mx)) != 0) {
+ if ((rv = nni_mtx_init(&pair->mx)) != 0) {
nni_free(pair, sizeof (*pair));
return (rv);
}
@@ -72,7 +72,7 @@ nni_pair_destroy(void *arg)
// this wold be the time to shut them all down. We don't, because
// the socket already shut us down, and we don't have any other
// threads that run.
- nni_mutex_fini(&pair->mx);
+ nni_mtx_fini(&pair->mx);
nni_free(pair, sizeof (*pair));
}
@@ -88,18 +88,14 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data)
pp->sigclose = 0;
pp->pair = pair;
- nni_mutex_enter(&pair->mx);
+ nni_mtx_lock(&pair->mx);
if (pair->pipe != NULL) {
- rv = NNG_EBUSY; // Already have a peer, denied.
- goto fail;
+ nni_mtx_unlock(&pair->mx);
+ return (NNG_EBUSY); // Already have a peer, denied.
}
pair->pipe = pp;
- nni_mutex_exit(&pair->mx);
+ nni_mtx_unlock(&pair->mx);
return (0);
-
-fail:
- nni_mutex_exit(&pair->mx);
- return (rv);
}
@@ -109,13 +105,11 @@ nni_pair_rem_pipe(void *arg, void *data)
nni_pair_sock *pair = arg;
nni_pair_pipe *pp = data;
- nni_mutex_enter(&pair->mx);
- if (pair->pipe != pp) {
- nni_mutex_exit(&pair->mx);
- return;
+ nni_mtx_lock(&pair->mx);
+ if (pair->pipe == pp) {
+ pair->pipe = NULL;
}
- pair->pipe = NULL;
- nni_mutex_exit(&pair->mx);
+ nni_mtx_unlock(&pair->mx);
}