aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/bus/bus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/bus/bus.c')
-rw-r--r--src/protocol/bus/bus.c28
1 files changed, 27 insertions, 1 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index cad21989..046aa19a 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -22,6 +22,9 @@ typedef struct bus_pipe bus_pipe;
typedef struct bus_sock bus_sock;
static void bus_sock_getq(bus_sock *);
+static void bus_sock_send(void *, nni_aio *);
+static void bus_sock_recv(void *, nni_aio *);
+
static void bus_pipe_getq(bus_pipe *);
static void bus_pipe_send(bus_pipe *);
static void bus_pipe_recv(bus_pipe *);
@@ -39,6 +42,8 @@ struct bus_sock {
nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
+ nni_msgq *uwq;
+ nni_msgq *urq;
};
// A bus_pipe is our per-pipe protocol private structure.
@@ -82,6 +87,8 @@ bus_sock_init(void **sp, nni_sock *nsock)
}
s->nsock = nsock;
s->raw = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
*sp = s;
return (0);
@@ -259,7 +266,6 @@ bus_sock_getq_cb(void *arg)
bus_sock *s = arg;
bus_pipe *p;
bus_pipe *lastp;
- nni_msgq *uwq = nni_sock_sendq(s->nsock);
nni_msg * msg;
nni_msg * dup;
uint32_t sender;
@@ -338,6 +344,24 @@ bus_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+bus_sock_send(void *arg, nni_aio *aio)
+{
+ bus_sock *s = arg;
+
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+bus_sock_recv(void *arg, nni_aio *aio)
+{
+ bus_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops bus_pipe_ops = {
.pipe_init = bus_pipe_init,
.pipe_fini = bus_pipe_fini,
@@ -360,6 +384,8 @@ static nni_proto_sock_ops bus_sock_ops = {
.sock_fini = bus_sock_fini,
.sock_open = bus_sock_open,
.sock_close = bus_sock_close,
+ .sock_send = bus_sock_send,
+ .sock_recv = bus_sock_recv,
.sock_options = bus_sock_options,
};