From bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Jan 2017 21:49:48 -0800 Subject: Simplify locking for protocols. In an attempt to simplify the protocol implementation, and hopefully track down a close related race, we've made it so that most protocols need not worry about locks, and can access the socket lock if they do need a lock. They also let the socket manage their workers, for the most part. (The req protocol is special, since it needs a top level work distributor, *and* a resender.) --- src/protocol/reqrep/req.c | 93 ++++++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 53 deletions(-) (limited to 'src/protocol/reqrep/req.c') diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 1c58d7a1..d8104342 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -22,7 +22,6 @@ typedef struct nni_req_sock nni_req_sock; // An nni_req_sock is our per-socket protocol private structure. struct nni_req_sock { nni_sock * sock; - nni_mtx mx; nni_cv cv; nni_msgq * uwq; nni_msgq * urq; @@ -48,7 +47,7 @@ struct nni_req_pipe { static void nni_req_resender(void *); static int -nni_req_init(void **reqp, nni_sock *sock) +nni_req_sock_init(void **reqp, nni_sock *sock) { nni_req_sock *req; int rv; @@ -56,12 +55,7 @@ nni_req_init(void **reqp, nni_sock *sock) if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&req->mx)) != 0) { - NNI_FREE_STRUCT(req); - return (rv); - } - if ((rv = nni_cv_init(&req->cv, &req->mx)) != 0) { - nni_mtx_fini(&req->mx); + if ((rv = nni_cv_init(&req->cv, nni_sock_mtx(sock))) != 0) { NNI_FREE_STRUCT(req); return (rv); } @@ -81,7 +75,6 @@ nni_req_init(void **reqp, nni_sock *sock) rv = nni_thr_init(&req->resender, nni_req_resender, req); if (rv != 0) { nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mx); NNI_FREE_STRUCT(req); return (rv); } @@ -91,20 +84,24 @@ nni_req_init(void **reqp, nni_sock *sock) static void -nni_req_fini(void *arg) +nni_req_sock_close(void *arg) { nni_req_sock *req = arg; // Shut down the resender. We request it to exit by clearing // its old value, then kick it. - nni_mtx_lock(&req->mx); req->closing = 1; nni_cv_wake(&req->cv); - nni_mtx_unlock(&req->mx); +} + + +static void +nni_req_sock_fini(void *arg) +{ + nni_req_sock *req = arg; nni_thr_fini(&req->resender); nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mx); if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } @@ -168,15 +165,16 @@ nni_req_pipe_send(void *arg) nni_msgq *uwq = req->uwq; nni_msgq *urq = req->urq; nni_pipe *pipe = rp->pipe; + nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int rv; for (;;) { - nni_mtx_lock(&req->mx); + nni_mtx_lock(mx); if ((msg = req->retrymsg) != NULL) { req->retrymsg = NULL; } - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); if (msg == NULL) { rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); if (rv != 0) { @@ -237,21 +235,17 @@ nni_req_pipe_recv(void *arg) static int -nni_req_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_req_sock *req = arg; int rv; switch (opt) { case NNG_OPT_RESENDTIME: - nni_mtx_lock(&req->mx); rv = nni_setopt_duration(&req->retry, buf, sz); - nni_mtx_unlock(&req->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&req->mx); rv = nni_setopt_int(&req->raw, buf, sz, 0, 1); - nni_mtx_unlock(&req->mx); break; default: rv = NNG_ENOTSUP; @@ -261,21 +255,17 @@ nni_req_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_req_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_req_sock *req = arg; int rv; switch (opt) { case NNG_OPT_RESENDTIME: - nni_mtx_lock(&req->mx); rv = nni_getopt_duration(&req->retry, buf, szp); - nni_mtx_unlock(&req->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&req->mx); rv = nni_getopt_int(&req->raw, buf, szp); - nni_mtx_unlock(&req->mx); break; default: rv = NNG_ENOTSUP; @@ -288,17 +278,18 @@ static void nni_req_resender(void *arg) { nni_req_sock *req = arg; + nni_mtx *mx = nni_sock_mtx(req->sock); int rv; for (;;) { - nni_mtx_lock(&req->mx); + nni_mtx_lock(mx); if (req->closing) { - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); return; } if (req->reqmsg == NULL) { nni_cv_wait(&req->cv); - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); continue; } rv = nni_cv_until(&req->cv, req->resend); @@ -309,22 +300,20 @@ nni_req_resender(void *arg) } req->resend = nni_clock() + req->retry; } - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); } } static nni_msg * -nni_req_sendfilter(void *arg, nni_msg *msg) +nni_req_sock_sfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; uint32_t id; - nni_mtx_lock(&req->mx); if (req->raw) { // No automatic retry, and the request ID must // be in the header coming down. - nni_mtx_unlock(&req->mx); return (msg); } @@ -338,7 +327,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg) if (nni_msg_append_header(msg, req->reqid, 4) != 0) { // Should be ENOMEM. - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -351,7 +339,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg) // Make a duplicate message... for retries. if (nni_msg_dup(&req->reqmsg, msg) != 0) { - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -362,39 +349,33 @@ nni_req_sendfilter(void *arg, nni_msg *msg) // Clear the error condition. nni_sock_recverr(req->sock, 0); - nni_mtx_unlock(&req->mx); return (msg); } static nni_msg * -nni_req_recvfilter(void *arg, nni_msg *msg) +nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; - nni_mtx_lock(&req->mx); if (req->raw) { // Pass it unmolested - nni_mtx_unlock(&req->mx); return (msg); } if (nni_msg_header_len(msg) < 4) { - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } if (req->reqmsg == NULL) { // We had no outstanding request. - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { // Wrong request id - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -403,14 +384,13 @@ nni_req_recvfilter(void *arg, nni_msg *msg) nni_msg_free(req->reqmsg); req->reqmsg = NULL; nni_cv_wake(&req->cv); - nni_mtx_unlock(&req->mx); return (msg); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_req_proto_pipe = { +static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_add = nni_req_pipe_add, @@ -419,15 +399,22 @@ static nni_proto_pipe nni_req_proto_pipe = { .pipe_recv = nni_req_pipe_recv, }; +static nni_proto_sock_ops nni_req_sock_ops = { + .sock_init = nni_req_sock_init, + .sock_fini = nni_req_sock_fini, + .sock_close = nni_req_sock_close, + .sock_setopt = nni_req_sock_setopt, + .sock_getopt = nni_req_sock_getopt, + .sock_rfilter = nni_req_sock_rfilter, + .sock_sfilter = nni_req_sock_sfilter, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_req_proto = { - .proto_self = NNG_PROTO_REQ, - .proto_peer = NNG_PROTO_REP, - .proto_name = "req", - .proto_pipe = &nni_req_proto_pipe, - .proto_init = nni_req_init, - .proto_fini = nni_req_fini, - .proto_setopt = nni_req_setopt, - .proto_getopt = nni_req_getopt, - .proto_recv_filter = nni_req_recvfilter, - .proto_send_filter = nni_req_sendfilter, + .proto_self = NNG_PROTO_REQ, + .proto_peer = NNG_PROTO_REP, + .proto_name = "req", + .proto_sock_ops = &nni_req_sock_ops, + .proto_pipe_ops = &nni_req_pipe_ops, }; -- cgit v1.2.3-70-g09d2