aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pipeline
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-31 17:59:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 11:47:07 -0700
commitd72076207a2fad96ff014a81366868fb47a0ed1b (patch)
tree5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/pipeline
parent366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff)
downloadnng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/pipeline')
-rw-r--r--src/protocol/pipeline/pull.c188
-rw-r--r--src/protocol/pipeline/push.c203
2 files changed, 199 insertions, 192 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 1d738ec2..0d66aab8 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -15,208 +15,212 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct nni_pull_pipe nni_pull_pipe;
-typedef struct nni_pull_sock nni_pull_sock;
+typedef struct pull_pipe pull_pipe;
+typedef struct pull_sock pull_sock;
-static void nni_pull_putq_cb(void *);
-static void nni_pull_recv_cb(void *);
-static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
+static void pull_putq_cb(void *);
+static void pull_recv_cb(void *);
+static void pull_putq(pull_pipe *, nni_msg *);
-// An nni_pull_sock is our per-socket protocol private structure.
-struct nni_pull_sock {
+// A pull_sock is our per-socket protocol private structure.
+struct pull_sock {
nni_msgq *urq;
int raw;
};
-// An nni_pull_pipe is our per-pipe protocol private structure.
-struct nni_pull_pipe {
- nni_pipe * pipe;
- nni_pull_sock *pull;
- nni_aio putq_aio;
- nni_aio recv_aio;
+// A pull_pipe is our per-pipe protocol private structure.
+struct pull_pipe {
+ nni_pipe * pipe;
+ pull_sock *pull;
+ nni_aio * putq_aio;
+ nni_aio * recv_aio;
};
static int
-nni_pull_sock_init(void **pullp, nni_sock *sock)
+pull_sock_init(void **sp, nni_sock *sock)
{
- nni_pull_sock *pull;
+ pull_sock *s;
- if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- pull->raw = 0;
- pull->urq = nni_sock_recvq(sock);
- *pullp = pull;
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
nni_sock_senderr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_pull_sock_fini(void *arg)
+pull_sock_fini(void *arg)
{
- nni_pull_sock *pull = arg;
+ pull_sock *s = arg;
- NNI_FREE_STRUCT(pull);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+pull_pipe_fini(void *arg)
+{
+ pull_pipe *p = arg;
+
+ nni_aio_fini(p->putq_aio);
+ nni_aio_fini(p->recv_aio);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_pull_pipe *pp;
+ pull_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp);
- nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp);
+ if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) {
+ pull_pipe_fini(p);
+ return (rv);
+ }
- pp->pipe = pipe;
- pp->pull = psock;
- *ppp = pp;
+ p->pipe = pipe;
+ p->pull = s;
+ *pp = p;
return (0);
}
-static void
-nni_pull_pipe_fini(void *arg)
-{
- nni_pull_pipe *pp = arg;
-
- nni_aio_fini(&pp->putq_aio);
- nni_aio_fini(&pp->recv_aio);
- NNI_FREE_STRUCT(pp);
-}
-
static int
-nni_pull_pipe_start(void *arg)
+pull_pipe_start(void *arg)
{
- nni_pull_pipe *pp = arg;
+ pull_pipe *p = arg;
// Start the pending pull...
- nni_pipe_recv(pp->pipe, &pp->recv_aio);
+ nni_pipe_recv(p->pipe, p->recv_aio);
return (0);
}
static void
-nni_pull_pipe_stop(void *arg)
+pull_pipe_stop(void *arg)
{
- nni_pull_pipe *pp = arg;
+ pull_pipe *p = arg;
- nni_aio_stop(&pp->putq_aio);
- nni_aio_stop(&pp->recv_aio);
+ nni_aio_stop(p->putq_aio);
+ nni_aio_stop(p->recv_aio);
}
static void
-nni_pull_recv_cb(void *arg)
+pull_recv_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio * aio = &pp->recv_aio;
- nni_msg * msg;
+ pull_pipe *p = arg;
+ nni_aio * aio = p->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pipe_stop(pp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
// Got a message... start the put to send it up to the application.
- msg = aio->a_msg;
- aio->a_msg = NULL;
- nni_pull_putq(pp, msg);
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ pull_putq(p, msg);
}
static void
-nni_pull_putq_cb(void *arg)
+pull_putq_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio * aio = &pp->putq_aio;
+ pull_pipe *p = arg;
+ nni_aio * aio = p->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
// we can do. Just close the pipe.
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(pp->pipe, &pp->recv_aio);
+ nni_pipe_recv(p->pipe, p->recv_aio);
}
// nni_pull_putq schedules a put operation to the user socket (sendup).
static void
-nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
+pull_putq(pull_pipe *p, nni_msg *msg)
{
- nni_pull_sock *pull = pp->pull;
+ pull_sock *s = p->pull;
- pp->putq_aio.a_msg = msg;
+ nni_aio_set_msg(p->putq_aio, msg);
- nni_msgq_aio_put(pull->urq, &pp->putq_aio);
+ nni_msgq_aio_put(s->urq, p->putq_aio);
}
static void
-nni_pull_sock_open(void *arg)
+pull_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_pull_sock_close(void *arg)
+pull_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static int
-nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_pull_sock *pull = arg;
- int rv = NNG_ENOTSUP;
+ pull_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_pull_sock *pull = arg;
- int rv = NNG_ENOTSUP;
+ pull_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&pull->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_pull_pipe_ops = {
- .pipe_init = nni_pull_pipe_init,
- .pipe_fini = nni_pull_pipe_fini,
- .pipe_start = nni_pull_pipe_start,
- .pipe_stop = nni_pull_pipe_stop,
+static nni_proto_pipe_ops pull_pipe_ops = {
+ .pipe_init = pull_pipe_init,
+ .pipe_fini = pull_pipe_fini,
+ .pipe_start = pull_pipe_start,
+ .pipe_stop = pull_pipe_stop,
};
-static nni_proto_sock_ops nni_pull_sock_ops = {
- .sock_init = nni_pull_sock_init,
- .sock_fini = nni_pull_sock_fini,
- .sock_open = nni_pull_sock_open,
- .sock_close = nni_pull_sock_close,
- .sock_setopt = nni_pull_sock_setopt,
- .sock_getopt = nni_pull_sock_getopt,
+static nni_proto_sock_ops pull_sock_ops = {
+ .sock_init = pull_sock_init,
+ .sock_fini = pull_sock_fini,
+ .sock_open = pull_sock_open,
+ .sock_close = pull_sock_close,
+ .sock_setopt = pull_sock_setopt,
+ .sock_getopt = pull_sock_getopt,
};
-nni_proto nni_pull_proto = {
+static nni_proto pull_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PULL_V0, "pull" },
.proto_peer = { NNG_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_pipe_ops = &nni_pull_pipe_ops,
- .proto_sock_ops = &nni_pull_sock_ops,
+ .proto_pipe_ops = &pull_pipe_ops,
+ .proto_sock_ops = &pull_sock_ops,
};
int
nng_pull0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_pull_proto));
+ return (nni_proto_open(sidp, &pull_proto));
}
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 10d04091..5e32efee 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -17,228 +17,231 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct nni_push_pipe nni_push_pipe;
-typedef struct nni_push_sock nni_push_sock;
+typedef struct push_pipe push_pipe;
+typedef struct push_sock push_sock;
-static void nni_push_send_cb(void *);
-static void nni_push_recv_cb(void *);
-static void nni_push_getq_cb(void *);
+static void push_send_cb(void *);
+static void push_recv_cb(void *);
+static void push_getq_cb(void *);
// An nni_push_sock is our per-socket protocol private structure.
-struct nni_push_sock {
+struct push_sock {
nni_msgq *uwq;
int raw;
nni_sock *sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
-struct nni_push_pipe {
- nni_pipe * pipe;
- nni_push_sock *push;
- nni_list_node node;
-
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_getq;
+struct push_pipe {
+ nni_pipe * pipe;
+ push_sock * push;
+ nni_list_node node;
+
+ nni_aio *aio_recv;
+ nni_aio *aio_send;
+ nni_aio *aio_getq;
};
static int
-nni_push_sock_init(void **pushp, nni_sock *sock)
+push_sock_init(void **sp, nni_sock *sock)
{
- nni_push_sock *push;
+ push_sock *s;
- if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- push->raw = 0;
- push->sock = sock;
- push->uwq = nni_sock_sendq(sock);
- *pushp = push;
+ s->raw = 0;
+ s->sock = sock;
+ s->uwq = nni_sock_sendq(sock);
+ *sp = s;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_push_sock_fini(void *arg)
+push_sock_fini(void *arg)
{
- nni_push_sock *push = arg;
+ push_sock *s = arg;
- NNI_FREE_STRUCT(push);
+ NNI_FREE_STRUCT(s);
}
static void
-nni_push_sock_open(void *arg)
+push_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_push_sock_close(void *arg)
+push_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_push_pipe_fini(void *arg)
+push_pipe_fini(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
- nni_aio_fini(&pp->aio_recv);
- nni_aio_fini(&pp->aio_send);
- nni_aio_fini(&pp->aio_getq);
- NNI_FREE_STRUCT(pp);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+push_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_push_pipe *pp;
+ push_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp);
- nni_aio_init(&pp->aio_send, nni_push_send_cb, pp);
- nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp);
-
- NNI_LIST_NODE_INIT(&pp->node);
- pp->pipe = pipe;
- pp->push = psock;
- *ppp = pp;
+ if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) {
+ push_pipe_fini(p);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&p->node);
+ p->pipe = pipe;
+ p->push = s;
+ *pp = p;
return (0);
}
static int
-nni_push_pipe_start(void *arg)
+push_pipe_start(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ push_pipe *p = arg;
+ push_sock *s = p->push;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_PULL) {
return (NNG_EPROTO);
}
// Schedule a receiver. This is mostly so that we can detect
// a closed transport pipe.
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
// Schedule a sender.
- nni_msgq_aio_get(push->uwq, &pp->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
return (0);
}
static void
-nni_push_pipe_stop(void *arg)
+push_pipe_stop(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
- nni_aio_stop(&pp->aio_recv);
- nni_aio_stop(&pp->aio_send);
- nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_getq);
}
static void
-nni_push_recv_cb(void *arg)
+push_recv_cb(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
- if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msg_free(pp->aio_recv.a_msg);
- pp->aio_recv.a_msg = NULL;
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_push_send_cb(void *arg)
+push_send_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ push_pipe *p = arg;
+ push_sock *s = p->push;
- if (nni_aio_result(&pp->aio_send) != 0) {
- nni_msg_free(pp->aio_send.a_msg);
- pp->aio_send.a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msgq_aio_get(push->uwq, &pp->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void
-nni_push_getq_cb(void *arg)
+push_getq_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_aio * aio = &pp->aio_getq;
+ push_pipe *p = arg;
+ nni_aio * aio = p->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_pipe_stop(pp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = aio->a_msg;
- aio->a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
- nni_pipe_send(pp->pipe, &pp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static int
-nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_push_sock *push = arg;
- int rv = NNG_ENOTSUP;
+ push_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&push->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_push_sock *push = arg;
- int rv = NNG_ENOTSUP;
+ push_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&push->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_push_pipe_ops = {
- .pipe_init = nni_push_pipe_init,
- .pipe_fini = nni_push_pipe_fini,
- .pipe_start = nni_push_pipe_start,
- .pipe_stop = nni_push_pipe_stop,
+static nni_proto_pipe_ops push_pipe_ops = {
+ .pipe_init = push_pipe_init,
+ .pipe_fini = push_pipe_fini,
+ .pipe_start = push_pipe_start,
+ .pipe_stop = push_pipe_stop,
};
-static nni_proto_sock_ops nni_push_sock_ops = {
- .sock_init = nni_push_sock_init,
- .sock_fini = nni_push_sock_fini,
- .sock_open = nni_push_sock_open,
- .sock_close = nni_push_sock_close,
- .sock_setopt = nni_push_sock_setopt,
- .sock_getopt = nni_push_sock_getopt,
+static nni_proto_sock_ops push_sock_ops = {
+ .sock_init = push_sock_init,
+ .sock_fini = push_sock_fini,
+ .sock_open = push_sock_open,
+ .sock_close = push_sock_close,
+ .sock_setopt = push_sock_setopt,
+ .sock_getopt = push_sock_getopt,
};
-nni_proto nni_push_proto = {
+static nni_proto push_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PUSH_V0, "push" },
.proto_peer = { NNG_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_pipe_ops = &nni_push_pipe_ops,
- .proto_sock_ops = &nni_push_sock_ops,
+ .proto_pipe_ops = &push_pipe_ops,
+ .proto_sock_ops = &push_sock_ops,
};
int
nng_push0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_push_proto));
+ return (nni_proto_open(sidp, &push_proto));
}