aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/protocol/bus0/bus.c81
1 files changed, 79 insertions, 2 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index d95f54ba..852de7c8 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -35,6 +35,7 @@ static void bus0_pipe_getq(bus0_pipe *);
static void bus0_pipe_recv(bus0_pipe *);
static void bus0_sock_getq_cb(void *);
+static void bus0_sock_getq_cb_raw(void *);
static void bus0_pipe_getq_cb(void *);
static void bus0_pipe_send_cb(void *);
static void bus0_pipe_recv_cb(void *);
@@ -47,6 +48,7 @@ struct bus0_sock {
nni_mtx mtx;
nni_msgq *uwq;
nni_msgq *urq;
+ bool raw;
};
// bus0_pipe is our per-pipe protocol private structure.
@@ -89,6 +91,30 @@ bus0_sock_init(void **sp, nni_sock *nsock)
}
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
+ s->raw = false;
+
+ *sp = s;
+ return (0);
+}
+
+static int
+bus0_sock_init_raw(void **sp, nni_sock *nsock)
+{
+ bus0_sock *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) {
+ bus0_sock_fini(s);
+ return (rv);
+ }
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->raw = true;
*sp = s;
return (0);
@@ -241,7 +267,8 @@ bus0_pipe_recv_cb(void *arg)
}
msg = nni_aio_get_msg(p->aio_recv);
- if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) {
+ if (s->raw &&
+ (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0)) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
@@ -249,6 +276,7 @@ bus0_pipe_recv_cb(void *arg)
return;
}
+ nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
nni_aio_set_msg(p->aio_putq, msg);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msgq_aio_put(s->urq, p->aio_putq);
@@ -278,6 +306,45 @@ bus0_sock_getq_cb(void *arg)
bus0_pipe *lastp;
nni_msg * msg;
nni_msg * dup;
+
+ if (nni_aio_result(s->aio_getq) != 0) {
+ return;
+ }
+
+ msg = nni_aio_get_msg(s->aio_getq);
+
+ // We ignore any headers present for cooked mode.
+ nni_msg_header_clear(msg);
+
+ nni_mtx_lock(&s->mtx);
+ lastp = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != lastp) {
+ if (nni_msg_dup(&dup, msg) != 0) {
+ continue;
+ }
+ } else {
+ dup = msg;
+ msg = NULL;
+ }
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
+ nni_msg_free(dup);
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_msg_free(msg);
+
+ bus0_sock_getq(s);
+}
+
+static void
+bus0_sock_getq_cb_raw(void *arg)
+{
+ bus0_sock *s = arg;
+ bus0_pipe *p;
+ bus0_pipe *lastp;
+ nni_msg * msg;
+ nni_msg * dup;
uint32_t sender;
if (nni_aio_result(s->aio_getq) != 0) {
@@ -384,6 +451,16 @@ static nni_proto_sock_ops bus0_sock_ops = {
.sock_options = bus0_sock_options,
};
+static nni_proto_sock_ops bus0_sock_ops_raw = {
+ .sock_init = bus0_sock_init_raw,
+ .sock_fini = bus0_sock_fini,
+ .sock_open = bus0_sock_open,
+ .sock_close = bus0_sock_close,
+ .sock_send = bus0_sock_send,
+ .sock_recv = bus0_sock_recv,
+ .sock_options = bus0_sock_options,
+};
+
static nni_proto bus0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_BUS_V0, "bus" },
@@ -398,7 +475,7 @@ static nni_proto bus0_proto_raw = {
.proto_self = { NNI_PROTO_BUS_V0, "bus" },
.proto_peer = { NNI_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &bus0_sock_ops,
+ .proto_sock_ops = &bus0_sock_ops_raw,
.proto_pipe_ops = &bus0_pipe_ops,
};