diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-10 14:39:21 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-10 14:39:21 -0800 |
| commit | c436e174d0ed8c5dc14af060e994b97a83df7750 (patch) | |
| tree | 9eeb7ef18ad6eb1a975ab6aaa7a68bcd3ee81c9a /src/protocol | |
| parent | f5c259eec0cd3fa5cd623e159cbfec83b4a500d5 (diff) | |
| download | nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.gz nng-c436e174d0ed8c5dc14af060e994b97a83df7750.tar.bz2 nng-c436e174d0ed8c5dc14af060e994b97a83df7750.zip | |
Start of close related race fixes. Scalability test.
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/pair/pair.c | 39 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 14 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 29 |
3 files changed, 61 insertions, 21 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index e5e2e17b..65eabd87 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -23,6 +23,7 @@ static void nni_pair_send_cb(void *); static void nni_pair_recv_cb(void *); static void nni_pair_getq_cb(void *); static void nni_pair_putq_cb(void *); +static void nni_pair_pipe_fini(void *); // An nni_pair_sock is our per-socket protocol private structure. struct nni_pair_sock { @@ -44,11 +45,10 @@ struct nni_pair_pipe { nni_aio aio_recv; nni_aio aio_getq; nni_aio aio_putq; + int busy; + int closed; }; -static void nni_pair_receiver(void *); -static void nni_pair_sender(void *); - static int nni_pair_sock_init(void **sp, nni_sock *nsock) { @@ -90,22 +90,22 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } ppipe->npipe = npipe; @@ -120,13 +120,12 @@ nni_pair_pipe_fini(void *arg) { nni_pair_pipe *ppipe = arg; - if (ppipe != NULL) { - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - nni_aio_fini(&ppipe->aio_getq); - NNI_FREE_STRUCT(ppipe); - } + NNI_ASSERT(ppipe->busy >= 0); + nni_aio_fini(&ppipe->aio_send); + nni_aio_fini(&ppipe->aio_recv); + nni_aio_fini(&ppipe->aio_putq); + nni_aio_fini(&ppipe->aio_getq); + NNI_FREE_STRUCT(ppipe); } @@ -142,7 +141,10 @@ nni_pair_pipe_add(void *arg) psock->ppipe = ppipe; // Schedule a getq on the upper, and a read from the pipe. + // Each of these also sets up another hold on the pipe itself. + nni_pipe_incref(ppipe->npipe); nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); + nni_pipe_incref(ppipe->npipe); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); return (0); @@ -155,9 +157,10 @@ nni_pair_pipe_rem(void *arg) nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); + nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + if (psock->ppipe == ppipe) { - nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); - nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); psock->ppipe = NULL; } } @@ -171,6 +174,7 @@ nni_pair_recv_cb(void *arg) if (nni_aio_result(&ppipe->aio_recv) != 0) { nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } @@ -189,6 +193,7 @@ nni_pair_putq_cb(void *arg) nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -204,6 +209,7 @@ nni_pair_getq_cb(void *arg) if (nni_aio_result(&ppipe->aio_getq) != 0) { nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } @@ -223,6 +229,7 @@ nni_pair_send_cb(void *arg) nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 2c658ae8..751a851b 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -182,7 +182,9 @@ nni_rep_pipe_add(void *arg) return (rv); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(rp->sendq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -194,7 +196,7 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgq_aio_cancel(rp->sendq, &rp->aio_getq); + nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); nni_idhash_remove(&rep->pipes, nni_pipe_id(rp->pipe)); } @@ -205,7 +207,6 @@ nni_rep_sock_getq_cb(void *arg) { nni_rep_sock *rep = arg; nni_msgq *uwq = rep->uwq; - nni_mtx *mx = nni_sock_mtx(rep->sock); nni_msg *msg; uint8_t *header; uint32_t id; @@ -241,12 +242,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_mtx_lock(mx); + nni_sock_lock(rep->sock); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_mtx_unlock(mx); + nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -263,6 +264,7 @@ nni_rep_pipe_getq_cb(void *arg) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -282,6 +284,7 @@ nni_rep_pipe_send_cb(void *arg) nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -303,6 +306,7 @@ nni_rep_pipe_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -353,6 +357,7 @@ malformed: // Failures here are bad enough to warrant to dropping the conn. nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } @@ -365,6 +370,7 @@ nni_rep_pipe_putq_cb(void *arg) nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f28db1df..553ef0bf 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -188,7 +188,9 @@ nni_req_pipe_add(void *arg) nni_req_resend(req); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -282,10 +284,12 @@ nni_req_getq_cb(void *arg) // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). + // that's ok (there's an inherent race anyway). (One minor + // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -301,6 +305,15 @@ static void nni_req_sendraw_cb(void *arg) { nni_req_pipe *rp = arg; + nni_msg *msg; + + if (nni_aio_result(&rp->aio_sendraw) != 0) { + nni_msg_free(rp->aio_sendraw.a_msg); + rp->aio_sendraw.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } // Sent a message so we just need to look for another one. nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq); @@ -314,6 +327,17 @@ nni_req_sendcooked_cb(void *arg) nni_req_sock *req = rp->req; nni_mtx *mx = nni_sock_mtx(req->sock); + if (nni_aio_result(&rp->aio_sendcooked) != 0) { + // We failed to send... clean up and deal with it. + // We leave ourselves on the busy list for now, which + // means no new asynchronous traffic can occur here. + nni_msg_free(rp->aio_sendcooked.a_msg); + rp->aio_sendcooked.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } + // Cooked mode. We completed a cooked send, so we need to // reinsert ourselves in the ready list, and possibly schedule // a resend. @@ -335,6 +359,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } rp->aio_putq.a_msg = NULL; @@ -351,6 +376,7 @@ nni_req_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -381,6 +407,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } |
