diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-01 14:34:29 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-01 14:34:29 -0800 |
| commit | 3fd43c488b47874db22a87a1d87eed94bbd85725 (patch) | |
| tree | ed9fe38b370c9a6162ac05596b91adfac9cb5579 /src/core | |
| parent | c7b541af4a1a2c410dc63a638a17adb31d7342a3 (diff) | |
| download | nng-3fd43c488b47874db22a87a1d87eed94bbd85725.tar.gz nng-3fd43c488b47874db22a87a1d87eed94bbd85725.tar.bz2 nng-3fd43c488b47874db22a87a1d87eed94bbd85725.zip | |
Pipe simplifications for thread management.
This may also address a race in closing down pipes. Now pipes are always
registered with the socket. They also always have both a sender and receiver
thread. If the protocol doesn't need one or the other, the stock thread just
exits early.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 5 | ||||
| -rw-r--r-- | src/core/pipe.c | 113 | ||||
| -rw-r--r-- | src/core/pipe.h | 9 | ||||
| -rw-r--r-- | src/core/platform.h | 6 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/thread.c | 17 | ||||
| -rw-r--r-- | src/core/thread.h | 4 |
7 files changed, 62 insertions, 96 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index c87c8ce6..a04cfa00 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -111,7 +111,7 @@ nni_endpt_connect(nni_endpt *ep, nni_pipe **pp) } rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_close(pipe); return (rv); } ep->ep_pipe = pipe; @@ -179,6 +179,7 @@ nni_dialer(void *arg) break; case NNG_ECLOSED: return; + default: // XXX: THIS NEEDS TO BE A PROPER BACKOFF. cooldown = 1000000; @@ -265,7 +266,7 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) return (rv); } if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) { - nni_pipe_destroy(pipe); + nni_pipe_close(pipe); return (rv); } *pp = pipe; diff --git a/src/core/pipe.c b/src/core/pipe.c index b12a7f1c..8fa41934 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -44,16 +44,15 @@ nni_pipe_close(nni_pipe *p) { nni_socket *sock = p->p_sock; - p->p_ops.p_close(p->p_trandata); + if (p->p_trandata != NULL) { + p->p_ops.p_close(p->p_trandata); + } nni_mutex_enter(&sock->s_mx); if (!p->p_reap) { // schedule deferred reap/close p->p_reap = 1; - if (p->p_active) { - nni_list_remove(&sock->s_pipes, p); - p->p_active = 0; - } + nni_list_remove(&sock->s_pipes, p); nni_list_append(&sock->s_reaps, p); nni_cond_broadcast(&sock->s_cv); } @@ -71,16 +70,12 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { - if (p->p_send_thr != NULL) { - nni_thread_reap(p->p_send_thr); - } - if (p->p_recv_thr != NULL) { - nni_thread_reap(p->p_recv_thr); - } + nni_thr_fini(&p->p_send_thr); + nni_thr_fini(&p->p_recv_thr); + if (p->p_trandata != NULL) { p->p_ops.p_destroy(p->p_trandata); } - nni_cond_fini(&p->p_cv); if (p->p_pdata != NULL) { nni_free(p->p_pdata, p->p_psize); } @@ -93,29 +88,41 @@ nni_pipe_create(nni_pipe **pp, nni_endpt *ep) { nni_pipe *p; nni_socket *sock = ep->ep_sock; + nni_protocol *proto = &sock->s_ops; int rv; if ((p = nni_alloc(sizeof (*p))) == NULL) { return (NNG_ENOMEM); } - p->p_send_thr = NULL; - p->p_recv_thr = NULL; + p->p_sock = sock; + p->p_ops = *ep->ep_ops.ep_pipe_ops; p->p_trandata = NULL; p->p_active = 0; - p->p_abort = 0; - if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) { + p->p_psize = proto->proto_pipe_size; + NNI_LIST_NODE_INIT(&p->p_node); + + if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + nni_free(p, sizeof (*p)); + return (NNG_ENOMEM); + } + rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata); + if (rv != 0) { + nni_free(p->p_pdata, p->p_psize); nni_free(p, sizeof (*p)); return (rv); } - p->p_psize = sock->s_ops.proto_pipe_size; - if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { - nni_cond_fini(&p->p_cv); + rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata); + if (rv != 0) { + nni_thr_fini(&p->p_recv_thr); + nni_free(p->p_pdata, p->p_psize); nni_free(p, sizeof (*p)); - return (NNG_ENOMEM); + return (rv); } - p->p_sock = sock; - p->p_ops = *ep->ep_ops.ep_pipe_ops; - NNI_LIST_NODE_INIT(&p->p_node); + p->p_psize = sock->s_ops.proto_pipe_size; + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_pipes, p); + nni_mutex_exit(&sock->s_mx); + *pp = p; return (0); } @@ -132,46 +139,6 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) } -static void -nni_pipe_sender(void *arg) -{ - nni_pipe *p = arg; - - nni_mutex_enter(&p->p_sock->s_mx); - while ((!p->p_active) && (!p->p_abort)) { - nni_cond_wait(&p->p_cv); - } - if (p->p_abort) { - nni_mutex_exit(&p->p_sock->s_mx); - return; - } - nni_mutex_exit(&p->p_sock->s_mx); - if (p->p_sock->s_ops.proto_pipe_send != NULL) { - p->p_sock->s_ops.proto_pipe_send(p->p_pdata); - } -} - - -static void -nni_pipe_receiver(void *arg) -{ - nni_pipe *p = arg; - - nni_mutex_enter(&p->p_sock->s_mx); - while ((!p->p_active) && (!p->p_abort)) { - nni_cond_wait(&p->p_cv); - } - if (p->p_abort) { - nni_mutex_exit(&p->p_sock->s_mx); - return; - } - nni_mutex_exit(&p->p_sock->s_mx); - if (p->p_sock->s_ops.proto_pipe_recv != NULL) { - p->p_sock->s_ops.proto_pipe_recv(p->p_pdata); - } -} - - int nni_pipe_start(nni_pipe *pipe) { @@ -194,41 +161,29 @@ nni_pipe_start(nni_pipe *pipe) pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF; collide = 0; NNI_LIST_FOREACH (&sock->s_pipes, check) { - if (check->p_id == pipe->p_id) { + if ((pipe != check) && (check->p_id == pipe->p_id)) { collide = 1; break; } } } while (collide); - rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe); - if (rv != 0) { - goto fail; - } - rv = nni_thread_create(&pipe->p_recv_thr, nni_pipe_receiver, pipe); - if (rv != 0) { - goto fail; - } - rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); if (rv != 0) { goto fail; } - nni_list_append(&sock->s_pipes, pipe); + nni_thr_run(&pipe->p_send_thr); + nni_thr_run(&pipe->p_recv_thr); pipe->p_active = 1; // XXX: Publish event - nni_cond_broadcast(&pipe->p_cv); nni_mutex_exit(&sock->s_mx); return (0); fail: - pipe->p_abort = 1; pipe->p_reap = 1; - nni_list_append(&sock->s_reaps, pipe); - nni_cond_broadcast(&sock->s_cv); - nni_cond_broadcast(&pipe->p_cv); nni_mutex_exit(&sock->s_mx); + nni_pipe_close(pipe); return (rv); } diff --git a/src/core/pipe.h b/src/core/pipe.h index d12320b2..3c2c1166 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -27,13 +27,8 @@ struct nng_pipe { nni_endpt * p_ep; int p_reap; int p_active; - int p_abort; - nni_mutex p_mx; - nni_cond p_cv; - void (*p_send)(void *); - void (*p_recv)(void *); - nni_thread * p_send_thr; - nni_thread * p_recv_thr; + nni_thr p_send_thr; + nni_thr p_recv_thr; }; // Pipe operations that protocols use. diff --git a/src/core/platform.h b/src/core/platform.h index bdec8bb6..3dd9fa29 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -63,9 +63,9 @@ extern void nni_free(void *, size_t); typedef struct nni_mutex nni_mutex; typedef struct nni_cond nni_cond; -typedef struct nni_plat_mtx nni_plat_mtx; -typedef struct nni_plat_cv nni_plat_cv; -typedef struct nni_plat_thr nni_plat_thr; +typedef struct nni_plat_mtx nni_plat_mtx; +typedef struct nni_plat_cv nni_plat_cv; +typedef struct nni_plat_thr nni_plat_thr; // Mutex handling. diff --git a/src/core/socket.c b/src/core/socket.c index 1700ca9f..2e95c229 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,7 +53,9 @@ nni_reaper(void *arg) nni_mutex_exit(&sock->s_mx); // This should already have been done. - pipe->p_ops.p_close(pipe->p_trandata); + if (pipe->p_trandata != NULL) { + pipe->p_ops.p_close(pipe->p_trandata); + } // Remove the pipe from the protocol. Protocols may // keep lists of pipes for managing their topologies. diff --git a/src/core/thread.c b/src/core/thread.c index 4f714341..d1f46e3f 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -15,48 +15,56 @@ nni_mtx_init(nni_mtx *mtx) return (nni_plat_mtx_init(&mtx->mtx)); } + void nni_mtx_fini(nni_mtx *mtx) { nni_plat_mtx_fini(&mtx->mtx); } + void nni_mtx_lock(nni_mtx *mtx) { nni_plat_mtx_lock(&mtx->mtx); } + void nni_mtx_unlock(nni_mtx *mtx) { nni_plat_mtx_unlock(&mtx->mtx); } + int nni_mtx_trylock(nni_mtx *mtx) { return (nni_plat_mtx_trylock(&mtx->mtx)); } + int nni_cv_init(nni_cv *cv, nni_mtx *mtx) { return (nni_plat_cv_init(&cv->cv, &mtx->mtx)); } + void nni_cv_fini(nni_cv *cv) { nni_plat_cv_fini(&cv->cv); } + void nni_cv_wait(nni_cv *cv) { nni_plat_cv_wait(&cv->cv); } + int nni_cv_until(nni_cv *cv, nni_time until) { @@ -73,12 +81,14 @@ nni_cv_until(nni_cv *cv, nni_time until) return (nni_plat_cv_until(&cv->cv, until)); } + void nni_cv_wake(nni_cv *cv) { return (nni_plat_cv_wake(&cv->cv)); } + static void nni_thr_wrap(void *arg) { @@ -90,7 +100,7 @@ nni_thr_wrap(void *arg) nni_plat_cv_wait(&thr->cv); } nni_plat_mtx_unlock(&thr->mtx); - if (!stop) { + if ((!stop) && (thr->fn != NULL)) { thr->fn(thr->arg); } nni_plat_mtx_lock(&thr->mtx); @@ -99,6 +109,7 @@ nni_thr_wrap(void *arg) nni_plat_mtx_unlock(&thr->mtx); } + int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) { @@ -125,6 +136,7 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) return (0); } + void nni_thr_run(nni_thr *thr) { @@ -134,6 +146,7 @@ nni_thr_run(nni_thr *thr) nni_plat_mtx_unlock(&thr->mtx); } + void nni_thr_fini(nni_thr *thr) { @@ -147,4 +160,4 @@ nni_thr_fini(nni_thr *thr) nni_plat_thr_fini(&thr->thr); nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); -}
\ No newline at end of file +} diff --git a/src/core/thread.h b/src/core/thread.h index 062941a1..8ef463e5 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -27,7 +27,7 @@ typedef struct { nni_plat_mtx mtx; nni_plat_cv cv; nni_thr_func fn; - void *arg; + void * arg; int start; int stop; int done; @@ -49,4 +49,4 @@ extern int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg); extern void nni_thr_fini(nni_thr *thr); extern void nni_thr_run(nni_thr *thr); -#endif CORE_THREAD_H
\ No newline at end of file +#endif CORE_THREAD_H |
