aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair')
-rw-r--r--src/protocol/pair/pair_v0.c90
-rw-r--r--src/protocol/pair/pair_v1.c130
2 files changed, 110 insertions, 110 deletions
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index ef420051..486ce43b 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -43,10 +43,10 @@ struct pair0_sock {
struct pair0_pipe {
nni_pipe * npipe;
pair0_sock *psock;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
};
static int
@@ -76,18 +76,34 @@ pair0_sock_fini(void *arg)
NNI_FREE_STRUCT(s);
}
+static void
+pair0_pipe_fini(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
pair0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair0_send_cb, p);
- nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+ if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ pair0_pipe_fini(p);
+ return (rv);
+ }
p->npipe = npipe;
p->psock = psock;
@@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (0);
}
-static void
-pair0_pipe_fini(void *arg)
-{
- pair0_pipe *p = arg;
-
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair0_pipe_start(void *arg)
{
@@ -123,8 +127,8 @@ pair0_pipe_start(void *arg)
// Schedule a getq on the upper, and a read from the pipe.
// Each of these also sets up another hold on the pipe itself.
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
nni_mtx_lock(&s->mtx);
if (s->ppipe == p) {
@@ -154,17 +158,17 @@ pair0_recv_cb(void *arg)
pair0_sock *s = p->psock;
nni_msg * msg;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_putq.a_msg = msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -172,13 +176,13 @@ pair0_putq_cb(void *arg)
{
pair0_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -187,14 +191,14 @@ pair0_getq_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- p->aio_send.a_msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
@@ -203,14 +207,14 @@ pair0_send_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index a737402f..d6c8ea75 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -41,7 +41,7 @@ struct pair1_sock {
nni_list plist;
int started;
int poly;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
};
// pair1_pipe is our per-pipe protocol private structure.
@@ -49,10 +49,10 @@ struct pair1_pipe {
nni_pipe * npipe;
pair1_sock * psock;
nni_msgq * sendq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
nni_list_node node;
};
@@ -61,7 +61,7 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(&s->aio_getq);
+ nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
@@ -85,10 +85,10 @@ pair1_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
// Raw mode uses this.
- nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s);
nni_mtx_init(&s->mtx);
- if ((rv = nni_option_register("polyamorous", &poly)) != 0) {
+ if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
+ ((rv = nni_option_register("polyamorous", &poly)) != 0)) {
pair1_sock_fini(s);
return (rv);
}
@@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (0);
}
+static void
+pair1_pipe_fini(void *arg)
+{
+ pair1_pipe *p = arg;
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
@@ -113,14 +125,14 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(p);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ pair1_pipe_fini(p);
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
- nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p);
p->npipe = npipe;
p->psock = psock;
@@ -129,18 +141,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (rv);
}
-static void
-pair1_pipe_fini(void *arg)
-{
- pair1_pipe *p = arg;
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair1_pipe_start(void *arg)
{
@@ -163,7 +163,7 @@ pair1_pipe_start(void *arg)
}
} else {
if (!s->started) {
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
}
nni_list_append(&s->plist, p);
@@ -171,16 +171,16 @@ pair1_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
// Schedule a getq. In polyamorous mode we get on the per pipe
- // sendq, as the socket distributes to us. In monogamous mode we
- // bypass and get from the upper writeq directly (saving a set of
- // context switches).
+ // sendq, as the socket distributes to us. In monogamous mode
+ // we bypass and get from the upper writeq directly (saving a
+ // set of context switches).
if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
} else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
// And the pipe read of course.
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -197,10 +197,10 @@ pair1_pipe_stop(void *arg)
nni_mtx_unlock(&s->mtx);
nni_msgq_close(p->sendq);
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
}
static void
@@ -213,13 +213,13 @@ pair1_pipe_recv_cb(void *arg)
nni_pipe * npipe = p->npipe;
int rv;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe ID.
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
@@ -241,20 +241,20 @@ pair1_pipe_recv_cb(void *arg)
// keep getting more.
if (hdr > (unsigned) s->ttl) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Store the pipe id followed by the hop count.
if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Send the message up.
- p->aio_putq.a_msg = msg;
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -265,13 +265,13 @@ pair1_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Socket closing...
return;
}
- msg = s->aio_getq.a_msg;
- s->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// By definition we are in polyamorous mode.
NNI_ASSERT(s->poly);
@@ -289,7 +289,7 @@ pair1_sock_getq_cb(void *arg)
// Pipe not present!
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
return;
}
@@ -302,7 +302,7 @@ pair1_sock_getq_cb(void *arg)
}
nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
@@ -310,13 +310,13 @@ pair1_pipe_putq_cb(void *arg)
{
pair1_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -327,13 +327,13 @@ pair1_pipe_getq_cb(void *arg)
nni_msg * msg;
uint32_t hops;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_getq);
+ nni_aio_set_msg(p->aio_getq, NULL);
// Raw mode messages have the header already formed, with
// a hop count. Cooked mode messages have no
@@ -354,13 +354,13 @@ pair1_pipe_getq_cb(void *arg)
goto badmsg;
}
- p->aio_send.a_msg = msg;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->npipe, p->aio_send);
return;
badmsg:
nni_msg_free(msg);
- nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void
@@ -369,20 +369,16 @@ pair1_pipe_send_cb(void *arg)
pair1_pipe *p = arg;
pair1_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
// In polyamorous mode, we want to get from the sendq; in
// monogamous we get from upper writeq.
- if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
- } else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- }
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void