diff options
Diffstat (limited to 'src')
45 files changed, 1640 insertions, 939 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a5a49052..4901789b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -74,30 +74,6 @@ set (NNG_SOURCES core/timer.h core/transport.c core/transport.h - - protocol/bus/bus.c - - protocol/pair/pair_v0.c - protocol/pair/pair_v1.c - - protocol/pipeline/pull.c - protocol/pipeline/push.c - - protocol/pubsub/pub.c - protocol/pubsub/sub.c - - protocol/reqrep/rep.c - protocol/reqrep/req.c - - protocol/survey/respond.c - protocol/survey/survey.c - - transport/inproc/inproc.c - - transport/ipc/ipc.c - - transport/tcp/tcp.c - ) if (NNG_PLATFORM_POSIX) @@ -146,14 +122,19 @@ endif() install(FILES transport/inproc/inproc.h DESTINATION include/nng/transport/inproc) -if (NNG_ENABLE_ZEROTIER) - set (NNG_SOURCES ${NNG_SOURCES} - transport/zerotier/zerotier.c - transport/zerotier/zerotier.h - ) - install(FILES transport/zerotier/zerotier.h - DESTINATION include/nng/transport/zerotier) -endif() + +add_subdirectory(protocol/bus0) +add_subdirectory(protocol/pair0) +add_subdirectory(protocol/pair1) +add_subdirectory(protocol/pipeline0) +add_subdirectory(protocol/pubsub0) +add_subdirectory(protocol/reqrep0) +add_subdirectory(protocol/survey0) + +add_subdirectory(transport/inproc) +add_subdirectory(transport/ipc) +add_subdirectory(transport/tcp) +add_subdirectory(transport/zerotier) include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src ${NNG_REQUIRED_INCLUDES}) @@ -216,3 +197,7 @@ install (TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_static LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) + +# Promote settings to parent +set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} PARENT_SCOPE) +set(NNG_REQUIRED_LFLAGS ${NNG_REQUIRED_LFLAGS} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 3f1ffcb5..10bffaa4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -297,7 +297,6 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - nni_aio *aio; if (mq->mq_cb_fn != NULL) { int flags = 0; diff --git a/src/core/protocol.h b/src/core/protocol.h index 5db259f0..39bde059 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -147,21 +147,26 @@ extern int nni_proto_open(nng_socket *, const nni_proto *); // Protocol numbers are never more than 16 bits. Also, there will never be // a valid protocol numbered 0 (NNG_PROTO_NONE). #define NNI_PROTO(major, minor) (((major) *16) + (minor)) -enum nng_proto_enum { - NNI_PROTO_NONE = NNI_PROTO(0, 0), - NNI_PROTO_PAIR_V0 = NNI_PROTO(1, 0), - NNI_PROTO_PAIR_V1 = NNI_PROTO(1, 1), - NNI_PROTO_PUB_V0 = NNI_PROTO(2, 0), - NNI_PROTO_SUB_V0 = NNI_PROTO(2, 1), - NNI_PROTO_REQ_V0 = NNI_PROTO(3, 0), - NNI_PROTO_REP_V0 = NNI_PROTO(3, 1), - NNI_PROTO_PUSH_V0 = NNI_PROTO(5, 0), - NNI_PROTO_PULL_V0 = NNI_PROTO(5, 1), - NNI_PROTO_SURVEYOR_V0 = NNI_PROTO(6, 2), - NNI_PROTO_RESPONDENT_V0 = NNI_PROTO(6, 3), - NNI_PROTO_BUS_V0 = NNI_PROTO(7, 0), - NNI_PROTO_STAR_V0 = NNI_PROTO(100, 0), -}; + +// Protocol major numbers. This is here for documentation only, and +// to serve as a "registry" for managing new protocol numbers. Consider +// updating this table when adding new protocols. +// +// Protocol Maj Min Name Notes +// ------------------------------------------- +// NONE 0 0 reserved +// PAIRv0 1 0 pair +// PAIRv1 1 1 pair1 nng only, experimental +// PUBv0 2 0 pub +// SUBv0 2 1 sub +// REQv0 3 0 req +// REPv0 3 1 rep +// PUSHv0 5 0 push +// PULLv0 5 1 pull +// SURVEYORv0 6 2 surveyor minors 0 & 1 retired +// RESPONDENTv0 6 3 respondent +// BUSv0 7 0 bus +// STARv0 100 0 star mangos only, experimental extern int nni_proto_sys_init(void); extern void nni_proto_sys_fini(void); diff --git a/src/core/socket.c b/src/core/socket.c index c9b70ccb..4a84b639 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -102,7 +102,6 @@ static int nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) { int rv; - uint32_t flags; nni_notifyfd *fd; nni_msgq * mq; nni_msgq_cb cb; diff --git a/src/core/transport.c b/src/core/transport.c index 6dcf4538..af9c93fb 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -10,6 +10,9 @@ #include "core/nng_impl.h" #include "transport/inproc/inproc.h" +#include "transport/ipc/ipc.h" +#include "transport/tcp/tcp.h" +#include "transport/zerotier/zerotier.h" #include <stdio.h> #include <string.h> @@ -51,6 +54,11 @@ nni_tran_register(const nni_tran *tran) nni_mtx_lock(&nni_tran_lk); // Check to see if the transport is already registered... NNI_LIST_FOREACH (&nni_tran_list, t) { + if (tran->tran_init == t->t_tran.tran_init) { + nni_mtx_unlock(&nni_tran_lk); + // Same transport, duplicate registration. + return (0); + } if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) { nni_mtx_unlock(&nni_tran_lk); return (NNG_ESTATE); @@ -129,20 +137,40 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz) // nni_tran_sys_init initializes the entire transport subsystem, including // each individual transport. + +typedef int (*nni_tran_ctor)(void); + +static nni_tran_ctor nni_tran_ctors[] = { +#ifdef NNG_HAVE_INPROC + nng_inproc_register, +#endif +#ifdef NNG_HAVE_IPC + nng_ipc_register, +#endif +#ifdef NNG_HAVE_TCP + nng_tcp_register, +#endif +#ifdef NNI_HAVE_ZEROTIER + nng_zt_register, +#endif + NULL, +}; + int nni_tran_sys_init(void) { - int rv; + int i; nni_tran_inited = 1; NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node); nni_mtx_init(&nni_tran_lk); - if (((rv = nng_inproc_register()) != 0) || - ((rv = nni_tran_register(&nni_ipc_tran)) != 0) || - ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) { - nni_tran_sys_fini(); - return (rv); + for (i = 0; nni_tran_ctors[i] != NULL; i++) { + int rv; + if ((rv = (nni_tran_ctors[i])()) != 0) { + nni_tran_sys_fini(); + return (rv); + } } return (0); } @@ -337,33 +337,6 @@ enum nng_flag_enum { NNG_FLAG_NONBLOCK = 2, // Non-blocking operations. }; -// Builtin protocol socket constructors. -NNG_DECL int nng_bus0_open(nng_socket *); -NNG_DECL int nng_pair0_open(nng_socket *); -NNG_DECL int nng_pair1_open(nng_socket *); -NNG_DECL int nng_pub0_open(nng_socket *); -NNG_DECL int nng_sub0_open(nng_socket *); -NNG_DECL int nng_push0_open(nng_socket *); -NNG_DECL int nng_pull0_open(nng_socket *); -NNG_DECL int nng_req0_open(nng_socket *); -NNG_DECL int nng_rep0_open(nng_socket *); -NNG_DECL int nng_surveyor0_open(nng_socket *); -NNG_DECL int nng_respondent0_open(nng_socket *); - -// Default versions. These provide compile time defaults; note that -// the actual protocols are baked into the binary; this should avoid -// suprising. Choosing a new protocol should be done explicitly. -#define nng_bus_open nng_bus0_open -#define nng_pair_open nng_pair1_open -#define nng_pub_open nng_pub0_open -#define nng_sub_open nng_sub0_open -#define nng_push_open nng_push0_open -#define nng_pull_open nng_pull0_open -#define nng_req_open nng_req0_open -#define nng_rep_open nng_rep0_open -#define nng_surveyor_open nng_surveyor0_open -#define nng_respondent_open nng_respondent0_open - // Options. #define NNG_OPT_SOCKNAME "socket-name" #define NNG_OPT_DOMAIN "compat:domain" // legacy compat only @@ -385,15 +358,6 @@ NNG_DECL int nng_respondent0_open(nng_socket *); #define NNG_OPT_RECONNMINT "reconnect-time-min" #define NNG_OPT_RECONNMAXT "reconnect-time-max" -#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" - -#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" -#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" - -#define NNG_OPT_REQ_RESENDTIME "req:resend-time" - -#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" - // XXX: TBD: priorities, socket names, ipv4only // Statistics. These are for informational purposes only, and subject diff --git a/src/nng_compat.c b/src/nng_compat.c index f8a7ceb0..482834a1 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -10,6 +10,16 @@ #include "nng_compat.h" #include "nng.h" +#include "protocol/bus0/bus.h" +#include "protocol/pair0/pair.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "protocol/pubsub0/pub.h" +#include "protocol/pubsub0/sub.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" #include <stdio.h> #include <string.h> @@ -92,17 +102,37 @@ static const struct { uint16_t p_id; int (*p_open)(nng_socket *); } nn_protocols[] = { - // clang-format off +// clang-format off +#ifdef NNG_HAVE_BUS0 { NN_BUS, nng_bus0_open }, +#endif +#ifdef NNG_HAVE_PAIR0 { NN_PAIR, nng_pair0_open }, +#endif +#ifdef NNG_HAVE_PUSH0 { NN_PUSH, nng_push0_open }, +#endif +#ifdef NNG_HAVE_PULL0 { NN_PULL, nng_pull0_open }, +#endif +#ifdef NNG_HAVE_PUB0 { NN_PUB, nng_pub0_open }, +#endif +#ifdef NNG_HAVE_SUB0 { NN_SUB, nng_sub0_open }, +#endif +#ifdef NNG_HAVE_REQ0 { NN_REQ, nng_req0_open }, +#endif +#ifdef NNG_HAVE_REP0 { NN_REP, nng_rep0_open }, +#endif +#ifdef NNG_HAVE_SURVEYOR0 { NN_SURVEYOR, nng_surveyor0_open }, +#endif +#ifdef NNG_HAVE_RESPONDENT0 { NN_RESPONDENT, nng_respondent0_open }, +#endif { 0, NULL }, // clang-format on }; @@ -115,7 +145,7 @@ nn_socket(int domain, int protocol) int i; if ((domain != AF_SP) && (domain != AF_SP_RAW)) { - nn_seterror(EAFNOSUPPORT); + errno = EAFNOSUPPORT; return (-1); } @@ -125,7 +155,7 @@ nn_socket(int domain, int protocol) } } if (nn_protocols[i].p_open == NULL) { - nn_seterror(ENOTSUP); + errno = ENOTSUP; return (-1); } diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt new file mode 100644 index 00000000..5071054a --- /dev/null +++ b/src/protocol/bus0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Bus protocol + +if (NNG_PROTO_BUS0) + set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h) + install(FILES bus.h DESTINATION include/nng/protocol/bus0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c index 6ec0066b..3e15000b 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus0/bus.c @@ -12,31 +12,36 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/bus0/bus.h" // Bus protocol. The BUS protocol, each peer sends a message to its peers. // However, bus protocols do not "forward" (absent a device). So in order // for each participant to receive the message, each sender must be connected // to every other node in the network (full mesh). -typedef struct bus_pipe bus_pipe; -typedef struct bus_sock bus_sock; +#ifndef NNI_PROTO_BUS_V0 +#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0) +#endif -static void bus_sock_getq(bus_sock *); -static void bus_sock_send(void *, nni_aio *); -static void bus_sock_recv(void *, nni_aio *); +typedef struct bus0_pipe bus0_pipe; +typedef struct bus0_sock bus0_sock; -static void bus_pipe_getq(bus_pipe *); -static void bus_pipe_send(bus_pipe *); -static void bus_pipe_recv(bus_pipe *); +static void bus0_sock_getq(bus0_sock *); +static void bus0_sock_send(void *, nni_aio *); +static void bus0_sock_recv(void *, nni_aio *); -static void bus_sock_getq_cb(void *); -static void bus_pipe_getq_cb(void *); -static void bus_pipe_send_cb(void *); -static void bus_pipe_recv_cb(void *); -static void bus_pipe_putq_cb(void *); +static void bus0_pipe_getq(bus0_pipe *); +static void bus0_pipe_send(bus0_pipe *); +static void bus0_pipe_recv(bus0_pipe *); -// A bus_sock is our per-socket protocol private structure. -struct bus_sock { +static void bus0_sock_getq_cb(void *); +static void bus0_pipe_getq_cb(void *); +static void bus0_pipe_send_cb(void *); +static void bus0_pipe_recv_cb(void *); +static void bus0_pipe_putq_cb(void *); + +// bus0_sock is our per-socket protocol private structure. +struct bus0_sock { int raw; nni_aio * aio_getq; nni_list pipes; @@ -45,10 +50,10 @@ struct bus_sock { nni_msgq *urq; }; -// A bus_pipe is our per-pipe protocol private structure. -struct bus_pipe { +// bus0_pipe is our per-pipe protocol private structure. +struct bus0_pipe { nni_pipe * npipe; - bus_sock * psock; + bus0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -59,9 +64,9 @@ struct bus_pipe { }; static void -bus_sock_fini(void *arg) +bus0_sock_fini(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -70,18 +75,18 @@ bus_sock_fini(void *arg) } static int -bus_sock_init(void **sp, nni_sock *nsock) +bus0_sock_init(void **sp, nni_sock *nsock) { - bus_sock *s; - int rv; + bus0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&s->pipes, bus_pipe, node); + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) { - bus_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + bus0_sock_fini(s); return (rv); } s->raw = 0; @@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock) } static void -bus_sock_open(void *arg) +bus0_sock_open(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_close(void *arg) +bus0_sock_close(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -bus_pipe_fini(void *arg) +bus0_pipe_fini(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -123,10 +128,10 @@ bus_pipe_fini(void *arg) } static int -bus_pipe_init(void **pp, nni_pipe *npipe, void *s) +bus0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - bus_pipe *p; - int rv; + bus0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) { - bus_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + bus0_pipe_fini(p); return (rv); } @@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -bus_pipe_start(void *arg) +bus0_pipe_start(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); - bus_pipe_recv(p); - bus_pipe_getq(p); + bus0_pipe_recv(p); + bus0_pipe_getq(p); return (0); } static void -bus_pipe_stop(void *arg) +bus0_pipe_stop(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_msgq_close(p->sendq); @@ -185,9 +190,9 @@ bus_pipe_stop(void *arg) } static void -bus_pipe_getq_cb(void *arg) +bus0_pipe_getq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { // closed? @@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg) } static void -bus_pipe_send_cb(void *arg) +bus0_pipe_send_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { // closed? @@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg) return; } - bus_pipe_getq(p); + bus0_pipe_getq(p); } static void -bus_pipe_recv_cb(void *arg) +bus0_pipe_recv_cb(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; - nni_msg * msg; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); @@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg) } static void -bus_pipe_putq_cb(void *arg) +bus0_pipe_putq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg) } // Wait for another recv. - bus_pipe_recv(p); + bus0_pipe_recv(p); } static void -bus_sock_getq_cb(void *arg) +bus0_sock_getq_cb(void *arg) { - bus_sock *s = arg; - bus_pipe *p; - bus_pipe *lastp; - nni_msg * msg; - nni_msg * dup; - uint32_t sender; + bus0_sock *s = arg; + bus0_pipe *p; + bus0_pipe *lastp; + nni_msg * msg; + nni_msg * dup; + uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg) nni_msg_free(msg); } - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_getq(bus_sock *s) +bus0_sock_getq(bus0_sock *s) { nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -bus_pipe_getq(bus_pipe *p) +bus0_pipe_getq(bus0_pipe *p) { nni_msgq_aio_get(p->sendq, p->aio_getq); } static void -bus_pipe_recv(bus_pipe *p) +bus0_pipe_recv(bus0_pipe *p) { nni_pipe_recv(p->npipe, p->aio_recv); } static int -bus_sock_setopt_raw(void *arg, const void *buf, size_t sz) +bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) +bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -bus_sock_send(void *arg, nni_aio *aio) +bus0_sock_send(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -bus_sock_recv(void *arg, nni_aio *aio) +bus0_sock_recv(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops bus_pipe_ops = { - .pipe_init = bus_pipe_init, - .pipe_fini = bus_pipe_fini, - .pipe_start = bus_pipe_start, - .pipe_stop = bus_pipe_stop, +static nni_proto_pipe_ops bus0_pipe_ops = { + .pipe_init = bus0_pipe_init, + .pipe_fini = bus0_pipe_fini, + .pipe_start = bus0_pipe_start, + .pipe_stop = bus0_pipe_stop, }; -static nni_proto_sock_option bus_sock_options[] = { +static nni_proto_sock_option bus0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = bus_sock_getopt_raw, - .pso_setopt = bus_sock_setopt_raw, + .pso_getopt = bus0_sock_getopt_raw, + .pso_setopt = bus0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops bus_sock_ops = { - .sock_init = bus_sock_init, - .sock_fini = bus_sock_fini, - .sock_open = bus_sock_open, - .sock_close = bus_sock_close, - .sock_send = bus_sock_send, - .sock_recv = bus_sock_recv, - .sock_options = bus_sock_options, +static nni_proto_sock_ops bus0_sock_ops = { + .sock_init = bus0_sock_init, + .sock_fini = bus0_sock_fini, + .sock_open = bus0_sock_open, + .sock_close = bus0_sock_close, + .sock_send = bus0_sock_send, + .sock_recv = bus0_sock_recv, + .sock_options = bus0_sock_options, }; -static nni_proto bus_proto = { +static nni_proto bus0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_BUS_V0, "bus" }, .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &bus_sock_ops, - .proto_pipe_ops = &bus_pipe_ops, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, }; int nng_bus0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &bus_proto)); + return (nni_proto_open(sidp, &bus0_proto)); } diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h new file mode 100644 index 00000000..0ef3d391 --- /dev/null +++ b/src/protocol/bus0/bus.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_BUS0_BUS_H +#define NNG_PROTOCOL_BUS0_BUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_bus0_open(nng_socket *); + +#ifndef nng_bus_open +#define nng_bus_open nng_bus0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_BUS0_BUS_H diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt new file mode 100644 index 00000000..68e7ad34 --- /dev/null +++ b/src/protocol/pair0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv0 protocol + +if (NNG_PROTO_PAIR0) + set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c index 93cd1497..bac405b8 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair0/pair.c @@ -17,6 +17,10 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. +#ifndef NNI_PROTO_PAIR_V0 +#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0) +#endif + typedef struct pair0_pipe pair0_pipe; typedef struct pair0_sock pair0_sock; diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h new file mode 100644 index 00000000..6828c921 --- /dev/null +++ b/src/protocol/pair0/pair.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR0_PAIR_H +#define NNG_PROTOCOL_PAIR0_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair0_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR0_PAIR_H diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt new file mode 100644 index 00000000..f35d6959 --- /dev/null +++ b/src/protocol/pair1/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv1 protocol + +if (NNG_PROTO_PAIR1) + set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair1) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c index e14d06d5..3f6f63fc 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair1/pair.c @@ -13,10 +13,16 @@ #include "core/nng_impl.h" +#include "protocol/pair1/pair.h" + // Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, // usually, but it can support a polyamorous mode where a single server can // communicate with multiple partners. +#ifndef NNI_PROTO_PAIR_V1 +#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1) +#endif + typedef struct pair1_pipe pair1_pipe; typedef struct pair1_sock pair1_sock; diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h new file mode 100644 index 00000000..bc519d9f --- /dev/null +++ b/src/protocol/pair1/pair.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR1_PAIR_H +#define NNG_PROTOCOL_PAIR1_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair1_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair1_open +#endif + +#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR1_PAIR_H diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt new file mode 100644 index 00000000..6153c5a7 --- /dev/null +++ b/src/protocol/pipeline0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUSH0) + set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h) + install(FILES push.h DESTINATION include/nng/protocol/pipeline0) +endif() + +if (NNG_PROTO_PULL0) + set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h) + install(FILES pull.h DESTINATION include/nng/protocol/pipeline0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c index 9685f0a1..8c16cb17 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -15,31 +15,39 @@ // Pull protocol. The PULL protocol is the "read" side of a pipeline. -typedef struct pull_pipe pull_pipe; -typedef struct pull_sock pull_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void pull_putq_cb(void *); -static void pull_recv_cb(void *); -static void pull_putq(pull_pipe *, nni_msg *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// A pull_sock is our per-socket protocol private structure. -struct pull_sock { +typedef struct pull0_pipe pull0_pipe; +typedef struct pull0_sock pull0_sock; + +static void pull0_putq_cb(void *); +static void pull0_recv_cb(void *); +static void pull0_putq(pull0_pipe *, nni_msg *); + +// pull0_sock is our per-socket protocol private structure. +struct pull0_sock { nni_msgq *urq; int raw; }; -// A pull_pipe is our per-pipe protocol private structure. -struct pull_pipe { - nni_pipe * pipe; - pull_sock *pull; - nni_aio * putq_aio; - nni_aio * recv_aio; +// pull0_pipe is our per-pipe protocol private structure. +struct pull0_pipe { + nni_pipe * pipe; + pull0_sock *pull; + nni_aio * putq_aio; + nni_aio * recv_aio; }; static int -pull_sock_init(void **sp, nni_sock *sock) +pull0_sock_init(void **sp, nni_sock *sock) { - pull_sock *s; + pull0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock) } static void -pull_sock_fini(void *arg) +pull0_sock_fini(void *arg) { - pull_sock *s = arg; + pull0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -pull_pipe_fini(void *arg) +pull0_pipe_fini(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_fini(p->putq_aio); nni_aio_fini(p->recv_aio); @@ -70,17 +78,17 @@ pull_pipe_fini(void *arg) } static int -pull_pipe_init(void **pp, nni_pipe *pipe, void *s) +pull0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pull_pipe *p; - int rv; + pull0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) { - pull_pipe_fini(p); + if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + pull0_pipe_fini(p); return (rv); } @@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pull_pipe_start(void *arg) +pull0_pipe_start(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; // Start the pending pull... nni_pipe_recv(p->pipe, p->recv_aio); @@ -102,20 +110,20 @@ pull_pipe_start(void *arg) } static void -pull_pipe_stop(void *arg) +pull0_pipe_stop(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_stop(p->putq_aio); nni_aio_stop(p->recv_aio); } static void -pull_recv_cb(void *arg) +pull0_recv_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->recv_aio; - nni_msg * msg; + pull0_pipe *p = arg; + nni_aio * aio = p->recv_aio; + nni_msg * msg; if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. @@ -127,14 +135,14 @@ pull_recv_cb(void *arg) msg = nni_aio_get_msg(aio); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); nni_aio_set_msg(aio, NULL); - pull_putq(p, msg); + pull0_putq(p, msg); } static void -pull_putq_cb(void *arg) +pull0_putq_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->putq_aio; + pull0_pipe *p = arg; + nni_aio * aio = p->putq_aio; if (nni_aio_result(aio) != 0) { // If we failed to put, probably NNG_ECLOSED, nothing else @@ -148,11 +156,11 @@ pull_putq_cb(void *arg) nni_pipe_recv(p->pipe, p->recv_aio); } -// nni_pull_putq schedules a put operation to the user socket (sendup). +// pull0_putq schedules a put operation to the user socket (sendup). static void -pull_putq(pull_pipe *p, nni_msg *msg) +pull0_putq(pull0_pipe *p, nni_msg *msg) { - pull_sock *s = p->pull; + pull0_sock *s = p->pull; nni_aio_set_msg(p->putq_aio, msg); @@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg) } static void -pull_sock_open(void *arg) +pull0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -pull_sock_close(void *arg) +pull0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static int -pull_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pull_sock_send(void *arg, nni_aio *aio) +pull0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pull_sock_recv(void *arg, nni_aio *aio) +pull0_sock_recv(void *arg, nni_aio *aio) { - pull_sock *s = arg; + pull0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops pull_pipe_ops = { - .pipe_init = pull_pipe_init, - .pipe_fini = pull_pipe_fini, - .pipe_start = pull_pipe_start, - .pipe_stop = pull_pipe_stop, +static nni_proto_pipe_ops pull0_pipe_ops = { + .pipe_init = pull0_pipe_init, + .pipe_fini = pull0_pipe_fini, + .pipe_start = pull0_pipe_start, + .pipe_stop = pull0_pipe_stop, }; -static nni_proto_sock_option pull_sock_options[] = { +static nni_proto_sock_option pull0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pull_sock_getopt_raw, - .pso_setopt = pull_sock_setopt_raw, + .pso_getopt = pull0_sock_getopt_raw, + .pso_setopt = pull0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pull_sock_ops = { - .sock_init = pull_sock_init, - .sock_fini = pull_sock_fini, - .sock_open = pull_sock_open, - .sock_close = pull_sock_close, - .sock_send = pull_sock_send, - .sock_recv = pull_sock_recv, - .sock_options = pull_sock_options, +static nni_proto_sock_ops pull0_sock_ops = { + .sock_init = pull0_sock_init, + .sock_fini = pull0_sock_fini, + .sock_open = pull0_sock_open, + .sock_close = pull0_sock_close, + .sock_send = pull0_sock_send, + .sock_recv = pull0_sock_recv, + .sock_options = pull0_sock_options, }; -static nni_proto pull_proto = { +static nni_proto pull0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PULL_V0, "pull" }, .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_pipe_ops = &pull_pipe_ops, - .proto_sock_ops = &pull_sock_ops, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, }; int nng_pull0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pull_proto)); + return (nni_proto_open(sidp, &pull0_proto)); } diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h new file mode 100644 index 00000000..75bded03 --- /dev/null +++ b/src/protocol/pipeline0/pull.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H +#define NNG_PROTOCOL_PIPELINE0_PULL_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pull0_open(nng_socket *); + +#ifndef nng_pull_open +#define nng_pull_open nng_pull0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PULL_H diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c index 9ff74558..3dd83fe0 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline0/push.c @@ -17,23 +17,31 @@ // Push distributes fairly, or tries to, by giving messages in round-robin // order. -typedef struct push_pipe push_pipe; -typedef struct push_sock push_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void push_send_cb(void *); -static void push_recv_cb(void *); -static void push_getq_cb(void *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// An nni_push_sock is our per-socket protocol private structure. -struct push_sock { +typedef struct push0_pipe push0_pipe; +typedef struct push0_sock push0_sock; + +static void push0_send_cb(void *); +static void push0_recv_cb(void *); +static void push0_getq_cb(void *); + +// push0_sock is our per-socket protocol private structure. +struct push0_sock { nni_msgq *uwq; int raw; }; -// An nni_push_pipe is our per-pipe protocol private structure. -struct push_pipe { +// push0_pipe is our per-pipe protocol private structure. +struct push0_pipe { nni_pipe * pipe; - push_sock * push; + push0_sock * push; nni_list_node node; nni_aio *aio_recv; @@ -42,9 +50,9 @@ struct push_pipe { }; static int -push_sock_init(void **sp, nni_sock *sock) +push0_sock_init(void **sp, nni_sock *sock) { - push_sock *s; + push0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock) } static void -push_sock_fini(void *arg) +push0_sock_fini(void *arg) { - push_sock *s = arg; + push0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -push_sock_open(void *arg) +push0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_sock_close(void *arg) +push0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_pipe_fini(void *arg) +push0_pipe_fini(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_fini(p->aio_recv); nni_aio_fini(p->aio_send); @@ -87,18 +95,18 @@ push_pipe_fini(void *arg) } static int -push_pipe_init(void **pp, nni_pipe *pipe, void *s) +push0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - push_pipe *p; - int rv; + push0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) { - push_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + push0_pipe_fini(p); return (rv); } NNI_LIST_NODE_INIT(&p->node); @@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -push_pipe_start(void *arg) +push0_pipe_start(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { return (NNG_EPROTO); @@ -129,9 +137,9 @@ push_pipe_start(void *arg) } static void -push_pipe_stop(void *arg) +push0_pipe_stop(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_send); @@ -139,9 +147,9 @@ push_pipe_stop(void *arg) } static void -push_recv_cb(void *arg) +push0_recv_cb(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. @@ -155,10 +163,10 @@ push_recv_cb(void *arg) } static void -push_send_cb(void *arg) +push0_send_cb(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -171,10 +179,10 @@ push_send_cb(void *arg) } static void -push_getq_cb(void *arg) +push0_getq_cb(void *arg) { - push_pipe *p = arg; - nni_aio * aio = p->aio_getq; + push0_pipe *p = arg; + nni_aio * aio = p->aio_getq; if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. @@ -189,71 +197,71 @@ push_getq_cb(void *arg) } static int -push_sock_setopt_raw(void *arg, const void *buf, size_t sz) +push0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -push_sock_getopt_raw(void *arg, void *buf, size_t *szp) +push0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -push_sock_send(void *arg, nni_aio *aio) +push0_sock_send(void *arg, nni_aio *aio) { - push_sock *s = arg; + push0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -push_sock_recv(void *arg, nni_aio *aio) +push0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } -static nni_proto_pipe_ops push_pipe_ops = { - .pipe_init = push_pipe_init, - .pipe_fini = push_pipe_fini, - .pipe_start = push_pipe_start, - .pipe_stop = push_pipe_stop, +static nni_proto_pipe_ops push0_pipe_ops = { + .pipe_init = push0_pipe_init, + .pipe_fini = push0_pipe_fini, + .pipe_start = push0_pipe_start, + .pipe_stop = push0_pipe_stop, }; -static nni_proto_sock_option push_sock_options[] = { +static nni_proto_sock_option push0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = push_sock_getopt_raw, - .pso_setopt = push_sock_setopt_raw, + .pso_getopt = push0_sock_getopt_raw, + .pso_setopt = push0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops push_sock_ops = { - .sock_init = push_sock_init, - .sock_fini = push_sock_fini, - .sock_open = push_sock_open, - .sock_close = push_sock_close, - .sock_options = push_sock_options, - .sock_send = push_sock_send, - .sock_recv = push_sock_recv, +static nni_proto_sock_ops push0_sock_ops = { + .sock_init = push0_sock_init, + .sock_fini = push0_sock_fini, + .sock_open = push0_sock_open, + .sock_close = push0_sock_close, + .sock_options = push0_sock_options, + .sock_send = push0_sock_send, + .sock_recv = push0_sock_recv, }; -static nni_proto push_proto = { +static nni_proto push0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUSH_V0, "push" }, .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_pipe_ops = &push_pipe_ops, - .proto_sock_ops = &push_sock_ops, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, }; int nng_push0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &push_proto)); + return (nni_proto_open(sidp, &push0_proto)); } diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h new file mode 100644 index 00000000..c7303b92 --- /dev/null +++ b/src/protocol/pipeline0/push.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H +#define NNG_PROTOCOL_PIPELINE0_PUSH_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_push0_open(nng_socket *); + +#ifndef nng_push_open +#define nng_push_open nng_push0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt new file mode 100644 index 00000000..4edcbfae --- /dev/null +++ b/src/protocol/pubsub0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUB0) + set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h) + install(FILES pub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +if (NNG_PROTO_SUB0) + set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h) + install(FILES sub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c index 9e5cd67f..f4a33b77 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -12,24 +12,33 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pubsub0/pub.h" // Publish protocol. The PUB protocol simply sends messages out, as // a broadcast. It has nothing more sophisticated because it does not // perform sender-side filtering. Its best effort delivery, so anything // that can't receive the message won't get one. -typedef struct pub_pipe pub_pipe; -typedef struct pub_sock pub_sock; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif -static void pub_pipe_recv_cb(void *); -static void pub_pipe_send_cb(void *); -static void pub_pipe_getq_cb(void *); -static void pub_sock_getq_cb(void *); -static void pub_sock_fini(void *); -static void pub_pipe_fini(void *); +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif -// A pub_sock is our per-socket protocol private structure. -struct pub_sock { +typedef struct pub0_pipe pub0_pipe; +typedef struct pub0_sock pub0_sock; + +static void pub0_pipe_recv_cb(void *); +static void pub0_pipe_send_cb(void *); +static void pub0_pipe_getq_cb(void *); +static void pub0_sock_getq_cb(void *); +static void pub0_sock_fini(void *); +static void pub0_pipe_fini(void *); + +// pub0_sock is our per-socket protocol private structure. +struct pub0_sock { nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -37,10 +46,10 @@ struct pub_sock { nni_mtx mtx; }; -// A pub_pipe is our per-pipe protocol private structure. -struct pub_pipe { +// pub0_pipe is our per-pipe protocol private structure. +struct pub0_pipe { nni_pipe * pipe; - pub_sock * pub; + pub0_sock * pub; nni_msgq * sendq; nni_aio * aio_getq; nni_aio * aio_send; @@ -49,9 +58,9 @@ struct pub_pipe { }; static void -pub_sock_fini(void *arg) +pub0_sock_fini(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -60,22 +69,22 @@ pub_sock_fini(void *arg) } static int -pub_sock_init(void **sp, nni_sock *sock) +pub0_sock_init(void **sp, nni_sock *sock) { - pub_sock *s; - int rv; + pub0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) { - pub_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) { + pub0_sock_fini(s); return (rv); } s->raw = 0; - NNI_LIST_INIT(&s->pipes, pub_pipe, node); + NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock) } static void -pub_sock_open(void *arg) +pub0_sock_open(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -pub_sock_close(void *arg) +pub0_sock_close(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -pub_pipe_fini(void *arg) +pub0_pipe_fini(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -111,10 +120,10 @@ pub_pipe_fini(void *arg) } static int -pub_pipe_init(void **pp, nni_pipe *pipe, void *s) +pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pub_pipe *p; - int rv; + pub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - pub_pipe_fini(p); + pub0_pipe_fini(p); return (rv); } @@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pub_pipe_start(void *arg) +pub0_pipe_start(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { return (NNG_EPROTO); @@ -157,10 +166,10 @@ pub_pipe_start(void *arg) } static void -pub_pipe_stop(void *arg) +pub0_pipe_stop(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -176,15 +185,15 @@ pub_pipe_stop(void *arg) } static void -pub_sock_getq_cb(void *arg) +pub0_sock_getq_cb(void *arg) { - pub_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg, *dup; + pub0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg, *dup; - pub_pipe *p; - pub_pipe *last; - int rv; + pub0_pipe *p; + pub0_pipe *last; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg) } static void -pub_pipe_recv_cb(void *arg) +pub0_pipe_recv_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg) } static void -pub_pipe_getq_cb(void *arg) +pub0_pipe_getq_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg) } static void -pub_pipe_send_cb(void *arg) +pub0_pipe_send_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg) } static int -pub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pub_sock_recv(void *arg, nni_aio *aio) +pub0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pub_sock_send(void *arg, nni_aio *aio) +pub0_sock_send(void *arg, nni_aio *aio) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } -static nni_proto_pipe_ops pub_pipe_ops = { - .pipe_init = pub_pipe_init, - .pipe_fini = pub_pipe_fini, - .pipe_start = pub_pipe_start, - .pipe_stop = pub_pipe_stop, +static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_init = pub0_pipe_init, + .pipe_fini = pub0_pipe_fini, + .pipe_start = pub0_pipe_start, + .pipe_stop = pub0_pipe_stop, }; -static nni_proto_sock_option pub_sock_options[] = { +static nni_proto_sock_option pub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pub_sock_getopt_raw, - .pso_setopt = pub_sock_setopt_raw, + .pso_getopt = pub0_sock_getopt_raw, + .pso_setopt = pub0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pub_sock_ops = { - .sock_init = pub_sock_init, - .sock_fini = pub_sock_fini, - .sock_open = pub_sock_open, - .sock_close = pub_sock_close, - .sock_send = pub_sock_send, - .sock_recv = pub_sock_recv, - .sock_options = pub_sock_options, +static nni_proto_sock_ops pub0_sock_ops = { + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_options = pub0_sock_options, }; -static nni_proto pub_proto = { +static nni_proto pub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUB_V0, "pub" }, .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_sock_ops = &pub_sock_ops, - .proto_pipe_ops = &pub_pipe_ops, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, }; int nng_pub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pub_proto)); + return (nni_proto_open(sidp, &pub0_proto)); } diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h new file mode 100644 index 00000000..2388a292 --- /dev/null +++ b/src/protocol/pubsub0/pub.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H +#define NNG_PROTOCOL_PUBSUB0_PUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pub0_open(nng_socket *); + +#ifndef nng_pub_open +#define nng_pub_open nng_pub0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_PUB_H diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c index 555d528e..6c504d75 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -12,54 +12,60 @@ #include <string.h> #include "core/nng_impl.h" - -const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE; -const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE; +#include "protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. -typedef struct sub_pipe sub_pipe; -typedef struct sub_sock sub_sock; -typedef struct sub_topic sub_topic; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_topic sub0_topic; -static void sub_recv_cb(void *); -static void sub_putq_cb(void *); -static void sub_pipe_fini(void *); +static void sub0_recv_cb(void *); +static void sub0_putq_cb(void *); +static void sub0_pipe_fini(void *); -struct sub_topic { +struct sub0_topic { nni_list_node node; size_t len; void * buf; }; -// An nni_rep_sock is our per-socket protocol private structure. -struct sub_sock { +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { nni_list topics; nni_msgq *urq; int raw; nni_mtx lk; }; -// An nni_rep_pipe is our per-pipe protocol private structure. -struct sub_pipe { - nni_pipe *pipe; - sub_sock *sub; - nni_aio * aio_recv; - nni_aio * aio_putq; +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static int -sub_sock_init(void **sp, nni_sock *sock) +sub0_sock_init(void **sp, nni_sock *sock) { - sub_sock *s; + sub0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); - NNI_LIST_INIT(&s->topics, sub_topic, node); + NNI_LIST_INIT(&s->topics, sub0_topic, node); s->raw = 0; s->urq = nni_sock_recvq(sock); @@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock) } static void -sub_sock_fini(void *arg) +sub0_sock_fini(void *arg) { - sub_sock * s = arg; - sub_topic *topic; + sub0_sock * s = arg; + sub0_topic *topic; while ((topic = nni_list_first(&s->topics)) != NULL) { nni_list_remove(&s->topics, topic); @@ -83,21 +89,21 @@ sub_sock_fini(void *arg) } static void -sub_sock_open(void *arg) +sub0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_sock_close(void *arg) +sub0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_pipe_fini(void *arg) +sub0_pipe_fini(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); @@ -105,17 +111,17 @@ sub_pipe_fini(void *arg) } static int -sub_pipe_init(void **pp, nni_pipe *pipe, void *s) +sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - sub_pipe *p; - int rv; + sub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) { - sub_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) { + sub0_pipe_fini(p); return (rv); } @@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -sub_pipe_start(void *arg) +sub0_pipe_start(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_pipe_recv(p->pipe, p->aio_recv); return (0); } static void -sub_pipe_stop(void *arg) +sub0_pipe_stop(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); } static void -sub_recv_cb(void *arg) +sub0_recv_cb(void *arg) { - sub_pipe *p = arg; - sub_sock *s = p->sub; - nni_msgq *urq = s->urq; - nni_msg * msg; + sub0_pipe *p = arg; + sub0_sock *s = p->sub; + nni_msgq * urq = s->urq; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -164,9 +170,9 @@ sub_recv_cb(void *arg) } static void -sub_putq_cb(void *arg) +sub0_putq_cb(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -184,11 +190,11 @@ sub_putq_cb(void *arg) // to replace this with a patricia trie, like old nanomsg had. static int -sub_subscribe(void *arg, const void *buf, size_t sz) +sub0_subscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - sub_topic *newtopic; + sub0_sock * s = arg; + sub0_topic *topic; + sub0_topic *newtopic; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } static int -sub_unsubscribe(void *arg, const void *buf, size_t sz) +sub0_unsubscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - int rv; + sub0_sock * s = arg; + sub0_topic *topic; + int rv; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) } static int -sub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -sub_sock_send(void *arg, nni_aio *aio) +sub0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -sub_sock_recv(void *arg, nni_aio *aio) +sub0_sock_recv(void *arg, nni_aio *aio) { - sub_sock *s = arg; + sub0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } static nni_msg * -sub_sock_filter(void *arg, nni_msg *msg) +sub0_sock_filter(void *arg, nni_msg *msg) { - sub_sock * s = arg; - sub_topic *topic; - char * body; - size_t len; - int match; + sub0_sock * s = arg; + sub0_topic *topic; + char * body; + size_t len; + int match; nni_mtx_lock(&s->lk); if (s->raw) { @@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops sub_pipe_ops = { - .pipe_init = sub_pipe_init, - .pipe_fini = sub_pipe_fini, - .pipe_start = sub_pipe_start, - .pipe_stop = sub_pipe_stop, +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_stop = sub0_pipe_stop, }; -static nni_proto_sock_option sub_sock_options[] = { +static nni_proto_sock_option sub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = sub_sock_getopt_raw, - .pso_setopt = sub_sock_setopt_raw, + .pso_getopt = sub0_sock_getopt_raw, + .pso_setopt = sub0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_subscribe, + .pso_setopt = sub0_subscribe, }, { .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_unsubscribe, + .pso_setopt = sub0_unsubscribe, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops sub_sock_ops = { - .sock_init = sub_sock_init, - .sock_fini = sub_sock_fini, - .sock_open = sub_sock_open, - .sock_close = sub_sock_close, - .sock_send = sub_sock_send, - .sock_recv = sub_sock_recv, - .sock_filter = sub_sock_filter, - .sock_options = sub_sock_options, +static nni_proto_sock_ops sub0_sock_ops = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = sub0_sock_filter, + .sock_options = sub0_sock_options, }; -static nni_proto sub_proto = { +static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &sub_sock_ops, - .proto_pipe_ops = &sub_pipe_ops, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, }; int nng_sub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &sub_proto)); + return (nni_proto_open(sidp, &sub0_proto)); } diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h new file mode 100644 index 00000000..1a09145d --- /dev/null +++ b/src/protocol/pubsub0/sub.h @@ -0,0 +1,31 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H +#define NNG_PROTOCOL_PUBSUB0_SUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_sub0_open(nng_socket *); + +#ifndef nng_sub_open +#define nng_sub_open nng_sub0_open +#endif + +#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" +#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_SUB_H diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt new file mode 100644 index 00000000..4e82ad41 --- /dev/null +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_REQ0) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + install(FILES req.h DESTINATION include/nng/protocol/reqrep0) +endif() + +if (NNG_PROTO_REP0) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + install(FILES rep.h DESTINATION include/nng/protocol/reqrep0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c index 100e739d..ee8e4277 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" // Response protocol. The REP protocol is the "reply" side of a // request-reply pair. This is useful for building RPC servers, for // example. -typedef struct rep_pipe rep_pipe; -typedef struct rep_sock rep_sock; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -static void rep_sock_getq_cb(void *); -static void rep_pipe_getq_cb(void *); -static void rep_pipe_putq_cb(void *); -static void rep_pipe_send_cb(void *); -static void rep_pipe_recv_cb(void *); -static void rep_pipe_fini(void *); +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -// A rep_sock is our per-socket protocol private structure. -struct rep_sock { +typedef struct rep0_pipe rep0_pipe; +typedef struct rep0_sock rep0_sock; + +static void rep0_sock_getq_cb(void *); +static void rep0_pipe_getq_cb(void *); +static void rep0_pipe_putq_cb(void *); +static void rep0_pipe_send_cb(void *); +static void rep0_pipe_recv_cb(void *); +static void rep0_pipe_fini(void *); + +// rep0_sock is our per-socket protocol private structure. +struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -40,21 +49,21 @@ struct rep_sock { nni_aio * aio_getq; }; -// A rep_pipe is our per-pipe protocol private structure. -struct rep_pipe { - nni_pipe *pipe; - rep_sock *rep; - nni_msgq *sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; +// rep0_pipe is our per-pipe protocol private structure. +struct rep0_pipe { + nni_pipe * pipe; + rep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static void -rep_sock_fini(void *arg) +rep0_sock_fini(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -67,18 +76,18 @@ rep_sock_fini(void *arg) } static int -rep_sock_init(void **sp, nni_sock *sock) +rep0_sock_init(void **sp, nni_sock *sock) { - rep_sock *s; - int rv; + rep0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { - rep_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + rep0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock) } static void -rep_sock_open(void *arg) +rep0_sock_open(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -rep_sock_close(void *arg) +rep0_sock_close(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -rep_pipe_fini(void *arg) +rep0_pipe_fini(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,20 +133,20 @@ rep_pipe_fini(void *arg) } static int -rep_pipe_init(void **pp, nni_pipe *pipe, void *s) +rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - rep_pipe *p; - int rv; + rep0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) { - rep_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + rep0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -rep_pipe_start(void *arg) +rep0_pipe_start(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - int rv; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + int rv; if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); @@ -164,10 +173,10 @@ rep_pipe_start(void *arg) } static void -rep_pipe_stop(void *arg) +rep0_pipe_stop(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_getq); @@ -179,14 +188,14 @@ rep_pipe_stop(void *arg) } static void -rep_sock_getq_cb(void *arg) +rep0_sock_getq_cb(void *arg) { - rep_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg; - uint32_t id; - rep_pipe *p; - int rv; + rep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + rep0_pipe *p; + int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg) } static void -rep_pipe_getq_cb(void *arg) +rep0_pipe_getq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg) } static void -rep_pipe_send_cb(void *arg) +rep0_pipe_send_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg) } static void -rep_pipe_recv_cb(void *arg) +rep0_pipe_recv_cb(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - nni_msg * msg; - int rv; - uint8_t * body; - int hops; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -329,9 +338,9 @@ drop: } static void -rep_pipe_putq_cb(void *arg) +rep0_pipe_putq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg) } static int -rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; - int rv; + rep0_sock *s = arg; + int rv; nni_mtx_lock(&s->lk); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -rep_sock_getopt_raw(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static nni_msg * -rep_sock_filter(void *arg, nni_msg *msg) +rep0_sock_filter(void *arg, nni_msg *msg) { - rep_sock *s = arg; - char * header; - size_t len; + rep0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->lk); if (s->raw) { @@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg) } static void -rep_sock_send(void *arg, nni_aio *aio) +rep0_sock_send(void *arg, nni_aio *aio) { - rep_sock *s = arg; - int rv; - nni_msg * msg; + rep0_sock *s = arg; + int rv; + nni_msg * msg; nni_mtx_lock(&s->lk); if (s->raw) { @@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio) } static void -rep_sock_recv(void *arg, nni_aio *aio) +rep0_sock_recv(void *arg, nni_aio *aio) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops rep_pipe_ops = { - .pipe_init = rep_pipe_init, - .pipe_fini = rep_pipe_fini, - .pipe_start = rep_pipe_start, - .pipe_stop = rep_pipe_stop, +static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_init = rep0_pipe_init, + .pipe_fini = rep0_pipe_fini, + .pipe_start = rep0_pipe_start, + .pipe_stop = rep0_pipe_stop, }; -static nni_proto_sock_option rep_sock_options[] = { +static nni_proto_sock_option rep0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = rep_sock_getopt_raw, - .pso_setopt = rep_sock_setopt_raw, + .pso_getopt = rep0_sock_getopt_raw, + .pso_setopt = rep0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = rep_sock_getopt_maxttl, - .pso_setopt = rep_sock_setopt_maxttl, + .pso_getopt = rep0_sock_getopt_maxttl, + .pso_setopt = rep0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops rep_sock_ops = { - .sock_init = rep_sock_init, - .sock_fini = rep_sock_fini, - .sock_open = rep_sock_open, - .sock_close = rep_sock_close, - .sock_options = rep_sock_options, - .sock_filter = rep_sock_filter, - .sock_send = rep_sock_send, - .sock_recv = rep_sock_recv, +static nni_proto_sock_ops rep0_sock_ops = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = rep0_sock_filter, + .sock_send = rep0_sock_send, + .sock_recv = rep0_sock_recv, }; -static nni_proto nni_rep_proto = { +static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, .proto_peer = { NNI_PROTO_REQ_V0, "req" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &rep_sock_ops, - .proto_pipe_ops = &rep_pipe_ops, + .proto_sock_ops = &rep0_sock_ops, + .proto_pipe_ops = &rep0_pipe_ops, }; int nng_rep0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_rep_proto)); + return (nni_proto_open(sidp, &rep0_proto)); } diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h new file mode 100644 index 00000000..93df9379 --- /dev/null +++ b/src/protocol/reqrep0/rep.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REP_H +#define NNG_PROTOCOL_REQREP0_REP_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_rep0_open(nng_socket *); + +#ifndef nng_rep_open +#define nng_rep_open nng_rep0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REP_H diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c index bead1ec4..94c7f1a0 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep0/req.c @@ -13,21 +13,28 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -typedef struct req_pipe req_pipe; -typedef struct req_sock req_sock; +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -static void req_resend(req_sock *); -static void req_timeout(void *); -static void req_pipe_fini(void *); +typedef struct req0_pipe req0_pipe; +typedef struct req0_sock req0_sock; -// A req_sock is our per-socket protocol private structure. -struct req_sock { +static void req0_resend(req0_sock *); +static void req0_timeout(void *); +static void req0_pipe_fini(void *); + +// A req0_sock is our per-socket protocol private structure. +struct req0_sock { nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -38,7 +45,7 @@ struct req_sock { int ttl; nni_msg * reqmsg; - req_pipe *pendpipe; + req0_pipe *pendpipe; nni_list readypipes; nni_list busypipes; @@ -51,10 +58,10 @@ struct req_sock { nni_cv cv; }; -// A req_pipe is our per-pipe protocol private structure. -struct req_pipe { +// A req0_pipe is our per-pipe protocol private structure. +struct req0_pipe { nni_pipe * pipe; - req_sock * req; + req0_sock * req; nni_list_node node; nni_aio * aio_getq; // raw mode only nni_aio * aio_sendraw; // raw mode only @@ -64,17 +71,17 @@ struct req_pipe { nni_mtx mtx; }; -static void req_resender(void *); -static void req_getq_cb(void *); -static void req_sendraw_cb(void *); -static void req_sendcooked_cb(void *); -static void req_recv_cb(void *); -static void req_putq_cb(void *); +static void req0_resender(void *); +static void req0_getq_cb(void *); +static void req0_sendraw_cb(void *); +static void req0_sendcooked_cb(void *); +static void req0_recv_cb(void *); +static void req0_putq_cb(void *); static int -req_sock_init(void **sp, nni_sock *sock) +req0_sock_init(void **sp, nni_sock *sock) { - req_sock *s; + req0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock) nni_mtx_init(&s->mtx); nni_cv_init(&s->cv, &s->mtx); - NNI_LIST_INIT(&s->readypipes, req_pipe, node); - NNI_LIST_INIT(&s->busypipes, req_pipe, node); - nni_timer_init(&s->timer, req_timeout, s); + NNI_LIST_INIT(&s->readypipes, req0_pipe, node); + NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + nni_timer_init(&s->timer, req0_timeout, s); // this is "semi random" start for request IDs. s->nextid = nni_random(); @@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock) } static void -req_sock_open(void *arg) +req0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -req_sock_close(void *arg) +req0_sock_close(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); s->closed = 1; @@ -120,9 +127,9 @@ req_sock_close(void *arg) } static void -req_sock_fini(void *arg) +req0_sock_fini(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); while ((!nni_list_empty(&s->readypipes)) || @@ -139,9 +146,9 @@ req_sock_fini(void *arg) } static void -req_pipe_fini(void *arg) +req0_pipe_fini(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_putq); @@ -153,22 +160,22 @@ req_pipe_fini(void *arg) } static int -req_pipe_init(void **pp, nni_pipe *pipe, void *s) +req0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - req_pipe *p; - int rv; + req0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) != + if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != 0)) { - req_pipe_fini(p); + req0_pipe_fini(p); return (rv); } @@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -req_pipe_start(void *arg) +req0_pipe_start(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { return (NNG_EPROTO); @@ -198,7 +205,7 @@ req_pipe_start(void *arg) // If sock was waiting for somewhere to send data, go ahead and // send it to this pipe. if (s->wantw) { - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); @@ -208,10 +215,10 @@ req_pipe_start(void *arg) } static void -req_pipe_stop(void *arg) +req0_pipe_stop(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_putq); @@ -238,50 +245,50 @@ req_pipe_stop(void *arg) s->pendpipe = NULL; s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static int -req_sock_setopt_raw(void *arg, const void *buf, size_t sz) +req0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -req_sock_getopt_raw(void *arg, void *buf, size_t *szp) +req0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static int -req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_ms(&s->retry, buf, sz)); } static int -req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_ms(s->retry, buf, szp)); } @@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) // kind of priority.) static void -req_getq_cb(void *arg) +req0_getq_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. @@ -326,9 +333,9 @@ req_getq_cb(void *arg) } static void -req_sendraw_cb(void *arg) +req0_sendraw_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_sendraw) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); @@ -342,10 +349,10 @@ req_sendraw_cb(void *arg) } static void -req_sendcooked_cb(void *arg) +req0_sendcooked_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_aio_result(p->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. @@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg) if (nni_list_active(&s->busypipes, p)) { nni_list_remove(&s->busypipes, p); nni_list_append(&s->readypipes, p); - req_resend(s); + req0_resend(s); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the @@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg) } static void -req_putq_cb(void *arg) +req0_putq_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -393,10 +400,10 @@ req_putq_cb(void *arg) } static void -req_recv_cb(void *arg) +req0_recv_cb(void *arg) { - req_pipe *p = arg; - nni_msg * msg; + req0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -431,23 +438,23 @@ malformed: } static void -req_timeout(void *arg) +req0_timeout(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->reqmsg != NULL) { s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static void -req_resend(req_sock *s) +req0_resend(req0_sock *s) { - req_pipe *p; - nni_msg * msg; + req0_pipe *p; + nni_msg * msg; // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode @@ -499,13 +506,13 @@ req_resend(req_sock *s) } static void -req_sock_send(void *arg, nni_aio *aio) +req0_sock_send(void *arg, nni_aio *aio) { - req_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; + req0_sock *s = arg; + uint32_t id; + size_t len; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio) s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); nni_mtx_unlock(&s->mtx); @@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio) } static nni_msg * -req_sock_filter(void *arg, nni_msg *msg) +req0_sock_filter(void *arg, nni_msg *msg) { - req_sock *s = arg; - nni_msg * rmsg; + req0_sock *s = arg; + nni_msg * rmsg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg) } static void -req_sock_recv(void *arg, nni_aio *aio) +req0_sock_recv(void *arg, nni_aio *aio) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (!s->raw) { @@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio) nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops req_pipe_ops = { - .pipe_init = req_pipe_init, - .pipe_fini = req_pipe_fini, - .pipe_start = req_pipe_start, - .pipe_stop = req_pipe_stop, +static nni_proto_pipe_ops req0_pipe_ops = { + .pipe_init = req0_pipe_init, + .pipe_fini = req0_pipe_fini, + .pipe_start = req0_pipe_start, + .pipe_stop = req0_pipe_stop, }; -static nni_proto_sock_option req_sock_options[] = { +static nni_proto_sock_option req0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = req_sock_getopt_raw, - .pso_setopt = req_sock_setopt_raw, + .pso_getopt = req0_sock_getopt_raw, + .pso_setopt = req0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = req_sock_getopt_maxttl, - .pso_setopt = req_sock_setopt_maxttl, + .pso_getopt = req0_sock_getopt_maxttl, + .pso_setopt = req0_sock_setopt_maxttl, }, { .pso_name = NNG_OPT_REQ_RESENDTIME, - .pso_getopt = req_sock_getopt_resendtime, - .pso_setopt = req_sock_setopt_resendtime, + .pso_getopt = req0_sock_getopt_resendtime, + .pso_setopt = req0_sock_setopt_resendtime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops req_sock_ops = { - .sock_init = req_sock_init, - .sock_fini = req_sock_fini, - .sock_open = req_sock_open, - .sock_close = req_sock_close, - .sock_options = req_sock_options, - .sock_filter = req_sock_filter, - .sock_send = req_sock_send, - .sock_recv = req_sock_recv, +static nni_proto_sock_ops req0_sock_ops = { + .sock_init = req0_sock_init, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_filter = req0_sock_filter, + .sock_send = req0_sock_send, + .sock_recv = req0_sock_recv, }; -static nni_proto req_proto = { +static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REQ_V0, "req" }, .proto_peer = { NNI_PROTO_REP_V0, "rep" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &req_sock_ops, - .proto_pipe_ops = &req_pipe_ops, + .proto_sock_ops = &req0_sock_ops, + .proto_pipe_ops = &req0_pipe_ops, }; int nng_req0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &req_proto)); + return (nni_proto_open(sidp, &req0_proto)); } diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h new file mode 100644 index 00000000..99c9bf62 --- /dev/null +++ b/src/protocol/reqrep0/req.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REQ_H +#define NNG_PROTOCOL_REQREP0_REQ_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_req0_open(nng_socket *); + +#ifndef nng_req_open +#define nng_req_open nng_req0_open +#endif + +#define NNG_OPT_REQ_RESENDTIME "req:resend-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REQ_H diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..61e5aa7b --- /dev/null +++ b/src/protocol/survey0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_SURVEYOR0) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + install(FILES survey.h DESTINATION include/nng/protocol/survey0) +endif() + +if (NNG_PROTO_RESPONDENT0) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + install(FILES respond.h DESTINATION include/nng/protocol/survey0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c index 94730be6..73e919c3 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey0/respond.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/respond.h" // Respondent protocol. The RESPONDENT protocol is the "replier" side of // the surveyor pattern. This is useful for building service discovery, or -// voting algorithsm, for example. +// voting algorithms, for example. -typedef struct resp_pipe resp_pipe; -typedef struct resp_sock resp_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void resp_recv_cb(void *); -static void resp_putq_cb(void *); -static void resp_getq_cb(void *); -static void resp_send_cb(void *); -static void resp_sock_getq_cb(void *); -static void resp_pipe_fini(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A resp_sock is our per-socket protocol private structure. -struct resp_sock { +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; + +static void resp0_recv_cb(void *); +static void resp0_putq_cb(void *); +static void resp0_getq_cb(void *); +static void resp0_send_cb(void *); +static void resp0_sock_getq_cb(void *); +static void resp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { nni_msgq * urq; nni_msgq * uwq; int raw; @@ -40,22 +49,22 @@ struct resp_sock { nni_mtx mtx; }; -// A resp_pipe is our per-pipe protocol private structure. -struct resp_pipe { - nni_pipe * npipe; - resp_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; }; static void -resp_sock_fini(void *arg) +resp0_sock_fini(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -68,18 +77,18 @@ resp_sock_fini(void *arg) } static int -resp_sock_init(void **sp, nni_sock *nsock) +resp0_sock_init(void **sp, nni_sock *nsock) { - resp_sock *s; - int rv; + resp0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { - resp_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + resp0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock) } static void -resp_sock_open(void *arg) +resp0_sock_open(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -resp_sock_close(void *arg) +resp0_sock_close(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -resp_pipe_fini(void *arg) +resp0_pipe_fini(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_getq); @@ -124,20 +133,20 @@ resp_pipe_fini(void *arg) } static int -resp_pipe_init(void **pp, nni_pipe *npipe, void *s) +resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - resp_pipe *p; - int rv; + resp0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) { - resp_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + resp0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -resp_pipe_start(void *arg) +resp0_pipe_start(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; p->id = nni_pipe_id(p->npipe); @@ -170,10 +179,10 @@ resp_pipe_start(void *arg) } static void -resp_pipe_stop(void *arg) +resp0_pipe_stop(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_putq); @@ -189,19 +198,19 @@ resp_pipe_stop(void *arg) } } -// resp_sock_send watches for messages from the upper write queue, +// resp0_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket.s void -resp_sock_getq_cb(void *arg) +resp0_sock_getq_cb(void *arg) { - resp_sock *s = arg; - nni_msg * msg; - uint32_t id; - resp_pipe *p; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + resp0_pipe *p; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg) } void -resp_getq_cb(void *arg) +resp0_getq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -249,9 +258,9 @@ resp_getq_cb(void *arg) } void -resp_send_cb(void *arg) +resp0_send_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -264,14 +273,14 @@ resp_send_cb(void *arg) } static void -resp_recv_cb(void *arg) +resp0_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq = s->urq; - nni_msg * msg; - int hops; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int rv; if (nni_aio_result(p->aio_recv) != 0) { goto error; @@ -324,9 +333,9 @@ error: } static void -resp_putq_cb(void *arg) +resp0_putq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -338,10 +347,10 @@ resp_putq_cb(void *arg) } static int -resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; - int rv; + resp0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -resp_sock_getopt_raw(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static void -resp_sock_send(void *arg, nni_aio *aio) +resp0_sock_send(void *arg, nni_aio *aio) { - resp_sock *s = arg; - nni_msg * msg; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio) } static nni_msg * -resp_sock_filter(void *arg, nni_msg *msg) +resp0_sock_filter(void *arg, nni_msg *msg) { - resp_sock *s = arg; - char * header; - size_t len; + resp0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg) } static void -resp_sock_recv(void *arg, nni_aio *aio) +resp0_sock_recv(void *arg, nni_aio *aio) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops resp_pipe_ops = { - .pipe_init = resp_pipe_init, - .pipe_fini = resp_pipe_fini, - .pipe_start = resp_pipe_start, - .pipe_stop = resp_pipe_stop, +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_stop = resp0_pipe_stop, }; -static nni_proto_sock_option resp_sock_options[] = { +static nni_proto_sock_option resp0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = resp_sock_getopt_raw, - .pso_setopt = resp_sock_setopt_raw, + .pso_getopt = resp0_sock_getopt_raw, + .pso_setopt = resp0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = resp_sock_getopt_maxttl, - .pso_setopt = resp_sock_setopt_maxttl, + .pso_getopt = resp0_sock_getopt_maxttl, + .pso_setopt = resp0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops resp_sock_ops = { - .sock_init = resp_sock_init, - .sock_fini = resp_sock_fini, - .sock_open = resp_sock_open, - .sock_close = resp_sock_close, - .sock_filter = resp_sock_filter, - .sock_send = resp_sock_send, - .sock_recv = resp_sock_recv, - .sock_options = resp_sock_options, +static nni_proto_sock_ops resp0_sock_ops = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = resp0_sock_filter, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, }; -static nni_proto resp_proto = { +static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &resp_sock_ops, - .proto_pipe_ops = &resp_pipe_ops, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, }; int nng_respondent0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &resp_proto)); + return (nni_proto_open(sidp, &resp0_proto)); } diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h new file mode 100644 index 00000000..58c65298 --- /dev/null +++ b/src/protocol/survey0/respond.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H +#define NNG_PROTOCOL_SURVEY0_RESPOND_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_respondent0_open(nng_socket *); + +#ifndef nng_respondent_open +#define nng_respondent_open nng_respondent0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c index 9dcd6664..02283436 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey0/survey.c @@ -12,22 +12,31 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -typedef struct surv_pipe surv_pipe; -typedef struct surv_sock surv_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void surv_sock_getq_cb(void *); -static void surv_getq_cb(void *); -static void surv_putq_cb(void *); -static void surv_send_cb(void *); -static void surv_recv_cb(void *); -static void surv_timeout(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A surv_sock is our per-socket protocol private structure. -struct surv_sock { +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; + +static void surv0_sock_getq_cb(void *); +static void surv0_getq_cb(void *); +static void surv0_putq_cb(void *); +static void surv0_send_cb(void *); +static void surv0_recv_cb(void *); +static void surv0_timeout(void *); + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { nni_duration survtime; nni_time expire; int raw; @@ -42,10 +51,10 @@ struct surv_sock { nni_mtx mtx; }; -// A surv_pipe is our per-pipe protocol private structure. -struct surv_pipe { +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { nni_pipe * npipe; - surv_sock * psock; + surv0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -55,9 +64,9 @@ struct surv_pipe { }; static void -surv_sock_fini(void *arg) +surv0_sock_fini(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -66,21 +75,21 @@ surv_sock_fini(void *arg) } static int -surv_sock_init(void **sp, nni_sock *nsock) +surv0_sock_init(void **sp, nni_sock *nsock) { - surv_sock *s; - int rv; + surv0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) { - surv_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { + surv0_sock_fini(s); return (rv); } - NNI_LIST_INIT(&s->pipes, surv_pipe, node); + NNI_LIST_INIT(&s->pipes, surv0_pipe, node); nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv_timeout, s); + nni_timer_init(&s->timer, surv0_timeout, s); s->nextid = nni_random(); s->raw = 0; @@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock) } static void -surv_sock_open(void *arg) +surv0_sock_open(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -surv_sock_close(void *arg) +surv0_sock_close(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_timer_cancel(&s->timer); nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -surv_pipe_fini(void *arg) +surv0_pipe_fini(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,21 +133,21 @@ surv_pipe_fini(void *arg) } static int -surv_pipe_init(void **pp, nni_pipe *npipe, void *s) +surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - surv_pipe *p; - int rv; + surv0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } // This depth could be tunable. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) { - surv_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + surv0_pipe_fini(p); return (rv); } @@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -surv_pipe_start(void *arg) +surv0_pipe_start(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); @@ -164,10 +173,10 @@ surv_pipe_start(void *arg) } static void -surv_pipe_stop(void *arg) +surv0_pipe_stop(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -184,9 +193,9 @@ surv_pipe_stop(void *arg) } static void -surv_getq_cb(void *arg) +surv0_getq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -200,9 +209,9 @@ surv_getq_cb(void *arg) } static void -surv_send_cb(void *arg) +surv0_send_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -215,9 +224,9 @@ surv_send_cb(void *arg) } static void -surv_putq_cb(void *arg) +surv0_putq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -230,10 +239,10 @@ surv_putq_cb(void *arg) } static void -surv_recv_cb(void *arg) +surv0_recv_cb(void *arg) { - surv_pipe *p = arg; - nni_msg * msg; + surv0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { goto failed; @@ -265,10 +274,10 @@ failed: } static int -surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; - int rv; + surv0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { @@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -surv_sock_getopt_raw(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_setopt_ms(&s->survtime, buf, sz)); } static int -surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_ms(s->survtime, buf, szp)); } static void -surv_sock_getq_cb(void *arg) +surv0_sock_getq_cb(void *arg) { - surv_sock *s = arg; - surv_pipe *p; - surv_pipe *last; - nni_msg * msg, *dup; + surv0_sock *s = arg; + surv0_pipe *p; + surv0_pipe *last; + nni_msg * msg, *dup; if (nni_aio_result(s->aio_getq) != 0) { // Should be NNG_ECLOSED. @@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg) } static void -surv_timeout(void *arg) +surv0_timeout(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); s->survid = 0; @@ -349,9 +358,9 @@ surv_timeout(void *arg) } static void -surv_sock_recv(void *arg, nni_aio *aio) +surv0_sock_recv(void *arg, nni_aio *aio) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->survid == 0) { @@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio) } static void -surv_sock_send(void *arg, nni_aio *aio) +surv0_sock_send(void *arg, nni_aio *aio) { - surv_sock *s = arg; - nni_msg * msg; - int rv; + surv0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio) } static nni_msg * -surv_sock_filter(void *arg, nni_msg *msg) +surv0_sock_filter(void *arg, nni_msg *msg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg) return (msg); } -static nni_proto_pipe_ops surv_pipe_ops = { - .pipe_init = surv_pipe_init, - .pipe_fini = surv_pipe_fini, - .pipe_start = surv_pipe_start, - .pipe_stop = surv_pipe_stop, +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_stop = surv0_pipe_stop, }; -static nni_proto_sock_option surv_sock_options[] = { +static nni_proto_sock_option surv0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = surv_sock_getopt_raw, - .pso_setopt = surv_sock_setopt_raw, + .pso_getopt = surv0_sock_getopt_raw, + .pso_setopt = surv0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, - .pso_getopt = surv_sock_getopt_surveytime, - .pso_setopt = surv_sock_setopt_surveytime, + .pso_getopt = surv0_sock_getopt_surveytime, + .pso_setopt = surv0_sock_setopt_surveytime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops surv_sock_ops = { - .sock_init = surv_sock_init, - .sock_fini = surv_sock_fini, - .sock_open = surv_sock_open, - .sock_close = surv_sock_close, - .sock_send = surv_sock_send, - .sock_recv = surv_sock_recv, - .sock_filter = surv_sock_filter, - .sock_options = surv_sock_options, +static nni_proto_sock_ops surv0_sock_ops = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, }; -static nni_proto surv_proto = { +static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &surv_sock_ops, - .proto_pipe_ops = &surv_pipe_ops, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, }; int nng_surveyor0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &surv_proto)); + return (nni_proto_open(sidp, &surv0_proto)); } diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h new file mode 100644 index 00000000..a7b6d943 --- /dev/null +++ b/src/protocol/survey0/survey.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H +#define NNG_PROTOCOL_SURVEY0_SURVEY_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_surveyor0_open(nng_socket *); + +#ifndef nng_surveyor_open +#define nng_surveyor_open nng_surveyor0_open +#endif + +#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H diff --git a/src/transport/inproc/CMakeLists.txt b/src/transport/inproc/CMakeLists.txt new file mode 100644 index 00000000..a445da85 --- /dev/null +++ b/src/transport/inproc/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# inproc protocol + +if (NNG_TRANSPORT_INPROC) + set(INPROC_SOURCES transport/inproc/inproc.c transport/inproc/inproc.h) + install(FILES inproc.h DESTINATION include/nng/transport/inproc) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${INPROC_SOURCES} PARENT_SCOPE) diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index c747f7dc..ae64263c 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -475,9 +475,8 @@ struct nni_tran nni_inproc_tran = { .tran_fini = nni_inproc_fini, }; - int nng_inproc_register(void) { - return (nni_tran_register(&nni_inproc_tran)); + return (nni_tran_register(&nni_inproc_tran)); } diff --git a/src/transport/ipc/CMakeLists.txt b/src/transport/ipc/CMakeLists.txt new file mode 100644 index 00000000..1a5496cf --- /dev/null +++ b/src/transport/ipc/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# ipc protocol + +if (NNG_TRANSPORT_IPC) + set(IPC_SOURCES transport/ipc/ipc.c transport/ipc/ipc.h) + install(FILES ipc.h DESTINATION include/nng/transport/ipc) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${IPC_SOURCES} PARENT_SCOPE) diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index afe8afa8..c13dbd34 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -704,9 +704,7 @@ static nni_tran_ep nni_ipc_ep_ops = { .ep_options = nni_ipc_ep_options, }; -// This is the IPC transport linkage, and should be the only global -// symbol in this entire file. -struct nni_tran nni_ipc_tran = { +static nni_tran nni_ipc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "ipc", .tran_ep = &nni_ipc_ep_ops, @@ -714,3 +712,9 @@ struct nni_tran nni_ipc_tran = { .tran_init = nni_ipc_tran_init, .tran_fini = nni_ipc_tran_fini, }; + +int +nng_ipc_register(void) +{ + return (nni_tran_register(&nni_ipc_tran)); +} diff --git a/src/transport/ipc/ipc.h b/src/transport/ipc/ipc.h new file mode 100644 index 00000000..f19762c7 --- /dev/null +++ b/src/transport/ipc/ipc.h @@ -0,0 +1,19 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_IPC_IPC_H +#define NNG_TRANSPORT_IPC_IPC_H + +// ipc transport. This is used for inter-process communication on +// the same host computer. + +extern int nng_ipc_register(void); + +#endif // NNG_TRANSPORT_IPC_IPC_H diff --git a/src/transport/tcp/CMakeLists.txt b/src/transport/tcp/CMakeLists.txt new file mode 100644 index 00000000..305c357a --- /dev/null +++ b/src/transport/tcp/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# TCP protocol + +if (NNG_TRANSPORT_TCP) + set(TCP_SOURCES transport/tcp/tcp.c transport/tcp/tcp.h) + install(FILES tcp.h DESTINATION include/nng/transport/tcp) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${TCP_SOURCES} PARENT_SCOPE) diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 43b2890d..e33db865 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -854,9 +854,7 @@ static nni_tran_ep nni_tcp_ep_ops = { .ep_options = nni_tcp_ep_options, }; -// This is the TCP transport linkage, and should be the only global -// symbol in this entire file. -struct nni_tran nni_tcp_tran = { +static nni_tran nni_tcp_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp", .tran_ep = &nni_tcp_ep_ops, @@ -864,3 +862,9 @@ struct nni_tran nni_tcp_tran = { .tran_init = nni_tcp_tran_init, .tran_fini = nni_tcp_tran_fini, }; + +int +nng_tcp_register(void) +{ + return (nni_tran_register(&nni_tcp_tran)); +} diff --git a/src/transport/tcp/tcp.h b/src/transport/tcp/tcp.h new file mode 100644 index 00000000..b4c79461 --- /dev/null +++ b/src/transport/tcp/tcp.h @@ -0,0 +1,18 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_TCP_TCP_H +#define NNG_TRANSPORT_TCP_TCP_H + +// TCP transport. This is used for communication over TCP/IP. + +extern int nng_tcp_register(void); + +#endif // NNG_TRANSPORT_TCP_TCP_H diff --git a/src/transport/zerotier/CMakeLists.txt b/src/transport/zerotier/CMakeLists.txt new file mode 100644 index 00000000..c1bb0c35 --- /dev/null +++ b/src/transport/zerotier/CMakeLists.txt @@ -0,0 +1,50 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# ZeroTier protocol + +set (NNG_TRANSPORT_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.") + +if (NNG_TRANSPORT_ZEROTIER) + + # We use the libzerotiercore.a library, which is unfortunately a C++ object + # even though it exposes only public C symbols. It would be extremely + # helpful if libzerotiercore didn't make us carry the whole C++ runtime + # behind us. The user must specify the location of the ZeroTier source + # tree (dev branch for now, and already compiled please) by setting the + # NNG_ZEROTIER_SOURCE macro. + # NB: This needs to be the zerotierone tree, not the libzt library. + # This is because we don't access the API, but instead use the low + # level zerotiercore functionality directly. + # NB: As we wind up linking libzerotiercore.a into the application, + # this means that your application will *also* need to either be licensed + # under the GPLv3, or you will need to have a commercial license from + # ZeroTier permitting its use elsewhere. + + enable_language(CXX) + find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_TRANSPORT_ZEROTIER_SOURCE}) + if (NNG_LIBZTCORE) + set(CMAKE_REQUIRED_INCLUDES ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include) + message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}") + set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) + set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES} PARENT_SCOPE) + set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include PARENT_SCOPE) + nng_check_sym(ZT_Node_join ZeroTierOne.h HAVE_ZTCORE) + endif() + if (NOT HAVE_ZTCORE) + message (FATAL_ERROR "Cannot find ZeroTier components") + endif() + message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}") + + set(ZT_SOURCES transport/zerotier/zerotier.c transport/zerotier/zerotier.h) + install(FILES zerotier.h DESTINATION include/nng/transport/zerotier) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${ZT_SOURCES} PARENT_SCOPE) |
