aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair/pair_v1.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-31 17:59:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 11:47:07 -0700
commitd72076207a2fad96ff014a81366868fb47a0ed1b (patch)
tree5a4f67ab607ef6690e983c2d1ab2c64062027e52 /src/protocol/pair/pair_v1.c
parent366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff)
downloadnng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code.
Diffstat (limited to 'src/protocol/pair/pair_v1.c')
-rw-r--r--src/protocol/pair/pair_v1.c130
1 files changed, 63 insertions, 67 deletions
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index a737402f..d6c8ea75 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -41,7 +41,7 @@ struct pair1_sock {
nni_list plist;
int started;
int poly;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
};
// pair1_pipe is our per-pipe protocol private structure.
@@ -49,10 +49,10 @@ struct pair1_pipe {
nni_pipe * npipe;
pair1_sock * psock;
nni_msgq * sendq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
nni_list_node node;
};
@@ -61,7 +61,7 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(&s->aio_getq);
+ nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
@@ -85,10 +85,10 @@ pair1_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
// Raw mode uses this.
- nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s);
nni_mtx_init(&s->mtx);
- if ((rv = nni_option_register("polyamorous", &poly)) != 0) {
+ if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
+ ((rv = nni_option_register("polyamorous", &poly)) != 0)) {
pair1_sock_fini(s);
return (rv);
}
@@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (0);
}
+static void
+pair1_pipe_fini(void *arg)
+{
+ pair1_pipe *p = arg;
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
@@ -113,14 +125,14 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(p);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ pair1_pipe_fini(p);
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
- nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p);
p->npipe = npipe;
p->psock = psock;
@@ -129,18 +141,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (rv);
}
-static void
-pair1_pipe_fini(void *arg)
-{
- pair1_pipe *p = arg;
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair1_pipe_start(void *arg)
{
@@ -163,7 +163,7 @@ pair1_pipe_start(void *arg)
}
} else {
if (!s->started) {
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
}
nni_list_append(&s->plist, p);
@@ -171,16 +171,16 @@ pair1_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
// Schedule a getq. In polyamorous mode we get on the per pipe
- // sendq, as the socket distributes to us. In monogamous mode we
- // bypass and get from the upper writeq directly (saving a set of
- // context switches).
+ // sendq, as the socket distributes to us. In monogamous mode
+ // we bypass and get from the upper writeq directly (saving a
+ // set of context switches).
if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
} else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
// And the pipe read of course.
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -197,10 +197,10 @@ pair1_pipe_stop(void *arg)
nni_mtx_unlock(&s->mtx);
nni_msgq_close(p->sendq);
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
}
static void
@@ -213,13 +213,13 @@ pair1_pipe_recv_cb(void *arg)
nni_pipe * npipe = p->npipe;
int rv;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe ID.
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
@@ -241,20 +241,20 @@ pair1_pipe_recv_cb(void *arg)
// keep getting more.
if (hdr > (unsigned) s->ttl) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Store the pipe id followed by the hop count.
if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Send the message up.
- p->aio_putq.a_msg = msg;
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -265,13 +265,13 @@ pair1_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Socket closing...
return;
}
- msg = s->aio_getq.a_msg;
- s->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// By definition we are in polyamorous mode.
NNI_ASSERT(s->poly);
@@ -289,7 +289,7 @@ pair1_sock_getq_cb(void *arg)
// Pipe not present!
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
return;
}
@@ -302,7 +302,7 @@ pair1_sock_getq_cb(void *arg)
}
nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
@@ -310,13 +310,13 @@ pair1_pipe_putq_cb(void *arg)
{
pair1_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -327,13 +327,13 @@ pair1_pipe_getq_cb(void *arg)
nni_msg * msg;
uint32_t hops;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_getq);
+ nni_aio_set_msg(p->aio_getq, NULL);
// Raw mode messages have the header already formed, with
// a hop count. Cooked mode messages have no
@@ -354,13 +354,13 @@ pair1_pipe_getq_cb(void *arg)
goto badmsg;
}
- p->aio_send.a_msg = msg;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->npipe, p->aio_send);
return;
badmsg:
nni_msg_free(msg);
- nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void
@@ -369,20 +369,16 @@ pair1_pipe_send_cb(void *arg)
pair1_pipe *p = arg;
pair1_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
// In polyamorous mode, we want to get from the sendq; in
// monogamous we get from upper writeq.
- if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
- } else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- }
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void