diff options
| -rw-r--r-- | src/core/endpt.c | 5 | ||||
| -rw-r--r-- | src/core/pipe.c | 113 | ||||
| -rw-r--r-- | src/core/pipe.h | 9 | ||||
| -rw-r--r-- | src/core/platform.h | 6 | ||||
| -rw-r--r-- | src/core/socket.c | 4 | ||||
| -rw-r--r-- | src/core/thread.c | 17 | ||||
| -rw-r--r-- | src/core/thread.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 7 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 28 |
10 files changed, 80 insertions, 117 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index c87c8ce6..a04cfa00 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -111,7 +111,7 @@ nni_endpt_connect(nni_endpt *ep, nni_pipe **pp) } rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata); if (rv != 0) { - nni_pipe_destroy(pipe); + nni_pipe_close(pipe); return (rv); } ep->ep_pipe = pipe; @@ -179,6 +179,7 @@ nni_dialer(void *arg) break; case NNG_ECLOSED: return; + default: // XXX: THIS NEEDS TO BE A PROPER BACKOFF. cooldown = 1000000; @@ -265,7 +266,7 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) return (rv); } if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) { - nni_pipe_destroy(pipe); + nni_pipe_close(pipe); return (rv); } *pp = pipe; diff --git a/src/core/pipe.c b/src/core/pipe.c index b12a7f1c..8fa41934 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -44,16 +44,15 @@ nni_pipe_close(nni_pipe *p) { nni_socket *sock = p->p_sock; - p->p_ops.p_close(p->p_trandata); + if (p->p_trandata != NULL) { + p->p_ops.p_close(p->p_trandata); + } nni_mutex_enter(&sock->s_mx); if (!p->p_reap) { // schedule deferred reap/close p->p_reap = 1; - if (p->p_active) { - nni_list_remove(&sock->s_pipes, p); - p->p_active = 0; - } + nni_list_remove(&sock->s_pipes, p); nni_list_append(&sock->s_reaps, p); nni_cond_broadcast(&sock->s_cv); } @@ -71,16 +70,12 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { - if (p->p_send_thr != NULL) { - nni_thread_reap(p->p_send_thr); - } - if (p->p_recv_thr != NULL) { - nni_thread_reap(p->p_recv_thr); - } + nni_thr_fini(&p->p_send_thr); + nni_thr_fini(&p->p_recv_thr); + if (p->p_trandata != NULL) { p->p_ops.p_destroy(p->p_trandata); } - nni_cond_fini(&p->p_cv); if (p->p_pdata != NULL) { nni_free(p->p_pdata, p->p_psize); } @@ -93,29 +88,41 @@ nni_pipe_create(nni_pipe **pp, nni_endpt *ep) { nni_pipe *p; nni_socket *sock = ep->ep_sock; + nni_protocol *proto = &sock->s_ops; int rv; if ((p = nni_alloc(sizeof (*p))) == NULL) { return (NNG_ENOMEM); } - p->p_send_thr = NULL; - p->p_recv_thr = NULL; + p->p_sock = sock; + p->p_ops = *ep->ep_ops.ep_pipe_ops; p->p_trandata = NULL; p->p_active = 0; - p->p_abort = 0; - if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) { + p->p_psize = proto->proto_pipe_size; + NNI_LIST_NODE_INIT(&p->p_node); + + if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + nni_free(p, sizeof (*p)); + return (NNG_ENOMEM); + } + rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata); + if (rv != 0) { + nni_free(p->p_pdata, p->p_psize); nni_free(p, sizeof (*p)); return (rv); } - p->p_psize = sock->s_ops.proto_pipe_size; - if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { - nni_cond_fini(&p->p_cv); + rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata); + if (rv != 0) { + nni_thr_fini(&p->p_recv_thr); + nni_free(p->p_pdata, p->p_psize); nni_free(p, sizeof (*p)); - return (NNG_ENOMEM); + return (rv); } - p->p_sock = sock; - p->p_ops = *ep->ep_ops.ep_pipe_ops; - NNI_LIST_NODE_INIT(&p->p_node); + p->p_psize = sock->s_ops.proto_pipe_size; + nni_mutex_enter(&sock->s_mx); + nni_list_append(&sock->s_pipes, p); + nni_mutex_exit(&sock->s_mx); + *pp = p; return (0); } @@ -132,46 +139,6 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) } -static void -nni_pipe_sender(void *arg) -{ - nni_pipe *p = arg; - - nni_mutex_enter(&p->p_sock->s_mx); - while ((!p->p_active) && (!p->p_abort)) { - nni_cond_wait(&p->p_cv); - } - if (p->p_abort) { - nni_mutex_exit(&p->p_sock->s_mx); - return; - } - nni_mutex_exit(&p->p_sock->s_mx); - if (p->p_sock->s_ops.proto_pipe_send != NULL) { - p->p_sock->s_ops.proto_pipe_send(p->p_pdata); - } -} - - -static void -nni_pipe_receiver(void *arg) -{ - nni_pipe *p = arg; - - nni_mutex_enter(&p->p_sock->s_mx); - while ((!p->p_active) && (!p->p_abort)) { - nni_cond_wait(&p->p_cv); - } - if (p->p_abort) { - nni_mutex_exit(&p->p_sock->s_mx); - return; - } - nni_mutex_exit(&p->p_sock->s_mx); - if (p->p_sock->s_ops.proto_pipe_recv != NULL) { - p->p_sock->s_ops.proto_pipe_recv(p->p_pdata); - } -} - - int nni_pipe_start(nni_pipe *pipe) { @@ -194,41 +161,29 @@ nni_pipe_start(nni_pipe *pipe) pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF; collide = 0; NNI_LIST_FOREACH (&sock->s_pipes, check) { - if (check->p_id == pipe->p_id) { + if ((pipe != check) && (check->p_id == pipe->p_id)) { collide = 1; break; } } } while (collide); - rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe); - if (rv != 0) { - goto fail; - } - rv = nni_thread_create(&pipe->p_recv_thr, nni_pipe_receiver, pipe); - if (rv != 0) { - goto fail; - } - rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); if (rv != 0) { goto fail; } - nni_list_append(&sock->s_pipes, pipe); + nni_thr_run(&pipe->p_send_thr); + nni_thr_run(&pipe->p_recv_thr); pipe->p_active = 1; // XXX: Publish event - nni_cond_broadcast(&pipe->p_cv); nni_mutex_exit(&sock->s_mx); return (0); fail: - pipe->p_abort = 1; pipe->p_reap = 1; - nni_list_append(&sock->s_reaps, pipe); - nni_cond_broadcast(&sock->s_cv); - nni_cond_broadcast(&pipe->p_cv); nni_mutex_exit(&sock->s_mx); + nni_pipe_close(pipe); return (rv); } diff --git a/src/core/pipe.h b/src/core/pipe.h index d12320b2..3c2c1166 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -27,13 +27,8 @@ struct nng_pipe { nni_endpt * p_ep; int p_reap; int p_active; - int p_abort; - nni_mutex p_mx; - nni_cond p_cv; - void (*p_send)(void *); - void (*p_recv)(void *); - nni_thread * p_send_thr; - nni_thread * p_recv_thr; + nni_thr p_send_thr; + nni_thr p_recv_thr; }; // Pipe operations that protocols use. diff --git a/src/core/platform.h b/src/core/platform.h index bdec8bb6..3dd9fa29 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -63,9 +63,9 @@ extern void nni_free(void *, size_t); typedef struct nni_mutex nni_mutex; typedef struct nni_cond nni_cond; -typedef struct nni_plat_mtx nni_plat_mtx; -typedef struct nni_plat_cv nni_plat_cv; -typedef struct nni_plat_thr nni_plat_thr; +typedef struct nni_plat_mtx nni_plat_mtx; +typedef struct nni_plat_cv nni_plat_cv; +typedef struct nni_plat_thr nni_plat_thr; // Mutex handling. diff --git a/src/core/socket.c b/src/core/socket.c index 1700ca9f..2e95c229 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,7 +53,9 @@ nni_reaper(void *arg) nni_mutex_exit(&sock->s_mx); // This should already have been done. - pipe->p_ops.p_close(pipe->p_trandata); + if (pipe->p_trandata != NULL) { + pipe->p_ops.p_close(pipe->p_trandata); + } // Remove the pipe from the protocol. Protocols may // keep lists of pipes for managing their topologies. diff --git a/src/core/thread.c b/src/core/thread.c index 4f714341..d1f46e3f 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -15,48 +15,56 @@ nni_mtx_init(nni_mtx *mtx) return (nni_plat_mtx_init(&mtx->mtx)); } + void nni_mtx_fini(nni_mtx *mtx) { nni_plat_mtx_fini(&mtx->mtx); } + void nni_mtx_lock(nni_mtx *mtx) { nni_plat_mtx_lock(&mtx->mtx); } + void nni_mtx_unlock(nni_mtx *mtx) { nni_plat_mtx_unlock(&mtx->mtx); } + int nni_mtx_trylock(nni_mtx *mtx) { return (nni_plat_mtx_trylock(&mtx->mtx)); } + int nni_cv_init(nni_cv *cv, nni_mtx *mtx) { return (nni_plat_cv_init(&cv->cv, &mtx->mtx)); } + void nni_cv_fini(nni_cv *cv) { nni_plat_cv_fini(&cv->cv); } + void nni_cv_wait(nni_cv *cv) { nni_plat_cv_wait(&cv->cv); } + int nni_cv_until(nni_cv *cv, nni_time until) { @@ -73,12 +81,14 @@ nni_cv_until(nni_cv *cv, nni_time until) return (nni_plat_cv_until(&cv->cv, until)); } + void nni_cv_wake(nni_cv *cv) { return (nni_plat_cv_wake(&cv->cv)); } + static void nni_thr_wrap(void *arg) { @@ -90,7 +100,7 @@ nni_thr_wrap(void *arg) nni_plat_cv_wait(&thr->cv); } nni_plat_mtx_unlock(&thr->mtx); - if (!stop) { + if ((!stop) && (thr->fn != NULL)) { thr->fn(thr->arg); } nni_plat_mtx_lock(&thr->mtx); @@ -99,6 +109,7 @@ nni_thr_wrap(void *arg) nni_plat_mtx_unlock(&thr->mtx); } + int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) { @@ -125,6 +136,7 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) return (0); } + void nni_thr_run(nni_thr *thr) { @@ -134,6 +146,7 @@ nni_thr_run(nni_thr *thr) nni_plat_mtx_unlock(&thr->mtx); } + void nni_thr_fini(nni_thr *thr) { @@ -147,4 +160,4 @@ nni_thr_fini(nni_thr *thr) nni_plat_thr_fini(&thr->thr); nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); -}
\ No newline at end of file +} diff --git a/src/core/thread.h b/src/core/thread.h index 062941a1..8ef463e5 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -27,7 +27,7 @@ typedef struct { nni_plat_mtx mtx; nni_plat_cv cv; nni_thr_func fn; - void *arg; + void * arg; int start; int stop; int done; @@ -49,4 +49,4 @@ extern int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg); extern void nni_thr_fini(nni_thr *thr); extern void nni_thr_run(nni_thr *thr); -#endif CORE_THREAD_H
\ No newline at end of file +#endif CORE_THREAD_H diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 5ca7812f..5c5c3798 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -54,8 +54,8 @@ struct nni_plat_thr { }; struct nni_plat_cv { - pthread_cond_t cv; - pthread_mutex_t *mtx; + pthread_cond_t cv; + pthread_mutex_t * mtx; }; #endif diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 0c928d84..d1944879 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -175,6 +175,7 @@ nni_plat_cv_init(nni_plat_cv *cv, nni_plat_mtx *mtx) return (0); } + void nni_plat_cv_wake(nni_plat_cv *cv) { @@ -185,6 +186,7 @@ nni_plat_cv_wake(nni_plat_cv *cv) } } + void nni_plat_cv_wait(nni_plat_cv *cv) { @@ -195,6 +197,7 @@ nni_plat_cv_wait(nni_plat_cv *cv) } } + int nni_plat_cv_until(nni_plat_cv *cv, nni_time until) { @@ -217,9 +220,9 @@ nni_plat_cv_until(nni_plat_cv *cv, nni_time until) nni_panic("pthread_cond_timedwait: %d", rv); } return (0); - } + void nni_plat_cv_fini(nni_plat_cv *cv) { @@ -230,6 +233,7 @@ nni_plat_cv_fini(nni_plat_cv *cv) } } + int nni_plat_thr_init(nni_plat_thr *thr, void (*fn)(void *), void *arg) { @@ -262,7 +266,6 @@ nni_atfork_child(void) } - int nni_plat_init(int (*helper)(void)) { diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 3f57c12b..9336af8c 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -23,7 +23,7 @@ typedef struct nni_pair_sock nni_pair_sock; struct nni_pair_sock { nni_socket * sock; nni_pair_pipe * pipe; - nni_mutex mx; + nni_mtx mx; nni_msgqueue * uwq; nni_msgqueue * urq; }; @@ -50,7 +50,7 @@ nni_pair_create(void **pairp, nni_socket *sock) if ((pair = nni_alloc(sizeof (*pair))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_init(&pair->mx)) != 0) { + if ((rv = nni_mtx_init(&pair->mx)) != 0) { nni_free(pair, sizeof (*pair)); return (rv); } @@ -72,7 +72,7 @@ nni_pair_destroy(void *arg) // this wold be the time to shut them all down. We don't, because // the socket already shut us down, and we don't have any other // threads that run. - nni_mutex_fini(&pair->mx); + nni_mtx_fini(&pair->mx); nni_free(pair, sizeof (*pair)); } @@ -88,18 +88,14 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data) pp->sigclose = 0; pp->pair = pair; - nni_mutex_enter(&pair->mx); + nni_mtx_lock(&pair->mx); if (pair->pipe != NULL) { - rv = NNG_EBUSY; // Already have a peer, denied. - goto fail; + nni_mtx_unlock(&pair->mx); + return (NNG_EBUSY); // Already have a peer, denied. } pair->pipe = pp; - nni_mutex_exit(&pair->mx); + nni_mtx_unlock(&pair->mx); return (0); - -fail: - nni_mutex_exit(&pair->mx); - return (rv); } @@ -109,13 +105,11 @@ nni_pair_rem_pipe(void *arg, void *data) nni_pair_sock *pair = arg; nni_pair_pipe *pp = data; - nni_mutex_enter(&pair->mx); - if (pair->pipe != pp) { - nni_mutex_exit(&pair->mx); - return; + nni_mtx_lock(&pair->mx); + if (pair->pipe == pp) { + pair->pipe = NULL; } - pair->pipe = NULL; - nni_mutex_exit(&pair->mx); + nni_mtx_unlock(&pair->mx); } |
