aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub/sub.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-06 11:21:32 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-06 11:21:32 -0800
commit68586f0090419def344debf970402750332af098 (patch)
treebff6f606ed2f5bdf448b904725a0baaf805fbc57 /src/protocol/pubsub/sub.c
parent44a6de38d240143ec2b4bb6f6457bae81271820a (diff)
downloadnng-68586f0090419def344debf970402750332af098.tar.gz
nng-68586f0090419def344debf970402750332af098.tar.bz2
nng-68586f0090419def344debf970402750332af098.zip
Pub/Sub now callback driven.
Diffstat (limited to 'src/protocol/pubsub/sub.c')
-rw-r--r--src/protocol/pubsub/sub.c75
1 files changed, 58 insertions, 17 deletions
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 46dcfd5c..864a80c7 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -20,6 +20,10 @@ typedef struct nni_sub_pipe nni_sub_pipe;
typedef struct nni_sub_sock nni_sub_sock;
typedef struct nni_sub_topic nni_sub_topic;
+static void nni_sub_recv_cb(void *);
+static void nni_sub_putq_cb(void *);
+static void nni_sub_pipe_fini(void *);
+
struct nni_sub_topic {
nni_list_node node;
size_t len;
@@ -38,6 +42,8 @@ struct nni_sub_sock {
struct nni_sub_pipe {
nni_pipe * pipe;
nni_sub_sock * sub;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static int
@@ -80,10 +86,16 @@ static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
+ int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp)) != 0) ||
+ ((rv = nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp)) != 0)) {
+ nni_sub_pipe_fini(sp);
+ return (rv);
+ }
sp->pipe = pipe;
sp->sub = ssock;
*spp = sp;
@@ -97,36 +109,64 @@ nni_sub_pipe_fini(void *arg)
nni_sub_pipe *sp = arg;
if (sp != NULL) {
+ nni_aio_fini(&sp->aio_putq);
+ nni_aio_fini(&sp->aio_recv);
NNI_FREE_STRUCT(sp);
}
}
+static int
+nni_sub_pipe_add(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ nni_pipe_aio_recv(sp->pipe, &sp->aio_recv);
+ return (0);
+}
+
+
+static void
+nni_sub_pipe_close(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ nni_msgq_aio_cancel(sp->sub->urq, &sp->aio_putq);
+}
+
+
static void
-nni_sub_pipe_recv(void *arg)
+nni_sub_recv_cb(void *arg)
{
nni_sub_pipe *sp = arg;
nni_sub_sock *sub = sp->sub;
nni_msgq *urq = sub->urq;
- nni_pipe *pipe = sp->pipe;
nni_msg *msg;
- int rv;
- for (;;) {
- rv = nni_pipe_recv(pipe, &msg);
- if (rv != 0) {
- break;
- }
+ if (nni_aio_result(&sp->aio_recv) != 0) {
+ nni_pipe_close(sp->pipe);
+ return;
+ }
- // Now send it up.
- rv = nni_msgq_put(urq, msg);
- if (rv != 0) {
- nni_msg_free(msg);
- break;
- }
+ sp->aio_putq.a_msg = sp->aio_recv.a_msg;
+ sp->aio_recv.a_msg = NULL;
+ nni_msgq_aio_put(sub->urq, &sp->aio_putq);
+}
+
+
+static void
+nni_sub_putq_cb(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ if (nni_aio_result(&sp->aio_putq) != 0) {
+ nni_msg_free(sp->aio_putq.a_msg);
+ sp->aio_putq.a_msg = NULL;
+ nni_pipe_close(sp->pipe);
+ return;
}
- // Nobody else to signal...
- nni_pipe_close(pipe);
+
+ nni_pipe_aio_recv(sp->pipe, &sp->aio_recv);
}
@@ -299,7 +339,8 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
- .pipe_worker = { nni_sub_pipe_recv },
+ .pipe_add = nni_sub_pipe_add,
+ .pipe_rem = nni_sub_pipe_close,
};
static nni_proto_sock_ops nni_sub_sock_ops = {