diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-07-20 14:34:51 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-07-20 14:34:51 -0700 |
| commit | a37093079b492e966344416445aae354b147d30e (patch) | |
| tree | 2f21fc2bc716f2423ba02f4713b25038c429ec4e /src/protocol | |
| parent | 88fb04f61918b06e6e269c1960058c3df5e0a0ef (diff) | |
| download | nng-a37093079b492e966344416445aae354b147d30e.tar.gz nng-a37093079b492e966344416445aae354b147d30e.tar.bz2 nng-a37093079b492e966344416445aae354b147d30e.zip | |
Yet more race condition fixes.
We need to remember that protocol stops can run synchronously, and
therefore we need to wait for the aio to complete. Further, we need
to break apart shutting down aio activity from deallocation, as we need
to shut down *all* async activity before deallocating *anything*.
Noticed that we had a pipe race in the surveyor pattern too.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/bus/bus.c | 25 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 1 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 12 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 4 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 59 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 33 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 10 |
10 files changed, 103 insertions, 68 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 17ef03bb..3070b90d 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -60,6 +60,7 @@ nni_bus_sock_fini(void *arg) nni_bus_sock *psock = arg; if (psock != NULL) { + nni_aio_stop(&psock->aio_getq); nni_aio_fini(&psock->aio_getq); nni_mtx_fini(&psock->mtx); NNI_FREE_STRUCT(psock); @@ -107,15 +108,13 @@ nni_bus_pipe_fini(void *arg) { nni_bus_pipe *ppipe = arg; - if (ppipe != NULL) { - nni_mtx_fini(&ppipe->mtx); - nni_aio_fini(&ppipe->aio_getq); - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); - } + nni_aio_fini(&ppipe->aio_getq); + nni_aio_fini(&ppipe->aio_send); + nni_aio_fini(&ppipe->aio_recv); + nni_aio_fini(&ppipe->aio_putq); + nni_msgq_fini(ppipe->sendq); + nni_mtx_fini(&ppipe->mtx); + NNI_FREE_STRUCT(ppipe); } static int @@ -183,10 +182,10 @@ nni_bus_pipe_stop(void *arg) nni_msgq_close(ppipe->sendq); - nni_aio_cancel(&ppipe->aio_getq, NNG_ECLOSED); - nni_aio_cancel(&ppipe->aio_send, NNG_ECLOSED); - nni_aio_cancel(&ppipe->aio_recv, NNG_ECLOSED); - nni_aio_cancel(&ppipe->aio_putq, NNG_ECLOSED); + nni_aio_stop(&ppipe->aio_getq); + nni_aio_stop(&ppipe->aio_send); + nni_aio_stop(&ppipe->aio_recv); + nni_aio_stop(&ppipe->aio_putq); nni_mtx_lock(&ppipe->psock->mtx); if (nni_list_active(&psock->pipes, ppipe)) { diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index f5fec540..55ce5aa9 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -122,7 +122,6 @@ static void nni_pair_pipe_fini(void *arg) { nni_pair_pipe *ppipe = arg; - nni_aio_fini(&ppipe->aio_send); nni_aio_fini(&ppipe->aio_recv); nni_aio_fini(&ppipe->aio_putq); diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 39e809e6..cde79824 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -90,11 +90,9 @@ nni_pull_pipe_fini(void *arg) { nni_pull_pipe *pp = arg; - if (pp != NULL) { - nni_aio_fini(&pp->putq_aio); - nni_aio_fini(&pp->recv_aio); - NNI_FREE_STRUCT(pp); - } + nni_aio_fini(&pp->putq_aio); + nni_aio_fini(&pp->recv_aio); + NNI_FREE_STRUCT(pp); } static int @@ -113,8 +111,8 @@ nni_pull_pipe_stop(void *arg) { nni_pull_pipe *pp = arg; - nni_aio_cancel(&pp->putq_aio, NNG_ECANCELED); - nni_aio_cancel(&pp->recv_aio, NNG_ECANCELED); + nni_aio_stop(&pp->putq_aio); + nni_aio_stop(&pp->recv_aio); } static void diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 43c0feaf..b7d4322c 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -132,12 +132,11 @@ nni_push_pipe_start(void *arg) static void nni_push_pipe_stop(void *arg) { - nni_push_pipe *pp = arg; - nni_push_sock *push = pp->push; + nni_push_pipe *pp = arg; - nni_aio_cancel(&pp->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&pp->aio_send, NNG_ECANCELED); - nni_aio_cancel(&pp->aio_getq, NNG_ECANCELED); + nni_aio_stop(&pp->aio_recv); + nni_aio_stop(&pp->aio_send); + nni_aio_stop(&pp->aio_getq); } static void diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 316cbf50..e32f179a 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -83,6 +83,7 @@ nni_pub_sock_fini(void *arg) { nni_pub_sock *pub = arg; + nni_aio_stop(&pub->aio_getq); nni_aio_fini(&pub->aio_getq); nni_mtx_fini(&pub->mtx); NNI_FREE_STRUCT(pub); @@ -100,7 +101,6 @@ static void nni_pub_pipe_fini(void *arg) { nni_pub_pipe *pp = arg; - nni_aio_fini(&pp->aio_getq); nni_aio_fini(&pp->aio_send); nni_aio_fini(&pp->aio_recv); @@ -172,9 +172,10 @@ nni_pub_pipe_stop(void *arg) nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; - nni_aio_cancel(&pp->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&pp->aio_send, NNG_ECANCELED); - nni_aio_cancel(&pp->aio_recv, NNG_ECANCELED); + nni_aio_stop(&pp->aio_getq); + nni_aio_stop(&pp->aio_send); + nni_aio_stop(&pp->aio_recv); + nni_msgq_close(pp->sendq); nni_mtx_lock(&pub->mtx); diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 36a42c49..03d76e2d 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -123,8 +123,8 @@ nni_sub_pipe_stop(void *arg) { nni_sub_pipe *sp = arg; - nni_aio_cancel(&sp->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&sp->aio_recv, NNG_ECANCELED); + nni_aio_stop(&sp->aio_putq); + nni_aio_stop(&sp->aio_recv); } static void diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 013b02fb..049b1422 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -56,6 +56,7 @@ nni_rep_sock_fini(void *arg) { nni_rep_sock *rep = arg; + nni_aio_stop(&rep->aio_getq); nni_aio_fini(&rep->aio_getq); nni_idhash_fini(rep->pipes); if (rep->btrace != NULL) { @@ -192,10 +193,10 @@ nni_rep_pipe_stop(void *arg) nni_rep_sock *rep = rp->rep; nni_msgq_close(rp->sendq); - nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_send, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED); + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_send); + nni_aio_stop(&rp->aio_recv); + nni_aio_stop(&rp->aio_putq); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f13094ff..8e7056f5 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -34,6 +34,7 @@ struct nni_req_sock { nni_time resend; int raw; int wantw; + int closed; nni_msg * reqmsg; nni_req_pipe *pendpipe; @@ -46,6 +47,7 @@ struct nni_req_sock { uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) nni_mtx mtx; + nni_cv cv; }; // An nni_req_pipe is our per-pipe protocol private structure. @@ -81,6 +83,10 @@ nni_req_sock_init(void **reqp, nni_sock *sock) NNI_FREE_STRUCT(req); return (rv); } + if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) { + nni_mtx_fini(&req->mtx); + NNI_FREE_STRUCT(req); + } NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); @@ -108,6 +114,10 @@ nni_req_sock_close(void *arg) { nni_req_sock *req = arg; + nni_mtx_lock(&req->mtx); + req->closed = 1; + nni_mtx_unlock(&req->mtx); + nni_timer_cancel(&req->timer); } @@ -117,10 +127,15 @@ nni_req_sock_fini(void *arg) nni_req_sock *req = arg; nni_mtx_lock(&req->mtx); + while ((!nni_list_empty(&req->readypipes)) || + (!nni_list_empty(&req->busypipes))) { + nni_cv_wait(&req->cv); + } if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } nni_mtx_unlock(&req->mtx); + nni_cv_fini(&req->cv); nni_mtx_fini(&req->mtx); NNI_FREE_STRUCT(req); } @@ -171,15 +186,13 @@ nni_req_pipe_fini(void *arg) { nni_req_pipe *rp = arg; - if (rp != NULL) { - nni_aio_fini(&rp->aio_getq); - nni_aio_fini(&rp->aio_putq); - nni_aio_fini(&rp->aio_recv); - nni_aio_fini(&rp->aio_sendcooked); - nni_aio_fini(&rp->aio_sendraw); - nni_mtx_fini(&rp->mtx); - NNI_FREE_STRUCT(rp); - } + nni_aio_fini(&rp->aio_getq); + nni_aio_fini(&rp->aio_putq); + nni_aio_fini(&rp->aio_recv); + nni_aio_fini(&rp->aio_sendcooked); + nni_aio_fini(&rp->aio_sendraw); + nni_mtx_fini(&rp->mtx); + NNI_FREE_STRUCT(rp); } static int @@ -193,6 +206,10 @@ nni_req_pipe_start(void *arg) } nni_mtx_lock(&req->mtx); + if (req->closed) { + nni_mtx_unlock(&req->mtx); + return (NNG_ECLOSED); + } nni_list_append(&req->readypipes, rp); if (req->wantw) { nni_req_resend(req); @@ -210,11 +227,11 @@ nni_req_pipe_stop(void *arg) nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_sendcooked, NNG_ECANCELED); - nni_aio_cancel(&rp->aio_sendraw, NNG_ECANCELED); + nni_aio_stop(&rp->aio_getq); + nni_aio_stop(&rp->aio_putq); + nni_aio_stop(&rp->aio_recv); + nni_aio_stop(&rp->aio_sendcooked); + nni_aio_stop(&rp->aio_sendraw); // At this point there should not be any further AIOs running. // Further, any completion tasks have completed. @@ -222,8 +239,11 @@ nni_req_pipe_stop(void *arg) nni_mtx_lock(&req->mtx); // This removes the node from either busypipes or readypipes. // It doesn't much matter which. - if (nni_list_active(&req->readypipes, rp)) { - nni_list_remove(&req->readypipes, rp); + if (nni_list_node_active(&rp->node)) { + nni_list_node_remove(&rp->node); + if (req->closed) { + nni_cv_wake(&req->cv); + } } if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { @@ -443,10 +463,15 @@ nni_req_resend(nni_req_sock *req) // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode // requests. - if (req->reqmsg == NULL) { + if ((msg = req->reqmsg) == NULL) { return; } + if (req->closed) { + req->reqmsg = NULL; + nni_msg_free(msg); + } + if (req->wantw) { req->wantw = 0; diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 4a4c8741..089e730e 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -38,6 +38,7 @@ struct nni_resp_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_resp_pipe is our per-pipe protocol private structure. @@ -57,14 +58,14 @@ nni_resp_sock_fini(void *arg) { nni_resp_sock *psock = arg; - if (psock != NULL) { - nni_aio_fini(&psock->aio_getq); - nni_idhash_fini(psock->pipes); - if (psock->btrace != NULL) { - nni_free(psock->btrace, psock->btrace_len); - } - NNI_FREE_STRUCT(psock); + nni_aio_stop(&psock->aio_getq); + nni_aio_fini(&psock->aio_getq); + nni_idhash_fini(psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); } + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); } static int @@ -83,6 +84,10 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->btrace_len = 0; psock->urq = nni_sock_recvq(nsock); psock->uwq = nni_sock_sendq(nsock); + + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + goto fail; + } if ((rv = nni_idhash_init(&psock->pipes)) != 0) { goto fail; } @@ -177,7 +182,9 @@ nni_resp_pipe_start(void *arg) ppipe->id = nni_pipe_id(ppipe->npipe); + nni_mtx_lock(&psock->mtx); rv = nni_idhash_insert(psock->pipes, ppipe->id, ppipe); + nni_mtx_unlock(&psock->mtx); if (rv != 0) { return (rv); } @@ -195,13 +202,15 @@ nni_resp_pipe_stop(void *arg) nni_resp_sock *psock = ppipe->psock; nni_msgq_close(ppipe->sendq); - nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED); + nni_aio_stop(&ppipe->aio_putq); + nni_aio_stop(&ppipe->aio_getq); + nni_aio_stop(&ppipe->aio_send); + nni_aio_stop(&ppipe->aio_recv); if (ppipe->id != 0) { + nni_mtx_lock(&psock->mtx); nni_idhash_remove(psock->pipes, ppipe->id); + nni_mtx_unlock(&psock->mtx); ppipe->id = 0; } } @@ -239,6 +248,7 @@ nni_resp_sock_getq_cb(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); + nni_mtx_lock(&psock->mtx); rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe); if (rv != 0) { @@ -250,6 +260,7 @@ nni_resp_sock_getq_cb(void *arg) nni_msg_free(msg); } } + nni_mtx_unlock(&psock->mtx); } void diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 85657f57..633e1491 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -60,6 +60,7 @@ nni_surv_sock_fini(void *arg) { nni_surv_sock *psock = arg; + nni_aio_stop(&psock->aio_getq); nni_aio_fini(&psock->aio_getq); nni_mtx_fini(&psock->mtx); NNI_FREE_STRUCT(psock); @@ -191,10 +192,11 @@ nni_surv_pipe_stop(void *arg) nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; - nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED); + nni_aio_stop(&ppipe->aio_getq); + nni_aio_stop(&ppipe->aio_send); + nni_aio_stop(&ppipe->aio_recv); + nni_aio_stop(&ppipe->aio_putq); + nni_msgq_close(ppipe->sendq); nni_mtx_lock(&psock->mtx); |
