aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pipeline/pull.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pipeline/pull.c')
-rw-r--r--src/protocol/pipeline/pull.c107
1 files changed, 95 insertions, 12 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 6f2d716b..ec66fab6 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -17,6 +17,11 @@
typedef struct nni_pull_pipe nni_pull_pipe;
typedef struct nni_pull_sock nni_pull_sock;
+static void nni_pull_putq_cb(void *);
+static void nni_pull_recv_cb(void *);
+static void nni_pull_recv(nni_pull_pipe *);
+static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
+
// An nni_pull_sock is our per-socket protocol private structure.
struct nni_pull_sock {
nni_msgq * urq;
@@ -27,6 +32,8 @@ struct nni_pull_sock {
struct nni_pull_pipe {
nni_pipe * pipe;
nni_pull_sock * pull;
+ nni_aio putq_aio;
+ nni_aio recv_aio;
};
static int
@@ -60,10 +67,20 @@ static int
nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
nni_pull_pipe *pp;
+ int rv;
if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) {
+ NNI_FREE_STRUCT(pp);
+ return (rv);
+ }
+ if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) {
+ nni_aio_fini(&pp->putq_aio);
+ NNI_FREE_STRUCT(pp);
+ return (rv);
+ }
pp->pipe = pipe;
pp->pull = psock;
*ppp = pp;
@@ -77,28 +94,93 @@ nni_pull_pipe_fini(void *arg)
nni_pull_pipe *pp = arg;
if (pp != NULL) {
+ nni_aio_fini(&pp->putq_aio);
+ nni_aio_fini(&pp->recv_aio);
NNI_FREE_STRUCT(pp);
}
}
+static int
+nni_pull_pipe_start(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+
+ // Start the pending pull...
+ nni_pull_recv(pp);
+
+ return (0);
+}
+
+
static void
-nni_pull_pipe_recv(void *arg)
+nni_pull_pipe_stop(void *arg)
{
nni_pull_pipe *pp = arg;
- nni_pull_sock *pull = pp->pull;
+
+ // Cancel any pending sendup.
+ nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio);
+}
+
+
+static void
+nni_pull_recv_cb(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+ nni_aio *aio = &pp->recv_aio;
nni_msg *msg;
- for (;;) {
- if (nni_pipe_recv(pp->pipe, &msg) != 0) {
- break;
- }
- if (nni_msgq_put(pull->urq, msg) != 0) {
- nni_msg_free(msg);
- break;
- }
+ if (nni_aio_result(aio) != 0) {
+ // Failed to get a message, probably the pipe is closed.
+ nni_pipe_close(pp->pipe);
+ return;
+ }
+
+ // Got a message... start the put to send it up to the application.
+ msg = aio->a_msg;
+ aio->a_msg = NULL;
+ nni_pull_putq(pp, msg);
+}
+
+
+static void
+nni_pull_putq_cb(void *arg)
+{
+ nni_pull_pipe *pp = arg;
+ nni_aio *aio = &pp->putq_aio;
+ int rv;
+
+ if (nni_aio_result(aio) != 0) {
+ // If we failed to put, probably NNG_ECLOSED, nothing else
+ // we can do. Just close the pipe.
+ nni_pipe_close(pp->pipe);
+ return;
+ }
+
+ nni_pull_recv(pp);
+}
+
+
+// nni_pull_recv is called to schedule a pending recv on the incoming pipe.
+static void
+nni_pull_recv(nni_pull_pipe *pp)
+{
+ // Schedule the aio with callback.
+ if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) {
+ nni_pipe_close(pp->pipe);
}
- nni_pipe_close(pp->pipe);
+}
+
+
+// nni_pull_putq schedules a put operation to the user socket (sendup).
+static void
+nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
+{
+ nni_pull_sock *pull = pp->pull;
+
+ pp->putq_aio.a_msg = msg;
+
+ nni_msgq_aio_put(pull->urq, &pp->putq_aio);
}
@@ -141,7 +223,8 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
- .pipe_worker = { nni_pull_pipe_recv },
+ .pipe_add = nni_pull_pipe_start,
+ .pipe_rem = nni_pull_pipe_stop,
};
static nni_proto_sock_ops nni_pull_sock_ops = {