diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-31 13:06:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-11-02 16:10:26 -0700 |
| commit | 7bf591e20a94b8d926f92ab9b320f3b75d342345 (patch) | |
| tree | d67ce7cab328a004346419047feede7d579dad77 /src/protocol | |
| parent | d340af7dc250388f48d36c5078c4857c51bb6121 (diff) | |
| download | nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.gz nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.bz2 nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.zip | |
fixes #143 Protocols and transports should be "configurable"
This makes all the protocols and transports optional. All
of them except ZeroTier are enabled by default, but you can
now disable them (remove from the build) with cmake options.
The test suite is modified so that tests still run as much
as they can, but skip over things caused by missing functionality
from the library (due to configuration).
Further, the constant definitions and prototypes for functions
that are specific to transports or protocols are moved into
appropriate headers, which should be included directly by
applications wishing to use these.
We have also added and improved documentation -- all of the
transports are documented, and several more man pages for
protocols have been added. (Req/Rep and Surveyor are still
missing.)
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/bus0/CMakeLists.txt | 18 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.c (renamed from src/protocol/bus/bus.c) | 205 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.h | 28 | ||||
| -rw-r--r-- | src/protocol/pair0/CMakeLists.txt | 18 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.c (renamed from src/protocol/pair/pair_v0.c) | 4 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.h | 28 | ||||
| -rw-r--r-- | src/protocol/pair1/CMakeLists.txt | 18 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c (renamed from src/protocol/pair/pair_v1.c) | 6 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.h | 30 | ||||
| -rw-r--r-- | src/protocol/pipeline0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c (renamed from src/protocol/pipeline/pull.c) | 146 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.h | 28 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.c (renamed from src/protocol/pipeline/push.c) | 138 | ||||
| -rw-r--r-- | src/protocol/pipeline0/push.h | 28 | ||||
| -rw-r--r-- | src/protocol/pubsub0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c (renamed from src/protocol/pubsub/pub.c) | 169 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.h | 28 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c (renamed from src/protocol/pubsub/sub.c) | 184 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.h | 31 | ||||
| -rw-r--r-- | src/protocol/reqrep0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c (renamed from src/protocol/reqrep/rep.c) | 227 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.h | 28 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c (renamed from src/protocol/reqrep/req.c) | 239 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.h | 30 | ||||
| -rw-r--r-- | src/protocol/survey0/CMakeLists.txt | 23 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c (renamed from src/protocol/survey/respond.c) | 231 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.h | 28 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c (renamed from src/protocol/survey/survey.c) | 205 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.h | 30 |
29 files changed, 1380 insertions, 837 deletions
diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt new file mode 100644 index 00000000..5071054a --- /dev/null +++ b/src/protocol/bus0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Bus protocol + +if (NNG_PROTO_BUS0) + set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h) + install(FILES bus.h DESTINATION include/nng/protocol/bus0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c index 6ec0066b..3e15000b 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus0/bus.c @@ -12,31 +12,36 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/bus0/bus.h" // Bus protocol. The BUS protocol, each peer sends a message to its peers. // However, bus protocols do not "forward" (absent a device). So in order // for each participant to receive the message, each sender must be connected // to every other node in the network (full mesh). -typedef struct bus_pipe bus_pipe; -typedef struct bus_sock bus_sock; +#ifndef NNI_PROTO_BUS_V0 +#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0) +#endif -static void bus_sock_getq(bus_sock *); -static void bus_sock_send(void *, nni_aio *); -static void bus_sock_recv(void *, nni_aio *); +typedef struct bus0_pipe bus0_pipe; +typedef struct bus0_sock bus0_sock; -static void bus_pipe_getq(bus_pipe *); -static void bus_pipe_send(bus_pipe *); -static void bus_pipe_recv(bus_pipe *); +static void bus0_sock_getq(bus0_sock *); +static void bus0_sock_send(void *, nni_aio *); +static void bus0_sock_recv(void *, nni_aio *); -static void bus_sock_getq_cb(void *); -static void bus_pipe_getq_cb(void *); -static void bus_pipe_send_cb(void *); -static void bus_pipe_recv_cb(void *); -static void bus_pipe_putq_cb(void *); +static void bus0_pipe_getq(bus0_pipe *); +static void bus0_pipe_send(bus0_pipe *); +static void bus0_pipe_recv(bus0_pipe *); -// A bus_sock is our per-socket protocol private structure. -struct bus_sock { +static void bus0_sock_getq_cb(void *); +static void bus0_pipe_getq_cb(void *); +static void bus0_pipe_send_cb(void *); +static void bus0_pipe_recv_cb(void *); +static void bus0_pipe_putq_cb(void *); + +// bus0_sock is our per-socket protocol private structure. +struct bus0_sock { int raw; nni_aio * aio_getq; nni_list pipes; @@ -45,10 +50,10 @@ struct bus_sock { nni_msgq *urq; }; -// A bus_pipe is our per-pipe protocol private structure. -struct bus_pipe { +// bus0_pipe is our per-pipe protocol private structure. +struct bus0_pipe { nni_pipe * npipe; - bus_sock * psock; + bus0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -59,9 +64,9 @@ struct bus_pipe { }; static void -bus_sock_fini(void *arg) +bus0_sock_fini(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -70,18 +75,18 @@ bus_sock_fini(void *arg) } static int -bus_sock_init(void **sp, nni_sock *nsock) +bus0_sock_init(void **sp, nni_sock *nsock) { - bus_sock *s; - int rv; + bus0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&s->pipes, bus_pipe, node); + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) { - bus_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + bus0_sock_fini(s); return (rv); } s->raw = 0; @@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock) } static void -bus_sock_open(void *arg) +bus0_sock_open(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_close(void *arg) +bus0_sock_close(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -bus_pipe_fini(void *arg) +bus0_pipe_fini(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -123,10 +128,10 @@ bus_pipe_fini(void *arg) } static int -bus_pipe_init(void **pp, nni_pipe *npipe, void *s) +bus0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - bus_pipe *p; - int rv; + bus0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) { - bus_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + bus0_pipe_fini(p); return (rv); } @@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -bus_pipe_start(void *arg) +bus0_pipe_start(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); - bus_pipe_recv(p); - bus_pipe_getq(p); + bus0_pipe_recv(p); + bus0_pipe_getq(p); return (0); } static void -bus_pipe_stop(void *arg) +bus0_pipe_stop(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_msgq_close(p->sendq); @@ -185,9 +190,9 @@ bus_pipe_stop(void *arg) } static void -bus_pipe_getq_cb(void *arg) +bus0_pipe_getq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { // closed? @@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg) } static void -bus_pipe_send_cb(void *arg) +bus0_pipe_send_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { // closed? @@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg) return; } - bus_pipe_getq(p); + bus0_pipe_getq(p); } static void -bus_pipe_recv_cb(void *arg) +bus0_pipe_recv_cb(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; - nni_msg * msg; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); @@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg) } static void -bus_pipe_putq_cb(void *arg) +bus0_pipe_putq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg) } // Wait for another recv. - bus_pipe_recv(p); + bus0_pipe_recv(p); } static void -bus_sock_getq_cb(void *arg) +bus0_sock_getq_cb(void *arg) { - bus_sock *s = arg; - bus_pipe *p; - bus_pipe *lastp; - nni_msg * msg; - nni_msg * dup; - uint32_t sender; + bus0_sock *s = arg; + bus0_pipe *p; + bus0_pipe *lastp; + nni_msg * msg; + nni_msg * dup; + uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg) nni_msg_free(msg); } - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_getq(bus_sock *s) +bus0_sock_getq(bus0_sock *s) { nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -bus_pipe_getq(bus_pipe *p) +bus0_pipe_getq(bus0_pipe *p) { nni_msgq_aio_get(p->sendq, p->aio_getq); } static void -bus_pipe_recv(bus_pipe *p) +bus0_pipe_recv(bus0_pipe *p) { nni_pipe_recv(p->npipe, p->aio_recv); } static int -bus_sock_setopt_raw(void *arg, const void *buf, size_t sz) +bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) +bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -bus_sock_send(void *arg, nni_aio *aio) +bus0_sock_send(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -bus_sock_recv(void *arg, nni_aio *aio) +bus0_sock_recv(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops bus_pipe_ops = { - .pipe_init = bus_pipe_init, - .pipe_fini = bus_pipe_fini, - .pipe_start = bus_pipe_start, - .pipe_stop = bus_pipe_stop, +static nni_proto_pipe_ops bus0_pipe_ops = { + .pipe_init = bus0_pipe_init, + .pipe_fini = bus0_pipe_fini, + .pipe_start = bus0_pipe_start, + .pipe_stop = bus0_pipe_stop, }; -static nni_proto_sock_option bus_sock_options[] = { +static nni_proto_sock_option bus0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = bus_sock_getopt_raw, - .pso_setopt = bus_sock_setopt_raw, + .pso_getopt = bus0_sock_getopt_raw, + .pso_setopt = bus0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops bus_sock_ops = { - .sock_init = bus_sock_init, - .sock_fini = bus_sock_fini, - .sock_open = bus_sock_open, - .sock_close = bus_sock_close, - .sock_send = bus_sock_send, - .sock_recv = bus_sock_recv, - .sock_options = bus_sock_options, +static nni_proto_sock_ops bus0_sock_ops = { + .sock_init = bus0_sock_init, + .sock_fini = bus0_sock_fini, + .sock_open = bus0_sock_open, + .sock_close = bus0_sock_close, + .sock_send = bus0_sock_send, + .sock_recv = bus0_sock_recv, + .sock_options = bus0_sock_options, }; -static nni_proto bus_proto = { +static nni_proto bus0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_BUS_V0, "bus" }, .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &bus_sock_ops, - .proto_pipe_ops = &bus_pipe_ops, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, }; int nng_bus0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &bus_proto)); + return (nni_proto_open(sidp, &bus0_proto)); } diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h new file mode 100644 index 00000000..0ef3d391 --- /dev/null +++ b/src/protocol/bus0/bus.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_BUS0_BUS_H +#define NNG_PROTOCOL_BUS0_BUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_bus0_open(nng_socket *); + +#ifndef nng_bus_open +#define nng_bus_open nng_bus0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_BUS0_BUS_H diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt new file mode 100644 index 00000000..68e7ad34 --- /dev/null +++ b/src/protocol/pair0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv0 protocol + +if (NNG_PROTO_PAIR0) + set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c index 93cd1497..bac405b8 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair0/pair.c @@ -17,6 +17,10 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. +#ifndef NNI_PROTO_PAIR_V0 +#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0) +#endif + typedef struct pair0_pipe pair0_pipe; typedef struct pair0_sock pair0_sock; diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h new file mode 100644 index 00000000..6828c921 --- /dev/null +++ b/src/protocol/pair0/pair.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR0_PAIR_H +#define NNG_PROTOCOL_PAIR0_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair0_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR0_PAIR_H diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt new file mode 100644 index 00000000..f35d6959 --- /dev/null +++ b/src/protocol/pair1/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv1 protocol + +if (NNG_PROTO_PAIR1) + set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair1) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c index e14d06d5..3f6f63fc 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair1/pair.c @@ -13,10 +13,16 @@ #include "core/nng_impl.h" +#include "protocol/pair1/pair.h" + // Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, // usually, but it can support a polyamorous mode where a single server can // communicate with multiple partners. +#ifndef NNI_PROTO_PAIR_V1 +#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1) +#endif + typedef struct pair1_pipe pair1_pipe; typedef struct pair1_sock pair1_sock; diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h new file mode 100644 index 00000000..bc519d9f --- /dev/null +++ b/src/protocol/pair1/pair.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR1_PAIR_H +#define NNG_PROTOCOL_PAIR1_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair1_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair1_open +#endif + +#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR1_PAIR_H diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt new file mode 100644 index 00000000..6153c5a7 --- /dev/null +++ b/src/protocol/pipeline0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUSH0) + set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h) + install(FILES push.h DESTINATION include/nng/protocol/pipeline0) +endif() + +if (NNG_PROTO_PULL0) + set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h) + install(FILES pull.h DESTINATION include/nng/protocol/pipeline0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c index 9685f0a1..8c16cb17 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -15,31 +15,39 @@ // Pull protocol. The PULL protocol is the "read" side of a pipeline. -typedef struct pull_pipe pull_pipe; -typedef struct pull_sock pull_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void pull_putq_cb(void *); -static void pull_recv_cb(void *); -static void pull_putq(pull_pipe *, nni_msg *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// A pull_sock is our per-socket protocol private structure. -struct pull_sock { +typedef struct pull0_pipe pull0_pipe; +typedef struct pull0_sock pull0_sock; + +static void pull0_putq_cb(void *); +static void pull0_recv_cb(void *); +static void pull0_putq(pull0_pipe *, nni_msg *); + +// pull0_sock is our per-socket protocol private structure. +struct pull0_sock { nni_msgq *urq; int raw; }; -// A pull_pipe is our per-pipe protocol private structure. -struct pull_pipe { - nni_pipe * pipe; - pull_sock *pull; - nni_aio * putq_aio; - nni_aio * recv_aio; +// pull0_pipe is our per-pipe protocol private structure. +struct pull0_pipe { + nni_pipe * pipe; + pull0_sock *pull; + nni_aio * putq_aio; + nni_aio * recv_aio; }; static int -pull_sock_init(void **sp, nni_sock *sock) +pull0_sock_init(void **sp, nni_sock *sock) { - pull_sock *s; + pull0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock) } static void -pull_sock_fini(void *arg) +pull0_sock_fini(void *arg) { - pull_sock *s = arg; + pull0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -pull_pipe_fini(void *arg) +pull0_pipe_fini(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_fini(p->putq_aio); nni_aio_fini(p->recv_aio); @@ -70,17 +78,17 @@ pull_pipe_fini(void *arg) } static int -pull_pipe_init(void **pp, nni_pipe *pipe, void *s) +pull0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pull_pipe *p; - int rv; + pull0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) { - pull_pipe_fini(p); + if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + pull0_pipe_fini(p); return (rv); } @@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pull_pipe_start(void *arg) +pull0_pipe_start(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; // Start the pending pull... nni_pipe_recv(p->pipe, p->recv_aio); @@ -102,20 +110,20 @@ pull_pipe_start(void *arg) } static void -pull_pipe_stop(void *arg) +pull0_pipe_stop(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_stop(p->putq_aio); nni_aio_stop(p->recv_aio); } static void -pull_recv_cb(void *arg) +pull0_recv_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->recv_aio; - nni_msg * msg; + pull0_pipe *p = arg; + nni_aio * aio = p->recv_aio; + nni_msg * msg; if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. @@ -127,14 +135,14 @@ pull_recv_cb(void *arg) msg = nni_aio_get_msg(aio); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); nni_aio_set_msg(aio, NULL); - pull_putq(p, msg); + pull0_putq(p, msg); } static void -pull_putq_cb(void *arg) +pull0_putq_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->putq_aio; + pull0_pipe *p = arg; + nni_aio * aio = p->putq_aio; if (nni_aio_result(aio) != 0) { // If we failed to put, probably NNG_ECLOSED, nothing else @@ -148,11 +156,11 @@ pull_putq_cb(void *arg) nni_pipe_recv(p->pipe, p->recv_aio); } -// nni_pull_putq schedules a put operation to the user socket (sendup). +// pull0_putq schedules a put operation to the user socket (sendup). static void -pull_putq(pull_pipe *p, nni_msg *msg) +pull0_putq(pull0_pipe *p, nni_msg *msg) { - pull_sock *s = p->pull; + pull0_sock *s = p->pull; nni_aio_set_msg(p->putq_aio, msg); @@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg) } static void -pull_sock_open(void *arg) +pull0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -pull_sock_close(void *arg) +pull0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static int -pull_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pull_sock_send(void *arg, nni_aio *aio) +pull0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pull_sock_recv(void *arg, nni_aio *aio) +pull0_sock_recv(void *arg, nni_aio *aio) { - pull_sock *s = arg; + pull0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops pull_pipe_ops = { - .pipe_init = pull_pipe_init, - .pipe_fini = pull_pipe_fini, - .pipe_start = pull_pipe_start, - .pipe_stop = pull_pipe_stop, +static nni_proto_pipe_ops pull0_pipe_ops = { + .pipe_init = pull0_pipe_init, + .pipe_fini = pull0_pipe_fini, + .pipe_start = pull0_pipe_start, + .pipe_stop = pull0_pipe_stop, }; -static nni_proto_sock_option pull_sock_options[] = { +static nni_proto_sock_option pull0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pull_sock_getopt_raw, - .pso_setopt = pull_sock_setopt_raw, + .pso_getopt = pull0_sock_getopt_raw, + .pso_setopt = pull0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pull_sock_ops = { - .sock_init = pull_sock_init, - .sock_fini = pull_sock_fini, - .sock_open = pull_sock_open, - .sock_close = pull_sock_close, - .sock_send = pull_sock_send, - .sock_recv = pull_sock_recv, - .sock_options = pull_sock_options, +static nni_proto_sock_ops pull0_sock_ops = { + .sock_init = pull0_sock_init, + .sock_fini = pull0_sock_fini, + .sock_open = pull0_sock_open, + .sock_close = pull0_sock_close, + .sock_send = pull0_sock_send, + .sock_recv = pull0_sock_recv, + .sock_options = pull0_sock_options, }; -static nni_proto pull_proto = { +static nni_proto pull0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PULL_V0, "pull" }, .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_pipe_ops = &pull_pipe_ops, - .proto_sock_ops = &pull_sock_ops, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, }; int nng_pull0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pull_proto)); + return (nni_proto_open(sidp, &pull0_proto)); } diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h new file mode 100644 index 00000000..75bded03 --- /dev/null +++ b/src/protocol/pipeline0/pull.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H +#define NNG_PROTOCOL_PIPELINE0_PULL_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pull0_open(nng_socket *); + +#ifndef nng_pull_open +#define nng_pull_open nng_pull0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PULL_H diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c index 9ff74558..3dd83fe0 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline0/push.c @@ -17,23 +17,31 @@ // Push distributes fairly, or tries to, by giving messages in round-robin // order. -typedef struct push_pipe push_pipe; -typedef struct push_sock push_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void push_send_cb(void *); -static void push_recv_cb(void *); -static void push_getq_cb(void *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// An nni_push_sock is our per-socket protocol private structure. -struct push_sock { +typedef struct push0_pipe push0_pipe; +typedef struct push0_sock push0_sock; + +static void push0_send_cb(void *); +static void push0_recv_cb(void *); +static void push0_getq_cb(void *); + +// push0_sock is our per-socket protocol private structure. +struct push0_sock { nni_msgq *uwq; int raw; }; -// An nni_push_pipe is our per-pipe protocol private structure. -struct push_pipe { +// push0_pipe is our per-pipe protocol private structure. +struct push0_pipe { nni_pipe * pipe; - push_sock * push; + push0_sock * push; nni_list_node node; nni_aio *aio_recv; @@ -42,9 +50,9 @@ struct push_pipe { }; static int -push_sock_init(void **sp, nni_sock *sock) +push0_sock_init(void **sp, nni_sock *sock) { - push_sock *s; + push0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock) } static void -push_sock_fini(void *arg) +push0_sock_fini(void *arg) { - push_sock *s = arg; + push0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -push_sock_open(void *arg) +push0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_sock_close(void *arg) +push0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_pipe_fini(void *arg) +push0_pipe_fini(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_fini(p->aio_recv); nni_aio_fini(p->aio_send); @@ -87,18 +95,18 @@ push_pipe_fini(void *arg) } static int -push_pipe_init(void **pp, nni_pipe *pipe, void *s) +push0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - push_pipe *p; - int rv; + push0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) { - push_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + push0_pipe_fini(p); return (rv); } NNI_LIST_NODE_INIT(&p->node); @@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -push_pipe_start(void *arg) +push0_pipe_start(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { return (NNG_EPROTO); @@ -129,9 +137,9 @@ push_pipe_start(void *arg) } static void -push_pipe_stop(void *arg) +push0_pipe_stop(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_send); @@ -139,9 +147,9 @@ push_pipe_stop(void *arg) } static void -push_recv_cb(void *arg) +push0_recv_cb(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. @@ -155,10 +163,10 @@ push_recv_cb(void *arg) } static void -push_send_cb(void *arg) +push0_send_cb(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -171,10 +179,10 @@ push_send_cb(void *arg) } static void -push_getq_cb(void *arg) +push0_getq_cb(void *arg) { - push_pipe *p = arg; - nni_aio * aio = p->aio_getq; + push0_pipe *p = arg; + nni_aio * aio = p->aio_getq; if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. @@ -189,71 +197,71 @@ push_getq_cb(void *arg) } static int -push_sock_setopt_raw(void *arg, const void *buf, size_t sz) +push0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -push_sock_getopt_raw(void *arg, void *buf, size_t *szp) +push0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -push_sock_send(void *arg, nni_aio *aio) +push0_sock_send(void *arg, nni_aio *aio) { - push_sock *s = arg; + push0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -push_sock_recv(void *arg, nni_aio *aio) +push0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } -static nni_proto_pipe_ops push_pipe_ops = { - .pipe_init = push_pipe_init, - .pipe_fini = push_pipe_fini, - .pipe_start = push_pipe_start, - .pipe_stop = push_pipe_stop, +static nni_proto_pipe_ops push0_pipe_ops = { + .pipe_init = push0_pipe_init, + .pipe_fini = push0_pipe_fini, + .pipe_start = push0_pipe_start, + .pipe_stop = push0_pipe_stop, }; -static nni_proto_sock_option push_sock_options[] = { +static nni_proto_sock_option push0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = push_sock_getopt_raw, - .pso_setopt = push_sock_setopt_raw, + .pso_getopt = push0_sock_getopt_raw, + .pso_setopt = push0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops push_sock_ops = { - .sock_init = push_sock_init, - .sock_fini = push_sock_fini, - .sock_open = push_sock_open, - .sock_close = push_sock_close, - .sock_options = push_sock_options, - .sock_send = push_sock_send, - .sock_recv = push_sock_recv, +static nni_proto_sock_ops push0_sock_ops = { + .sock_init = push0_sock_init, + .sock_fini = push0_sock_fini, + .sock_open = push0_sock_open, + .sock_close = push0_sock_close, + .sock_options = push0_sock_options, + .sock_send = push0_sock_send, + .sock_recv = push0_sock_recv, }; -static nni_proto push_proto = { +static nni_proto push0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUSH_V0, "push" }, .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_pipe_ops = &push_pipe_ops, - .proto_sock_ops = &push_sock_ops, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, }; int nng_push0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &push_proto)); + return (nni_proto_open(sidp, &push0_proto)); } diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h new file mode 100644 index 00000000..c7303b92 --- /dev/null +++ b/src/protocol/pipeline0/push.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H +#define NNG_PROTOCOL_PIPELINE0_PUSH_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_push0_open(nng_socket *); + +#ifndef nng_push_open +#define nng_push_open nng_push0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt new file mode 100644 index 00000000..4edcbfae --- /dev/null +++ b/src/protocol/pubsub0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUB0) + set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h) + install(FILES pub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +if (NNG_PROTO_SUB0) + set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h) + install(FILES sub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c index 9e5cd67f..f4a33b77 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -12,24 +12,33 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pubsub0/pub.h" // Publish protocol. The PUB protocol simply sends messages out, as // a broadcast. It has nothing more sophisticated because it does not // perform sender-side filtering. Its best effort delivery, so anything // that can't receive the message won't get one. -typedef struct pub_pipe pub_pipe; -typedef struct pub_sock pub_sock; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif -static void pub_pipe_recv_cb(void *); -static void pub_pipe_send_cb(void *); -static void pub_pipe_getq_cb(void *); -static void pub_sock_getq_cb(void *); -static void pub_sock_fini(void *); -static void pub_pipe_fini(void *); +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif -// A pub_sock is our per-socket protocol private structure. -struct pub_sock { +typedef struct pub0_pipe pub0_pipe; +typedef struct pub0_sock pub0_sock; + +static void pub0_pipe_recv_cb(void *); +static void pub0_pipe_send_cb(void *); +static void pub0_pipe_getq_cb(void *); +static void pub0_sock_getq_cb(void *); +static void pub0_sock_fini(void *); +static void pub0_pipe_fini(void *); + +// pub0_sock is our per-socket protocol private structure. +struct pub0_sock { nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -37,10 +46,10 @@ struct pub_sock { nni_mtx mtx; }; -// A pub_pipe is our per-pipe protocol private structure. -struct pub_pipe { +// pub0_pipe is our per-pipe protocol private structure. +struct pub0_pipe { nni_pipe * pipe; - pub_sock * pub; + pub0_sock * pub; nni_msgq * sendq; nni_aio * aio_getq; nni_aio * aio_send; @@ -49,9 +58,9 @@ struct pub_pipe { }; static void -pub_sock_fini(void *arg) +pub0_sock_fini(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -60,22 +69,22 @@ pub_sock_fini(void *arg) } static int -pub_sock_init(void **sp, nni_sock *sock) +pub0_sock_init(void **sp, nni_sock *sock) { - pub_sock *s; - int rv; + pub0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) { - pub_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) { + pub0_sock_fini(s); return (rv); } s->raw = 0; - NNI_LIST_INIT(&s->pipes, pub_pipe, node); + NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock) } static void -pub_sock_open(void *arg) +pub0_sock_open(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -pub_sock_close(void *arg) +pub0_sock_close(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -pub_pipe_fini(void *arg) +pub0_pipe_fini(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -111,10 +120,10 @@ pub_pipe_fini(void *arg) } static int -pub_pipe_init(void **pp, nni_pipe *pipe, void *s) +pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pub_pipe *p; - int rv; + pub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - pub_pipe_fini(p); + pub0_pipe_fini(p); return (rv); } @@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pub_pipe_start(void *arg) +pub0_pipe_start(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { return (NNG_EPROTO); @@ -157,10 +166,10 @@ pub_pipe_start(void *arg) } static void -pub_pipe_stop(void *arg) +pub0_pipe_stop(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -176,15 +185,15 @@ pub_pipe_stop(void *arg) } static void -pub_sock_getq_cb(void *arg) +pub0_sock_getq_cb(void *arg) { - pub_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg, *dup; + pub0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg, *dup; - pub_pipe *p; - pub_pipe *last; - int rv; + pub0_pipe *p; + pub0_pipe *last; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg) } static void -pub_pipe_recv_cb(void *arg) +pub0_pipe_recv_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg) } static void -pub_pipe_getq_cb(void *arg) +pub0_pipe_getq_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg) } static void -pub_pipe_send_cb(void *arg) +pub0_pipe_send_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg) } static int -pub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pub_sock_recv(void *arg, nni_aio *aio) +pub0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pub_sock_send(void *arg, nni_aio *aio) +pub0_sock_send(void *arg, nni_aio *aio) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } -static nni_proto_pipe_ops pub_pipe_ops = { - .pipe_init = pub_pipe_init, - .pipe_fini = pub_pipe_fini, - .pipe_start = pub_pipe_start, - .pipe_stop = pub_pipe_stop, +static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_init = pub0_pipe_init, + .pipe_fini = pub0_pipe_fini, + .pipe_start = pub0_pipe_start, + .pipe_stop = pub0_pipe_stop, }; -static nni_proto_sock_option pub_sock_options[] = { +static nni_proto_sock_option pub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pub_sock_getopt_raw, - .pso_setopt = pub_sock_setopt_raw, + .pso_getopt = pub0_sock_getopt_raw, + .pso_setopt = pub0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pub_sock_ops = { - .sock_init = pub_sock_init, - .sock_fini = pub_sock_fini, - .sock_open = pub_sock_open, - .sock_close = pub_sock_close, - .sock_send = pub_sock_send, - .sock_recv = pub_sock_recv, - .sock_options = pub_sock_options, +static nni_proto_sock_ops pub0_sock_ops = { + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_options = pub0_sock_options, }; -static nni_proto pub_proto = { +static nni_proto pub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUB_V0, "pub" }, .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_sock_ops = &pub_sock_ops, - .proto_pipe_ops = &pub_pipe_ops, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, }; int nng_pub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pub_proto)); + return (nni_proto_open(sidp, &pub0_proto)); } diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h new file mode 100644 index 00000000..2388a292 --- /dev/null +++ b/src/protocol/pubsub0/pub.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H +#define NNG_PROTOCOL_PUBSUB0_PUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pub0_open(nng_socket *); + +#ifndef nng_pub_open +#define nng_pub_open nng_pub0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_PUB_H diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c index 555d528e..6c504d75 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -12,54 +12,60 @@ #include <string.h> #include "core/nng_impl.h" - -const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE; -const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE; +#include "protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. -typedef struct sub_pipe sub_pipe; -typedef struct sub_sock sub_sock; -typedef struct sub_topic sub_topic; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_topic sub0_topic; -static void sub_recv_cb(void *); -static void sub_putq_cb(void *); -static void sub_pipe_fini(void *); +static void sub0_recv_cb(void *); +static void sub0_putq_cb(void *); +static void sub0_pipe_fini(void *); -struct sub_topic { +struct sub0_topic { nni_list_node node; size_t len; void * buf; }; -// An nni_rep_sock is our per-socket protocol private structure. -struct sub_sock { +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { nni_list topics; nni_msgq *urq; int raw; nni_mtx lk; }; -// An nni_rep_pipe is our per-pipe protocol private structure. -struct sub_pipe { - nni_pipe *pipe; - sub_sock *sub; - nni_aio * aio_recv; - nni_aio * aio_putq; +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static int -sub_sock_init(void **sp, nni_sock *sock) +sub0_sock_init(void **sp, nni_sock *sock) { - sub_sock *s; + sub0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); - NNI_LIST_INIT(&s->topics, sub_topic, node); + NNI_LIST_INIT(&s->topics, sub0_topic, node); s->raw = 0; s->urq = nni_sock_recvq(sock); @@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock) } static void -sub_sock_fini(void *arg) +sub0_sock_fini(void *arg) { - sub_sock * s = arg; - sub_topic *topic; + sub0_sock * s = arg; + sub0_topic *topic; while ((topic = nni_list_first(&s->topics)) != NULL) { nni_list_remove(&s->topics, topic); @@ -83,21 +89,21 @@ sub_sock_fini(void *arg) } static void -sub_sock_open(void *arg) +sub0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_sock_close(void *arg) +sub0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_pipe_fini(void *arg) +sub0_pipe_fini(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); @@ -105,17 +111,17 @@ sub_pipe_fini(void *arg) } static int -sub_pipe_init(void **pp, nni_pipe *pipe, void *s) +sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - sub_pipe *p; - int rv; + sub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) { - sub_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) { + sub0_pipe_fini(p); return (rv); } @@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -sub_pipe_start(void *arg) +sub0_pipe_start(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_pipe_recv(p->pipe, p->aio_recv); return (0); } static void -sub_pipe_stop(void *arg) +sub0_pipe_stop(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); } static void -sub_recv_cb(void *arg) +sub0_recv_cb(void *arg) { - sub_pipe *p = arg; - sub_sock *s = p->sub; - nni_msgq *urq = s->urq; - nni_msg * msg; + sub0_pipe *p = arg; + sub0_sock *s = p->sub; + nni_msgq * urq = s->urq; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -164,9 +170,9 @@ sub_recv_cb(void *arg) } static void -sub_putq_cb(void *arg) +sub0_putq_cb(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -184,11 +190,11 @@ sub_putq_cb(void *arg) // to replace this with a patricia trie, like old nanomsg had. static int -sub_subscribe(void *arg, const void *buf, size_t sz) +sub0_subscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - sub_topic *newtopic; + sub0_sock * s = arg; + sub0_topic *topic; + sub0_topic *newtopic; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } static int -sub_unsubscribe(void *arg, const void *buf, size_t sz) +sub0_unsubscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - int rv; + sub0_sock * s = arg; + sub0_topic *topic; + int rv; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) } static int -sub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -sub_sock_send(void *arg, nni_aio *aio) +sub0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -sub_sock_recv(void *arg, nni_aio *aio) +sub0_sock_recv(void *arg, nni_aio *aio) { - sub_sock *s = arg; + sub0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } static nni_msg * -sub_sock_filter(void *arg, nni_msg *msg) +sub0_sock_filter(void *arg, nni_msg *msg) { - sub_sock * s = arg; - sub_topic *topic; - char * body; - size_t len; - int match; + sub0_sock * s = arg; + sub0_topic *topic; + char * body; + size_t len; + int match; nni_mtx_lock(&s->lk); if (s->raw) { @@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops sub_pipe_ops = { - .pipe_init = sub_pipe_init, - .pipe_fini = sub_pipe_fini, - .pipe_start = sub_pipe_start, - .pipe_stop = sub_pipe_stop, +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_stop = sub0_pipe_stop, }; -static nni_proto_sock_option sub_sock_options[] = { +static nni_proto_sock_option sub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = sub_sock_getopt_raw, - .pso_setopt = sub_sock_setopt_raw, + .pso_getopt = sub0_sock_getopt_raw, + .pso_setopt = sub0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_subscribe, + .pso_setopt = sub0_subscribe, }, { .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_unsubscribe, + .pso_setopt = sub0_unsubscribe, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops sub_sock_ops = { - .sock_init = sub_sock_init, - .sock_fini = sub_sock_fini, - .sock_open = sub_sock_open, - .sock_close = sub_sock_close, - .sock_send = sub_sock_send, - .sock_recv = sub_sock_recv, - .sock_filter = sub_sock_filter, - .sock_options = sub_sock_options, +static nni_proto_sock_ops sub0_sock_ops = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = sub0_sock_filter, + .sock_options = sub0_sock_options, }; -static nni_proto sub_proto = { +static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &sub_sock_ops, - .proto_pipe_ops = &sub_pipe_ops, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, }; int nng_sub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &sub_proto)); + return (nni_proto_open(sidp, &sub0_proto)); } diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h new file mode 100644 index 00000000..1a09145d --- /dev/null +++ b/src/protocol/pubsub0/sub.h @@ -0,0 +1,31 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H +#define NNG_PROTOCOL_PUBSUB0_SUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_sub0_open(nng_socket *); + +#ifndef nng_sub_open +#define nng_sub_open nng_sub0_open +#endif + +#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" +#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_SUB_H diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt new file mode 100644 index 00000000..4e82ad41 --- /dev/null +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_REQ0) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + install(FILES req.h DESTINATION include/nng/protocol/reqrep0) +endif() + +if (NNG_PROTO_REP0) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + install(FILES rep.h DESTINATION include/nng/protocol/reqrep0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c index 100e739d..ee8e4277 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" // Response protocol. The REP protocol is the "reply" side of a // request-reply pair. This is useful for building RPC servers, for // example. -typedef struct rep_pipe rep_pipe; -typedef struct rep_sock rep_sock; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -static void rep_sock_getq_cb(void *); -static void rep_pipe_getq_cb(void *); -static void rep_pipe_putq_cb(void *); -static void rep_pipe_send_cb(void *); -static void rep_pipe_recv_cb(void *); -static void rep_pipe_fini(void *); +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -// A rep_sock is our per-socket protocol private structure. -struct rep_sock { +typedef struct rep0_pipe rep0_pipe; +typedef struct rep0_sock rep0_sock; + +static void rep0_sock_getq_cb(void *); +static void rep0_pipe_getq_cb(void *); +static void rep0_pipe_putq_cb(void *); +static void rep0_pipe_send_cb(void *); +static void rep0_pipe_recv_cb(void *); +static void rep0_pipe_fini(void *); + +// rep0_sock is our per-socket protocol private structure. +struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -40,21 +49,21 @@ struct rep_sock { nni_aio * aio_getq; }; -// A rep_pipe is our per-pipe protocol private structure. -struct rep_pipe { - nni_pipe *pipe; - rep_sock *rep; - nni_msgq *sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; +// rep0_pipe is our per-pipe protocol private structure. +struct rep0_pipe { + nni_pipe * pipe; + rep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static void -rep_sock_fini(void *arg) +rep0_sock_fini(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -67,18 +76,18 @@ rep_sock_fini(void *arg) } static int -rep_sock_init(void **sp, nni_sock *sock) +rep0_sock_init(void **sp, nni_sock *sock) { - rep_sock *s; - int rv; + rep0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { - rep_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + rep0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock) } static void -rep_sock_open(void *arg) +rep0_sock_open(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -rep_sock_close(void *arg) +rep0_sock_close(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -rep_pipe_fini(void *arg) +rep0_pipe_fini(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,20 +133,20 @@ rep_pipe_fini(void *arg) } static int -rep_pipe_init(void **pp, nni_pipe *pipe, void *s) +rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - rep_pipe *p; - int rv; + rep0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) { - rep_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + rep0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -rep_pipe_start(void *arg) +rep0_pipe_start(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - int rv; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + int rv; if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); @@ -164,10 +173,10 @@ rep_pipe_start(void *arg) } static void -rep_pipe_stop(void *arg) +rep0_pipe_stop(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_getq); @@ -179,14 +188,14 @@ rep_pipe_stop(void *arg) } static void -rep_sock_getq_cb(void *arg) +rep0_sock_getq_cb(void *arg) { - rep_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg; - uint32_t id; - rep_pipe *p; - int rv; + rep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + rep0_pipe *p; + int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg) } static void -rep_pipe_getq_cb(void *arg) +rep0_pipe_getq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg) } static void -rep_pipe_send_cb(void *arg) +rep0_pipe_send_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg) } static void -rep_pipe_recv_cb(void *arg) +rep0_pipe_recv_cb(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - nni_msg * msg; - int rv; - uint8_t * body; - int hops; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -329,9 +338,9 @@ drop: } static void -rep_pipe_putq_cb(void *arg) +rep0_pipe_putq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg) } static int -rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; - int rv; + rep0_sock *s = arg; + int rv; nni_mtx_lock(&s->lk); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -rep_sock_getopt_raw(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static nni_msg * -rep_sock_filter(void *arg, nni_msg *msg) +rep0_sock_filter(void *arg, nni_msg *msg) { - rep_sock *s = arg; - char * header; - size_t len; + rep0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->lk); if (s->raw) { @@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg) } static void -rep_sock_send(void *arg, nni_aio *aio) +rep0_sock_send(void *arg, nni_aio *aio) { - rep_sock *s = arg; - int rv; - nni_msg * msg; + rep0_sock *s = arg; + int rv; + nni_msg * msg; nni_mtx_lock(&s->lk); if (s->raw) { @@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio) } static void -rep_sock_recv(void *arg, nni_aio *aio) +rep0_sock_recv(void *arg, nni_aio *aio) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops rep_pipe_ops = { - .pipe_init = rep_pipe_init, - .pipe_fini = rep_pipe_fini, - .pipe_start = rep_pipe_start, - .pipe_stop = rep_pipe_stop, +static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_init = rep0_pipe_init, + .pipe_fini = rep0_pipe_fini, + .pipe_start = rep0_pipe_start, + .pipe_stop = rep0_pipe_stop, }; -static nni_proto_sock_option rep_sock_options[] = { +static nni_proto_sock_option rep0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = rep_sock_getopt_raw, - .pso_setopt = rep_sock_setopt_raw, + .pso_getopt = rep0_sock_getopt_raw, + .pso_setopt = rep0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = rep_sock_getopt_maxttl, - .pso_setopt = rep_sock_setopt_maxttl, + .pso_getopt = rep0_sock_getopt_maxttl, + .pso_setopt = rep0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops rep_sock_ops = { - .sock_init = rep_sock_init, - .sock_fini = rep_sock_fini, - .sock_open = rep_sock_open, - .sock_close = rep_sock_close, - .sock_options = rep_sock_options, - .sock_filter = rep_sock_filter, - .sock_send = rep_sock_send, - .sock_recv = rep_sock_recv, +static nni_proto_sock_ops rep0_sock_ops = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = rep0_sock_filter, + .sock_send = rep0_sock_send, + .sock_recv = rep0_sock_recv, }; -static nni_proto nni_rep_proto = { +static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, .proto_peer = { NNI_PROTO_REQ_V0, "req" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &rep_sock_ops, - .proto_pipe_ops = &rep_pipe_ops, + .proto_sock_ops = &rep0_sock_ops, + .proto_pipe_ops = &rep0_pipe_ops, }; int nng_rep0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_rep_proto)); + return (nni_proto_open(sidp, &rep0_proto)); } diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h new file mode 100644 index 00000000..93df9379 --- /dev/null +++ b/src/protocol/reqrep0/rep.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REP_H +#define NNG_PROTOCOL_REQREP0_REP_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_rep0_open(nng_socket *); + +#ifndef nng_rep_open +#define nng_rep_open nng_rep0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REP_H diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c index bead1ec4..94c7f1a0 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep0/req.c @@ -13,21 +13,28 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -typedef struct req_pipe req_pipe; -typedef struct req_sock req_sock; +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -static void req_resend(req_sock *); -static void req_timeout(void *); -static void req_pipe_fini(void *); +typedef struct req0_pipe req0_pipe; +typedef struct req0_sock req0_sock; -// A req_sock is our per-socket protocol private structure. -struct req_sock { +static void req0_resend(req0_sock *); +static void req0_timeout(void *); +static void req0_pipe_fini(void *); + +// A req0_sock is our per-socket protocol private structure. +struct req0_sock { nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -38,7 +45,7 @@ struct req_sock { int ttl; nni_msg * reqmsg; - req_pipe *pendpipe; + req0_pipe *pendpipe; nni_list readypipes; nni_list busypipes; @@ -51,10 +58,10 @@ struct req_sock { nni_cv cv; }; -// A req_pipe is our per-pipe protocol private structure. -struct req_pipe { +// A req0_pipe is our per-pipe protocol private structure. +struct req0_pipe { nni_pipe * pipe; - req_sock * req; + req0_sock * req; nni_list_node node; nni_aio * aio_getq; // raw mode only nni_aio * aio_sendraw; // raw mode only @@ -64,17 +71,17 @@ struct req_pipe { nni_mtx mtx; }; -static void req_resender(void *); -static void req_getq_cb(void *); -static void req_sendraw_cb(void *); -static void req_sendcooked_cb(void *); -static void req_recv_cb(void *); -static void req_putq_cb(void *); +static void req0_resender(void *); +static void req0_getq_cb(void *); +static void req0_sendraw_cb(void *); +static void req0_sendcooked_cb(void *); +static void req0_recv_cb(void *); +static void req0_putq_cb(void *); static int -req_sock_init(void **sp, nni_sock *sock) +req0_sock_init(void **sp, nni_sock *sock) { - req_sock *s; + req0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock) nni_mtx_init(&s->mtx); nni_cv_init(&s->cv, &s->mtx); - NNI_LIST_INIT(&s->readypipes, req_pipe, node); - NNI_LIST_INIT(&s->busypipes, req_pipe, node); - nni_timer_init(&s->timer, req_timeout, s); + NNI_LIST_INIT(&s->readypipes, req0_pipe, node); + NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + nni_timer_init(&s->timer, req0_timeout, s); // this is "semi random" start for request IDs. s->nextid = nni_random(); @@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock) } static void -req_sock_open(void *arg) +req0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -req_sock_close(void *arg) +req0_sock_close(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); s->closed = 1; @@ -120,9 +127,9 @@ req_sock_close(void *arg) } static void -req_sock_fini(void *arg) +req0_sock_fini(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); while ((!nni_list_empty(&s->readypipes)) || @@ -139,9 +146,9 @@ req_sock_fini(void *arg) } static void -req_pipe_fini(void *arg) +req0_pipe_fini(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_putq); @@ -153,22 +160,22 @@ req_pipe_fini(void *arg) } static int -req_pipe_init(void **pp, nni_pipe *pipe, void *s) +req0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - req_pipe *p; - int rv; + req0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) != + if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != 0)) { - req_pipe_fini(p); + req0_pipe_fini(p); return (rv); } @@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -req_pipe_start(void *arg) +req0_pipe_start(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { return (NNG_EPROTO); @@ -198,7 +205,7 @@ req_pipe_start(void *arg) // If sock was waiting for somewhere to send data, go ahead and // send it to this pipe. if (s->wantw) { - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); @@ -208,10 +215,10 @@ req_pipe_start(void *arg) } static void -req_pipe_stop(void *arg) +req0_pipe_stop(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_putq); @@ -238,50 +245,50 @@ req_pipe_stop(void *arg) s->pendpipe = NULL; s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static int -req_sock_setopt_raw(void *arg, const void *buf, size_t sz) +req0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -req_sock_getopt_raw(void *arg, void *buf, size_t *szp) +req0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static int -req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_ms(&s->retry, buf, sz)); } static int -req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_ms(s->retry, buf, szp)); } @@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) // kind of priority.) static void -req_getq_cb(void *arg) +req0_getq_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. @@ -326,9 +333,9 @@ req_getq_cb(void *arg) } static void -req_sendraw_cb(void *arg) +req0_sendraw_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_sendraw) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); @@ -342,10 +349,10 @@ req_sendraw_cb(void *arg) } static void -req_sendcooked_cb(void *arg) +req0_sendcooked_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_aio_result(p->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. @@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg) if (nni_list_active(&s->busypipes, p)) { nni_list_remove(&s->busypipes, p); nni_list_append(&s->readypipes, p); - req_resend(s); + req0_resend(s); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the @@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg) } static void -req_putq_cb(void *arg) +req0_putq_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -393,10 +400,10 @@ req_putq_cb(void *arg) } static void -req_recv_cb(void *arg) +req0_recv_cb(void *arg) { - req_pipe *p = arg; - nni_msg * msg; + req0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -431,23 +438,23 @@ malformed: } static void -req_timeout(void *arg) +req0_timeout(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->reqmsg != NULL) { s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static void -req_resend(req_sock *s) +req0_resend(req0_sock *s) { - req_pipe *p; - nni_msg * msg; + req0_pipe *p; + nni_msg * msg; // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode @@ -499,13 +506,13 @@ req_resend(req_sock *s) } static void -req_sock_send(void *arg, nni_aio *aio) +req0_sock_send(void *arg, nni_aio *aio) { - req_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; + req0_sock *s = arg; + uint32_t id; + size_t len; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio) s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); nni_mtx_unlock(&s->mtx); @@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio) } static nni_msg * -req_sock_filter(void *arg, nni_msg *msg) +req0_sock_filter(void *arg, nni_msg *msg) { - req_sock *s = arg; - nni_msg * rmsg; + req0_sock *s = arg; + nni_msg * rmsg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg) } static void -req_sock_recv(void *arg, nni_aio *aio) +req0_sock_recv(void *arg, nni_aio *aio) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (!s->raw) { @@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio) nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops req_pipe_ops = { - .pipe_init = req_pipe_init, - .pipe_fini = req_pipe_fini, - .pipe_start = req_pipe_start, - .pipe_stop = req_pipe_stop, +static nni_proto_pipe_ops req0_pipe_ops = { + .pipe_init = req0_pipe_init, + .pipe_fini = req0_pipe_fini, + .pipe_start = req0_pipe_start, + .pipe_stop = req0_pipe_stop, }; -static nni_proto_sock_option req_sock_options[] = { +static nni_proto_sock_option req0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = req_sock_getopt_raw, - .pso_setopt = req_sock_setopt_raw, + .pso_getopt = req0_sock_getopt_raw, + .pso_setopt = req0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = req_sock_getopt_maxttl, - .pso_setopt = req_sock_setopt_maxttl, + .pso_getopt = req0_sock_getopt_maxttl, + .pso_setopt = req0_sock_setopt_maxttl, }, { .pso_name = NNG_OPT_REQ_RESENDTIME, - .pso_getopt = req_sock_getopt_resendtime, - .pso_setopt = req_sock_setopt_resendtime, + .pso_getopt = req0_sock_getopt_resendtime, + .pso_setopt = req0_sock_setopt_resendtime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops req_sock_ops = { - .sock_init = req_sock_init, - .sock_fini = req_sock_fini, - .sock_open = req_sock_open, - .sock_close = req_sock_close, - .sock_options = req_sock_options, - .sock_filter = req_sock_filter, - .sock_send = req_sock_send, - .sock_recv = req_sock_recv, +static nni_proto_sock_ops req0_sock_ops = { + .sock_init = req0_sock_init, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_filter = req0_sock_filter, + .sock_send = req0_sock_send, + .sock_recv = req0_sock_recv, }; -static nni_proto req_proto = { +static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REQ_V0, "req" }, .proto_peer = { NNI_PROTO_REP_V0, "rep" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &req_sock_ops, - .proto_pipe_ops = &req_pipe_ops, + .proto_sock_ops = &req0_sock_ops, + .proto_pipe_ops = &req0_pipe_ops, }; int nng_req0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &req_proto)); + return (nni_proto_open(sidp, &req0_proto)); } diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h new file mode 100644 index 00000000..99c9bf62 --- /dev/null +++ b/src/protocol/reqrep0/req.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REQ_H +#define NNG_PROTOCOL_REQREP0_REQ_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_req0_open(nng_socket *); + +#ifndef nng_req_open +#define nng_req_open nng_req0_open +#endif + +#define NNG_OPT_REQ_RESENDTIME "req:resend-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REQ_H diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..61e5aa7b --- /dev/null +++ b/src/protocol/survey0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_SURVEYOR0) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + install(FILES survey.h DESTINATION include/nng/protocol/survey0) +endif() + +if (NNG_PROTO_RESPONDENT0) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + install(FILES respond.h DESTINATION include/nng/protocol/survey0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c index 94730be6..73e919c3 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey0/respond.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/respond.h" // Respondent protocol. The RESPONDENT protocol is the "replier" side of // the surveyor pattern. This is useful for building service discovery, or -// voting algorithsm, for example. +// voting algorithms, for example. -typedef struct resp_pipe resp_pipe; -typedef struct resp_sock resp_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void resp_recv_cb(void *); -static void resp_putq_cb(void *); -static void resp_getq_cb(void *); -static void resp_send_cb(void *); -static void resp_sock_getq_cb(void *); -static void resp_pipe_fini(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A resp_sock is our per-socket protocol private structure. -struct resp_sock { +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; + +static void resp0_recv_cb(void *); +static void resp0_putq_cb(void *); +static void resp0_getq_cb(void *); +static void resp0_send_cb(void *); +static void resp0_sock_getq_cb(void *); +static void resp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { nni_msgq * urq; nni_msgq * uwq; int raw; @@ -40,22 +49,22 @@ struct resp_sock { nni_mtx mtx; }; -// A resp_pipe is our per-pipe protocol private structure. -struct resp_pipe { - nni_pipe * npipe; - resp_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; }; static void -resp_sock_fini(void *arg) +resp0_sock_fini(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -68,18 +77,18 @@ resp_sock_fini(void *arg) } static int -resp_sock_init(void **sp, nni_sock *nsock) +resp0_sock_init(void **sp, nni_sock *nsock) { - resp_sock *s; - int rv; + resp0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { - resp_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + resp0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock) } static void -resp_sock_open(void *arg) +resp0_sock_open(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -resp_sock_close(void *arg) +resp0_sock_close(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -resp_pipe_fini(void *arg) +resp0_pipe_fini(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_getq); @@ -124,20 +133,20 @@ resp_pipe_fini(void *arg) } static int -resp_pipe_init(void **pp, nni_pipe *npipe, void *s) +resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - resp_pipe *p; - int rv; + resp0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) { - resp_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + resp0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -resp_pipe_start(void *arg) +resp0_pipe_start(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; p->id = nni_pipe_id(p->npipe); @@ -170,10 +179,10 @@ resp_pipe_start(void *arg) } static void -resp_pipe_stop(void *arg) +resp0_pipe_stop(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_putq); @@ -189,19 +198,19 @@ resp_pipe_stop(void *arg) } } -// resp_sock_send watches for messages from the upper write queue, +// resp0_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket.s void -resp_sock_getq_cb(void *arg) +resp0_sock_getq_cb(void *arg) { - resp_sock *s = arg; - nni_msg * msg; - uint32_t id; - resp_pipe *p; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + resp0_pipe *p; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg) } void -resp_getq_cb(void *arg) +resp0_getq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -249,9 +258,9 @@ resp_getq_cb(void *arg) } void -resp_send_cb(void *arg) +resp0_send_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -264,14 +273,14 @@ resp_send_cb(void *arg) } static void -resp_recv_cb(void *arg) +resp0_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq = s->urq; - nni_msg * msg; - int hops; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int rv; if (nni_aio_result(p->aio_recv) != 0) { goto error; @@ -324,9 +333,9 @@ error: } static void -resp_putq_cb(void *arg) +resp0_putq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -338,10 +347,10 @@ resp_putq_cb(void *arg) } static int -resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; - int rv; + resp0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -resp_sock_getopt_raw(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static void -resp_sock_send(void *arg, nni_aio *aio) +resp0_sock_send(void *arg, nni_aio *aio) { - resp_sock *s = arg; - nni_msg * msg; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio) } static nni_msg * -resp_sock_filter(void *arg, nni_msg *msg) +resp0_sock_filter(void *arg, nni_msg *msg) { - resp_sock *s = arg; - char * header; - size_t len; + resp0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg) } static void -resp_sock_recv(void *arg, nni_aio *aio) +resp0_sock_recv(void *arg, nni_aio *aio) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops resp_pipe_ops = { - .pipe_init = resp_pipe_init, - .pipe_fini = resp_pipe_fini, - .pipe_start = resp_pipe_start, - .pipe_stop = resp_pipe_stop, +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_stop = resp0_pipe_stop, }; -static nni_proto_sock_option resp_sock_options[] = { +static nni_proto_sock_option resp0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = resp_sock_getopt_raw, - .pso_setopt = resp_sock_setopt_raw, + .pso_getopt = resp0_sock_getopt_raw, + .pso_setopt = resp0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = resp_sock_getopt_maxttl, - .pso_setopt = resp_sock_setopt_maxttl, + .pso_getopt = resp0_sock_getopt_maxttl, + .pso_setopt = resp0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops resp_sock_ops = { - .sock_init = resp_sock_init, - .sock_fini = resp_sock_fini, - .sock_open = resp_sock_open, - .sock_close = resp_sock_close, - .sock_filter = resp_sock_filter, - .sock_send = resp_sock_send, - .sock_recv = resp_sock_recv, - .sock_options = resp_sock_options, +static nni_proto_sock_ops resp0_sock_ops = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = resp0_sock_filter, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, }; -static nni_proto resp_proto = { +static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &resp_sock_ops, - .proto_pipe_ops = &resp_pipe_ops, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, }; int nng_respondent0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &resp_proto)); + return (nni_proto_open(sidp, &resp0_proto)); } diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h new file mode 100644 index 00000000..58c65298 --- /dev/null +++ b/src/protocol/survey0/respond.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H +#define NNG_PROTOCOL_SURVEY0_RESPOND_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_respondent0_open(nng_socket *); + +#ifndef nng_respondent_open +#define nng_respondent_open nng_respondent0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c index 9dcd6664..02283436 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey0/survey.c @@ -12,22 +12,31 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -typedef struct surv_pipe surv_pipe; -typedef struct surv_sock surv_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void surv_sock_getq_cb(void *); -static void surv_getq_cb(void *); -static void surv_putq_cb(void *); -static void surv_send_cb(void *); -static void surv_recv_cb(void *); -static void surv_timeout(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A surv_sock is our per-socket protocol private structure. -struct surv_sock { +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; + +static void surv0_sock_getq_cb(void *); +static void surv0_getq_cb(void *); +static void surv0_putq_cb(void *); +static void surv0_send_cb(void *); +static void surv0_recv_cb(void *); +static void surv0_timeout(void *); + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { nni_duration survtime; nni_time expire; int raw; @@ -42,10 +51,10 @@ struct surv_sock { nni_mtx mtx; }; -// A surv_pipe is our per-pipe protocol private structure. -struct surv_pipe { +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { nni_pipe * npipe; - surv_sock * psock; + surv0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -55,9 +64,9 @@ struct surv_pipe { }; static void -surv_sock_fini(void *arg) +surv0_sock_fini(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -66,21 +75,21 @@ surv_sock_fini(void *arg) } static int -surv_sock_init(void **sp, nni_sock *nsock) +surv0_sock_init(void **sp, nni_sock *nsock) { - surv_sock *s; - int rv; + surv0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) { - surv_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { + surv0_sock_fini(s); return (rv); } - NNI_LIST_INIT(&s->pipes, surv_pipe, node); + NNI_LIST_INIT(&s->pipes, surv0_pipe, node); nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv_timeout, s); + nni_timer_init(&s->timer, surv0_timeout, s); s->nextid = nni_random(); s->raw = 0; @@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock) } static void -surv_sock_open(void *arg) +surv0_sock_open(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -surv_sock_close(void *arg) +surv0_sock_close(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_timer_cancel(&s->timer); nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -surv_pipe_fini(void *arg) +surv0_pipe_fini(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,21 +133,21 @@ surv_pipe_fini(void *arg) } static int -surv_pipe_init(void **pp, nni_pipe *npipe, void *s) +surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - surv_pipe *p; - int rv; + surv0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } // This depth could be tunable. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) { - surv_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + surv0_pipe_fini(p); return (rv); } @@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -surv_pipe_start(void *arg) +surv0_pipe_start(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); @@ -164,10 +173,10 @@ surv_pipe_start(void *arg) } static void -surv_pipe_stop(void *arg) +surv0_pipe_stop(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -184,9 +193,9 @@ surv_pipe_stop(void *arg) } static void -surv_getq_cb(void *arg) +surv0_getq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -200,9 +209,9 @@ surv_getq_cb(void *arg) } static void -surv_send_cb(void *arg) +surv0_send_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -215,9 +224,9 @@ surv_send_cb(void *arg) } static void -surv_putq_cb(void *arg) +surv0_putq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -230,10 +239,10 @@ surv_putq_cb(void *arg) } static void -surv_recv_cb(void *arg) +surv0_recv_cb(void *arg) { - surv_pipe *p = arg; - nni_msg * msg; + surv0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { goto failed; @@ -265,10 +274,10 @@ failed: } static int -surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; - int rv; + surv0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { @@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -surv_sock_getopt_raw(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_setopt_ms(&s->survtime, buf, sz)); } static int -surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_ms(s->survtime, buf, szp)); } static void -surv_sock_getq_cb(void *arg) +surv0_sock_getq_cb(void *arg) { - surv_sock *s = arg; - surv_pipe *p; - surv_pipe *last; - nni_msg * msg, *dup; + surv0_sock *s = arg; + surv0_pipe *p; + surv0_pipe *last; + nni_msg * msg, *dup; if (nni_aio_result(s->aio_getq) != 0) { // Should be NNG_ECLOSED. @@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg) } static void -surv_timeout(void *arg) +surv0_timeout(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); s->survid = 0; @@ -349,9 +358,9 @@ surv_timeout(void *arg) } static void -surv_sock_recv(void *arg, nni_aio *aio) +surv0_sock_recv(void *arg, nni_aio *aio) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->survid == 0) { @@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio) } static void -surv_sock_send(void *arg, nni_aio *aio) +surv0_sock_send(void *arg, nni_aio *aio) { - surv_sock *s = arg; - nni_msg * msg; - int rv; + surv0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio) } static nni_msg * -surv_sock_filter(void *arg, nni_msg *msg) +surv0_sock_filter(void *arg, nni_msg *msg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg) return (msg); } -static nni_proto_pipe_ops surv_pipe_ops = { - .pipe_init = surv_pipe_init, - .pipe_fini = surv_pipe_fini, - .pipe_start = surv_pipe_start, - .pipe_stop = surv_pipe_stop, +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_stop = surv0_pipe_stop, }; -static nni_proto_sock_option surv_sock_options[] = { +static nni_proto_sock_option surv0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = surv_sock_getopt_raw, - .pso_setopt = surv_sock_setopt_raw, + .pso_getopt = surv0_sock_getopt_raw, + .pso_setopt = surv0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, - .pso_getopt = surv_sock_getopt_surveytime, - .pso_setopt = surv_sock_setopt_surveytime, + .pso_getopt = surv0_sock_getopt_surveytime, + .pso_setopt = surv0_sock_setopt_surveytime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops surv_sock_ops = { - .sock_init = surv_sock_init, - .sock_fini = surv_sock_fini, - .sock_open = surv_sock_open, - .sock_close = surv_sock_close, - .sock_send = surv_sock_send, - .sock_recv = surv_sock_recv, - .sock_filter = surv_sock_filter, - .sock_options = surv_sock_options, +static nni_proto_sock_ops surv0_sock_ops = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, }; -static nni_proto surv_proto = { +static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &surv_sock_ops, - .proto_pipe_ops = &surv_pipe_ops, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, }; int nng_surveyor0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &surv_proto)); + return (nni_proto_open(sidp, &surv0_proto)); } diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h new file mode 100644 index 00000000..a7b6d943 --- /dev/null +++ b/src/protocol/survey0/survey.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H +#define NNG_PROTOCOL_SURVEY0_SURVEY_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_surveyor0_open(nng_socket *); + +#ifndef nng_surveyor_open +#define nng_surveyor_open nng_surveyor0_open +#endif + +#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H |
