diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-10 14:39:21 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-10 14:39:21 -0800 |
| commit | c436e174d0ed8c5dc14af060e994b97a83df7750 (patch) | |
| tree | 9eeb7ef18ad6eb1a975ab6aaa7a68bcd3ee81c9a /src/protocol/reqrep/req.c | |
| parent | f5c259eec0cd3fa5cd623e159cbfec83b4a500d5 (diff) | |
| download | nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.gz nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.bz2 nng-c436e174d0ed8c5dc14af060e994b97a83df7750.zip | |
Start of close related race fixes. Scalability test.
Diffstat (limited to 'src/protocol/reqrep/req.c')
| -rw-r--r-- | src/protocol/reqrep/req.c | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f28db1df..553ef0bf 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -188,7 +188,9 @@ nni_req_pipe_add(void *arg) nni_req_resend(req); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -282,10 +284,12 @@ nni_req_getq_cb(void *arg) // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). + // that's ok (there's an inherent race anyway). (One minor + // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -301,6 +305,15 @@ static void nni_req_sendraw_cb(void *arg) { nni_req_pipe *rp = arg; + nni_msg *msg; + + if (nni_aio_result(&rp->aio_sendraw) != 0) { + nni_msg_free(rp->aio_sendraw.a_msg); + rp->aio_sendraw.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } // Sent a message so we just need to look for another one. nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq); @@ -314,6 +327,17 @@ nni_req_sendcooked_cb(void *arg) nni_req_sock *req = rp->req; nni_mtx *mx = nni_sock_mtx(req->sock); + if (nni_aio_result(&rp->aio_sendcooked) != 0) { + // We failed to send... clean up and deal with it. + // We leave ourselves on the busy list for now, which + // means no new asynchronous traffic can occur here. + nni_msg_free(rp->aio_sendcooked.a_msg); + rp->aio_sendcooked.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } + // Cooked mode. We completed a cooked send, so we need to // reinsert ourselves in the ready list, and possibly schedule // a resend. @@ -335,6 +359,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } rp->aio_putq.a_msg = NULL; @@ -351,6 +376,7 @@ nni_req_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -381,6 +407,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } |
