diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-26 18:27:06 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-26 18:27:06 -0700 |
| commit | ac80ef7c3b1caa2f1fe3b093bef825363675bcb3 (patch) | |
| tree | 567857cedd1b618de2daabab26bb672abc055f40 | |
| parent | e095c624882b94141eca64a4746ac186683556f3 (diff) | |
| download | nng-ac80ef7c3b1caa2f1fe3b093bef825363675bcb3.tar.gz nng-ac80ef7c3b1caa2f1fe3b093bef825363675bcb3.tar.bz2 nng-ac80ef7c3b1caa2f1fe3b093bef825363675bcb3.zip | |
More race condition fixes.
| -rw-r--r-- | src/core/endpt.c | 12 | ||||
| -rw-r--r-- | src/core/panic.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_debug.c | 3 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 62 |
4 files changed, 49 insertions, 34 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index ab0d8cb9..74bd0314 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -253,7 +253,9 @@ nni_ep_connect(nni_ep *ep) nni_pipe_remove(pipe); return (rv); } + nni_mtx_lock(&ep->ep_mtx); ep->ep_pipe = pipe; + nni_mtx_unlock(&ep->ep_mtx); return (0); } @@ -277,17 +279,19 @@ nni_ep_pipe_add(nni_ep *ep, nni_pipe *pipe) void nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe) { - if ((ep != NULL) && (nni_list_active(&ep->ep_pipes, pipe))) { - nni_mtx_lock(&ep->ep_mtx); + if (ep == NULL) { + return; + } + nni_mtx_lock(&ep->ep_mtx); + if (nni_list_active(&ep->ep_pipes, pipe)) { nni_list_remove(&ep->ep_pipes, pipe); if (ep->ep_pipe == pipe) { ep->ep_pipe = NULL; } nni_cv_wake(&ep->ep_cv); - - nni_mtx_unlock(&ep->ep_mtx); } + nni_mtx_unlock(&ep->ep_mtx); } diff --git a/src/core/panic.c b/src/core/panic.c index 2205779f..b3d64dc3 100644 --- a/src/core/panic.c +++ b/src/core/panic.c @@ -55,10 +55,12 @@ nni_panic(const char *fmt, ...) va_list va; va_start(va, fmt); - (void) snprintf(fbuf, sizeof (fbuf), "panic: %s", fmt); - (void) vsnprintf(buf, sizeof (buf), fbuf, va); + (void) vsnprintf(fbuf, sizeof (fbuf), fmt, va); va_end(va); + + (void) snprintf(buf, sizeof (buf), "panic: %s", fbuf); + nni_println(buf); nni_println("This message is indicative of a BUG."); nni_println("Report this at http://github.com/nanomsg/nanomsg"); diff --git a/src/platform/posix/posix_debug.c b/src/platform/posix/posix_debug.c index 56699d02..b9321ea5 100644 --- a/src/platform/posix/posix_debug.c +++ b/src/platform/posix/posix_debug.c @@ -26,7 +26,8 @@ nni_plat_abort(void) void nni_plat_println(const char *message) { - (void) fprintf(stderr, "%s\n", message); + fputs(message, stderr); + fputc('\n', stderr); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 519b224e..1ce1156c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -57,7 +57,6 @@ struct nni_req_pipe { nni_aio aio_sendcooked; // cooked mode only nni_aio aio_recv; nni_aio aio_putq; - int running; int refcnt; nni_mtx mtx; }; @@ -119,9 +118,11 @@ nni_req_sock_fini(void *arg) { nni_req_sock *req = arg; + nni_mtx_lock(&req->mtx); if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } + nni_mtx_unlock(&req->mtx); nni_mtx_fini(&req->mtx); NNI_FREE_STRUCT(req); } @@ -195,6 +196,11 @@ nni_req_pipe_start(void *arg) if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } + + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + nni_mtx_unlock(&rp->mtx); + nni_mtx_lock(&req->mtx); nni_list_append(&req->readypipes, rp); if (req->wantw) { @@ -202,10 +208,6 @@ nni_req_pipe_start(void *arg) } nni_mtx_unlock(&req->mtx); - nni_mtx_lock(&rp->mtx); - rp->refcnt = 2; - rp->running = 1; - nni_mtx_unlock(&rp->mtx); nni_msgq_aio_get(req->uwq, &rp->aio_getq); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); @@ -218,34 +220,33 @@ nni_req_pipe_stop(nni_req_pipe *rp) { nni_req_sock *req = rp->req; int refcnt; - int running; nni_mtx_lock(&rp->mtx); - running = rp->running; - rp->running = 0; NNI_ASSERT(rp->refcnt > 0); rp->refcnt--; refcnt = rp->refcnt; nni_mtx_unlock(&rp->mtx); - if (running) { - nni_mtx_lock(&req->mtx); - // This removes the node from either busypipes or readypipes. - // It doesn't much matter which. + nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); + nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + + nni_mtx_lock(&req->mtx); + // This removes the node from either busypipes or readypipes. + // It doesn't much matter which. + if (nni_list_active(&req->readypipes, rp)) { nni_list_remove(&req->readypipes, rp); + } - if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { - // removing the pipe we sent the last request on... - // schedule immediate resend. - req->resend = NNI_TIME_ZERO; - req->wantw = 1; - nni_req_resend(req); - } - nni_mtx_unlock(&req->mtx); + if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + // removing the pipe we sent the last request on... + // schedule immediate resend. + req->pendpipe = NULL; + req->resend = NNI_TIME_ZERO; + req->wantw = 1; + nni_req_resend(req); } - nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); - nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + nni_mtx_unlock(&req->mtx); if (refcnt == 0) { nni_pipe_remove(rp->pipe); @@ -357,8 +358,8 @@ nni_req_sendcooked_cb(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_mtx *mx = nni_sock_mtx(req->sock); + NNI_ASSERT(rp->refcnt > 0); if (nni_aio_result(&rp->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. // We leave ourselves on the busy list for now, which @@ -374,10 +375,17 @@ nni_req_sendcooked_cb(void *arg) // a resend. nni_mtx_lock(&req->mtx); - nni_list_remove(&req->busypipes, rp); - nni_list_append(&req->readypipes, rp); - - nni_req_resend(req); + if (nni_list_active(&req->busypipes, rp)) { + nni_list_remove(&req->busypipes, rp); + nni_list_append(&req->readypipes, rp); + nni_req_resend(req); + } else { + // We wind up here if stop was called from the reader + // side while we were waiting to be scheduled to run for the + // writer side. In this case we can't complete the operation, + // and we have to abort. + nni_req_pipe_stop(rp); + } nni_mtx_unlock(&req->mtx); } |
