aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/req.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-20 17:03:12 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-23 16:14:53 -0700
commit3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch)
treea45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/protocol/reqrep/req.c
parentfdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff)
downloadnng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/protocol/reqrep/req.c')
-rw-r--r--src/protocol/reqrep/req.c91
1 files changed, 59 insertions, 32 deletions
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 24d01df2..81abd306 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -99,7 +99,7 @@ req_sock_init(void **sp, nni_sock *sock)
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
*sp = s;
- nni_sock_recverr(sock, NNG_ESTATE);
+
return (0);
}
@@ -249,12 +249,7 @@ static int
req_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
req_sock *s = arg;
- int rv;
- rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
- if (rv == 0) {
- nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE);
- }
- return (rv);
+ return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
@@ -505,35 +500,46 @@ req_resend(req_sock *s)
}
}
-static nni_msg *
-req_sock_sfilter(void *arg, nni_msg *msg)
+static void
+req_sock_send(void *arg, nni_aio *aio)
{
req_sock *s = arg;
uint32_t id;
+ size_t len;
+ nni_msg * msg;
+ int rv;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
- // No automatic retry, and the request ID must
- // be in the header coming down.
- return (msg);
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
}
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+
+ // In cooked mode, because we need to manage our own resend logic,
+ // we bypass the upper writeq entirely.
+
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
id = (s->nextid++) | 0x80000000u;
-
// Request ID is in big endian format.
NNI_PUT32(s->reqid, id);
- if (nni_msg_header_append(msg, s->reqid, 4) != 0) {
- // Should be ENOMEM.
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- // NB: The socket lock is also held, so this is always self-serialized.
- // But we have to serialize against other async callbacks.
- nni_mtx_lock(&s->mtx);
+ // XXX: I think we should just not do this... and leave the
+ // socket "permanently writeable". This does screw up all the
+ // backpressure.
+ // nni_sock_send_pending(s->sock);
// If another message is there, this cancels it.
if (s->reqmsg != NULL) {
@@ -541,6 +547,8 @@ req_sock_sfilter(void *arg, nni_msg *msg)
s->reqmsg = NULL;
}
+ nni_aio_set_msg(aio, NULL);
+
// Make a duplicate message... for retries.
s->reqmsg = msg;
// Schedule for immediate send
@@ -548,40 +556,41 @@ req_sock_sfilter(void *arg, nni_msg *msg)
s->wantw = 1;
req_resend(s);
- nni_mtx_unlock(&s->mtx);
- // Clear the error condition.
- nni_sock_recverr(s->sock, 0);
+ nni_mtx_unlock(&s->mtx);
- return (NULL);
+ nni_aio_finish(aio, 0, len);
}
static nni_msg *
-req_sock_rfilter(void *arg, nni_msg *msg)
+req_sock_filter(void *arg, nni_msg *msg)
{
req_sock *s = arg;
nni_msg * rmsg;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
// Pass it unmolested
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
if (nni_msg_header_len(msg) < 4) {
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
- nni_mtx_lock(&s->mtx);
-
if ((rmsg = s->reqmsg) == NULL) {
- // We had no outstanding request.
+ // We had no outstanding request. (Perhaps canceled,
+ // or duplicate response.)
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
+
if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) {
- // Wrong request id
+ // Wrong request id.
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
@@ -591,12 +600,29 @@ req_sock_rfilter(void *arg, nni_msg *msg)
s->pendpipe = NULL;
nni_mtx_unlock(&s->mtx);
- nni_sock_recverr(s->sock, NNG_ESTATE);
nni_msg_free(rmsg);
return (msg);
}
+static void
+req_sock_recv(void *arg, nni_aio *aio)
+{
+ req_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (!s->raw) {
+ if (s->reqmsg == NULL) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops req_pipe_ops = {
.pipe_init = req_pipe_init,
.pipe_fini = req_pipe_fini,
@@ -630,8 +656,9 @@ static nni_proto_sock_ops req_sock_ops = {
.sock_open = req_sock_open,
.sock_close = req_sock_close,
.sock_options = req_sock_options,
- .sock_rfilter = req_sock_rfilter,
- .sock_sfilter = req_sock_sfilter,
+ .sock_filter = req_sock_filter,
+ .sock_send = req_sock_send,
+ .sock_recv = req_sock_recv,
};
static nni_proto req_proto = {