aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-07 11:41:27 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-07 11:41:27 -0800
commitf5c259eec0cd3fa5cd623e159cbfec83b4a500d5 (patch)
tree623e47d37a45968c9e06dc58223bf75abbabd6f0 /src
parentf397fec0e0524ff73ee501642a5dca5cf22064e2 (diff)
downloadnng-f5c259eec0cd3fa5cd623e159cbfec83b4a500d5.tar.gz
nng-f5c259eec0cd3fa5cd623e159cbfec83b4a500d5.tar.bz2
nng-f5c259eec0cd3fa5cd623e159cbfec83b4a500d5.zip
Req/Rep now callback driven.
Diffstat (limited to 'src')
-rw-r--r--src/core/timer.c4
-rw-r--r--src/protocol/reqrep/rep.c2
-rw-r--r--src/protocol/reqrep/req.c449
3 files changed, 244 insertions, 211 deletions
diff --git a/src/core/timer.c b/src/core/timer.c
index 778a82b7..f64c1294 100644
--- a/src/core/timer.c
+++ b/src/core/timer.c
@@ -113,6 +113,10 @@ nni_timer_schedule(nni_timer_node *node, nni_time when)
nni_mtx_lock(&timer->t_list_mx);
+ if (nni_list_active(&timer->t_entries, node)) {
+ nni_list_remove(&timer->t_entries, node);
+ }
+
srch = nni_list_first(&timer->t_entries);
while ((srch != NULL) && (srch->t_expire < node->t_expire)) {
srch = nni_list_next(&timer->t_entries, srch);
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 154a40f7..2c658ae8 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -267,7 +267,7 @@ nni_rep_pipe_getq_cb(void *arg)
}
rp->aio_send.a_msg = rp->aio_getq.a_msg;
- rp->aio_send.a_msg = NULL;
+ rp->aio_getq.a_msg = NULL;
nni_pipe_aio_send(rp->pipe, &rp->aio_send);
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index fe61584c..f28db1df 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -20,24 +20,27 @@
typedef struct nni_req_pipe nni_req_pipe;
typedef struct nni_req_sock nni_req_sock;
+static void nni_req_resend(nni_req_sock *);
+static void nni_req_timeout(void *);
+static void nni_req_pipe_fini(void *);
+
// An nni_req_sock is our per-socket protocol private structure.
struct nni_req_sock {
nni_sock * sock;
- nni_cv cv;
nni_msgq * uwq;
nni_msgq * urq;
nni_duration retry;
nni_time resend;
int raw;
- int closing;
int wantw;
nni_msg * reqmsg;
- nni_msg * retrymsg;
- nni_list pipes;
- nni_req_pipe * nextpipe;
nni_req_pipe * pendpipe;
- int npipes;
+
+ nni_list readypipes;
+ nni_list busypipes;
+
+ nni_timer_node timer;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
@@ -47,12 +50,20 @@ struct nni_req_sock {
struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
- nni_msgq * mq;
- int sigclose;
nni_list_node node;
+ nni_aio aio_getq; // raw mode only
+ nni_aio aio_sendraw; // raw mode only
+ nni_aio aio_sendcooked; // cooked mode only
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static void nni_req_resender(void *);
+static void nni_req_getq_cb(void *);
+static void nni_req_sendraw_cb(void *);
+static void nni_req_sendcooked_cb(void *);
+static void nni_req_recv_cb(void *);
+static void nni_req_putq_cb(void *);
static int
nni_req_sock_init(void **reqp, nni_sock *sock)
@@ -63,19 +74,16 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_cv_init(&req->cv, nni_sock_mtx(sock))) != 0) {
- NNI_FREE_STRUCT(req);
- return (rv);
- }
+ NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
+ NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
+ nni_timer_init(&req->timer, nni_req_timeout, req);
+
// this is "semi random" start for request IDs.
- NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
- req->nextpipe = NULL;
- req->npipes = 0;
req->nextid = nni_random();
+
req->retry = NNI_SECOND * 60;
req->sock = sock;
req->reqmsg = NULL;
- req->retrymsg = NULL;
req->raw = 0;
req->wantw = 0;
req->resend = NNI_TIME_ZERO;
@@ -93,8 +101,7 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
- req->closing = 1;
- nni_cv_wake(&req->cv);
+ nni_timer_cancel(&req->timer);
}
@@ -104,13 +111,9 @@ nni_req_sock_fini(void *arg)
nni_req_sock *req = arg;
if (req != NULL) {
- nni_cv_fini(&req->cv);
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
- if (req->retrymsg != NULL) {
- nni_msg_free(req->retrymsg);
- }
NNI_FREE_STRUCT(req);
}
}
@@ -125,16 +128,33 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&rp->mq, 0)) != 0) {
- NNI_FREE_STRUCT(rp);
- return (rv);
+ if ((rv = nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp)) != 0) {
+ goto failed;
}
+ if ((rv = nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp)) != 0) {
+ goto failed;
+ }
+ if ((rv = nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp)) != 0) {
+ goto failed;
+ }
+ rv = nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
+ if (rv != 0) {
+ goto failed;
+ }
+ rv = nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
+ if (rv != 0) {
+ goto failed;
+ }
+
NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
- rp->sigclose = 0;
rp->req = rsock;
*rpp = rp;
return (0);
+
+failed:
+ nni_req_pipe_fini(rp);
+ return (rv);
}
@@ -144,7 +164,11 @@ nni_req_pipe_fini(void *arg)
nni_req_pipe *rp = arg;
if (rp != NULL) {
- nni_msgq_fini(rp->mq);
+ nni_aio_fini(&rp->aio_getq);
+ nni_aio_fini(&rp->aio_putq);
+ nni_aio_fini(&rp->aio_recv);
+ nni_aio_fini(&rp->aio_sendcooked);
+ nni_aio_fini(&rp->aio_sendraw);
NNI_FREE_STRUCT(rp);
}
}
@@ -159,9 +183,13 @@ nni_req_pipe_add(void *arg)
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
- nni_list_append(&req->pipes, rp);
- req->npipes++;
- nni_cv_wake(&req->cv); // Wake the top sender, new job candidate!
+ nni_list_append(&req->readypipes, rp);
+ if (req->wantw) {
+ nni_req_resend(req);
+ }
+
+ nni_msgq_aio_get(req->uwq, &rp->aio_getq);
+ nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
return (0);
}
@@ -172,88 +200,20 @@ nni_req_pipe_rem(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- if (rp == req->nextpipe) {
- req->nextpipe = nni_list_next(&req->pipes, rp);
- }
+ // This removes the node from either busypipes or readypipes.
+ // It doesn't much matter which.
+ nni_list_remove(&req->readypipes, rp);
+
if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
// we are removing the pipe we sent the last request on...
// schedule immediate resend.
req->resend = NNI_TIME_ZERO;
- nni_cv_wake(&req->cv);
+ req->wantw = 1;
+ nni_req_resend(req);
}
- req->npipes--;
- nni_list_remove(&req->pipes, rp);
-}
-
-static void
-nni_req_pipe_send(void *arg)
-{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
- nni_mtx *mx = nni_sock_mtx(req->sock);
- nni_msg *msg;
- int rv;
-
- for (;;) {
- nni_mtx_lock(mx);
- if (req->wantw) {
- nni_cv_wake(&req->cv);
- }
- nni_mtx_unlock(mx);
- if (nni_msgq_get_sig(rp->mq, &msg, &rp->sigclose) != 0) {
- break;
- }
- rv = nni_pipe_send(rp->pipe, msg);
- if (rv != 0) {
- nni_msg_free(msg);
- break;
- }
- }
- nni_msgq_signal(req->urq, &rp->sigclose);
- nni_pipe_close(rp->pipe);
-}
-
-
-static void
-nni_req_pipe_recv(void *arg)
-{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
- nni_msgq *urq = req->urq;
- nni_msgq *uwq = req->uwq;
- nni_pipe *pipe = rp->pipe;
- nni_msg *msg;
- int rv;
-
- for (;;) {
- rv = nni_pipe_recv(pipe, &msg);
- if (rv != 0) {
- break;
- }
- // We yank 4 bytes of body, and move them to the header.
- if (nni_msg_len(msg) < 4) {
- // Not enough data, just toss it.
- nni_msg_free(msg);
- continue;
- }
- if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) {
- // Should be NNG_ENOMEM
- nni_msg_free(msg);
- continue;
- }
- if (nni_msg_trim(msg, 4) != 0) {
- // This should never happen - could be an assert.
- nni_panic("Failed to trim REQ header from body");
- }
- rv = nni_msgq_put_sig(urq, msg, &rp->sigclose);
- if (rv != 0) {
- nni_msg_free(msg);
- break;
- }
- }
- nni_msgq_signal(rp->mq, &rp->sigclose);
- nni_pipe_close(pipe);
+ nni_msgq_aio_cancel(req->uwq, &rp->aio_getq);
+ nni_msgq_aio_cancel(req->urq, &rp->aio_putq);
}
@@ -297,127 +257,199 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
}
+// Raw and cooked mode differ in the way they send messages out.
+//
+// For cooked mdes, we have a getq callback on the upper write queue, which
+// when it finds a message, cancels any current processing, and saves a copy
+// of the message, and then tries to "resend" the message, looking for a
+// suitable available outgoing pipe. If no suitable pipe is available,
+// a flag is set, so that as soon as such a pipe is available we trigger
+// a resend attempt. We also trigger the attempt on either timeout, or if
+// the underlying pipe we chose disconnects.
+//
+// For raw mode we can just let the pipes "contend" via getq to get a
+// message from the upper write queue. The msgqueue implementation
+// actually provides ordering, so load will be spread automatically.
+// (NB: We may have to revise this in the future if we want to provide some
+// kind of priority.)
+
static void
-nni_req_sock_send(void *arg)
+nni_req_getq_cb(void *arg)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp;
- nni_msgq *uwq = req->uwq;
- nni_msg *msg;
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
+
+ // We should be in RAW mode. Cooked mode traffic bypasses
+ // the upper write queue entirely, and should never end up here.
+ // If the mode changes, we may briefly deliver a message, but
+ // that's ok (there's an inherent race anyway).
+
+ if (nni_aio_result(&rp->aio_getq) != 0) {
+ nni_pipe_close(rp->pipe);
+ return;
+ }
+
+ rp->aio_sendraw.a_msg = rp->aio_getq.a_msg;
+ rp->aio_getq.a_msg = NULL;
+
+ // Send the message, but use the raw mode aio.
+ nni_pipe_aio_send(rp->pipe, &rp->aio_sendraw);
+}
+
+
+static void
+nni_req_sendraw_cb(void *arg)
+{
+ nni_req_pipe *rp = arg;
+
+ // Sent a message so we just need to look for another one.
+ nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq);
+}
+
+
+static void
+nni_req_sendcooked_cb(void *arg)
+{
+ nni_req_pipe *rp = arg;
+ nni_req_sock *req = rp->req;
nni_mtx *mx = nni_sock_mtx(req->sock);
- int i;
- msg = NULL;
+ // Cooked mode. We completed a cooked send, so we need to
+ // reinsert ourselves in the ready list, and possibly schedule
+ // a resend.
- for (;;) {
- if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) {
- // ECLOSED? Should be!
- return;
- }
+ nni_mtx_lock(mx);
+ nni_list_remove(&req->busypipes, rp);
+ nni_list_append(&req->readypipes, rp);
- nni_mtx_lock(mx);
- if (!req->raw) {
- nni_mtx_unlock(mx);
- // Cooked messages come another path... just toss
- // this (shouldn't happen actually!)
- if (msg != NULL) {
- nni_msg_free(msg);
- msg = NULL;
- }
- continue;
- }
+ nni_req_resend(req);
+ nni_mtx_unlock(mx);
+}
- if (req->closing) {
- if (msg != NULL) {
- nni_mtx_unlock(mx);
- nni_msg_free(msg);
- return;
- }
- }
- req->wantw = 0;
- for (i = 0; i < req->npipes; i++) {
- rp = req->nextpipe;
- if (rp == NULL) {
- rp = nni_list_first(&req->pipes);
- }
- req->nextpipe = nni_list_next(&req->pipes, rp);
- if (nni_msgq_tryput(rp->mq, msg) == 0) {
- msg = NULL;
- break;
- }
- }
- // We weren't able to deliver it. We have two choices:
- // 1) drop the message and let the originator resend, or
- // 2) apply pushback. There is value in pushback, since it
- // will cause senders to slow down, or redistribute the work.
- // So, let's try that.
- if (msg != NULL) {
- req->wantw = 1;
- nni_cv_wait(&req->cv);
- }
- nni_mtx_unlock(mx);
+
+static void
+nni_req_putq_cb(void *arg)
+{
+ nni_req_pipe *rp = arg;
+
+ if (nni_aio_result(&rp->aio_putq) != 0) {
+ nni_msg_free(rp->aio_putq.a_msg);
+ nni_pipe_close(rp->pipe);
+ return;
}
+ rp->aio_putq.a_msg = NULL;
+
+ nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
}
static void
-nni_req_sock_resend(void *arg)
+nni_req_recv_cb(void *arg)
+{
+ nni_req_pipe *rp = arg;
+ nni_msg *msg;
+
+ if (nni_aio_result(&rp->aio_recv) != 0) {
+ nni_pipe_close(rp->pipe);
+ return;
+ }
+
+ msg = rp->aio_recv.a_msg;
+ rp->aio_recv.a_msg = NULL;
+
+ // We yank 4 bytes of body, and move them to the header.
+ if (nni_msg_len(msg) < 4) {
+ // Malformed message.
+ goto malformed;
+ }
+ if (nni_msg_append_header(msg, nni_msg_body(msg), 4) != 0) {
+ // Arguably we could just discard and carry on. But
+ // dropping the connection is probably more helpful since
+ // it lets the other side see that a problem occurred.
+ // Plus it gives us a chance to reclaim some memory.
+ goto malformed;
+ }
+ if (nni_msg_trim(msg, 4) != 0) {
+ // This should never happen - could be an assert.
+ nni_panic("Failed to trim REQ header from body");
+ }
+
+ rp->aio_putq.a_msg = msg;
+ nni_msgq_aio_put(rp->req->urq, &rp->aio_putq);
+ return;
+
+malformed:
+ nni_msg_free(msg);
+ nni_pipe_close(rp->pipe);
+}
+
+
+static void
+nni_req_timeout(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
+
+ nni_mtx_lock(mx);
+ if (req->reqmsg != NULL) {
+ req->wantw = 1;
+ nni_req_resend(req);
+ }
+ nni_mtx_unlock(mx);
+}
+
+
+static void
+nni_req_resend(nni_req_sock *req)
+{
nni_req_pipe *rp;
nni_mtx *mx = nni_sock_mtx(req->sock);
nni_msg *msg;
int i;
- for (;;) {
- nni_mtx_lock(mx);
- if (req->closing) {
- nni_mtx_unlock(mx);
+ // Note: This routine should be called with the socket lock held.
+ // Also, this should only be called while handling cooked mode
+ // requests.
+ if (req->reqmsg == NULL) {
+ return;
+ }
+
+ if (req->wantw) {
+ req->wantw = 0;
+
+ if (nni_msg_dup(&msg, req->reqmsg) != 0) {
+ // Failed to alloc message, reschedule it. Also,
+ // mark that we have a message we want to resend,
+ // in case something comes available.
+ req->wantw = 1;
+ nni_timer_schedule(&req->timer,
+ nni_clock() + req->retry);
return;
}
- if (req->reqmsg == NULL) {
- nni_cv_wait(&req->cv);
- nni_mtx_unlock(mx);
- continue;
- }
- if ((req->wantw) || (nni_clock() >= req->resend)) {
- req->wantw = 0;
-
- if (nni_msg_dup(&msg, req->reqmsg) != 0) {
- // Failed to alloc message, just wait for next
- // retry.
- req->resend = nni_clock() + req->retry;
- nni_mtx_unlock(mx);
- continue;
- }
-
- // Now we iterate across all possible outpipes, until
- // one accepts it.
- for (i = 0; i < req->npipes; i++) {
- rp = req->nextpipe;
- if (rp == NULL) {
- rp = nni_list_first(&req->pipes);
- }
- req->nextpipe = nni_list_next(&req->pipes, rp);
- if (nni_msgq_tryput(rp->mq, msg) == 0) {
- req->pendpipe = rp;
- msg = NULL;
- break;
- }
- }
- if (msg == NULL) {
- // Message was published, update the timeout.
- req->resend = nni_clock() + req->retry;
- } else {
- // No suitable outbound destination found,
- // so alert us when we get one.
- req->wantw = 1;
- nni_msg_free(msg);
- }
+ // Now we iterate across all possible outpipes, until
+ // one accepts it.
+ rp = nni_list_first(&req->readypipes);
+ if (rp == NULL) {
+ // No pipes ready to process us. Note that we have
+ // something to send, and schedule it.
+ nni_msg_free(msg);
+ req->wantw = 1;
+ return;
}
- nni_cv_until(&req->cv, req->resend);
- nni_mtx_unlock(mx);
+
+ nni_list_remove(&req->readypipes, rp);
+ nni_list_append(&req->busypipes, rp);
+
+ req->pendpipe = rp;
+ req->resend = nni_clock() + req->retry;
+ rp->aio_sendcooked.a_msg = msg;
+
+ // Note that because we were ready rather than busy, we
+ // should not have any I/O oustanding and hence the aio
+ // object will be available for our use.
+ nni_pipe_aio_send(rp->pipe, &rp->aio_sendcooked);
+ nni_timer_schedule(&req->timer, req->resend);
}
}
@@ -458,7 +490,9 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
req->reqmsg = msg;
// Schedule for immediate send
req->resend = NNI_TIME_ZERO;
- nni_cv_wake(&req->cv);
+ req->wantw = 1;
+
+ nni_req_resend(req);
// Clear the error condition.
nni_sock_recverr(req->sock, 0);
@@ -497,7 +531,6 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
nni_msg_free(req->reqmsg);
req->reqmsg = NULL;
req->pendpipe = NULL;
- nni_cv_wake(&req->cv);
return (msg);
}
@@ -509,8 +542,6 @@ static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_fini = nni_req_pipe_fini,
.pipe_add = nni_req_pipe_add,
.pipe_rem = nni_req_pipe_rem,
- .pipe_worker = { nni_req_pipe_send,
- nni_req_pipe_recv },
};
static nni_proto_sock_ops nni_req_sock_ops = {
@@ -521,8 +552,6 @@ static nni_proto_sock_ops nni_req_sock_ops = {
.sock_getopt = nni_req_sock_getopt,
.sock_rfilter = nni_req_sock_rfilter,
.sock_sfilter = nni_req_sock_sfilter,
- .sock_worker = { nni_req_sock_send,
- nni_req_sock_resend },
};
nni_proto nni_req_proto = {