aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/bus
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-20 17:03:12 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-23 16:14:53 -0700
commit3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch)
treea45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/protocol/bus
parentfdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff)
downloadnng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/protocol/bus')
-rw-r--r--src/protocol/bus/bus.c28
1 files changed, 27 insertions, 1 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index cad21989..046aa19a 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -22,6 +22,9 @@ typedef struct bus_pipe bus_pipe;
typedef struct bus_sock bus_sock;
static void bus_sock_getq(bus_sock *);
+static void bus_sock_send(void *, nni_aio *);
+static void bus_sock_recv(void *, nni_aio *);
+
static void bus_pipe_getq(bus_pipe *);
static void bus_pipe_send(bus_pipe *);
static void bus_pipe_recv(bus_pipe *);
@@ -39,6 +42,8 @@ struct bus_sock {
nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
+ nni_msgq *uwq;
+ nni_msgq *urq;
};
// A bus_pipe is our per-pipe protocol private structure.
@@ -82,6 +87,8 @@ bus_sock_init(void **sp, nni_sock *nsock)
}
s->nsock = nsock;
s->raw = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
*sp = s;
return (0);
@@ -259,7 +266,6 @@ bus_sock_getq_cb(void *arg)
bus_sock *s = arg;
bus_pipe *p;
bus_pipe *lastp;
- nni_msgq *uwq = nni_sock_sendq(s->nsock);
nni_msg * msg;
nni_msg * dup;
uint32_t sender;
@@ -338,6 +344,24 @@ bus_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+bus_sock_send(void *arg, nni_aio *aio)
+{
+ bus_sock *s = arg;
+
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+bus_sock_recv(void *arg, nni_aio *aio)
+{
+ bus_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops bus_pipe_ops = {
.pipe_init = bus_pipe_init,
.pipe_fini = bus_pipe_fini,
@@ -360,6 +384,8 @@ static nni_proto_sock_ops bus_sock_ops = {
.sock_fini = bus_sock_fini,
.sock_open = bus_sock_open,
.sock_close = bus_sock_close,
+ .sock_send = bus_sock_send,
+ .sock_recv = bus_sock_recv,
.sock_options = bus_sock_options,
};