From 0a51aa7bfc88d55b98fdde0d497b072e6911457d Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 24 Jun 2017 14:11:35 -0700 Subject: Protocols keep their own reference counts. --- src/protocol/reqrep/rep.c | 100 +++++++++++++++++++++++++--------------- src/protocol/reqrep/req.c | 113 ++++++++++++++++++++++++++++++---------------- 2 files changed, 136 insertions(+), 77 deletions(-) (limited to 'src/protocol/reqrep') diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 822758ef..507edf66 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -38,6 +38,7 @@ struct nni_rep_sock { char * btrace; size_t btrace_len; nni_aio aio_getq; + nni_mtx mtx; }; // An nni_rep_pipe is our per-pipe protocol private structure. @@ -51,8 +52,25 @@ struct nni_rep_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; +static void +nni_rep_sock_fini(void *arg) +{ + nni_rep_sock *rep = arg; + + nni_aio_fini(&rep->aio_getq); + nni_idhash_fini(&rep->pipes); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + } + nni_mtx_fini(&rep->mtx); + NNI_FREE_STRUCT(rep); +} + + static int nni_rep_sock_init(void **repp, nni_sock *sock) { @@ -67,15 +85,14 @@ nni_rep_sock_init(void **repp, nni_sock *sock) rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_init(&rep->pipes)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); + if (((rv = nni_mtx_init(&rep->mtx)) != 0) || + ((rv = nni_idhash_init(&rep->pipes)) != 0)) { + goto fail; } rv = nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep); if (rv != 0) { - nni_idhash_fini(&rep->pipes); - return (rv); + goto fail; } rep->uwq = nni_sock_sendq(sock); @@ -85,6 +102,10 @@ nni_rep_sock_init(void **repp, nni_sock *sock) nni_sock_senderr(sock, NNG_ESTATE); return (0); + +fail: + nni_rep_sock_fini(rep); + return (rv); } @@ -106,20 +127,6 @@ nni_rep_sock_close(void *arg) } -static void -nni_rep_sock_fini(void *arg) -{ - nni_rep_sock *rep = arg; - - nni_aio_fini(&rep->aio_getq); - nni_idhash_fini(&rep->pipes); - if (rep->btrace != NULL) { - nni_free(rep->btrace, rep->btrace_len); - } - NNI_FREE_STRUCT(rep); -} - - static int nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { @@ -129,7 +136,8 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + if (((rv = nni_msgq_init(&rp->sendq, 2)) != 0) || + ((rv = nni_mtx_init(&rp->mtx)) != 0)) { goto fail; } if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) { @@ -165,6 +173,7 @@ nni_rep_pipe_fini(void *arg) nni_aio_fini(&rp->aio_send); nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_putq); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } @@ -177,31 +186,53 @@ nni_rep_pipe_start(void *arg) int rv; rp->id = nni_pipe_id(rp->pipe); + + nni_mtx_lock(&rep->mtx); rv = nni_idhash_insert(&rep->pipes, rp->id, rp); + nni_mtx_unlock(&rep->mtx); if (rv != 0) { return (rv); } - nni_pipe_hold(rp->pipe); + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); + nni_msgq_aio_get(rp->sendq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_rep_pipe_stop(void *arg) +nni_rep_pipe_stop(nni_rep_pipe *rp) { - nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; + int refcnt; + uint32_t id; + nni_mtx_lock(&rp->mtx); + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + id = rp->id; + rp->id = 0; if (rp->running) { rp->running = 0; nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); - nni_idhash_remove(&rep->pipes, rp->id); + } + nni_mtx_unlock(&rp->mtx); + + if (id != 0) { + nni_mtx_lock(&rep->mtx); + nni_idhash_remove(&rep->pipes, id); + nni_mtx_unlock(&rep->mtx); + } + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); } } @@ -246,12 +277,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_sock_lock(rep->sock); + nni_mtx_lock(&rep->mtx); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); + nni_mtx_unlock(&rep->mtx); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -267,8 +298,7 @@ nni_rep_pipe_getq_cb(void *arg) nni_rep_pipe *rp = arg; if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -287,8 +317,7 @@ nni_rep_pipe_send_cb(void *arg) if (nni_aio_result(&rp->aio_send) != 0) { nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -308,8 +337,7 @@ nni_rep_pipe_recv_cb(void *arg) int hops; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -371,8 +399,7 @@ nni_rep_pipe_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_rep_pipe_stop(rp); return; } @@ -494,7 +521,6 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_start = nni_rep_pipe_start, - .pipe_stop = nni_rep_pipe_stop, }; static nni_proto_sock_ops nni_rep_sock_ops = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 8268ecd6..519b224e 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -44,6 +44,7 @@ struct nni_req_sock { uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) + nni_mtx mtx; }; // An nni_req_pipe is our per-pipe protocol private structure. @@ -57,6 +58,8 @@ struct nni_req_pipe { nni_aio aio_recv; nni_aio aio_putq; int running; + int refcnt; + nni_mtx mtx; }; static void nni_req_resender(void *); @@ -75,6 +78,11 @@ nni_req_sock_init(void **reqp, nni_sock *sock) if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&req->mtx)) != 0) { + NNI_FREE_STRUCT(req); + return (rv); + } + NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node); NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node); nni_timer_init(&req->timer, nni_req_timeout, req); @@ -114,6 +122,7 @@ nni_req_sock_fini(void *arg) if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } + nni_mtx_fini(&req->mtx); NNI_FREE_STRUCT(req); } @@ -127,6 +136,9 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&rp->mtx)) != 0) { + goto failed; + } if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) { goto failed; } @@ -168,6 +180,7 @@ nni_req_pipe_fini(void *arg) nni_aio_fini(&rp->aio_recv); nni_aio_fini(&rp->aio_sendcooked); nni_aio_fini(&rp->aio_sendraw); + nni_mtx_fini(&rp->mtx); NNI_FREE_STRUCT(rp); } } @@ -182,45 +195,61 @@ nni_req_pipe_start(void *arg) if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } + nni_mtx_lock(&req->mtx); nni_list_append(&req->readypipes, rp); if (req->wantw) { nni_req_resend(req); } + nni_mtx_unlock(&req->mtx); + + nni_mtx_lock(&rp->mtx); + rp->refcnt = 2; + rp->running = 1; + nni_mtx_unlock(&rp->mtx); - nni_pipe_hold(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); - nni_pipe_hold(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); - rp->running = 1; return (0); } static void -nni_req_pipe_stop(void *arg) +nni_req_pipe_stop(nni_req_pipe *rp) { - nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; + int refcnt; + int running; - if (!rp->running) { - return; - } + nni_mtx_lock(&rp->mtx); + running = rp->running; rp->running = 0; + NNI_ASSERT(rp->refcnt > 0); + rp->refcnt--; + refcnt = rp->refcnt; + nni_mtx_unlock(&rp->mtx); + + if (running) { + nni_mtx_lock(&req->mtx); + // This removes the node from either busypipes or readypipes. + // It doesn't much matter which. + nni_list_remove(&req->readypipes, rp); - // This removes the node from either busypipes or readypipes. - // It doesn't much matter which. - nni_list_remove(&req->readypipes, rp); - - if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { - // we are removing the pipe we sent the last request on... - // schedule immediate resend. - req->resend = NNI_TIME_ZERO; - req->wantw = 1; - nni_req_resend(req); + if ((rp == req->pendpipe) && (req->reqmsg != NULL)) { + // removing the pipe we sent the last request on... + // schedule immediate resend. + req->resend = NNI_TIME_ZERO; + req->wantw = 1; + nni_req_resend(req); + } + nni_mtx_unlock(&req->mtx); } nni_msgq_aio_cancel(req->uwq, &rp->aio_getq); nni_msgq_aio_cancel(req->urq, &rp->aio_putq); + + if (refcnt == 0) { + nni_pipe_remove(rp->pipe); + } } @@ -293,8 +322,7 @@ nni_req_getq_cb(void *arg) // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -315,8 +343,7 @@ nni_req_sendraw_cb(void *arg) if (nni_aio_result(&rp->aio_sendraw) != 0) { nni_msg_free(rp->aio_sendraw.a_msg); rp->aio_sendraw.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -338,8 +365,7 @@ nni_req_sendcooked_cb(void *arg) // means no new asynchronous traffic can occur here. nni_msg_free(rp->aio_sendcooked.a_msg); rp->aio_sendcooked.a_msg = NULL; - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -347,12 +373,12 @@ nni_req_sendcooked_cb(void *arg) // reinsert ourselves in the ready list, and possibly schedule // a resend. - nni_mtx_lock(mx); + nni_mtx_lock(&req->mtx); nni_list_remove(&req->busypipes, rp); nni_list_append(&req->readypipes, rp); nni_req_resend(req); - nni_mtx_unlock(mx); + nni_mtx_unlock(&req->mtx); } @@ -363,8 +389,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } rp->aio_putq.a_msg = NULL; @@ -380,8 +405,7 @@ nni_req_recv_cb(void *arg) nni_msg *msg; if (nni_aio_result(&rp->aio_recv) != 0) { - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); return; } @@ -411,8 +435,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); - nni_pipe_close(rp->pipe); - nni_pipe_rele(rp->pipe); + nni_req_pipe_stop(rp); } @@ -420,14 +443,13 @@ static void nni_req_timeout(void *arg) { nni_req_sock *req = arg; - nni_mtx *mx = nni_sock_mtx(req->sock); - nni_mtx_lock(mx); + nni_mtx_lock(&req->mtx); if (req->reqmsg != NULL) { req->wantw = 1; nni_req_resend(req); } - nni_mtx_unlock(mx); + nni_mtx_unlock(&req->mtx); } @@ -435,7 +457,6 @@ static void nni_req_resend(nni_req_sock *req) { nni_req_pipe *rp; - nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int i; @@ -512,6 +533,10 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) return (NULL); } + // NB: The socket lock is also held, so this is always self-serialized. + // But we have to serialize against other async callbacks. + nni_mtx_lock(&req->mtx); + // If another message is there, this cancels it. if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); @@ -525,6 +550,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg) req->wantw = 1; nni_req_resend(req); + nni_mtx_unlock(&req->mtx); // Clear the error condition. nni_sock_recverr(req->sock, 0); @@ -537,6 +563,7 @@ static nni_msg * nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; + nni_msg *rmsg; if (req->raw) { // Pass it unmolested @@ -548,21 +575,28 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg) return (NULL); } - if (req->reqmsg == NULL) { + nni_mtx_lock(&req->mtx); + + if ((rmsg = req->reqmsg) == NULL) { // We had no outstanding request. + nni_mtx_unlock(&req->mtx); nni_msg_free(msg); return (NULL); } if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { // Wrong request id + nni_mtx_unlock(&req->mtx); nni_msg_free(msg); return (NULL); } - nni_sock_recverr(req->sock, NNG_ESTATE); - nni_msg_free(req->reqmsg); req->reqmsg = NULL; req->pendpipe = NULL; + nni_mtx_unlock(&req->mtx); + + nni_sock_recverr(req->sock, NNG_ESTATE); + nni_msg_free(rmsg); + return (msg); } @@ -573,7 +607,6 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_start = nni_req_pipe_start, - .pipe_stop = nni_req_pipe_stop, }; static nni_proto_sock_ops nni_req_sock_ops = { -- cgit v1.2.3-70-g09d2