diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-16 21:46:56 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-17 09:44:39 -0700 |
| commit | e490aa3353f05e158a0f1f485f371cd49e70b4f5 (patch) | |
| tree | b435b7587b94a3f3d5d8ec127e2d63cc978622bc /src | |
| parent | eab6a4d2d96d11d3926e927c135362fc166895f0 (diff) | |
| download | nng-e490aa3353f05e158a0f1f485f371cd49e70b4f5.tar.gz nng-e490aa3353f05e158a0f1f485f371cd49e70b4f5.tar.bz2 nng-e490aa3353f05e158a0f1f485f371cd49e70b4f5.zip | |
fixes #441 Unintentional semantic in bus protocol
Diffstat (limited to 'src')
| -rw-r--r-- | src/protocol/bus0/bus.c | 81 |
1 files changed, 79 insertions, 2 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index d95f54ba..852de7c8 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -35,6 +35,7 @@ static void bus0_pipe_getq(bus0_pipe *); static void bus0_pipe_recv(bus0_pipe *); static void bus0_sock_getq_cb(void *); +static void bus0_sock_getq_cb_raw(void *); static void bus0_pipe_getq_cb(void *); static void bus0_pipe_send_cb(void *); static void bus0_pipe_recv_cb(void *); @@ -47,6 +48,7 @@ struct bus0_sock { nni_mtx mtx; nni_msgq *uwq; nni_msgq *urq; + bool raw; }; // bus0_pipe is our per-pipe protocol private structure. @@ -89,6 +91,30 @@ bus0_sock_init(void **sp, nni_sock *nsock) } s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); + s->raw = false; + + *sp = s; + return (0); +} + +static int +bus0_sock_init_raw(void **sp, nni_sock *nsock) +{ + bus0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); + nni_mtx_init(&s->mtx); + if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb_raw, s)) != 0) { + bus0_sock_fini(s); + return (rv); + } + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->raw = true; *sp = s; return (0); @@ -241,7 +267,8 @@ bus0_pipe_recv_cb(void *arg) } msg = nni_aio_get_msg(p->aio_recv); - if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) { + if (s->raw && + (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0)) { // XXX: bump a nomemory stat nni_msg_free(msg); nni_aio_set_msg(p->aio_recv, NULL); @@ -249,6 +276,7 @@ bus0_pipe_recv_cb(void *arg) return; } + nni_msg_set_pipe(msg, nni_pipe_id(p->npipe)); nni_aio_set_msg(p->aio_putq, msg); nni_aio_set_msg(p->aio_recv, NULL); nni_msgq_aio_put(s->urq, p->aio_putq); @@ -278,6 +306,45 @@ bus0_sock_getq_cb(void *arg) bus0_pipe *lastp; nni_msg * msg; nni_msg * dup; + + if (nni_aio_result(s->aio_getq) != 0) { + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + + // We ignore any headers present for cooked mode. + nni_msg_header_clear(msg); + + nni_mtx_lock(&s->mtx); + lastp = nni_list_last(&s->pipes); + NNI_LIST_FOREACH (&s->pipes, p) { + if (p != lastp) { + if (nni_msg_dup(&dup, msg) != 0) { + continue; + } + } else { + dup = msg; + msg = NULL; + } + if (nni_msgq_tryput(p->sendq, dup) != 0) { + nni_msg_free(dup); + } + } + nni_mtx_unlock(&s->mtx); + nni_msg_free(msg); + + bus0_sock_getq(s); +} + +static void +bus0_sock_getq_cb_raw(void *arg) +{ + bus0_sock *s = arg; + bus0_pipe *p; + bus0_pipe *lastp; + nni_msg * msg; + nni_msg * dup; uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { @@ -384,6 +451,16 @@ static nni_proto_sock_ops bus0_sock_ops = { .sock_options = bus0_sock_options, }; +static nni_proto_sock_ops bus0_sock_ops_raw = { + .sock_init = bus0_sock_init_raw, + .sock_fini = bus0_sock_fini, + .sock_open = bus0_sock_open, + .sock_close = bus0_sock_close, + .sock_send = bus0_sock_send, + .sock_recv = bus0_sock_recv, + .sock_options = bus0_sock_options, +}; + static nni_proto bus0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_BUS_V0, "bus" }, @@ -398,7 +475,7 @@ static nni_proto bus0_proto_raw = { .proto_self = { NNI_PROTO_BUS_V0, "bus" }, .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &bus0_sock_ops, + .proto_sock_ops = &bus0_sock_ops_raw, .proto_pipe_ops = &bus0_pipe_ops, }; |
