diff options
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/bus0/bus.c | 37 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.h | 10 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.c | 37 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.h | 10 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 65 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.h | 9 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c | 36 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.h | 9 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.c | 37 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.h | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 37 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.h | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 57 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.h | 10 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 70 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.h | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 97 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.h | 8 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 68 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.h | 9 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 76 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.h | 9 |
22 files changed, 381 insertions, 337 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index 57fad341..2a2a1228 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -42,7 +42,6 @@ static void bus0_pipe_putq_cb(void *); // bus0_sock is our per-socket protocol private structure. struct bus0_sock { - bool raw; nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; @@ -89,7 +88,6 @@ bus0_sock_init(void **sp, nni_sock *nsock) bus0_sock_fini(s); return (rv); } - s->raw = false; s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); @@ -333,20 +331,6 @@ bus0_pipe_recv(bus0_pipe *p) nni_pipe_recv(p->npipe, p->aio_recv); } -static int -bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - bus0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - bus0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void bus0_sock_send(void *arg, nni_aio *aio) { @@ -371,12 +355,6 @@ static nni_proto_pipe_ops bus0_pipe_ops = { }; static nni_proto_sock_option bus0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = bus0_sock_getopt_raw, - .pso_setopt = bus0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -402,8 +380,23 @@ static nni_proto bus0_proto = { .proto_pipe_ops = &bus0_pipe_ops, }; +static nni_proto bus0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_BUS_V0, "bus" }, + .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, +}; + int nng_bus0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &bus0_proto)); } + +int +nng_bus0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &bus0_proto_raw)); +} diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h index 0ef3d391..c8c23d84 100644 --- a/src/protocol/bus0/bus.h +++ b/src/protocol/bus0/bus.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_bus0_open(nng_socket *); +NNG_DECL int nng_bus0_open_raw(nng_socket *); + #ifndef nng_bus_open #define nng_bus_open nng_bus0_open #endif +#ifndef nng_bus_open_raw +#define nng_bus_open_raw nng_bus0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index ccece972..e275e52c 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -36,7 +36,6 @@ struct pair0_sock { pair0_pipe *ppipe; nni_msgq * uwq; nni_msgq * urq; - bool raw; nni_mtx mtx; }; @@ -63,7 +62,6 @@ pair0_sock_init(void **sp, nni_sock *nsock) } nni_mtx_init(&s->mtx); s->ppipe = NULL; - s->raw = false; s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); *sp = s; @@ -231,20 +229,6 @@ pair0_sock_close(void *arg) NNI_ARG_UNUSED(arg); } -static int -pair0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pair0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pair0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pair0_sock_send(void *arg, nni_aio *aio) { @@ -269,12 +253,6 @@ static nni_proto_pipe_ops pair0_pipe_ops = { }; static nni_proto_sock_option pair0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pair0_sock_getopt_raw, - .pso_setopt = pair0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -301,8 +279,23 @@ static nni_proto pair0_proto = { .proto_pipe_ops = &pair0_pipe_ops, }; +static nni_proto pair0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PAIR_V0, "pair" }, + .proto_peer = { NNI_PROTO_PAIR_V0, "pair" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pair0_sock_ops, + .proto_pipe_ops = &pair0_pipe_ops, +}; + int nng_pair0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pair0_proto)); } + +int +nng_pair0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pair0_proto_raw)); +} diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h index 6828c921..1356f1cd 100644 --- a/src/protocol/pair0/pair.h +++ b/src/protocol/pair0/pair.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_pair0_open(nng_socket *); +NNG_DECL int nng_pair0_open_raw(nng_socket *); + #ifndef nng_pair_open #define nng_pair_open nng_pair0_open #endif +#ifndef nng_pair_open_raw +#define nng_pair_open_raw nng_pair0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index becbbfa7..a3c01d46 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -72,7 +72,7 @@ pair1_sock_fini(void *arg) } static int -pair1_sock_init(void **sp, nni_sock *nsock) +pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw) { pair1_sock *s; int rv; @@ -94,7 +94,7 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (rv); } - s->raw = false; + s->raw = raw; s->poly = false; s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); @@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (0); } +static int +pair1_sock_init(void **sp, nni_sock *nsock) +{ + return (pair1_sock_init_impl(sp, nsock, false)); +} + +static int +pair1_sock_init_raw(void **sp, nni_sock *nsock) +{ + return (pair1_sock_init_impl(sp, nsock, true)); +} + static void pair1_pipe_fini(void *arg) { @@ -397,24 +409,6 @@ pair1_sock_close(void *arg) } static int -pair1_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pair1_sock *s = arg; - int rv; - nni_mtx_lock(&s->mtx); - rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -pair1_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pair1_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int pair1_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { pair1_sock *s = arg; @@ -475,12 +469,6 @@ static nni_proto_pipe_ops pair1_pipe_ops = { static nni_proto_sock_option pair1_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pair1_sock_getopt_raw, - .pso_setopt = pair1_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = pair1_sock_getopt_maxttl, @@ -522,3 +510,28 @@ nng_pair1_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pair1_proto)); } + +static nni_proto_sock_ops pair1_sock_ops_raw = { + .sock_init = pair1_sock_init_raw, + .sock_fini = pair1_sock_fini, + .sock_open = pair1_sock_open, + .sock_close = pair1_sock_close, + .sock_recv = pair1_sock_recv, + .sock_send = pair1_sock_send, + .sock_options = pair1_sock_options, +}; + +static nni_proto pair1_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PAIR_V1, "pair1" }, + .proto_peer = { NNI_PROTO_PAIR_V1, "pair1" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pair1_sock_ops_raw, + .proto_pipe_ops = &pair1_pipe_ops, +}; + +int +nng_pair1_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pair1_proto_raw)); +} diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h index bc519d9f..85da9d45 100644 --- a/src/protocol/pair1/pair.h +++ b/src/protocol/pair1/pair.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pair1_open(nng_socket *); +NNG_DECL int nng_pair1_open_raw(nng_socket *); #ifndef nng_pair_open #define nng_pair_open nng_pair1_open #endif +#ifndef nng_pair_open_raw +#define nng_pair_open_raw nng_pair1_open_raw +#endif + #define NNG_OPT_PAIR1_POLY "pair1:polyamorous" #ifdef __cplusplus diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 9aa7bea9..c5017d50 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -53,7 +53,6 @@ pull0_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = false; s->urq = nni_sock_recvq(sock); *sp = s; @@ -180,20 +179,6 @@ pull0_sock_close(void *arg) NNI_ARG_UNUSED(arg); } -static int -pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pull0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pull0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pull0_sock_send(void *arg, nni_aio *aio) { @@ -217,12 +202,6 @@ static nni_proto_pipe_ops pull0_pipe_ops = { }; static nni_proto_sock_option pull0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pull0_sock_getopt_raw, - .pso_setopt = pull0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -248,8 +227,23 @@ static nni_proto pull0_proto = { .proto_sock_ops = &pull0_sock_ops, }; +static nni_proto pull0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PULL_V0, "pull" }, + .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, +}; + int nng_pull0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pull0_proto)); } + +int +nng_pull0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pull0_proto_raw)); +} diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h index 75bded03..1c5d63e3 100644 --- a/src/protocol/pipeline0/pull.h +++ b/src/protocol/pipeline0/pull.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pull0_open(nng_socket *); +NNG_DECL int nng_pull0_open_raw(nng_socket *); #ifndef nng_pull_open #define nng_pull_open nng_pull0_open #endif +#ifndef nng_pull_open_raw +#define nng_pull_open_raw nng_pull0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 8c8fa13e..2ad657b6 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -36,7 +36,6 @@ static void push0_getq_cb(void *); // push0_sock is our per-socket protocol private structure. struct push0_sock { nni_msgq *uwq; - bool raw; }; // push0_pipe is our per-pipe protocol private structure. @@ -58,7 +57,6 @@ push0_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = false; s->uwq = nni_sock_sendq(sock); *sp = s; return (0); @@ -197,20 +195,6 @@ push0_getq_cb(void *arg) nni_pipe_send(p->pipe, p->aio_send); } -static int -push0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - push0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -push0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - push0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void push0_sock_send(void *arg, nni_aio *aio) { @@ -234,12 +218,6 @@ static nni_proto_pipe_ops push0_pipe_ops = { }; static nni_proto_sock_option push0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = push0_sock_getopt_raw, - .pso_setopt = push0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -265,8 +243,23 @@ static nni_proto push0_proto = { .proto_sock_ops = &push0_sock_ops, }; +static nni_proto push0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, +}; + int nng_push0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &push0_proto)); } + +int +nng_push0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &push0_proto_raw)); +} diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h index c7303b92..a1384e0a 100644 --- a/src/protocol/pipeline0/push.h +++ b/src/protocol/pipeline0/push.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_push0_open(nng_socket *); +NNG_DECL int nng_push0_open_raw(nng_socket *); #ifndef nng_push_open #define nng_push_open nng_push0_open #endif +#ifndef nng_push_open_raw +#define nng_push_open_raw nng_push0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index aaa22801..45f4b7d9 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -40,7 +40,6 @@ static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { nni_msgq *uwq; - bool raw; nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; @@ -83,7 +82,6 @@ pub0_sock_init(void **sp, nni_sock *sock) return (rv); } - s->raw = false; NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -273,20 +271,6 @@ pub0_pipe_send_cb(void *arg) nni_msgq_aio_get(p->sendq, p->aio_getq); } -static int -pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pub0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pub0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pub0_sock_recv(void *arg, nni_aio *aio) { @@ -310,12 +294,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = { }; static nni_proto_sock_option pub0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pub0_sock_getopt_raw, - .pso_setopt = pub0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -341,8 +319,23 @@ static nni_proto pub0_proto = { .proto_pipe_ops = &pub0_pipe_ops, }; +static nni_proto pub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, +}; + int nng_pub0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pub0_proto)); } + +int +nng_pub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pub0_proto_raw)); +} diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h index 2388a292..877f2f1c 100644 --- a/src/protocol/pubsub0/pub.h +++ b/src/protocol/pubsub0/pub.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pub0_open(nng_socket *); +NNG_DECL int nng_pub0_open_raw(nng_socket *); #ifndef nng_pub_open #define nng_pub_open nng_pub0_open #endif +#ifndef nng_pub_open_raw +#define nng_pub_open_raw nng_pub0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 6b1f1173..b41b33ea 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -44,7 +44,6 @@ struct sub0_topic { struct sub0_sock { nni_list topics; nni_msgq *urq; - bool raw; nni_mtx lk; }; @@ -66,7 +65,6 @@ sub0_sock_init(void **sp, nni_sock *sock) } nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub0_topic, node); - s->raw = false; s->urq = nni_sock_recvq(sock); *sp = s; @@ -277,20 +275,6 @@ sub0_unsubscribe(void *arg, const void *buf, size_t sz, int typ) return (NNG_ENOENT); } -static int -sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - sub0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - sub0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void sub0_sock_send(void *arg, nni_aio *aio) { @@ -315,16 +299,13 @@ sub0_sock_filter(void *arg, nni_msg *msg) size_t len; int match; - nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } - body = nni_msg_body(msg); len = nni_msg_len(msg); match = 0; + + nni_mtx_lock(&s->lk); + // Check to see if the message matches one of our subscriptions. NNI_LIST_FOREACH (&s->topics, topic) { if (len >= topic->len) { @@ -362,12 +343,6 @@ static nni_proto_pipe_ops sub0_pipe_ops = { static nni_proto_sock_option sub0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = sub0_sock_getopt_raw, - .pso_setopt = sub0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_type = NNI_TYPE_OPAQUE, .pso_getopt = NULL, @@ -396,6 +371,17 @@ static nni_proto_sock_ops sub0_sock_ops = { .sock_options = sub0_sock_options, }; +static nni_proto_sock_ops sub0_sock_ops_raw = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = NULL, // raw does not filter + .sock_options = sub0_sock_options, +}; + static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, @@ -405,8 +391,23 @@ static nni_proto sub0_proto = { .proto_pipe_ops = &sub0_pipe_ops, }; +static nni_proto sub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &sub0_sock_ops_raw, + .proto_pipe_ops = &sub0_pipe_ops, +}; + int nng_sub0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &sub0_proto)); } + +int +nng_sub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &sub0_proto_raw)); +} diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h index 1a09145d..acb5cda3 100644 --- a/src/protocol/pubsub0/sub.h +++ b/src/protocol/pubsub0/sub.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_sub0_open(nng_socket *); +NNG_DECL int nng_sub0_open_raw(nng_socket *); + #ifndef nng_sub_open #define nng_sub_open nng_sub0_open #endif +#ifndef nng_sub_open_raw +#define nng_sub_open_raw nng_sub0_open_raw +#endif + #define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" #define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f62406cd..78a1f2ee 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -41,7 +41,6 @@ struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; - bool raw; int ttl; nni_idhash *pipes; char * btrace; @@ -92,7 +91,6 @@ rep0_sock_init(void **sp, nni_sock *sock) } s->ttl = 8; // Per RFC - s->raw = false; s->btrace = NULL; s->btrace_len = 0; s->uwq = nni_sock_sendq(sock); @@ -353,25 +351,6 @@ rep0_pipe_putq_cb(void *arg) } static int -rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - rep0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->lk); - rv = nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->lk); - return (rv); -} - -static int -rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - rep0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { rep0_sock *s = arg; @@ -393,10 +372,6 @@ rep0_sock_filter(void *arg, nni_msg *msg) size_t len; nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } len = nni_msg_header_len(msg); header = nni_msg_header(msg); @@ -417,6 +392,13 @@ rep0_sock_filter(void *arg, nni_msg *msg) } static void +rep0_sock_send_raw(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + nni_msgq_aio_put(s->uwq, aio); +} + +static void rep0_sock_send(void *arg, nni_aio *aio) { rep0_sock *s = arg; @@ -424,12 +406,6 @@ rep0_sock_send(void *arg, nni_aio *aio) nni_msg * msg; nni_mtx_lock(&s->lk); - if (s->raw) { - // Pass thru - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); - return; - } if (s->btrace == NULL) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ESTATE); @@ -475,12 +451,6 @@ static nni_proto_pipe_ops rep0_pipe_ops = { static nni_proto_sock_option rep0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = rep0_sock_getopt_raw, - .pso_setopt = rep0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = rep0_sock_getopt_maxttl, @@ -503,6 +473,17 @@ static nni_proto_sock_ops rep0_sock_ops = { .sock_recv = rep0_sock_recv, }; +static nni_proto_sock_ops rep0_sock_ops_raw = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = NULL, // No filtering for raw mode + .sock_send = rep0_sock_send_raw, + .sock_recv = rep0_sock_recv, +}; + static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, @@ -512,8 +493,23 @@ static nni_proto rep0_proto = { .proto_pipe_ops = &rep0_pipe_ops, }; +static nni_proto rep0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REP_V0, "rep" }, + .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &rep0_sock_ops_raw, + .proto_pipe_ops = &rep0_pipe_ops, +}; + int nng_rep0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &rep0_proto)); } + +int +nng_rep0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &rep0_proto_raw)); +} diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h index 93df9379..612127a2 100644 --- a/src/protocol/reqrep0/rep.h +++ b/src/protocol/reqrep0/rep.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_rep0_open(nng_socket *); +NNG_DECL int nng_rep0_open_raw(nng_socket *); #ifndef nng_rep_open #define nng_rep_open nng_rep0_open #endif +#ifndef nng_rep_open +#define nng_rep_open_raw nng_rep0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 0a5b566a..4d35ca1f 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -78,7 +78,7 @@ static void req0_recv_cb(void *); static void req0_putq_cb(void *); static int -req0_sock_init(void **sp, nni_sock *sock) +req0_sock_init_impl(void **sp, nni_sock *sock, bool raw) { req0_sock *s; @@ -96,7 +96,7 @@ req0_sock_init(void **sp, nni_sock *sock) s->nextid = nni_random(); s->retry = NNI_SECOND * 60; s->reqmsg = NULL; - s->raw = false; + s->raw = raw; s->wantw = false; s->resend = NNI_TIME_ZERO; s->ttl = 8; @@ -107,6 +107,18 @@ req0_sock_init(void **sp, nni_sock *sock) return (0); } +static int +req0_sock_init(void **sp, nni_sock *sock) +{ + return (req0_sock_init_impl(sp, sock, false)); +} + +static int +req0_sock_init_raw(void **sp, nni_sock *sock) +{ + return (req0_sock_init_impl(sp, sock, true)); +} + static void req0_sock_open(void *arg) { @@ -250,20 +262,6 @@ req0_pipe_stop(void *arg) } static int -req0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -req0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { req0_sock *s = arg; @@ -513,11 +511,6 @@ req0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); @@ -559,6 +552,14 @@ req0_sock_send(void *arg, nni_aio *aio) nni_aio_finish(aio, 0, len); } +static void +req0_sock_send_raw(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + static nni_msg * req0_sock_filter(void *arg, nni_msg *msg) { @@ -566,11 +567,6 @@ req0_sock_filter(void *arg, nni_msg *msg) nni_msg * rmsg; nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } if (nni_msg_header_len(msg) < 4) { nni_mtx_unlock(&s->mtx); @@ -608,17 +604,23 @@ req0_sock_recv(void *arg, nni_aio *aio) req0_sock *s = arg; nni_mtx_lock(&s->mtx); - if (!s->raw) { - if (s->reqmsg == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } nni_mtx_unlock(&s->mtx); nni_msgq_aio_get(s->urq, aio); } +static void +req0_sock_recv_raw(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops req0_pipe_ops = { .pipe_init = req0_pipe_init, .pipe_fini = req0_pipe_fini, @@ -628,12 +630,6 @@ static nni_proto_pipe_ops req0_pipe_ops = { static nni_proto_sock_option req0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = req0_sock_getopt_raw, - .pso_setopt = req0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = req0_sock_getopt_maxttl, @@ -676,3 +672,28 @@ nng_req0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &req0_proto)); } + +static nni_proto_sock_ops req0_sock_ops_raw = { + .sock_init = req0_sock_init_raw, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_send = req0_sock_send_raw, + .sock_recv = req0_sock_recv_raw, +}; + +static nni_proto req0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REQ_V0, "req" }, + .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &req0_sock_ops_raw, + .proto_pipe_ops = &req0_pipe_ops, +}; + +int +nng_req0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &req0_proto_raw)); +}
\ No newline at end of file diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h index 99c9bf62..392c7932 100644 --- a/src/protocol/reqrep0/req.h +++ b/src/protocol/reqrep0/req.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,10 +16,14 @@ extern "C" { #endif NNG_DECL int nng_req0_open(nng_socket *); +NNG_DECL int nng_req0_open_raw(nng_socket *); #ifndef nng_req_open #define nng_req_open nng_req0_open #endif +#ifndef nng_req_open_raw +#define nng_req_open_raw nng_req0_open_raw +#endif #define NNG_OPT_REQ_RESENDTIME "req:resend-time" diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index eeb09d2a..1605d9e6 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -40,7 +40,6 @@ static void resp0_pipe_fini(void *); struct resp0_sock { nni_msgq * urq; nni_msgq * uwq; - bool raw; int ttl; nni_idhash *pipes; char * btrace; @@ -93,7 +92,6 @@ resp0_sock_init(void **sp, nni_sock *nsock) } s->ttl = 8; // Per RFC - s->raw = false; s->btrace = NULL; s->btrace_len = 0; s->urq = nni_sock_recvq(nsock); @@ -347,36 +345,25 @@ resp0_putq_cb(void *arg) } static int -resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { resp0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->mtx); - rv = nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->mtx); - return (rv); + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); } static int -resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) { resp0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); + return (nni_copyout_int(s->ttl, buf, szp, typ)); } -static int -resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +static void +resp0_sock_send_raw(void *arg, nni_aio *aio) { resp0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); -} -static int -resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) -{ - resp0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, typ)); + nni_msgq_aio_put(s->uwq, aio); } static void @@ -387,11 +374,6 @@ resp0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } msg = nni_aio_get_msg(aio); @@ -428,10 +410,6 @@ resp0_sock_filter(void *arg, nni_msg *msg) size_t len; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - return (msg); - } len = nni_msg_header_len(msg); header = nni_msg_header(msg); @@ -469,12 +447,6 @@ static nni_proto_pipe_ops resp0_pipe_ops = { static nni_proto_sock_option resp0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = resp0_sock_getopt_raw, - .pso_setopt = resp0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = resp0_sock_getopt_maxttl, @@ -497,6 +469,17 @@ static nni_proto_sock_ops resp0_sock_ops = { .sock_options = resp0_sock_options, }; +static nni_proto_sock_ops resp0_sock_ops_raw = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = NULL, // no filter for raw + .sock_send = resp0_sock_send_raw, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, +}; + static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, @@ -506,8 +489,23 @@ static nni_proto resp0_proto = { .proto_pipe_ops = &resp0_pipe_ops, }; +static nni_proto resp0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &resp0_sock_ops_raw, + .proto_pipe_ops = &resp0_pipe_ops, +}; + int nng_respondent0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &resp0_proto)); } + +int +nng_respondent0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &resp0_proto_raw)); +} diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h index 58c65298..b865b2ac 100644 --- a/src/protocol/survey0/respond.h +++ b/src/protocol/survey0/respond.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_respondent0_open(nng_socket *); +NNG_DECL int nng_respondent0_open_raw(nng_socket *); #ifndef nng_respondent_open #define nng_respondent_open nng_respondent0_open #endif +#ifndef nng_respondent_open_raw +#define nng_respondent_open_raw nng_respondent0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index a5909015..b7158464 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -39,7 +39,6 @@ static void surv0_timeout(void *); struct surv0_sock { nni_duration survtime; nni_time expire; - bool raw; int ttl; uint32_t nextid; // next id uint32_t survid; // outstanding request ID (big endian) @@ -92,7 +91,6 @@ surv0_sock_init(void **sp, nni_sock *nsock) nni_timer_init(&s->timer, surv0_timeout, s); s->nextid = nni_random(); - s->raw = false; s->survtime = NNI_SECOND; s->expire = NNI_TIME_ZERO; s->uwq = nni_sock_sendq(nsock); @@ -275,28 +273,6 @@ failed: } static int -surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - surv0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->mtx); - if ((rv = nni_copyin_bool(&s->raw, buf, sz, typ)) == 0) { - s->survid = 0; - nni_timer_cancel(&s->timer); - } - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - surv0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int surv0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { surv0_sock *s = arg; @@ -391,6 +367,14 @@ surv0_sock_recv(void *arg, nni_aio *aio) } static void +surv0_sock_send_raw(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void surv0_sock_send(void *arg, nni_aio *aio) { surv0_sock *s = arg; @@ -398,13 +382,6 @@ surv0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - // No automatic retry, and the request ID must - // be in the header coming down. - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the @@ -437,11 +414,6 @@ surv0_sock_filter(void *arg, nni_msg *msg) surv0_sock *s = arg; nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || (nni_msg_header_trim_u32(msg) != s->survid)) { @@ -464,12 +436,6 @@ static nni_proto_pipe_ops surv0_pipe_ops = { static nni_proto_sock_option surv0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = surv0_sock_getopt_raw, - .pso_setopt = surv0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, .pso_type = NNI_TYPE_DURATION, .pso_getopt = surv0_sock_getopt_surveytime, @@ -498,6 +464,17 @@ static nni_proto_sock_ops surv0_sock_ops = { .sock_options = surv0_sock_options, }; +static nni_proto_sock_ops surv0_sock_ops_raw = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send_raw, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, +}; + static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, @@ -507,8 +484,23 @@ static nni_proto surv0_proto = { .proto_pipe_ops = &surv0_pipe_ops, }; +static nni_proto surv0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &surv0_sock_ops_raw, + .proto_pipe_ops = &surv0_pipe_ops, +}; + int nng_surveyor0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &surv0_proto)); } + +int +nng_surveyor0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &surv0_proto_raw)); +} diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h index a7b6d943..37f76fbf 100644 --- a/src/protocol/survey0/survey.h +++ b/src/protocol/survey0/survey.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_surveyor0_open(nng_socket *); +NNG_DECL int nng_surveyor0_open_raw(nng_socket *); #ifndef nng_surveyor_open #define nng_surveyor_open nng_surveyor0_open #endif +#ifndef nng_surveyor_open_raw +#define nng_surveyor_open_raw nng_surveyor0_open_raw +#endif + #define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" #ifdef __cplusplus |
