diff options
Diffstat (limited to 'src/protocol/survey')
| -rw-r--r-- | src/protocol/survey/respond.c | 33 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 10 |
2 files changed, 28 insertions, 15 deletions
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 4a4c8741..089e730e 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -38,6 +38,7 @@ struct nni_resp_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_resp_pipe is our per-pipe protocol private structure. @@ -57,14 +58,14 @@ nni_resp_sock_fini(void *arg) { nni_resp_sock *psock = arg; - if (psock != NULL) { - nni_aio_fini(&psock->aio_getq); - nni_idhash_fini(psock->pipes); - if (psock->btrace != NULL) { - nni_free(psock->btrace, psock->btrace_len); - } - NNI_FREE_STRUCT(psock); + nni_aio_stop(&psock->aio_getq); + nni_aio_fini(&psock->aio_getq); + nni_idhash_fini(psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); } + nni_mtx_fini(&psock->mtx); + NNI_FREE_STRUCT(psock); } static int @@ -83,6 +84,10 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->btrace_len = 0; psock->urq = nni_sock_recvq(nsock); psock->uwq = nni_sock_sendq(nsock); + + if ((rv = nni_mtx_init(&psock->mtx)) != 0) { + goto fail; + } if ((rv = nni_idhash_init(&psock->pipes)) != 0) { goto fail; } @@ -177,7 +182,9 @@ nni_resp_pipe_start(void *arg) ppipe->id = nni_pipe_id(ppipe->npipe); + nni_mtx_lock(&psock->mtx); rv = nni_idhash_insert(psock->pipes, ppipe->id, ppipe); + nni_mtx_unlock(&psock->mtx); if (rv != 0) { return (rv); } @@ -195,13 +202,15 @@ nni_resp_pipe_stop(void *arg) nni_resp_sock *psock = ppipe->psock; nni_msgq_close(ppipe->sendq); - nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED); + nni_aio_stop(&ppipe->aio_putq); + nni_aio_stop(&ppipe->aio_getq); + nni_aio_stop(&ppipe->aio_send); + nni_aio_stop(&ppipe->aio_recv); if (ppipe->id != 0) { + nni_mtx_lock(&psock->mtx); nni_idhash_remove(psock->pipes, ppipe->id); + nni_mtx_unlock(&psock->mtx); ppipe->id = 0; } } @@ -239,6 +248,7 @@ nni_resp_sock_getq_cb(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); + nni_mtx_lock(&psock->mtx); rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe); if (rv != 0) { @@ -250,6 +260,7 @@ nni_resp_sock_getq_cb(void *arg) nni_msg_free(msg); } } + nni_mtx_unlock(&psock->mtx); } void diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 85657f57..633e1491 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -60,6 +60,7 @@ nni_surv_sock_fini(void *arg) { nni_surv_sock *psock = arg; + nni_aio_stop(&psock->aio_getq); nni_aio_fini(&psock->aio_getq); nni_mtx_fini(&psock->mtx); NNI_FREE_STRUCT(psock); @@ -191,10 +192,11 @@ nni_surv_pipe_stop(void *arg) nni_surv_pipe *ppipe = arg; nni_surv_sock *psock = ppipe->psock; - nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED); - nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED); + nni_aio_stop(&ppipe->aio_getq); + nni_aio_stop(&ppipe->aio_send); + nni_aio_stop(&ppipe->aio_recv); + nni_aio_stop(&ppipe->aio_putq); + nni_msgq_close(ppipe->sendq); nni_mtx_lock(&psock->mtx); |
