diff options
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 70 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.h | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 97 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.h | 8 |
4 files changed, 105 insertions, 79 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f62406cd..78a1f2ee 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -41,7 +41,6 @@ struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; - bool raw; int ttl; nni_idhash *pipes; char * btrace; @@ -92,7 +91,6 @@ rep0_sock_init(void **sp, nni_sock *sock) } s->ttl = 8; // Per RFC - s->raw = false; s->btrace = NULL; s->btrace_len = 0; s->uwq = nni_sock_sendq(sock); @@ -353,25 +351,6 @@ rep0_pipe_putq_cb(void *arg) } static int -rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - rep0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->lk); - rv = nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->lk); - return (rv); -} - -static int -rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - rep0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { rep0_sock *s = arg; @@ -393,10 +372,6 @@ rep0_sock_filter(void *arg, nni_msg *msg) size_t len; nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } len = nni_msg_header_len(msg); header = nni_msg_header(msg); @@ -417,6 +392,13 @@ rep0_sock_filter(void *arg, nni_msg *msg) } static void +rep0_sock_send_raw(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + nni_msgq_aio_put(s->uwq, aio); +} + +static void rep0_sock_send(void *arg, nni_aio *aio) { rep0_sock *s = arg; @@ -424,12 +406,6 @@ rep0_sock_send(void *arg, nni_aio *aio) nni_msg * msg; nni_mtx_lock(&s->lk); - if (s->raw) { - // Pass thru - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); - return; - } if (s->btrace == NULL) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ESTATE); @@ -475,12 +451,6 @@ static nni_proto_pipe_ops rep0_pipe_ops = { static nni_proto_sock_option rep0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = rep0_sock_getopt_raw, - .pso_setopt = rep0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = rep0_sock_getopt_maxttl, @@ -503,6 +473,17 @@ static nni_proto_sock_ops rep0_sock_ops = { .sock_recv = rep0_sock_recv, }; +static nni_proto_sock_ops rep0_sock_ops_raw = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = NULL, // No filtering for raw mode + .sock_send = rep0_sock_send_raw, + .sock_recv = rep0_sock_recv, +}; + static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, @@ -512,8 +493,23 @@ static nni_proto rep0_proto = { .proto_pipe_ops = &rep0_pipe_ops, }; +static nni_proto rep0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REP_V0, "rep" }, + .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &rep0_sock_ops_raw, + .proto_pipe_ops = &rep0_pipe_ops, +}; + int nng_rep0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &rep0_proto)); } + +int +nng_rep0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &rep0_proto_raw)); +} diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h index 93df9379..612127a2 100644 --- a/src/protocol/reqrep0/rep.h +++ b/src/protocol/reqrep0/rep.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_rep0_open(nng_socket *); +NNG_DECL int nng_rep0_open_raw(nng_socket *); #ifndef nng_rep_open #define nng_rep_open nng_rep0_open #endif +#ifndef nng_rep_open +#define nng_rep_open_raw nng_rep0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 0a5b566a..4d35ca1f 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -78,7 +78,7 @@ static void req0_recv_cb(void *); static void req0_putq_cb(void *); static int -req0_sock_init(void **sp, nni_sock *sock) +req0_sock_init_impl(void **sp, nni_sock *sock, bool raw) { req0_sock *s; @@ -96,7 +96,7 @@ req0_sock_init(void **sp, nni_sock *sock) s->nextid = nni_random(); s->retry = NNI_SECOND * 60; s->reqmsg = NULL; - s->raw = false; + s->raw = raw; s->wantw = false; s->resend = NNI_TIME_ZERO; s->ttl = 8; @@ -107,6 +107,18 @@ req0_sock_init(void **sp, nni_sock *sock) return (0); } +static int +req0_sock_init(void **sp, nni_sock *sock) +{ + return (req0_sock_init_impl(sp, sock, false)); +} + +static int +req0_sock_init_raw(void **sp, nni_sock *sock) +{ + return (req0_sock_init_impl(sp, sock, true)); +} + static void req0_sock_open(void *arg) { @@ -250,20 +262,6 @@ req0_pipe_stop(void *arg) } static int -req0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -req0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { req0_sock *s = arg; @@ -513,11 +511,6 @@ req0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); @@ -559,6 +552,14 @@ req0_sock_send(void *arg, nni_aio *aio) nni_aio_finish(aio, 0, len); } +static void +req0_sock_send_raw(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + static nni_msg * req0_sock_filter(void *arg, nni_msg *msg) { @@ -566,11 +567,6 @@ req0_sock_filter(void *arg, nni_msg *msg) nni_msg * rmsg; nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } if (nni_msg_header_len(msg) < 4) { nni_mtx_unlock(&s->mtx); @@ -608,17 +604,23 @@ req0_sock_recv(void *arg, nni_aio *aio) req0_sock *s = arg; nni_mtx_lock(&s->mtx); - if (!s->raw) { - if (s->reqmsg == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } nni_mtx_unlock(&s->mtx); nni_msgq_aio_get(s->urq, aio); } +static void +req0_sock_recv_raw(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops req0_pipe_ops = { .pipe_init = req0_pipe_init, .pipe_fini = req0_pipe_fini, @@ -628,12 +630,6 @@ static nni_proto_pipe_ops req0_pipe_ops = { static nni_proto_sock_option req0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = req0_sock_getopt_raw, - .pso_setopt = req0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = req0_sock_getopt_maxttl, @@ -676,3 +672,28 @@ nng_req0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &req0_proto)); } + +static nni_proto_sock_ops req0_sock_ops_raw = { + .sock_init = req0_sock_init_raw, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_send = req0_sock_send_raw, + .sock_recv = req0_sock_recv_raw, +}; + +static nni_proto req0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REQ_V0, "req" }, + .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &req0_sock_ops_raw, + .proto_pipe_ops = &req0_pipe_ops, +}; + +int +nng_req0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &req0_proto_raw)); +}
\ No newline at end of file diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h index 99c9bf62..392c7932 100644 --- a/src/protocol/reqrep0/req.h +++ b/src/protocol/reqrep0/req.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,10 +16,14 @@ extern "C" { #endif NNG_DECL int nng_req0_open(nng_socket *); +NNG_DECL int nng_req0_open_raw(nng_socket *); #ifndef nng_req_open #define nng_req_open nng_req0_open #endif +#ifndef nng_req_open_raw +#define nng_req_open_raw nng_req0_open_raw +#endif #define NNG_OPT_REQ_RESENDTIME "req:resend-time" |
