diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-10 14:39:21 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-10 14:39:21 -0800 |
| commit | c436e174d0ed8c5dc14af060e994b97a83df7750 (patch) | |
| tree | 9eeb7ef18ad6eb1a975ab6aaa7a68bcd3ee81c9a /src/protocol/reqrep | |
| parent | f5c259eec0cd3fa5cd623e159cbfec83b4a500d5 (diff) | |
| download | nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.gz nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.bz2 nng-c436e174d0ed8c5dc14af060e994b97a83df7750.zip | |
Start of close related race fixes. Scalability test.
Diffstat (limited to 'src/protocol/reqrep')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 14 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 29 |
2 files changed, 38 insertions, 5 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 2c658ae8..751a851b 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -182,7 +182,9 @@ nni_rep_pipe_add(void *arg) return (rv); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(rp->sendq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -194,7 +196,7 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgq_aio_cancel(rp->sendq, &rp->aio_getq); + nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); nni_idhash_remove(&rep->pipes, nni_pipe_id(rp->pipe)); } @@ -205,7 +207,6 @@ nni_rep_sock_getq_cb(void *arg) { nni_rep_sock *rep = arg; nni_msgq *uwq = rep->uwq; - nni_mtx *mx = nni_sock_mtx(rep->sock); nni_msg *msg; uint8_t *header; uint32_t id; @@ -241,12 +242,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_mtx_lock(mx); + nni_sock_lock(rep->sock); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_mtx_unlock(mx); + nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -263,6 +264,7 @@ nni_rep_pipe_getq_cb(void *arg) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -282,6 +284,7 @@ nni_rep_pipe_send_cb(void *arg) nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -303,6 +306,7 @@ nni_rep_pipe_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -353,6 +357,7 @@ malformed: // Failures here are bad enough to warrant to dropping the conn. nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } @@ -365,6 +370,7 @@ nni_rep_pipe_putq_cb(void *arg) nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f28db1df..553ef0bf 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -188,7 +188,9 @@ nni_req_pipe_add(void *arg) nni_req_resend(req); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -282,10 +284,12 @@ nni_req_getq_cb(void *arg) // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). + // that's ok (there's an inherent race anyway). (One minor + // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -301,6 +305,15 @@ static void nni_req_sendraw_cb(void *arg) { nni_req_pipe *rp = arg; + nni_msg *msg; + + if (nni_aio_result(&rp->aio_sendraw) != 0) { + nni_msg_free(rp->aio_sendraw.a_msg); + rp->aio_sendraw.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } // Sent a message so we just need to look for another one. nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq); @@ -314,6 +327,17 @@ nni_req_sendcooked_cb(void *arg) nni_req_sock *req = rp->req; nni_mtx *mx = nni_sock_mtx(req->sock); + if (nni_aio_result(&rp->aio_sendcooked) != 0) { + // We failed to send... clean up and deal with it. + // We leave ourselves on the busy list for now, which + // means no new asynchronous traffic can occur here. + nni_msg_free(rp->aio_sendcooked.a_msg); + rp->aio_sendcooked.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } + // Cooked mode. We completed a cooked send, so we need to // reinsert ourselves in the ready list, and possibly schedule // a resend. @@ -335,6 +359,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } rp->aio_putq.a_msg = NULL; @@ -351,6 +376,7 @@ nni_req_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -381,6 +407,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } |
