aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-07 21:49:48 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-07 21:52:30 -0800
commitbc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 (patch)
tree55ca7c800e9dfa54bb58b3f2323b1cb5996fab09 /src/protocol/reqrep
parentffdceebc19214f384f1b1b6b358f1b2301384135 (diff)
downloadnng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.gz
nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.bz2
nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.zip
Simplify locking for protocols.
In an attempt to simplify the protocol implementation, and hopefully track down a close related race, we've made it so that most protocols need not worry about locks, and can access the socket lock if they do need a lock. They also let the socket manage their workers, for the most part. (The req protocol is special, since it needs a top level work distributor, *and* a resender.)
Diffstat (limited to 'src/protocol/reqrep')
-rw-r--r--src/protocol/reqrep/rep.c99
-rw-r--r--src/protocol/reqrep/req.c93
2 files changed, 71 insertions, 121 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 56ee2367..8de196c4 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -22,7 +22,6 @@ typedef struct nni_rep_sock nni_rep_sock;
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_rep_sock {
nni_sock * sock;
- nni_mtx mx;
nni_msgq * uwq;
nni_msgq * urq;
int raw;
@@ -44,7 +43,7 @@ struct nni_rep_pipe {
static void nni_rep_topsender(void *);
static int
-nni_rep_init(void **repp, nni_sock *sock)
+nni_rep_sock_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
int rv;
@@ -52,17 +51,12 @@ nni_rep_init(void **repp, nni_sock *sock)
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&rep->mx)) != 0) {
- NNI_FREE_STRUCT(rep);
- return (rv);
- }
rep->ttl = 8; // Per RFC
rep->sock = sock;
rep->raw = 0;
rep->btrace = NULL;
rep->btrace_len = 0;
if ((rv = nni_idhash_create(&rep->pipes)) != 0) {
- nni_mtx_fini(&rep->mx);
NNI_FREE_STRUCT(rep);
return (rv);
}
@@ -70,28 +64,18 @@ nni_rep_init(void **repp, nni_sock *sock)
rep->uwq = nni_sock_sendq(sock);
rep->urq = nni_sock_recvq(sock);
- rv = nni_thr_init(&rep->sender, nni_rep_topsender, rep);
- if (rv != 0) {
- nni_idhash_destroy(rep->pipes);
- nni_mtx_fini(&rep->mx);
- NNI_FREE_STRUCT(rep);
- return (rv);
- }
*repp = rep;
nni_sock_senderr(sock, NNG_ESTATE);
- nni_thr_run(&rep->sender);
return (0);
}
static void
-nni_rep_fini(void *arg)
+nni_rep_sock_fini(void *arg)
{
nni_rep_sock *rep = arg;
- nni_thr_fini(&rep->sender);
nni_idhash_destroy(rep->pipes);
- nni_mtx_fini(&rep->mx);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
}
@@ -135,16 +119,8 @@ nni_rep_pipe_add(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int rv;
-
- if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) {
- return (NNG_EPROTO);
- }
- nni_mtx_lock(&rep->mx);
- rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
- nni_mtx_unlock(&rep->mx);
- return (rv);
+ return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp));
}
@@ -154,22 +130,21 @@ nni_rep_pipe_rem(void *arg)
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
- nni_mtx_unlock(&rep->mx);
}
-// nni_rep_topsender watches for messages from the upper write queue,
+// nni_rep_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.
static void
-nni_rep_topsender(void *arg)
+nni_rep_sock_send(void *arg)
{
nni_rep_sock *rep = arg;
nni_msgq *uwq = rep->uwq;
nni_msgq *urq = rep->urq;
+ nni_mtx *mx = nni_sock_mtx(rep->sock);
nni_msg *msg;
for (;;) {
@@ -190,9 +165,9 @@ nni_rep_topsender(void *arg)
NNI_GET32(header, id);
nni_msg_trim_header(msg, 4);
- nni_mtx_lock(&rep->mx);
+ nni_mtx_lock(mx);
if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) {
- nni_mtx_unlock(&rep->mx);
+ nni_mtx_unlock(mx);
nni_msg_free(msg);
continue;
}
@@ -204,7 +179,7 @@ nni_rep_topsender(void *arg)
// circumstances.
nni_msg_free(msg);
}
- nni_mtx_unlock(&rep->mx);
+ nni_mtx_unlock(mx);
}
}
@@ -218,8 +193,6 @@ nni_rep_pipe_send(void *arg)
nni_msgq *wq = rp->sendq;
nni_pipe *pipe = rp->pipe;
nni_msg *msg;
- uint8_t *body;
- size_t size;
int rv;
for (;;) {
@@ -311,21 +284,17 @@ again:
static int
-nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_rep_sock *rep = arg;
int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
- nni_mtx_lock(&rep->mx);
rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255);
- nni_mtx_unlock(&rep->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&rep->mx);
rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&rep->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -335,21 +304,17 @@ nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_rep_sock *rep = arg;
int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
- nni_mtx_lock(&rep->mx);
rv = nni_getopt_int(&rep->ttl, buf, szp);
- nni_mtx_unlock(&rep->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&rep->mx);
rv = nni_getopt_int(&rep->raw, buf, szp);
- nni_mtx_unlock(&rep->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -359,14 +324,12 @@ nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_msg *
-nni_rep_sendfilter(void *arg, nni_msg *msg)
+nni_rep_sock_sfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
size_t len;
- nni_mtx_lock(&rep->mx);
if (rep->raw) {
- nni_mtx_unlock(&rep->mx);
return (msg);
}
@@ -376,7 +339,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
if (rep->btrace == NULL) {
- nni_mtx_unlock(&rep->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -388,7 +350,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
nni_free(rep->btrace, rep->btrace_len);
rep->btrace = NULL;
rep->btrace_len = 0;
- nni_mtx_unlock(&rep->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -396,21 +357,18 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
nni_free(rep->btrace, rep->btrace_len);
rep->btrace = NULL;
rep->btrace_len = 0;
- nni_mtx_unlock(&rep->mx);
return (msg);
}
static nni_msg *
-nni_rep_recvfilter(void *arg, nni_msg *msg)
+nni_rep_sock_rfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
char *header;
size_t len;
- nni_mtx_lock(&rep->mx);
if (rep->raw) {
- nni_mtx_unlock(&rep->mx);
return (msg);
}
@@ -423,21 +381,19 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
rep->btrace_len = 0;
}
if ((rep->btrace = nni_alloc(len)) == NULL) {
- nni_mtx_unlock(&rep->mx);
nni_msg_free(msg);
return (NULL);
}
rep->btrace_len = len;
memcpy(rep->btrace, header, len);
nni_msg_trunc_header(msg, len);
- nni_mtx_unlock(&rep->mx);
return (msg);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_rep_proto_pipe = {
+static nni_proto_pipe_ops nni_rep_pipe_ops = {
.pipe_init = nni_rep_pipe_init,
.pipe_fini = nni_rep_pipe_fini,
.pipe_add = nni_rep_pipe_add,
@@ -446,15 +402,22 @@ static nni_proto_pipe nni_rep_proto_pipe = {
.pipe_recv = nni_rep_pipe_recv,
};
+static nni_proto_sock_ops nni_rep_sock_ops = {
+ .sock_init = nni_rep_sock_init,
+ .sock_fini = nni_rep_sock_fini,
+ .sock_close = NULL,
+ .sock_setopt = nni_rep_sock_setopt,
+ .sock_getopt = nni_rep_sock_getopt,
+ .sock_rfilter = nni_rep_sock_rfilter,
+ .sock_sfilter = nni_rep_sock_sfilter,
+ .sock_send = nni_rep_sock_send,
+ .sock_recv = NULL,
+};
+
nni_proto nni_rep_proto = {
- .proto_self = NNG_PROTO_REP,
- .proto_peer = NNG_PROTO_REQ,
- .proto_name = "rep",
- .proto_pipe = &nni_rep_proto_pipe,
- .proto_init = nni_rep_init,
- .proto_fini = nni_rep_fini,
- .proto_setopt = nni_rep_setopt,
- .proto_getopt = nni_rep_getopt,
- .proto_recv_filter = nni_rep_recvfilter,
- .proto_send_filter = nni_rep_sendfilter,
+ .proto_self = NNG_PROTO_REP,
+ .proto_peer = NNG_PROTO_REQ,
+ .proto_name = "rep",
+ .proto_sock_ops = &nni_rep_sock_ops,
+ .proto_pipe_ops = &nni_rep_pipe_ops,
};
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 1c58d7a1..d8104342 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -22,7 +22,6 @@ typedef struct nni_req_sock nni_req_sock;
// An nni_req_sock is our per-socket protocol private structure.
struct nni_req_sock {
nni_sock * sock;
- nni_mtx mx;
nni_cv cv;
nni_msgq * uwq;
nni_msgq * urq;
@@ -48,7 +47,7 @@ struct nni_req_pipe {
static void nni_req_resender(void *);
static int
-nni_req_init(void **reqp, nni_sock *sock)
+nni_req_sock_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
int rv;
@@ -56,12 +55,7 @@ nni_req_init(void **reqp, nni_sock *sock)
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&req->mx)) != 0) {
- NNI_FREE_STRUCT(req);
- return (rv);
- }
- if ((rv = nni_cv_init(&req->cv, &req->mx)) != 0) {
- nni_mtx_fini(&req->mx);
+ if ((rv = nni_cv_init(&req->cv, nni_sock_mtx(sock))) != 0) {
NNI_FREE_STRUCT(req);
return (rv);
}
@@ -81,7 +75,6 @@ nni_req_init(void **reqp, nni_sock *sock)
rv = nni_thr_init(&req->resender, nni_req_resender, req);
if (rv != 0) {
nni_cv_fini(&req->cv);
- nni_mtx_fini(&req->mx);
NNI_FREE_STRUCT(req);
return (rv);
}
@@ -91,20 +84,24 @@ nni_req_init(void **reqp, nni_sock *sock)
static void
-nni_req_fini(void *arg)
+nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
// Shut down the resender. We request it to exit by clearing
// its old value, then kick it.
- nni_mtx_lock(&req->mx);
req->closing = 1;
nni_cv_wake(&req->cv);
- nni_mtx_unlock(&req->mx);
+}
+
+
+static void
+nni_req_sock_fini(void *arg)
+{
+ nni_req_sock *req = arg;
nni_thr_fini(&req->resender);
nni_cv_fini(&req->cv);
- nni_mtx_fini(&req->mx);
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
@@ -168,15 +165,16 @@ nni_req_pipe_send(void *arg)
nni_msgq *uwq = req->uwq;
nni_msgq *urq = req->urq;
nni_pipe *pipe = rp->pipe;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
nni_msg *msg;
int rv;
for (;;) {
- nni_mtx_lock(&req->mx);
+ nni_mtx_lock(mx);
if ((msg = req->retrymsg) != NULL) {
req->retrymsg = NULL;
}
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
if (msg == NULL) {
rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
if (rv != 0) {
@@ -237,21 +235,17 @@ nni_req_pipe_recv(void *arg)
static int
-nni_req_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_req_sock *req = arg;
int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
- nni_mtx_lock(&req->mx);
rv = nni_setopt_duration(&req->retry, buf, sz);
- nni_mtx_unlock(&req->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&req->mx);
rv = nni_setopt_int(&req->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&req->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -261,21 +255,17 @@ nni_req_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_req_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_req_sock *req = arg;
int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
- nni_mtx_lock(&req->mx);
rv = nni_getopt_duration(&req->retry, buf, szp);
- nni_mtx_unlock(&req->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&req->mx);
rv = nni_getopt_int(&req->raw, buf, szp);
- nni_mtx_unlock(&req->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -288,17 +278,18 @@ static void
nni_req_resender(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
int rv;
for (;;) {
- nni_mtx_lock(&req->mx);
+ nni_mtx_lock(mx);
if (req->closing) {
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
return;
}
if (req->reqmsg == NULL) {
nni_cv_wait(&req->cv);
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
continue;
}
rv = nni_cv_until(&req->cv, req->resend);
@@ -309,22 +300,20 @@ nni_req_resender(void *arg)
}
req->resend = nni_clock() + req->retry;
}
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
}
}
static nni_msg *
-nni_req_sendfilter(void *arg, nni_msg *msg)
+nni_req_sock_sfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
uint32_t id;
- nni_mtx_lock(&req->mx);
if (req->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
- nni_mtx_unlock(&req->mx);
return (msg);
}
@@ -338,7 +327,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
if (nni_msg_append_header(msg, req->reqid, 4) != 0) {
// Should be ENOMEM.
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -351,7 +339,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
// Make a duplicate message... for retries.
if (nni_msg_dup(&req->reqmsg, msg) != 0) {
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -362,39 +349,33 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
// Clear the error condition.
nni_sock_recverr(req->sock, 0);
- nni_mtx_unlock(&req->mx);
return (msg);
}
static nni_msg *
-nni_req_recvfilter(void *arg, nni_msg *msg)
+nni_req_sock_rfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- nni_mtx_lock(&req->mx);
if (req->raw) {
// Pass it unmolested
- nni_mtx_unlock(&req->mx);
return (msg);
}
if (nni_msg_header_len(msg) < 4) {
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
if (req->reqmsg == NULL) {
// We had no outstanding request.
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) {
// Wrong request id
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -403,14 +384,13 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
nni_msg_free(req->reqmsg);
req->reqmsg = NULL;
nni_cv_wake(&req->cv);
- nni_mtx_unlock(&req->mx);
return (msg);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_req_proto_pipe = {
+static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_init = nni_req_pipe_init,
.pipe_fini = nni_req_pipe_fini,
.pipe_add = nni_req_pipe_add,
@@ -419,15 +399,22 @@ static nni_proto_pipe nni_req_proto_pipe = {
.pipe_recv = nni_req_pipe_recv,
};
+static nni_proto_sock_ops nni_req_sock_ops = {
+ .sock_init = nni_req_sock_init,
+ .sock_fini = nni_req_sock_fini,
+ .sock_close = nni_req_sock_close,
+ .sock_setopt = nni_req_sock_setopt,
+ .sock_getopt = nni_req_sock_getopt,
+ .sock_rfilter = nni_req_sock_rfilter,
+ .sock_sfilter = nni_req_sock_sfilter,
+ .sock_send = NULL,
+ .sock_recv = NULL,
+};
+
nni_proto nni_req_proto = {
- .proto_self = NNG_PROTO_REQ,
- .proto_peer = NNG_PROTO_REP,
- .proto_name = "req",
- .proto_pipe = &nni_req_proto_pipe,
- .proto_init = nni_req_init,
- .proto_fini = nni_req_fini,
- .proto_setopt = nni_req_setopt,
- .proto_getopt = nni_req_getopt,
- .proto_recv_filter = nni_req_recvfilter,
- .proto_send_filter = nni_req_sendfilter,
+ .proto_self = NNG_PROTO_REQ,
+ .proto_peer = NNG_PROTO_REP,
+ .proto_name = "req",
+ .proto_sock_ops = &nni_req_sock_ops,
+ .proto_pipe_ops = &nni_req_pipe_ops,
};