aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-06 11:21:32 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-06 11:21:32 -0800
commit68586f0090419def344debf970402750332af098 (patch)
treebff6f606ed2f5bdf448b904725a0baaf805fbc57 /src
parent44a6de38d240143ec2b4bb6f6457bae81271820a (diff)
downloadnng-68586f0090419def344debf970402750332af098.tar.gz
nng-68586f0090419def344debf970402750332af098.tar.bz2
nng-68586f0090419def344debf970402750332af098.zip
Pub/Sub now callback driven.
Diffstat (limited to 'src')
-rw-r--r--src/core/list.c4
-rw-r--r--src/core/msgqueue.c1
-rw-r--r--src/protocol/pipeline/push.c4
-rw-r--r--src/protocol/pubsub/pub.c180
-rw-r--r--src/protocol/pubsub/sub.c75
5 files changed, 182 insertions, 82 deletions
diff --git a/src/core/list.c b/src/core/list.c
index 1a5a4eb1..1d00171b 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -90,7 +90,7 @@ nni_list_insert_before(nni_list *list, void *item, void *before)
nni_list_node *where = NODE(list, before);
if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
- nni_panic("prepending node already on a list or not inited");
+ nni_panic("inserting node already on a list or not inited");
}
node->ln_next = where;
node->ln_prev = where->ln_prev;
@@ -106,7 +106,7 @@ nni_list_insert_after(nni_list *list, void *item, void *after)
nni_list_node *where = NODE(list, after);
if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
- nni_panic("prepending node already on a list or not inited");
+ nni_panic("inserting node already on a list or not inited");
}
node->ln_prev = where;
node->ln_next = where->ln_next;
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 147c0e20..9607f562 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -172,6 +172,7 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
+ nni_timer_cancel(&mq->mq_timer);
nni_thr_fini(&mq->mq_notify_thr);
nni_cv_fini(&mq->mq_drained);
nni_cv_fini(&mq->mq_writeable);
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index fcbb6d4f..16156ee4 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -208,6 +208,8 @@ nni_push_recv_cb(void *arg)
nni_pipe_close(pp->pipe);
return;
}
+ nni_msg_free(pp->aio_recv.a_msg);
+ pp->aio_recv.a_msg = NULL;
nni_push_recv(pp);
}
@@ -220,6 +222,8 @@ nni_push_send_cb(void *arg)
nni_mtx *mx = nni_sock_mtx(push->sock);
if (nni_aio_result(&pp->aio_send) != 0) {
+ nni_msg_free(pp->aio_send.a_msg);
+ pp->aio_send.a_msg = NULL;
nni_pipe_close(pp->pipe);
return;
}
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index d148999b..e1808c2b 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -20,11 +20,19 @@
typedef struct nni_pub_pipe nni_pub_pipe;
typedef struct nni_pub_sock nni_pub_sock;
+static void nni_pub_pipe_recv_cb(void *);
+static void nni_pub_pipe_send_cb(void *);
+static void nni_pub_pipe_getq_cb(void *);
+static void nni_pub_sock_getq_cb(void *);
+static void nni_pub_sock_fini(void *);
+static void nni_pub_pipe_fini(void *);
+
// An nni_pub_sock is our per-socket protocol private structure.
struct nni_pub_sock {
nni_sock * sock;
nni_msgq * uwq;
int raw;
+ nni_aio aio_getq;
nni_list pipes;
};
@@ -33,18 +41,26 @@ struct nni_pub_pipe {
nni_pipe * pipe;
nni_pub_sock * pub;
nni_msgq * sendq;
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node node;
- int sigclose;
};
static int
nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
+ int rv;
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
}
+ rv = nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
+ if (rv != 0) {
+ nni_pub_sock_fini(pub);
+ return (rv);
+ }
pub->sock = sock;
pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
@@ -63,11 +79,21 @@ nni_pub_sock_fini(void *arg)
nni_pub_sock *pub = arg;
if (pub != NULL) {
+ nni_aio_fini(&pub->aio_getq);
NNI_FREE_STRUCT(pub);
}
}
+static void
+nni_pub_sock_open(void *arg)
+{
+ nni_pub_sock *pub = arg;
+
+ nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
+}
+
+
static int
nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
@@ -79,12 +105,29 @@ nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
}
// XXX: consider making this depth tunable
if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(pp);
+ nni_pub_pipe_fini(pp);
+ return (rv);
+ }
+
+ rv = nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
+ if (rv != 0) {
+ nni_pub_pipe_fini(pp);
+ return (rv);
+ }
+
+ rv = nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
+ if (rv != 0) {
+ nni_pub_pipe_fini(pp);
+ return (rv);
+ }
+
+ rv = nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
+ if (rv != 0) {
+ nni_pub_pipe_fini(pp);
return (rv);
}
pp->pipe = pipe;
pp->pub = psock;
- pp->sigclose = 0;
*ppp = pp;
return (0);
}
@@ -97,6 +140,9 @@ nni_pub_pipe_fini(void *arg)
if (pp != NULL) {
nni_msgq_fini(pp->sendq);
+ nni_aio_fini(&pp->aio_getq);
+ nni_aio_fini(&pp->aio_send);
+ nni_aio_fini(&pp->aio_recv);
NNI_FREE_STRUCT(pp);
}
}
@@ -112,6 +158,11 @@ nni_pub_pipe_add(void *arg)
return (NNG_EPROTO);
}
nni_list_append(&pub->pipes, pp);
+
+ // Start the receiver and the queue reader.
+ nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
+ nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+
return (0);
}
@@ -123,97 +174,102 @@ nni_pub_pipe_rem(void *arg)
nni_pub_sock *pub = pp->pub;
nni_list_remove(&pub->pipes, pp);
+ nni_msgq_aio_cancel(pp->sendq, &pp->aio_getq);
}
static void
-nni_pub_sock_send(void *arg)
+nni_pub_sock_getq_cb(void *arg)
{
nni_pub_sock *pub = arg;
nni_msgq *uwq = pub->uwq;
nni_msg *msg, *dup;
nni_mtx *mx = nni_sock_mtx(pub->sock);
- for (;;) {
- nni_pub_pipe *pp;
- nni_pub_pipe *last;
- int rv;
+ nni_pub_pipe *pp;
+ nni_pub_pipe *last;
+ int rv;
- if ((rv = nni_msgq_get(uwq, &msg)) != 0) {
- break;
- }
- nni_mtx_lock(mx);
- last = nni_list_last(&pub->pipes);
- NNI_LIST_FOREACH (&pub->pipes, pp) {
- if (pp != last) {
- rv = nni_msg_dup(&dup, msg);
- if (rv != 0) {
- continue;
- }
- } else {
- dup = msg;
- }
- if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) {
- nni_msg_free(dup);
+ if (nni_aio_result(&pub->aio_getq) != 0) {
+ return;
+ }
+
+ msg = pub->aio_getq.a_msg;
+ pub->aio_getq.a_msg = NULL;
+
+ nni_mtx_lock(mx);
+ last = nni_list_last(&pub->pipes);
+ NNI_LIST_FOREACH (&pub->pipes, pp) {
+ if (pp != last) {
+ rv = nni_msg_dup(&dup, msg);
+ if (rv != 0) {
+ continue;
}
+ } else {
+ dup = msg;
}
- nni_mtx_unlock(mx);
-
- if (last == NULL) {
- nni_msg_free(msg);
+ if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) {
+ nni_msg_free(dup);
}
}
+ nni_mtx_unlock(mx);
+
+ if (last == NULL) {
+ nni_msg_free(msg);
+ }
+
+ nni_msgq_aio_get(uwq, &pub->aio_getq);
}
static void
-nni_pub_pipe_send(void *arg)
+nni_pub_pipe_recv_cb(void *arg)
{
nni_pub_pipe *pp = arg;
- nni_msgq *wq = pp->sendq;
- nni_pipe *pipe = pp->pipe;
- nni_msg *msg;
- int rv;
- for (;;) {
- rv = nni_msgq_get_sig(wq, &msg, &pp->sigclose);
- if (rv != 0) {
- break;
- }
+ if (nni_aio_result(&pp->aio_recv) != 0) {
+ nni_pipe_close(pp->pipe);
+ return;
+ }
- rv = nni_pipe_send(pipe, msg);
- if (rv != 0) {
- nni_msg_free(msg);
- break;
- }
+ nni_msg_free(pp->aio_recv.a_msg);
+ pp->aio_recv.a_msg = NULL;
+ nni_pipe_aio_recv(pp->pipe, &pp->aio_recv);
+}
+
+
+static void
+nni_pub_pipe_getq_cb(void *arg)
+{
+ nni_pub_pipe *pp = arg;
+
+ if (nni_aio_result(&pp->aio_getq) != 0) {
+ nni_pipe_close(pp->pipe);
+ return;
}
- nni_pipe_close(pipe);
+
+ pp->aio_send.a_msg = pp->aio_getq.a_msg;
+ pp->aio_getq.a_msg = NULL;
+
+ nni_pipe_aio_send(pp->pipe, &pp->aio_send);
}
static void
-nni_pub_pipe_recv(void *arg)
+nni_pub_pipe_send_cb(void *arg)
{
nni_pub_pipe *pp = arg;
- nni_msgq *uwq = pp->pub->uwq;
- nni_pipe *pipe = pp->pipe;
- nni_msg *msg;
- int rv;
- // All we do is spin, waiting for the underlying transport to close.
- // We discard anything we happen to get.
- for (;;) {
- rv = nni_pipe_recv(pipe, &msg);
- if (rv != 0) {
- break;
- }
- nni_msg_free(msg);
+ if (nni_aio_result(&pp->aio_send) != 0) {
+ nni_msg_free(pp->aio_send.a_msg);
+ pp->aio_send.a_msg = NULL;
+ nni_pipe_close(pp->pipe);
+ return;
}
- nni_msgq_signal(uwq, &pp->sigclose);
- nni_msgq_signal(pp->sendq, &pp->sigclose);
- nni_pipe_close(pipe);
+ pp->aio_send.a_msg = NULL;
+ nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
}
@@ -258,16 +314,14 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = {
.pipe_fini = nni_pub_pipe_fini,
.pipe_add = nni_pub_pipe_add,
.pipe_rem = nni_pub_pipe_rem,
- .pipe_worker = { nni_pub_pipe_send,
- nni_pub_pipe_recv },
};
nni_proto_sock_ops nni_pub_sock_ops = {
.sock_init = nni_pub_sock_init,
.sock_fini = nni_pub_sock_fini,
+ .sock_open = nni_pub_sock_open,
.sock_setopt = nni_pub_sock_setopt,
.sock_getopt = nni_pub_sock_getopt,
- .sock_worker = { nni_pub_sock_send },
};
nni_proto nni_pub_proto = {
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 46dcfd5c..864a80c7 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -20,6 +20,10 @@ typedef struct nni_sub_pipe nni_sub_pipe;
typedef struct nni_sub_sock nni_sub_sock;
typedef struct nni_sub_topic nni_sub_topic;
+static void nni_sub_recv_cb(void *);
+static void nni_sub_putq_cb(void *);
+static void nni_sub_pipe_fini(void *);
+
struct nni_sub_topic {
nni_list_node node;
size_t len;
@@ -38,6 +42,8 @@ struct nni_sub_sock {
struct nni_sub_pipe {
nni_pipe * pipe;
nni_sub_sock * sub;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
};
static int
@@ -80,10 +86,16 @@ static int
nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
{
nni_sub_pipe *sp;
+ int rv;
if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
return (NNG_ENOMEM);
}
+ if (((rv = nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp)) != 0) ||
+ ((rv = nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp)) != 0)) {
+ nni_sub_pipe_fini(sp);
+ return (rv);
+ }
sp->pipe = pipe;
sp->sub = ssock;
*spp = sp;
@@ -97,36 +109,64 @@ nni_sub_pipe_fini(void *arg)
nni_sub_pipe *sp = arg;
if (sp != NULL) {
+ nni_aio_fini(&sp->aio_putq);
+ nni_aio_fini(&sp->aio_recv);
NNI_FREE_STRUCT(sp);
}
}
+static int
+nni_sub_pipe_add(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ nni_pipe_aio_recv(sp->pipe, &sp->aio_recv);
+ return (0);
+}
+
+
+static void
+nni_sub_pipe_close(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ nni_msgq_aio_cancel(sp->sub->urq, &sp->aio_putq);
+}
+
+
static void
-nni_sub_pipe_recv(void *arg)
+nni_sub_recv_cb(void *arg)
{
nni_sub_pipe *sp = arg;
nni_sub_sock *sub = sp->sub;
nni_msgq *urq = sub->urq;
- nni_pipe *pipe = sp->pipe;
nni_msg *msg;
- int rv;
- for (;;) {
- rv = nni_pipe_recv(pipe, &msg);
- if (rv != 0) {
- break;
- }
+ if (nni_aio_result(&sp->aio_recv) != 0) {
+ nni_pipe_close(sp->pipe);
+ return;
+ }
- // Now send it up.
- rv = nni_msgq_put(urq, msg);
- if (rv != 0) {
- nni_msg_free(msg);
- break;
- }
+ sp->aio_putq.a_msg = sp->aio_recv.a_msg;
+ sp->aio_recv.a_msg = NULL;
+ nni_msgq_aio_put(sub->urq, &sp->aio_putq);
+}
+
+
+static void
+nni_sub_putq_cb(void *arg)
+{
+ nni_sub_pipe *sp = arg;
+
+ if (nni_aio_result(&sp->aio_putq) != 0) {
+ nni_msg_free(sp->aio_putq.a_msg);
+ sp->aio_putq.a_msg = NULL;
+ nni_pipe_close(sp->pipe);
+ return;
}
- // Nobody else to signal...
- nni_pipe_close(pipe);
+
+ nni_pipe_aio_recv(sp->pipe, &sp->aio_recv);
}
@@ -299,7 +339,8 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
- .pipe_worker = { nni_sub_pipe_recv },
+ .pipe_add = nni_sub_pipe_add,
+ .pipe_rem = nni_sub_pipe_close,
};
static nni_proto_sock_ops nni_sub_sock_ops = {