aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair/pair_v0.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair/pair_v0.c')
-rw-r--r--src/protocol/pair/pair_v0.c90
1 files changed, 47 insertions, 43 deletions
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index ef420051..486ce43b 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -43,10 +43,10 @@ struct pair0_sock {
struct pair0_pipe {
nni_pipe * npipe;
pair0_sock *psock;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
};
static int
@@ -76,18 +76,34 @@ pair0_sock_fini(void *arg)
NNI_FREE_STRUCT(s);
}
+static void
+pair0_pipe_fini(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
pair0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair0_send_cb, p);
- nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+ if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ pair0_pipe_fini(p);
+ return (rv);
+ }
p->npipe = npipe;
p->psock = psock;
@@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (0);
}
-static void
-pair0_pipe_fini(void *arg)
-{
- pair0_pipe *p = arg;
-
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair0_pipe_start(void *arg)
{
@@ -123,8 +127,8 @@ pair0_pipe_start(void *arg)
// Schedule a getq on the upper, and a read from the pipe.
// Each of these also sets up another hold on the pipe itself.
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
nni_mtx_lock(&s->mtx);
if (s->ppipe == p) {
@@ -154,17 +158,17 @@ pair0_recv_cb(void *arg)
pair0_sock *s = p->psock;
nni_msg * msg;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_putq.a_msg = msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -172,13 +176,13 @@ pair0_putq_cb(void *arg)
{
pair0_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -187,14 +191,14 @@ pair0_getq_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- p->aio_send.a_msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
@@ -203,14 +207,14 @@ pair0_send_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void