diff options
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 37 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.h | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 57 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.h | 10 |
4 files changed, 59 insertions, 54 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index aaa22801..45f4b7d9 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -40,7 +40,6 @@ static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { nni_msgq *uwq; - bool raw; nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; @@ -83,7 +82,6 @@ pub0_sock_init(void **sp, nni_sock *sock) return (rv); } - s->raw = false; NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -273,20 +271,6 @@ pub0_pipe_send_cb(void *arg) nni_msgq_aio_get(p->sendq, p->aio_getq); } -static int -pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pub0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pub0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pub0_sock_recv(void *arg, nni_aio *aio) { @@ -310,12 +294,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = { }; static nni_proto_sock_option pub0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pub0_sock_getopt_raw, - .pso_setopt = pub0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -341,8 +319,23 @@ static nni_proto pub0_proto = { .proto_pipe_ops = &pub0_pipe_ops, }; +static nni_proto pub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, +}; + int nng_pub0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pub0_proto)); } + +int +nng_pub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pub0_proto_raw)); +} diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h index 2388a292..877f2f1c 100644 --- a/src/protocol/pubsub0/pub.h +++ b/src/protocol/pubsub0/pub.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pub0_open(nng_socket *); +NNG_DECL int nng_pub0_open_raw(nng_socket *); #ifndef nng_pub_open #define nng_pub_open nng_pub0_open #endif +#ifndef nng_pub_open_raw +#define nng_pub_open_raw nng_pub0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 6b1f1173..b41b33ea 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -44,7 +44,6 @@ struct sub0_topic { struct sub0_sock { nni_list topics; nni_msgq *urq; - bool raw; nni_mtx lk; }; @@ -66,7 +65,6 @@ sub0_sock_init(void **sp, nni_sock *sock) } nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub0_topic, node); - s->raw = false; s->urq = nni_sock_recvq(sock); *sp = s; @@ -277,20 +275,6 @@ sub0_unsubscribe(void *arg, const void *buf, size_t sz, int typ) return (NNG_ENOENT); } -static int -sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - sub0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - sub0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void sub0_sock_send(void *arg, nni_aio *aio) { @@ -315,16 +299,13 @@ sub0_sock_filter(void *arg, nni_msg *msg) size_t len; int match; - nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } - body = nni_msg_body(msg); len = nni_msg_len(msg); match = 0; + + nni_mtx_lock(&s->lk); + // Check to see if the message matches one of our subscriptions. NNI_LIST_FOREACH (&s->topics, topic) { if (len >= topic->len) { @@ -362,12 +343,6 @@ static nni_proto_pipe_ops sub0_pipe_ops = { static nni_proto_sock_option sub0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = sub0_sock_getopt_raw, - .pso_setopt = sub0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_type = NNI_TYPE_OPAQUE, .pso_getopt = NULL, @@ -396,6 +371,17 @@ static nni_proto_sock_ops sub0_sock_ops = { .sock_options = sub0_sock_options, }; +static nni_proto_sock_ops sub0_sock_ops_raw = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = NULL, // raw does not filter + .sock_options = sub0_sock_options, +}; + static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, @@ -405,8 +391,23 @@ static nni_proto sub0_proto = { .proto_pipe_ops = &sub0_pipe_ops, }; +static nni_proto sub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &sub0_sock_ops_raw, + .proto_pipe_ops = &sub0_pipe_ops, +}; + int nng_sub0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &sub0_proto)); } + +int +nng_sub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &sub0_proto_raw)); +} diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h index 1a09145d..acb5cda3 100644 --- a/src/protocol/pubsub0/sub.h +++ b/src/protocol/pubsub0/sub.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_sub0_open(nng_socket *); +NNG_DECL int nng_sub0_open_raw(nng_socket *); + #ifndef nng_sub_open #define nng_sub_open nng_sub0_open #endif +#ifndef nng_sub_open_raw +#define nng_sub_open_raw nng_sub0_open_raw +#endif + #define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" #define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" |
