summaryrefslogtreecommitdiff
path: root/src/protocol/pubsub
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-31 17:59:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 11:47:07 -0700
commitd72076207a2fad96ff014a81366868fb47a0ed1b (patch)
tree5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/pubsub
parent366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff)
downloadnng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/pubsub')
-rw-r--r--src/protocol/pubsub/pub.c305
-rw-r--r--src/protocol/pubsub/sub.c236
2 files changed, 274 insertions, 267 deletions
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index bbca1ecd..b7ac361e 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -18,183 +18,188 @@
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct nni_pub_pipe nni_pub_pipe;
-typedef struct nni_pub_sock nni_pub_sock;
-
-static void nni_pub_pipe_recv_cb(void *);
-static void nni_pub_pipe_send_cb(void *);
-static void nni_pub_pipe_getq_cb(void *);
-static void nni_pub_sock_getq_cb(void *);
-static void nni_pub_sock_fini(void *);
-static void nni_pub_pipe_fini(void *);
-
-// An nni_pub_sock is our per-socket protocol private structure.
-struct nni_pub_sock {
+typedef struct pub_pipe pub_pipe;
+typedef struct pub_sock pub_sock;
+
+static void pub_pipe_recv_cb(void *);
+static void pub_pipe_send_cb(void *);
+static void pub_pipe_getq_cb(void *);
+static void pub_sock_getq_cb(void *);
+static void pub_sock_fini(void *);
+static void pub_pipe_fini(void *);
+
+// A pub_sock is our per-socket protocol private structure.
+struct pub_sock {
nni_sock *sock;
nni_msgq *uwq;
int raw;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
};
-// An nni_pub_pipe is our per-pipe protocol private structure.
-struct nni_pub_pipe {
+// A pub_pipe is our per-pipe protocol private structure.
+struct pub_pipe {
nni_pipe * pipe;
- nni_pub_sock *pub;
+ pub_sock * pub;
nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
nni_list_node node;
};
+static void
+pub_sock_fini(void *arg)
+{
+ pub_sock *s = arg;
+
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
static int
-nni_pub_sock_init(void **pubp, nni_sock *sock)
+pub_sock_init(void **sp, nni_sock *sock)
{
- nni_pub_sock *pub;
+ pub_sock *s;
+ int rv;
- if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&pub->mtx);
- nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) {
+ pub_sock_fini(s);
+ return (rv);
+ }
- pub->sock = sock;
- pub->raw = 0;
- NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
+ s->sock = sock;
+ s->raw = 0;
+ NNI_LIST_INIT(&s->pipes, pub_pipe, node);
- pub->uwq = nni_sock_sendq(sock);
+ s->uwq = nni_sock_sendq(sock);
- *pubp = pub;
+ *sp = s;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_pub_sock_fini(void *arg)
+pub_sock_open(void *arg)
{
- nni_pub_sock *pub = arg;
+ pub_sock *s = arg;
- nni_aio_stop(&pub->aio_getq);
- nni_aio_fini(&pub->aio_getq);
- nni_mtx_fini(&pub->mtx);
- NNI_FREE_STRUCT(pub);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_pub_sock_open(void *arg)
+pub_sock_close(void *arg)
{
- nni_pub_sock *pub = arg;
+ pub_sock *s = arg;
- nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_pub_sock_close(void *arg)
+pub_pipe_fini(void *arg)
{
- nni_pub_sock *pub = arg;
-
- nni_aio_cancel(&pub->aio_getq, NNG_ECLOSED);
-}
-
-static void
-nni_pub_pipe_fini(void *arg)
-{
- nni_pub_pipe *pp = arg;
- nni_aio_fini(&pp->aio_getq);
- nni_aio_fini(&pp->aio_send);
- nni_aio_fini(&pp->aio_recv);
- nni_msgq_fini(pp->sendq);
- NNI_FREE_STRUCT(pp);
+ pub_pipe *p = arg;
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_pub_pipe *pp;
- int rv;
+ pub_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+
// XXX: consider making this depth tunable
- if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(pp);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) {
+
+ pub_pipe_fini(p);
return (rv);
}
- nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
- nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
- nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
-
- pp->pipe = pipe;
- pp->pub = psock;
- *ppp = pp;
+ p->pipe = pipe;
+ p->pub = s;
+ *pp = p;
return (0);
}
static int
-nni_pub_pipe_start(void *arg)
+pub_pipe_start(void *arg)
{
- nni_pub_pipe *pp = arg;
- nni_pub_sock *pub = pp->pub;
+ pub_pipe *p = arg;
+ pub_sock *s = p->pub;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&pub->mtx);
- nni_list_append(&pub->pipes, pp);
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
// Start the receiver and the queue reader.
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
- nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
return (0);
}
static void
-nni_pub_pipe_stop(void *arg)
+pub_pipe_stop(void *arg)
{
- nni_pub_pipe *pp = arg;
- nni_pub_sock *pub = pp->pub;
+ pub_pipe *p = arg;
+ pub_sock *s = p->pub;
- nni_aio_stop(&pp->aio_getq);
- nni_aio_stop(&pp->aio_send);
- nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
- nni_msgq_close(pp->sendq);
+ nni_msgq_close(p->sendq);
- nni_mtx_lock(&pub->mtx);
- if (nni_list_active(&pub->pipes, pp)) {
- nni_list_remove(&pub->pipes, pp);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_pub_sock_getq_cb(void *arg)
+pub_sock_getq_cb(void *arg)
{
- nni_pub_sock *pub = arg;
- nni_msgq * uwq = pub->uwq;
- nni_msg * msg, *dup;
+ pub_sock *s = arg;
+ nni_msgq *uwq = s->uwq;
+ nni_msg * msg, *dup;
- nni_pub_pipe *pp;
- nni_pub_pipe *last;
- int rv;
+ pub_pipe *p;
+ pub_pipe *last;
+ int rv;
- if (nni_aio_result(&pub->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = pub->aio_getq.a_msg;
- pub->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
- nni_mtx_lock(&pub->mtx);
- last = nni_list_last(&pub->pipes);
- NNI_LIST_FOREACH (&pub->pipes, pp) {
- if (pp != last) {
+ nni_mtx_lock(&s->mtx);
+ last = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != last) {
rv = nni_msg_dup(&dup, msg);
if (rv != 0) {
continue;
@@ -202,119 +207,117 @@ nni_pub_sock_getq_cb(void *arg)
} else {
dup = msg;
}
- if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) {
+ if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_unlock(&s->mtx);
if (last == NULL) {
nni_msg_free(msg);
}
- nni_msgq_aio_get(uwq, &pub->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
}
static void
-nni_pub_pipe_recv_cb(void *arg)
+pub_pipe_recv_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msg_free(pp->aio_recv.a_msg);
- pp->aio_recv.a_msg = NULL;
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_pub_pipe_getq_cb(void *arg)
+pub_pipe_getq_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_getq) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = pp->aio_getq.a_msg;
- pp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(pp->pipe, &pp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static void
-nni_pub_pipe_send_cb(void *arg)
+pub_pipe_send_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_send) != 0) {
- nni_msg_free(pp->aio_send.a_msg);
- pp->aio_send.a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = NULL;
- nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static int
-nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_pub_sock *pub = arg;
- int rv = NNG_ENOTSUP;
+ pub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_pub_sock *pub = arg;
- int rv = NNG_ENOTSUP;
+ pub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&pub->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_pub_pipe_ops = {
- .pipe_init = nni_pub_pipe_init,
- .pipe_fini = nni_pub_pipe_fini,
- .pipe_start = nni_pub_pipe_start,
- .pipe_stop = nni_pub_pipe_stop,
+static nni_proto_pipe_ops pub_pipe_ops = {
+ .pipe_init = pub_pipe_init,
+ .pipe_fini = pub_pipe_fini,
+ .pipe_start = pub_pipe_start,
+ .pipe_stop = pub_pipe_stop,
};
-nni_proto_sock_ops nni_pub_sock_ops = {
- .sock_init = nni_pub_sock_init,
- .sock_fini = nni_pub_sock_fini,
- .sock_open = nni_pub_sock_open,
- .sock_close = nni_pub_sock_close,
- .sock_setopt = nni_pub_sock_setopt,
- .sock_getopt = nni_pub_sock_getopt,
+static nni_proto_sock_ops pub_sock_ops = {
+ .sock_init = pub_sock_init,
+ .sock_fini = pub_sock_fini,
+ .sock_open = pub_sock_open,
+ .sock_close = pub_sock_close,
+ .sock_setopt = pub_sock_setopt,
+ .sock_getopt = pub_sock_getopt,
};
-nni_proto nni_pub_proto = {
+static nni_proto pub_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PUB_V0, "pub" },
.proto_peer = { NNG_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_sock_ops = &nni_pub_sock_ops,
- .proto_pipe_ops = &nni_pub_pipe_ops,
+ .proto_sock_ops = &pub_sock_ops,
+ .proto_pipe_ops = &pub_pipe_ops,
};
int
nng_pub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_pub_proto));
+ return (nni_proto_open(sidp, &pub_proto));
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 0563b764..0dbad081 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -17,22 +17,22 @@
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct nni_sub_pipe nni_sub_pipe;
-typedef struct nni_sub_sock nni_sub_sock;
-typedef struct nni_sub_topic nni_sub_topic;
+typedef struct sub_pipe sub_pipe;
+typedef struct sub_sock sub_sock;
+typedef struct sub_topic sub_topic;
-static void nni_sub_recv_cb(void *);
-static void nni_sub_putq_cb(void *);
-static void nni_sub_pipe_fini(void *);
+static void sub_recv_cb(void *);
+static void sub_putq_cb(void *);
+static void sub_pipe_fini(void *);
-struct nni_sub_topic {
+struct sub_topic {
nni_list_node node;
size_t len;
void * buf;
};
// An nni_rep_sock is our per-socket protocol private structure.
-struct nni_sub_sock {
+struct sub_sock {
nni_sock *sock;
nni_list topics;
nni_msgq *urq;
@@ -40,132 +40,136 @@ struct nni_sub_sock {
};
// An nni_rep_pipe is our per-pipe protocol private structure.
-struct nni_sub_pipe {
- nni_pipe * pipe;
- nni_sub_sock *sub;
- nni_aio aio_recv;
- nni_aio aio_putq;
+struct sub_pipe {
+ nni_pipe *pipe;
+ sub_sock *sub;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static int
-nni_sub_sock_init(void **subp, nni_sock *sock)
+sub_sock_init(void **sp, nni_sock *sock)
{
- nni_sub_sock *sub;
+ sub_sock *s;
- if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
- sub->sock = sock;
- sub->raw = 0;
+ NNI_LIST_INIT(&s->topics, sub_topic, node);
+ s->sock = sock;
+ s->raw = 0;
- sub->urq = nni_sock_recvq(sock);
+ s->urq = nni_sock_recvq(sock);
nni_sock_senderr(sock, NNG_ENOTSUP);
- *subp = sub;
+ *sp = s;
return (0);
}
static void
-nni_sub_sock_fini(void *arg)
+sub_sock_fini(void *arg)
{
- nni_sub_sock * sub = arg;
- nni_sub_topic *topic;
+ sub_sock * s = arg;
+ sub_topic *topic;
- while ((topic = nni_list_first(&sub->topics)) != NULL) {
- nni_list_remove(&sub->topics, topic);
+ while ((topic = nni_list_first(&s->topics)) != NULL) {
+ nni_list_remove(&s->topics, topic);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
}
- NNI_FREE_STRUCT(sub);
+ NNI_FREE_STRUCT(s);
}
static void
-nni_sub_sock_open(void *arg)
+sub_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_sub_sock_close(void *arg)
+sub_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
+static void
+sub_pipe_fini(void *arg)
+{
+ sub_pipe *p = arg;
+
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_recv);
+ NNI_FREE_STRUCT(p);
+}
+
static int
-nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
+sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_sub_pipe *sp;
+ sub_pipe *p;
+ int rv;
- if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp);
- nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp);
+ if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) {
+ sub_pipe_fini(p);
+ return (rv);
+ }
- sp->pipe = pipe;
- sp->sub = ssock;
- *spp = sp;
+ p->pipe = pipe;
+ p->sub = s;
+ *pp = p;
return (0);
}
-static void
-nni_sub_pipe_fini(void *arg)
-{
- nni_sub_pipe *sp = arg;
-
- nni_aio_fini(&sp->aio_putq);
- nni_aio_fini(&sp->aio_recv);
- NNI_FREE_STRUCT(sp);
-}
-
static int
-nni_sub_pipe_start(void *arg)
+sub_pipe_start(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- nni_pipe_recv(sp->pipe, &sp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_sub_pipe_stop(void *arg)
+sub_pipe_stop(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- nni_aio_stop(&sp->aio_putq);
- nni_aio_stop(&sp->aio_recv);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
}
static void
-nni_sub_recv_cb(void *arg)
+sub_recv_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
- nni_sub_sock *sub = sp->sub;
- nni_msgq * urq = sub->urq;
+ sub_pipe *p = arg;
+ sub_sock *s = p->sub;
+ nni_msgq *urq = s->urq;
- if (nni_aio_result(&sp->aio_recv) != 0) {
- nni_pipe_stop(sp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- sp->aio_putq.a_msg = sp->aio_recv.a_msg;
- sp->aio_recv.a_msg = NULL;
- nni_msgq_aio_put(sub->urq, &sp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msgq_aio_put(urq, p->aio_putq);
}
static void
-nni_sub_putq_cb(void *arg)
+sub_putq_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- if (nni_aio_result(&sp->aio_putq) != 0) {
- nni_msg_free(sp->aio_putq.a_msg);
- sp->aio_putq.a_msg = NULL;
- nni_pipe_stop(sp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(sp->pipe, &sp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
// For now we maintain subscriptions on a sorted linked list. As we do not
@@ -174,12 +178,12 @@ nni_sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+sub_subscribe(sub_sock *s, const void *buf, size_t sz)
{
- nni_sub_topic *topic;
- nni_sub_topic *newtopic;
+ sub_topic *topic;
+ sub_topic *newtopic;
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
int rv;
if (topic->len >= sz) {
@@ -210,20 +214,20 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
newtopic->len = sz;
memcpy(newtopic->buf, buf, sz);
if (topic != NULL) {
- nni_list_insert_before(&sub->topics, newtopic, topic);
+ nni_list_insert_before(&s->topics, newtopic, topic);
} else {
- nni_list_append(&sub->topics, newtopic);
+ nni_list_append(&s->topics, newtopic);
}
return (0);
}
static int
-nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+sub_unsubscribe(sub_sock *s, const void *buf, size_t sz)
{
- nni_sub_topic *topic;
- int rv;
+ sub_topic *topic;
+ int rv;
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
if (topic->len >= sz) {
rv = memcmp(topic->buf, buf, sz);
} else {
@@ -231,7 +235,7 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
if (rv == 0) {
if (topic->len == sz) {
- nni_list_remove(&sub->topics, topic);
+ nni_list_remove(&s->topics, topic);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
return (0);
@@ -248,43 +252,43 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
static int
-nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_sub_sock *sub = arg;
- int rv = NNG_ENOTSUP;
+ sub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
} else if (opt == nng_optid_sub_subscribe) {
- rv = nni_sub_subscribe(sub, buf, sz);
+ rv = sub_subscribe(s, buf, sz);
} else if (opt == nng_optid_sub_unsubscribe) {
- rv = nni_sub_unsubscribe(sub, buf, sz);
+ rv = sub_unsubscribe(s, buf, sz);
}
return (rv);
}
static int
-nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_sub_sock *sub = arg;
- int rv = NNG_ENOTSUP;
+ sub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&sub->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_sub_sock_rfilter(void *arg, nni_msg *msg)
+sub_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_sub_sock * sub = arg;
- nni_sub_topic *topic;
- char * body;
- size_t len;
- int match;
+ sub_sock * s = arg;
+ sub_topic *topic;
+ char * body;
+ size_t len;
+ int match;
- if (sub->raw) {
+ if (s->raw) {
return (msg);
}
@@ -293,7 +297,7 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
match = 0;
// Check to see if the message matches one of our subscriptions.
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
if (len >= topic->len) {
int rv = memcmp(topic->buf, body, topic->len);
if (rv == 0) {
@@ -319,34 +323,34 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_sub_pipe_ops = {
- .pipe_init = nni_sub_pipe_init,
- .pipe_fini = nni_sub_pipe_fini,
- .pipe_start = nni_sub_pipe_start,
- .pipe_stop = nni_sub_pipe_stop,
+static nni_proto_pipe_ops sub_pipe_ops = {
+ .pipe_init = sub_pipe_init,
+ .pipe_fini = sub_pipe_fini,
+ .pipe_start = sub_pipe_start,
+ .pipe_stop = sub_pipe_stop,
};
-static nni_proto_sock_ops nni_sub_sock_ops = {
- .sock_init = nni_sub_sock_init,
- .sock_fini = nni_sub_sock_fini,
- .sock_open = nni_sub_sock_open,
- .sock_close = nni_sub_sock_close,
- .sock_setopt = nni_sub_sock_setopt,
- .sock_getopt = nni_sub_sock_getopt,
- .sock_rfilter = nni_sub_sock_rfilter,
+static nni_proto_sock_ops sub_sock_ops = {
+ .sock_init = sub_sock_init,
+ .sock_fini = sub_sock_fini,
+ .sock_open = sub_sock_open,
+ .sock_close = sub_sock_close,
+ .sock_setopt = sub_sock_setopt,
+ .sock_getopt = sub_sock_getopt,
+ .sock_rfilter = sub_sock_rfilter,
};
-nni_proto nni_sub_proto = {
+static nni_proto sub_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_SUB_V0, "sub" },
.proto_peer = { NNG_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &nni_sub_sock_ops,
- .proto_pipe_ops = &nni_sub_pipe_ops,
+ .proto_sock_ops = &sub_sock_ops,
+ .proto_pipe_ops = &sub_pipe_ops,
};
int
nng_sub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_sub_proto));
+ return (nni_proto_open(sidp, &sub_proto));
}