diff options
76 files changed, 2686 insertions, 1161 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index c2815ca3..6cd49b9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,11 +93,83 @@ option (NNG_TESTS "Build and run tests" ON) option (NNG_TOOLS "Build extra tools" OFF) option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS}) option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF) -option (NNG_ENABLE_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) -set (NNG_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.") # Enable access to private APIs for our own use. add_definitions (-DNNG_PRIVATE) +option (NNG_PROTO_BUS0 "Enable BUSv0 protocol." ON) +if (NNG_PROTO_BUS0) + add_definitions (-DNNG_HAVE_BUS0) +endif () + +option (NNG_PROTO_PAIR0 "Enable PAIRv0 protocol." ON) +if (NNG_PROTO_PAIR0) + add_definitions (-DNNG_HAVE_PAIR0) +endif () + +option (NNG_PROTO_PAIR1 "Enable PAIRv1 protocol." ON) +if (NNG_PROTO_PAIR1) + add_definitions (-DNNG_HAVE_PAIR1) +endif () + +option (NNG_PROTO_REQ0 "Enable REQv0 protocol." ON) +if (NNG_PROTO_REQ0) + add_definitions (-DNNG_HAVE_REQ0) +endif () + +option (NNG_PROTO_REP0 "Enable REPv0 protocol." ON) +if (NNG_PROTO_REP0) + add_definitions (-DNNG_HAVE_REP0) +endif () + +option (NNG_PROTO_PUB0 "Enable PUBv0 protocol." ON) +if (NNG_PROTO_PUB0) + add_definitions (-DNNG_HAVE_PUB0) +endif () + +option (NNG_PROTO_SUB0 "Enable SUBv0 protocol." ON) +if (NNG_PROTO_SUB0) + add_definitions (-DNNG_HAVE_SUB0) +endif () + +option (NNG_PROTO_PUSH0 "Enable PUSHv0 protocol." ON) +if (NNG_PROTO_PUSH0) + add_definitions (-DNNG_HAVE_PUSH0) +endif () + +option (NNG_PROTO_PULL0 "Enable PULLv0 protocol." ON) +if (NNG_PROTO_PULL0) + add_definitions (-DNNG_HAVE_PULL0) +endif () + +option (NNG_PROTO_SURVEYOR0 "Enable SURVEYORv0 protocol." ON) +if (NNG_PROTO_SURVEYOR0) + add_definitions (-DNNG_HAVE_SURVEYOR0) +endif () + +option (NNG_PROTO_RESPONDENT0 "Enable RESPONDENTv0 protocol." ON) +if (NNG_PROTO_RESPONDENT0) + add_definitions (-DNNG_HAVE_RESPONDENT0) +endif () + +option (NNG_TRANSPORT_INPROC "Enable inproc transport." ON) +if (NNG_TRANSPORT_INPROC) + add_definitions (-DNNG_HAVE_INPROC) +endif () + +option (NNG_TRANSPORT_IPC "Enable IPC transport." ON) +if (NNG_TRANSPORT_IPC) + add_definitions (-DNNG_HAVE_IPC) +endif () + +option (NNG_TRANSPORT_TCP "Enable TCP transport." ON) +if (NNG_TRANSPORT_TCP) + add_definitions (-DNNG_HAVE_TCP) +endif () + +option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) +if (NNG_TRANSPORT_ZEROTIER) + add_definitions (-DNNG_HAVE_ZEROTIER) +endif () # Platform checks. if (NNG_ENABLE_COVERAGE) @@ -247,39 +319,6 @@ nng_check_sym (strlcat string.h NNG_HAVE_STRLCAT) nng_check_sym (strlcpy string.h NNG_HAVE_STRLCPY) nng_check_sym (strnlen string.h NNG_HAVE_STRNLEN) -# Search for ZeroTier -# We use the libzerotiercore.a library, which is unfortunately a C++ object -# even though it exposes only public C symbols. It would be extremely -# helpful if libzerotiercore didn't make us carry the whole C++ runtime -# behind us. The user must specify the location of the ZeroTier source -# tree (dev branch for now, and already compiled please) by setting the -# NNG_ZEROTIER_SOURCE macro. -# NB: This needs to be the zerotierone tree, not the libzt library. -# This is because we don't access the API, but instead use the low -# level zerotiercore functionality directly. -# NB: As we wind up linking libzerotiercore.a into the application, -# this means that your application will *also* need to either be licensed -# under the GPLv3, or you will need to have a commercial license from -# ZeroTier permitting its use elsewhere. -if (NNG_ENABLE_ZEROTIER) - enable_language(CXX) - find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_ZEROTIER_SOURCE}) - if (NNG_LIBZTCORE) - set(CMAKE_REQUIRED_INCLUDES ${NNG_ZEROTIER_SOURCE}/include) -# set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} c++) -# set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} c++) - message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}") - set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) - set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) - set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_ZEROTIER_SOURCE}/include) - nng_check_sym(ZT_Node_join ZeroTierOne.h NNG_HAVE_ZEROTIER) - endif() - if (NOT NNG_HAVE_ZEROTIER) - message (FATAL_ERROR "Cannot find ZeroTier components") - endif() - message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}") -endif() - add_subdirectory (src) if (NNG_TESTS) diff --git a/docs/nng_bus.adoc b/docs/nng_bus.adoc index 03c43381..dd83062b 100644 --- a/docs/nng_bus.adoc +++ b/docs/nng_bus.adoc @@ -21,11 +21,9 @@ SYNOPSIS [source,c] ---------- -#include <nng/protocol/bus/bus.h> +#include <nng/protocol/bus0/bus.h> -int nng_bus_open(nng_socket *s); int nng_bus0_open(nng_socket *s); - ---------- DESCRIPTION @@ -57,7 +55,7 @@ likely that message loss is to occur. Socket Operations ~~~~~~~~~~~~~~~~~ -The `nng_bus_open()` call creates a bus socket. This socket +The `nng_bus0_open()` call creates a bus socket. This socket may be used to send and receive messages. Sending messages will attempt to deliver to each directly connected peer. diff --git a/docs/nng_inproc.adoc b/docs/nng_inproc.adoc index 9e044f6d..a6d83c95 100644 --- a/docs/nng_inproc.adoc +++ b/docs/nng_inproc.adoc @@ -47,8 +47,7 @@ URI Format This transport uses URIs using the scheme `inproc://`, followed by an arbitrary string of text, terminated by a `NUL` byte. The -entire URI must be less than `NNG_MAXADDRLEN` bytes long, including -the terminating `NUL`. +entire URI must be less than `NNG_MAXADDRLEN` bytes long. Multiple URIs can be used within the same application, and they will not interfere with one another. @@ -65,16 +64,22 @@ When using an `nng_sockaddr` structure, the actual structure is of type [source,c] -------- -#define NNG_AF_INPROC 1 +#define NNG_AF_INPROC 1 <1> #define NNG_MAXADDRLEN 128 -struct nng_sockaddr_inproc { +typedef nng_sockaddr_inproc { + // <2> uint16_t sa_family; // must be NNG_AF_INPROC - uint32_t sa_path[NNG_MAXADDRLEN]; // arbitrary "path" + char sa_path[NNG_MAXADDRLEN]; // arbitrary "path" + // } -------- +<1> The values of these macros may change, so applications +should avoid depending upon their values and instead use them symbolically. +<2> Other members may be present, but only those listed here +are suitable for application use. -The `sa_family` member will have the value `NNG_AF_INPROC` (1). +The `sa_family` member will have the value `NNG_AF_INPROC`. The `sa_path` member is an ASCIIZ string, and may contain any characters, terminated by a `NUL` byte. diff --git a/docs/nng_ipc.adoc b/docs/nng_ipc.adoc new file mode 100644 index 00000000..c5dac4f7 --- /dev/null +++ b/docs/nng_ipc.adoc @@ -0,0 +1,110 @@ +nng_ipc(7) +========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_ipc - IPC transport for nng + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/transport/ipc/ipc.h> + +int nng_ipc_register(void); +---------- + +DESCRIPTION +----------- + +The _nng_ipc_ transport provides communication support between +_nng_ sockets within different processes on the same host. For POSIX +platforms, this is implemented using UNIX domain sockets. For Windows, +this is implemented using Windows Named Pipes. Other platforms may +have different implementation strategies. + +// We need to insert a reference to the nanomsg RFC. + +Registration +~~~~~~~~~~~~ + +The _ipc_ transport is generally built-in to the _nng_ core, so +no extra steps to use it should be necessary. + +URI Format +~~~~~~~~~~ + +This transport uses URIs using the scheme `ipc://`, followed by +a path name in the file system where the socket or named pipe +should be created. + +TIP: On Windows, all names are prefixed by `\.\pipe\` and do not +occupy the normal file system. On POSIX platforms, the path is +taken literally, and is relative to the current directory, unless +an extra leading `/` is provided. For example, `ipc://myname` refers +to the name `myname` in the current directory, whereas `ipc:///tmp/myname` +refers to `myname` located in `/tmp`. + +The entire URI must be less than `NNG_MAXADDRLEN` bytes long. + +Socket Address +~~~~~~~~~~~~~~ + +When using an `nng_sockaddr` structure, the actual structure is of type +`nng_sockaddr_ipc`. This is a `struct` type with the following definition: + +[source,c] +-------- +#define NNG_AF_IPC 2 <1> +#define NNG_MAXADDRLEN 128 + +typedef struct { + // ... <2> + uint16_t sa_family; // must be NNG_AF_IPC + char sa_path[NNG_MAXADDRLEN]; // arbitrary "path" + // ... +} nng_sockaddr_ipc; +-------- +<1> The values of these macros may change, so applications +should avoid depending upon their values and instead use them symbolically. +<2> Other members may be present, but only those listed here +are suitable for application use. + +The `sa_family` member will have the value `NNG_AF_IPC`. +The `sa_path` member is an ASCIIZ string, and may contain any legal +path name (platform-dependent), terminated by a `NUL` byte. + +Transport Options +~~~~~~~~~~~~~~~~~ + +The _ipc_ transport has no special +options.footnote:[Options for security attributes and credentials are planned.] + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_pair.adoc b/docs/nng_pair.adoc new file mode 100644 index 00000000..b55000af --- /dev/null +++ b/docs/nng_pair.adoc @@ -0,0 +1,155 @@ +nng_pair(7) +=========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_pair - pair protocol + +SYNOPSIS +-------- + +.Version 0 +[source,c] +---------- +#include <nng/protocol/pair0/pair.h> + +int nng_pair0_open(nng_socket *s); +---------- + +.Version 1 +[source,c] +---------- +#include <nng/protocol/pair1/pair.h> + +int nng_pair1_open(nng_socket *s); +---------- + +DESCRIPTION +----------- + +The _nng_pair_ protocol implements a peer-to-peer pattern, where +relationships between peers are one-to-one. + +Version 1 of this protocol supports an optional _polyamorous_ mode where a +peer can maintain multiple partnerships. Using this mode requires +some additional sophistication in the application. + +Socket Operations +~~~~~~~~~~~~~~~~~ + +The `nng_pair_open()` call creates a _pair_ socket. Normally, this +pattern will block when attempting to send a message, if no peer is +able to receive the message. + +NOTE: Even though this mode may appear to be "reliable", because back-pressure +prevents discarding messages most of the time, there are topologies involving +_devices_ (see <<nng_device.adoc#,nng_device(3)>>) or raw mode sockets where +messages may be discarded. Applications that require reliable delivery +semantics should consider using <<nng_req.adoc#,nng_req(7)>> sockets, or +implement their own acknowledgement layer on top of pair sockets. + +In order to avoid head-of-line blocking conditions, _polyamorous_ mode pair +sockets (version 1 only) discard messages if they are unable to deliver them +to a peer. + +Protocol Versions +~~~~~~~~~~~~~~~~~ + +Version 0 is the legacy version of this protocol. It lacks any header +information, and is suitable when building simple one-to-one topologies. + +TIP: Use version 0 if you need to communicate with other implementations, +including the legacy https://github.com/nanomsg/nanomsg[nanomsg] library or +https://github.com/go-mangos/mangos[mangos]. + +Version 1 of the protocol offers improved protection against loops when +used with <<nng_device.adoc#,nng_device(3)>>. It also offers _polyamorous_ +mode for forming multiple partnerships on a single socket. + +NOTE: Version 1 of this protocol is considered experimental at this time. + +Polyamorous Mode +~~~~~~~~~~~~~~~~ + +Normally pair sockets are for one-to-one communication, and a given peer +will reject new connections if it already has an active connection to another +peer. + +In _polyamorous_ mode, which is only available with version 1, a socket can +support many one-to-one connections. In this mode, the application must +choose the remote peer to receive an ougoing message by setting the value +of the pipe ID on the outgoing message using +the <<nng_msg_set_pipe.adoc#,nng_msg_set_pipe(3)>> function. + +Most often the value of the outgoing pipe ID will be obtained from an incoming +message using the <<nng_msg_get_pipe.adoc#,nng_msg_get_pipe(3)>> function, +such as when replying to an incoming message. + +In order to prevent head-of-line blocking, if the peer on the given pipe +is not able to receive (or the pipe is no longer available, such as if the +peer has disconnected), then the message will be discarded with no notification +to the sender. + +Protocol Options +~~~~~~~~~~~~~~~~ + +The following protocol-specific options are available. + +`NNG_OPT_PAIR1_POLY`:: + + (Version 1 only). This option enables the use of _polyamorous_ mode. + The value is read-write, and takes an integer boolean value. The default + false value (0) indicates that legacy monogamous mode should be used. + +`NNG_OPT_MAXTTL`:: + + (Version 1 only). Maximum time-to-live. This option is an integer value + between 0 and 255, + inclusive, and is the maximum number of "hops" that a message may + pass through until it is discarded. The default value is 8. A value + of 0 may be used to disable the loop protection, allowing an infinite + number of hops. ++ +TIP: Each node along a forwarding path may have it's own value for the +maximum time-to-live, and performs its own checks before forwarding a message. +Therefore it is helpful if all nodes in the topology use the same value for +this option. + +Protocol Headers +~~~~~~~~~~~~~~~~ + +Version 0 of the pair protocol has no protocol-specific headers. + +Version 1 of the pair protocol uses a single 32-bit unsigned value. The +low-order (big-endian) byte of this value contains a "hop" count, and is +used in conjuction with the `NNG_OPT_MAXTTL` option to guard against +device forwarding loops. This value is initialized to 1, and incremented +each time the message is received by a new node. + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_pub.adoc b/docs/nng_pub.adoc index d4db45ae..4c3b764b 100644 --- a/docs/nng_pub.adoc +++ b/docs/nng_pub.adoc @@ -21,11 +21,9 @@ SYNOPSIS [source,c] ---------- -#include <nng/protocol/pubsub/pubsub.h> +#include <nng/protocol/pubsub0/pub.h> -int nng_pub_open(nng_socket *s); int nng_pub0_open(nng_socket *s); - ---------- DESCRIPTION @@ -51,7 +49,7 @@ accordingly. Socket Operations ~~~~~~~~~~~~~~~~~ -The `nng_pub_open()` call creates a publisher socket. This socket +The `nng_pub0_open()` call creates a publisher socket. This socket may be used to send messages, but is unable to receive them. Attempts to receive messages will result in `NNG_ENOTSUP`. diff --git a/docs/nng_pull.adoc b/docs/nng_pull.adoc new file mode 100644 index 00000000..17614a3d --- /dev/null +++ b/docs/nng_pull.adoc @@ -0,0 +1,85 @@ +nng_pull(7) +=========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_pull - pull protocol + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/protocol/pipeline0/pull.h> + +int nng_pull0_open(nng_socket *s); +---------- + +DESCRIPTION +----------- + +The _nng_pull_ protocol is one half of a pipeline pattern. The other half +is the <<nng_push.adoc#,nng_push(7)>> protocol. + +In the pipeline pattern, pushers distribute messages to pullers. +Each message sent +by a pusher will be sent to one of its peer pullers, +chosen in a round-robin fashion +from the set of connected peers available for receiving. +This property makes this pattern useful in load-balancing scenarios. + +Socket Operations +~~~~~~~~~~~~~~~~~ + +The `nng_pull0_open()` call creates a puller socket. This socket +may be used to receive messages, but is unable to send them. Attempts +to send messages will result in `NNG_ENOTSUP`. + +When receiving messages, the _nng_pull_ protocol accepts messages as +they arrive from peers. If two peers both have a message ready, the +order in which messages are handled is undefined. + +Protocol Versions +~~~~~~~~~~~~~~~~~ + +Only version 0 of this protocol is supported. (At the time of writing, +no other versions of this protocol have been defined.) + +Protocol Options +~~~~~~~~~~~~~~~~ + +The _nng_pull_ protocol has no protocol-specific options. + +Protocol Headers +~~~~~~~~~~~~~~~~ + +The _nng_pull_ protocol has no protocol-specific headers. + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> +<<nng_push.adoc#,nng_push(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_push.adoc b/docs/nng_push.adoc new file mode 100644 index 00000000..faf783b6 --- /dev/null +++ b/docs/nng_push.adoc @@ -0,0 +1,93 @@ +nng_push(7) +=========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_push - push protocol + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/protocol/pipeline0/push.h> + +int nng_push0_open(nng_socket *s); +---------- + +DESCRIPTION +----------- + +The _nng_push_ protocol is one half of a pipeline pattern. The +other side is the <<nng_pull.adoc#,nng_pull(7)>> protocol. + +In the pipeline pattern, pushers distribute messages to pullers. +Each message sent +by a pusher will be sent to one of its peer pullers, +chosen in a round-robin fashion +from the set of connected peers available for receiving. +This property makes this pattern useful in load-balancing scenarios. + +Socket Operations +~~~~~~~~~~~~~~~~~ + +The `nng_push0_open()` call creates a pusher socket. This socket +may be used to send messages, but is unable to receive them. Attempts +to receive messages will result in `NNG_ENOTSUP`. + +Send operations will observe flow control (back-pressure), so that +only peers capable of accepting a message will be considered. If no +peer is available to receive a message, then the send operation will +wait until one is available, or the operation times out. + +NOTE: Although the pipeline protocol honors flow control, and attempts +to avoid dropping messages, no guarantee of delivery is made. Furthermore, +as there is no capability for message acknowledgement, applications that +need reliable delivery are encouraged to consider the +<<nng_req.adoc#,nng_req(7)>> protocol instead. + +Protocol Versions +~~~~~~~~~~~~~~~~~ + +Only version 0 of this protocol is supported. (At the time of writing, +no other versions of this protocol have been defined.) + +Protocol Options +~~~~~~~~~~~~~~~~ + +The _nng_push_ protocol has no protocol-specific options. + +Protocol Headers +~~~~~~~~~~~~~~~~ + +The _nng_push_ protocol has no protocol-specific headers. + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> +<<nng_pull.adoc#,nng_pull(7)>> +<<nng_req.adoc#,nng_req(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_sub.adoc b/docs/nng_sub.adoc index 0b409904..5359acec 100644 --- a/docs/nng_sub.adoc +++ b/docs/nng_sub.adoc @@ -21,11 +21,9 @@ SYNOPSIS [source,c] ---------- -#include <nng/protocol/pubsub/pubsub.h> +#include <nng/protocol/pubsub0/sub.h> -int nng_sub_open(nng_socket *s); int nng_sub0_open(nng_socket *s); - ---------- DESCRIPTION @@ -51,7 +49,7 @@ accordingly. Socket Operations ~~~~~~~~~~~~~~~~~ -The `nng_sub_open()` call creates a subscriber socket. This socket +The `nng_sub0_open()` call creates a subscriber socket. This socket may be used to receive messages, but is unable to send them. Attempts to send messages will result in `NNG_ENOTSUP`. diff --git a/docs/nng_tcp.adoc b/docs/nng_tcp.adoc new file mode 100644 index 00000000..a1d0cdab --- /dev/null +++ b/docs/nng_tcp.adoc @@ -0,0 +1,137 @@ +nng_tcp(7) +========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_tcp - TCP/IP transport for nng + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/transport/tcp/tcp.h> + +int nng_tcp_register(void); +---------- + +DESCRIPTION +----------- + +The _nng_tcp_ transport provides communication support between +_nng_ sockets across a TCP/IP network. Both IPv4 and IPv6 +are supported when the underlying platform also supports it. + +// We need to insert a reference to the nanomsg RFC. + +Registration +~~~~~~~~~~~~ + +The _tcp_ transport is generally built-in to the _nng_ core, so +no extra steps to use it should be necessary. + +URI Format +~~~~~~~~~~ + +This transport uses URIs using the scheme `tcp://`, followed by +an IP address or hostname, followed by a colon and finally a +TCP port number. For example, to contact port 80 on the localhost +either of the following URIs could be used: `tcp://127.0.0.1:80` or +`tcp://localhost:80`. + +When specifying IPv6 addresses, the address must be enclosed in +square brackets (`[]`) to avoid confusion with the final colon +separating the port. + +For example, the same port 80 on the IPv6 loopback address ('::1') would +be specified as `tcp://[::1]:80`. + +NOTE: When using symbolic names, the name is resolved when the +name is first used. _nng_ won't become aware of changes in the +name resolution until restart, +usually.footnote:[This is a bug and will likely be fixed in the future.] + +The special value of 0 (`INADDR_ANY`) can be used for a listener +to indicate that it should listen on all interfaces on the host. +A short-hand for this form is to either omit the address, or specify +the asterisk (`*`) character. For example, the following three +URIs are all equivalent, and could be used to listen to port 9999 +on the host: + + 1. `tcp://0.0.0.0:9999` + 2. `tcp://*:9999` + 3. `tcp://:9999` + +The entire URI must be less than `NNG_MAXADDRLEN` bytes long. + +Socket Address +~~~~~~~~~~~~~~ + +When using an `nng_sockaddr` structure, the actual structure is either +of type `nng_sockaddr_in` (for IPv4) or `nng_sockaddr_in6` (for IPv6). +These are `struct` types with the following definitions: + +[source,c] +-------- +#define NNG_AF_INET 3 <1> +#define NNG_AF_INET6 4 +#define NNG_MAXADDRLEN 128 + +typedef struct { + // ... <2> + uint16_t sa_family; // must be NNG_AF_INET + uint16_t sa_port; // TCP port number + uint32_t sa_addr; + // ... +} nng_sockaddr_in; + +typedef struct { + // ... <2> + uint16_t sa_family; // must be NNG_AF_INET6 + uint16_t sa_port; // TCP port number + uint8_t sa_addr[16]; + // ... +} nng_sockaddr_in6; +-------- +<1> The values of these macros may change, so applications +should avoid depending upon their values and instead use them symbolically. +<2> Other members may be present, but only those listed here +are suitable for application use. + +The `sa_family` member will have the value `NNG_AF_INET` or `NNG_AF_INET6`. +The `sa_port` and `sa_addr` are the TCP port number and address, both in +network byte order (most significant byte is first). + +Transport Options +~~~~~~~~~~~~~~~~~ + +The _tcp_ transport has no special +options.footnote:[Options for TCP keepalive, linger, and nodelay are planned.] + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/perf/perf.c b/perf/perf.c index 9333d4f5..1d89fc61 100644 --- a/perf/perf.c +++ b/perf/perf.c @@ -22,6 +22,24 @@ // change without notice, and not part of the stable API or ABI. #include "core/nng_impl.h" +#if defined(NNG_ENABLE_PAIR1) +#include "protocol/pair1/pair.h" + +#elif defined(NNG_ENABLE_PAIR0) +#include "protocol/pair0/pair.h" + +#else + +static void die(const char *, ...); + +static int +nng_pair_open(nng_socket *rv) +{ + die("No pair protocol enabled in this build!"); + return (NNG_ENOTSUP); +} +#endif // NNG_ENABLE_PAIR + static void latency_client(const char *, int, int); static void latency_server(const char *, int, int); static void throughput_client(const char *, int, int); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a5a49052..4901789b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -74,30 +74,6 @@ set (NNG_SOURCES core/timer.h core/transport.c core/transport.h - - protocol/bus/bus.c - - protocol/pair/pair_v0.c - protocol/pair/pair_v1.c - - protocol/pipeline/pull.c - protocol/pipeline/push.c - - protocol/pubsub/pub.c - protocol/pubsub/sub.c - - protocol/reqrep/rep.c - protocol/reqrep/req.c - - protocol/survey/respond.c - protocol/survey/survey.c - - transport/inproc/inproc.c - - transport/ipc/ipc.c - - transport/tcp/tcp.c - ) if (NNG_PLATFORM_POSIX) @@ -146,14 +122,19 @@ endif() install(FILES transport/inproc/inproc.h DESTINATION include/nng/transport/inproc) -if (NNG_ENABLE_ZEROTIER) - set (NNG_SOURCES ${NNG_SOURCES} - transport/zerotier/zerotier.c - transport/zerotier/zerotier.h - ) - install(FILES transport/zerotier/zerotier.h - DESTINATION include/nng/transport/zerotier) -endif() + +add_subdirectory(protocol/bus0) +add_subdirectory(protocol/pair0) +add_subdirectory(protocol/pair1) +add_subdirectory(protocol/pipeline0) +add_subdirectory(protocol/pubsub0) +add_subdirectory(protocol/reqrep0) +add_subdirectory(protocol/survey0) + +add_subdirectory(transport/inproc) +add_subdirectory(transport/ipc) +add_subdirectory(transport/tcp) +add_subdirectory(transport/zerotier) include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src ${NNG_REQUIRED_INCLUDES}) @@ -216,3 +197,7 @@ install (TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_static LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) + +# Promote settings to parent +set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} PARENT_SCOPE) +set(NNG_REQUIRED_LFLAGS ${NNG_REQUIRED_LFLAGS} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 3f1ffcb5..10bffaa4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -297,7 +297,6 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - nni_aio *aio; if (mq->mq_cb_fn != NULL) { int flags = 0; diff --git a/src/core/protocol.h b/src/core/protocol.h index 5db259f0..39bde059 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -147,21 +147,26 @@ extern int nni_proto_open(nng_socket *, const nni_proto *); // Protocol numbers are never more than 16 bits. Also, there will never be // a valid protocol numbered 0 (NNG_PROTO_NONE). #define NNI_PROTO(major, minor) (((major) *16) + (minor)) -enum nng_proto_enum { - NNI_PROTO_NONE = NNI_PROTO(0, 0), - NNI_PROTO_PAIR_V0 = NNI_PROTO(1, 0), - NNI_PROTO_PAIR_V1 = NNI_PROTO(1, 1), - NNI_PROTO_PUB_V0 = NNI_PROTO(2, 0), - NNI_PROTO_SUB_V0 = NNI_PROTO(2, 1), - NNI_PROTO_REQ_V0 = NNI_PROTO(3, 0), - NNI_PROTO_REP_V0 = NNI_PROTO(3, 1), - NNI_PROTO_PUSH_V0 = NNI_PROTO(5, 0), - NNI_PROTO_PULL_V0 = NNI_PROTO(5, 1), - NNI_PROTO_SURVEYOR_V0 = NNI_PROTO(6, 2), - NNI_PROTO_RESPONDENT_V0 = NNI_PROTO(6, 3), - NNI_PROTO_BUS_V0 = NNI_PROTO(7, 0), - NNI_PROTO_STAR_V0 = NNI_PROTO(100, 0), -}; + +// Protocol major numbers. This is here for documentation only, and +// to serve as a "registry" for managing new protocol numbers. Consider +// updating this table when adding new protocols. +// +// Protocol Maj Min Name Notes +// ------------------------------------------- +// NONE 0 0 reserved +// PAIRv0 1 0 pair +// PAIRv1 1 1 pair1 nng only, experimental +// PUBv0 2 0 pub +// SUBv0 2 1 sub +// REQv0 3 0 req +// REPv0 3 1 rep +// PUSHv0 5 0 push +// PULLv0 5 1 pull +// SURVEYORv0 6 2 surveyor minors 0 & 1 retired +// RESPONDENTv0 6 3 respondent +// BUSv0 7 0 bus +// STARv0 100 0 star mangos only, experimental extern int nni_proto_sys_init(void); extern void nni_proto_sys_fini(void); diff --git a/src/core/socket.c b/src/core/socket.c index c9b70ccb..4a84b639 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -102,7 +102,6 @@ static int nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) { int rv; - uint32_t flags; nni_notifyfd *fd; nni_msgq * mq; nni_msgq_cb cb; diff --git a/src/core/transport.c b/src/core/transport.c index 6dcf4538..af9c93fb 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -10,6 +10,9 @@ #include "core/nng_impl.h" #include "transport/inproc/inproc.h" +#include "transport/ipc/ipc.h" +#include "transport/tcp/tcp.h" +#include "transport/zerotier/zerotier.h" #include <stdio.h> #include <string.h> @@ -51,6 +54,11 @@ nni_tran_register(const nni_tran *tran) nni_mtx_lock(&nni_tran_lk); // Check to see if the transport is already registered... NNI_LIST_FOREACH (&nni_tran_list, t) { + if (tran->tran_init == t->t_tran.tran_init) { + nni_mtx_unlock(&nni_tran_lk); + // Same transport, duplicate registration. + return (0); + } if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) { nni_mtx_unlock(&nni_tran_lk); return (NNG_ESTATE); @@ -129,20 +137,40 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz) // nni_tran_sys_init initializes the entire transport subsystem, including // each individual transport. + +typedef int (*nni_tran_ctor)(void); + +static nni_tran_ctor nni_tran_ctors[] = { +#ifdef NNG_HAVE_INPROC + nng_inproc_register, +#endif +#ifdef NNG_HAVE_IPC + nng_ipc_register, +#endif +#ifdef NNG_HAVE_TCP + nng_tcp_register, +#endif +#ifdef NNI_HAVE_ZEROTIER + nng_zt_register, +#endif + NULL, +}; + int nni_tran_sys_init(void) { - int rv; + int i; nni_tran_inited = 1; NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node); nni_mtx_init(&nni_tran_lk); - if (((rv = nng_inproc_register()) != 0) || - ((rv = nni_tran_register(&nni_ipc_tran)) != 0) || - ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) { - nni_tran_sys_fini(); - return (rv); + for (i = 0; nni_tran_ctors[i] != NULL; i++) { + int rv; + if ((rv = (nni_tran_ctors[i])()) != 0) { + nni_tran_sys_fini(); + return (rv); + } } return (0); } @@ -337,33 +337,6 @@ enum nng_flag_enum { NNG_FLAG_NONBLOCK = 2, // Non-blocking operations. }; -// Builtin protocol socket constructors. -NNG_DECL int nng_bus0_open(nng_socket *); -NNG_DECL int nng_pair0_open(nng_socket *); -NNG_DECL int nng_pair1_open(nng_socket *); -NNG_DECL int nng_pub0_open(nng_socket *); -NNG_DECL int nng_sub0_open(nng_socket *); -NNG_DECL int nng_push0_open(nng_socket *); -NNG_DECL int nng_pull0_open(nng_socket *); -NNG_DECL int nng_req0_open(nng_socket *); -NNG_DECL int nng_rep0_open(nng_socket *); -NNG_DECL int nng_surveyor0_open(nng_socket *); -NNG_DECL int nng_respondent0_open(nng_socket *); - -// Default versions. These provide compile time defaults; note that -// the actual protocols are baked into the binary; this should avoid -// suprising. Choosing a new protocol should be done explicitly. -#define nng_bus_open nng_bus0_open -#define nng_pair_open nng_pair1_open -#define nng_pub_open nng_pub0_open -#define nng_sub_open nng_sub0_open -#define nng_push_open nng_push0_open -#define nng_pull_open nng_pull0_open -#define nng_req_open nng_req0_open -#define nng_rep_open nng_rep0_open -#define nng_surveyor_open nng_surveyor0_open -#define nng_respondent_open nng_respondent0_open - // Options. #define NNG_OPT_SOCKNAME "socket-name" #define NNG_OPT_DOMAIN "compat:domain" // legacy compat only @@ -385,15 +358,6 @@ NNG_DECL int nng_respondent0_open(nng_socket *); #define NNG_OPT_RECONNMINT "reconnect-time-min" #define NNG_OPT_RECONNMAXT "reconnect-time-max" -#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" - -#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" -#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" - -#define NNG_OPT_REQ_RESENDTIME "req:resend-time" - -#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" - // XXX: TBD: priorities, socket names, ipv4only // Statistics. These are for informational purposes only, and subject diff --git a/src/nng_compat.c b/src/nng_compat.c index f8a7ceb0..482834a1 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -10,6 +10,16 @@ #include "nng_compat.h" #include "nng.h" +#include "protocol/bus0/bus.h" +#include "protocol/pair0/pair.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "protocol/pubsub0/pub.h" +#include "protocol/pubsub0/sub.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" #include <stdio.h> #include <string.h> @@ -92,17 +102,37 @@ static const struct { uint16_t p_id; int (*p_open)(nng_socket *); } nn_protocols[] = { - // clang-format off +// clang-format off +#ifdef NNG_HAVE_BUS0 { NN_BUS, nng_bus0_open }, +#endif +#ifdef NNG_HAVE_PAIR0 { NN_PAIR, nng_pair0_open }, +#endif +#ifdef NNG_HAVE_PUSH0 { NN_PUSH, nng_push0_open }, +#endif +#ifdef NNG_HAVE_PULL0 { NN_PULL, nng_pull0_open }, +#endif +#ifdef NNG_HAVE_PUB0 { NN_PUB, nng_pub0_open }, +#endif +#ifdef NNG_HAVE_SUB0 { NN_SUB, nng_sub0_open }, +#endif +#ifdef NNG_HAVE_REQ0 { NN_REQ, nng_req0_open }, +#endif +#ifdef NNG_HAVE_REP0 { NN_REP, nng_rep0_open }, +#endif +#ifdef NNG_HAVE_SURVEYOR0 { NN_SURVEYOR, nng_surveyor0_open }, +#endif +#ifdef NNG_HAVE_RESPONDENT0 { NN_RESPONDENT, nng_respondent0_open }, +#endif { 0, NULL }, // clang-format on }; @@ -115,7 +145,7 @@ nn_socket(int domain, int protocol) int i; if ((domain != AF_SP) && (domain != AF_SP_RAW)) { - nn_seterror(EAFNOSUPPORT); + errno = EAFNOSUPPORT; return (-1); } @@ -125,7 +155,7 @@ nn_socket(int domain, int protocol) } } if (nn_protocols[i].p_open == NULL) { - nn_seterror(ENOTSUP); + errno = ENOTSUP; return (-1); } diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt new file mode 100644 index 00000000..5071054a --- /dev/null +++ b/src/protocol/bus0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Bus protocol + +if (NNG_PROTO_BUS0) + set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h) + install(FILES bus.h DESTINATION include/nng/protocol/bus0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c index 6ec0066b..3e15000b 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus0/bus.c @@ -12,31 +12,36 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/bus0/bus.h" // Bus protocol. The BUS protocol, each peer sends a message to its peers. // However, bus protocols do not "forward" (absent a device). So in order // for each participant to receive the message, each sender must be connected // to every other node in the network (full mesh). -typedef struct bus_pipe bus_pipe; -typedef struct bus_sock bus_sock; +#ifndef NNI_PROTO_BUS_V0 +#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0) +#endif -static void bus_sock_getq(bus_sock *); -static void bus_sock_send(void *, nni_aio *); -static void bus_sock_recv(void *, nni_aio *); +typedef struct bus0_pipe bus0_pipe; +typedef struct bus0_sock bus0_sock; -static void bus_pipe_getq(bus_pipe *); -static void bus_pipe_send(bus_pipe *); -static void bus_pipe_recv(bus_pipe *); +static void bus0_sock_getq(bus0_sock *); +static void bus0_sock_send(void *, nni_aio *); +static void bus0_sock_recv(void *, nni_aio *); -static void bus_sock_getq_cb(void *); -static void bus_pipe_getq_cb(void *); -static void bus_pipe_send_cb(void *); -static void bus_pipe_recv_cb(void *); -static void bus_pipe_putq_cb(void *); +static void bus0_pipe_getq(bus0_pipe *); +static void bus0_pipe_send(bus0_pipe *); +static void bus0_pipe_recv(bus0_pipe *); -// A bus_sock is our per-socket protocol private structure. -struct bus_sock { +static void bus0_sock_getq_cb(void *); +static void bus0_pipe_getq_cb(void *); +static void bus0_pipe_send_cb(void *); +static void bus0_pipe_recv_cb(void *); +static void bus0_pipe_putq_cb(void *); + +// bus0_sock is our per-socket protocol private structure. +struct bus0_sock { int raw; nni_aio * aio_getq; nni_list pipes; @@ -45,10 +50,10 @@ struct bus_sock { nni_msgq *urq; }; -// A bus_pipe is our per-pipe protocol private structure. -struct bus_pipe { +// bus0_pipe is our per-pipe protocol private structure. +struct bus0_pipe { nni_pipe * npipe; - bus_sock * psock; + bus0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -59,9 +64,9 @@ struct bus_pipe { }; static void -bus_sock_fini(void *arg) +bus0_sock_fini(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -70,18 +75,18 @@ bus_sock_fini(void *arg) } static int -bus_sock_init(void **sp, nni_sock *nsock) +bus0_sock_init(void **sp, nni_sock *nsock) { - bus_sock *s; - int rv; + bus0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&s->pipes, bus_pipe, node); + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) { - bus_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + bus0_sock_fini(s); return (rv); } s->raw = 0; @@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock) } static void -bus_sock_open(void *arg) +bus0_sock_open(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_close(void *arg) +bus0_sock_close(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -bus_pipe_fini(void *arg) +bus0_pipe_fini(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -123,10 +128,10 @@ bus_pipe_fini(void *arg) } static int -bus_pipe_init(void **pp, nni_pipe *npipe, void *s) +bus0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - bus_pipe *p; - int rv; + bus0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) { - bus_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + bus0_pipe_fini(p); return (rv); } @@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -bus_pipe_start(void *arg) +bus0_pipe_start(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); - bus_pipe_recv(p); - bus_pipe_getq(p); + bus0_pipe_recv(p); + bus0_pipe_getq(p); return (0); } static void -bus_pipe_stop(void *arg) +bus0_pipe_stop(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_msgq_close(p->sendq); @@ -185,9 +190,9 @@ bus_pipe_stop(void *arg) } static void -bus_pipe_getq_cb(void *arg) +bus0_pipe_getq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { // closed? @@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg) } static void -bus_pipe_send_cb(void *arg) +bus0_pipe_send_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { // closed? @@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg) return; } - bus_pipe_getq(p); + bus0_pipe_getq(p); } static void -bus_pipe_recv_cb(void *arg) +bus0_pipe_recv_cb(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; - nni_msg * msg; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); @@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg) } static void -bus_pipe_putq_cb(void *arg) +bus0_pipe_putq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg) } // Wait for another recv. - bus_pipe_recv(p); + bus0_pipe_recv(p); } static void -bus_sock_getq_cb(void *arg) +bus0_sock_getq_cb(void *arg) { - bus_sock *s = arg; - bus_pipe *p; - bus_pipe *lastp; - nni_msg * msg; - nni_msg * dup; - uint32_t sender; + bus0_sock *s = arg; + bus0_pipe *p; + bus0_pipe *lastp; + nni_msg * msg; + nni_msg * dup; + uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg) nni_msg_free(msg); } - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_getq(bus_sock *s) +bus0_sock_getq(bus0_sock *s) { nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -bus_pipe_getq(bus_pipe *p) +bus0_pipe_getq(bus0_pipe *p) { nni_msgq_aio_get(p->sendq, p->aio_getq); } static void -bus_pipe_recv(bus_pipe *p) +bus0_pipe_recv(bus0_pipe *p) { nni_pipe_recv(p->npipe, p->aio_recv); } static int -bus_sock_setopt_raw(void *arg, const void *buf, size_t sz) +bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) +bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -bus_sock_send(void *arg, nni_aio *aio) +bus0_sock_send(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -bus_sock_recv(void *arg, nni_aio *aio) +bus0_sock_recv(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops bus_pipe_ops = { - .pipe_init = bus_pipe_init, - .pipe_fini = bus_pipe_fini, - .pipe_start = bus_pipe_start, - .pipe_stop = bus_pipe_stop, +static nni_proto_pipe_ops bus0_pipe_ops = { + .pipe_init = bus0_pipe_init, + .pipe_fini = bus0_pipe_fini, + .pipe_start = bus0_pipe_start, + .pipe_stop = bus0_pipe_stop, }; -static nni_proto_sock_option bus_sock_options[] = { +static nni_proto_sock_option bus0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = bus_sock_getopt_raw, - .pso_setopt = bus_sock_setopt_raw, + .pso_getopt = bus0_sock_getopt_raw, + .pso_setopt = bus0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops bus_sock_ops = { - .sock_init = bus_sock_init, - .sock_fini = bus_sock_fini, - .sock_open = bus_sock_open, - .sock_close = bus_sock_close, - .sock_send = bus_sock_send, - .sock_recv = bus_sock_recv, - .sock_options = bus_sock_options, +static nni_proto_sock_ops bus0_sock_ops = { + .sock_init = bus0_sock_init, + .sock_fini = bus0_sock_fini, + .sock_open = bus0_sock_open, + .sock_close = bus0_sock_close, + .sock_send = bus0_sock_send, + .sock_recv = bus0_sock_recv, + .sock_options = bus0_sock_options, }; -static nni_proto bus_proto = { +static nni_proto bus0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_BUS_V0, "bus" }, .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &bus_sock_ops, - .proto_pipe_ops = &bus_pipe_ops, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, }; int nng_bus0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &bus_proto)); + return (nni_proto_open(sidp, &bus0_proto)); } diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h new file mode 100644 index 00000000..0ef3d391 --- /dev/null +++ b/src/protocol/bus0/bus.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_BUS0_BUS_H +#define NNG_PROTOCOL_BUS0_BUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_bus0_open(nng_socket *); + +#ifndef nng_bus_open +#define nng_bus_open nng_bus0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_BUS0_BUS_H diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt new file mode 100644 index 00000000..68e7ad34 --- /dev/null +++ b/src/protocol/pair0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv0 protocol + +if (NNG_PROTO_PAIR0) + set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c index 93cd1497..bac405b8 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair0/pair.c @@ -17,6 +17,10 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. +#ifndef NNI_PROTO_PAIR_V0 +#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0) +#endif + typedef struct pair0_pipe pair0_pipe; typedef struct pair0_sock pair0_sock; diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h new file mode 100644 index 00000000..6828c921 --- /dev/null +++ b/src/protocol/pair0/pair.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR0_PAIR_H +#define NNG_PROTOCOL_PAIR0_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair0_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR0_PAIR_H diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt new file mode 100644 index 00000000..f35d6959 --- /dev/null +++ b/src/protocol/pair1/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv1 protocol + +if (NNG_PROTO_PAIR1) + set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair1) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c index e14d06d5..3f6f63fc 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair1/pair.c @@ -13,10 +13,16 @@ #include "core/nng_impl.h" +#include "protocol/pair1/pair.h" + // Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, // usually, but it can support a polyamorous mode where a single server can // communicate with multiple partners. +#ifndef NNI_PROTO_PAIR_V1 +#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1) +#endif + typedef struct pair1_pipe pair1_pipe; typedef struct pair1_sock pair1_sock; diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h new file mode 100644 index 00000000..bc519d9f --- /dev/null +++ b/src/protocol/pair1/pair.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR1_PAIR_H +#define NNG_PROTOCOL_PAIR1_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair1_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair1_open +#endif + +#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR1_PAIR_H diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt new file mode 100644 index 00000000..6153c5a7 --- /dev/null +++ b/src/protocol/pipeline0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUSH0) + set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h) + install(FILES push.h DESTINATION include/nng/protocol/pipeline0) +endif() + +if (NNG_PROTO_PULL0) + set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h) + install(FILES pull.h DESTINATION include/nng/protocol/pipeline0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c index 9685f0a1..8c16cb17 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -15,31 +15,39 @@ // Pull protocol. The PULL protocol is the "read" side of a pipeline. -typedef struct pull_pipe pull_pipe; -typedef struct pull_sock pull_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void pull_putq_cb(void *); -static void pull_recv_cb(void *); -static void pull_putq(pull_pipe *, nni_msg *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// A pull_sock is our per-socket protocol private structure. -struct pull_sock { +typedef struct pull0_pipe pull0_pipe; +typedef struct pull0_sock pull0_sock; + +static void pull0_putq_cb(void *); +static void pull0_recv_cb(void *); +static void pull0_putq(pull0_pipe *, nni_msg *); + +// pull0_sock is our per-socket protocol private structure. +struct pull0_sock { nni_msgq *urq; int raw; }; -// A pull_pipe is our per-pipe protocol private structure. -struct pull_pipe { - nni_pipe * pipe; - pull_sock *pull; - nni_aio * putq_aio; - nni_aio * recv_aio; +// pull0_pipe is our per-pipe protocol private structure. +struct pull0_pipe { + nni_pipe * pipe; + pull0_sock *pull; + nni_aio * putq_aio; + nni_aio * recv_aio; }; static int -pull_sock_init(void **sp, nni_sock *sock) +pull0_sock_init(void **sp, nni_sock *sock) { - pull_sock *s; + pull0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock) } static void -pull_sock_fini(void *arg) +pull0_sock_fini(void *arg) { - pull_sock *s = arg; + pull0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -pull_pipe_fini(void *arg) +pull0_pipe_fini(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_fini(p->putq_aio); nni_aio_fini(p->recv_aio); @@ -70,17 +78,17 @@ pull_pipe_fini(void *arg) } static int -pull_pipe_init(void **pp, nni_pipe *pipe, void *s) +pull0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pull_pipe *p; - int rv; + pull0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) { - pull_pipe_fini(p); + if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + pull0_pipe_fini(p); return (rv); } @@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pull_pipe_start(void *arg) +pull0_pipe_start(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; // Start the pending pull... nni_pipe_recv(p->pipe, p->recv_aio); @@ -102,20 +110,20 @@ pull_pipe_start(void *arg) } static void -pull_pipe_stop(void *arg) +pull0_pipe_stop(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_stop(p->putq_aio); nni_aio_stop(p->recv_aio); } static void -pull_recv_cb(void *arg) +pull0_recv_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->recv_aio; - nni_msg * msg; + pull0_pipe *p = arg; + nni_aio * aio = p->recv_aio; + nni_msg * msg; if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. @@ -127,14 +135,14 @@ pull_recv_cb(void *arg) msg = nni_aio_get_msg(aio); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); nni_aio_set_msg(aio, NULL); - pull_putq(p, msg); + pull0_putq(p, msg); } static void -pull_putq_cb(void *arg) +pull0_putq_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->putq_aio; + pull0_pipe *p = arg; + nni_aio * aio = p->putq_aio; if (nni_aio_result(aio) != 0) { // If we failed to put, probably NNG_ECLOSED, nothing else @@ -148,11 +156,11 @@ pull_putq_cb(void *arg) nni_pipe_recv(p->pipe, p->recv_aio); } -// nni_pull_putq schedules a put operation to the user socket (sendup). +// pull0_putq schedules a put operation to the user socket (sendup). static void -pull_putq(pull_pipe *p, nni_msg *msg) +pull0_putq(pull0_pipe *p, nni_msg *msg) { - pull_sock *s = p->pull; + pull0_sock *s = p->pull; nni_aio_set_msg(p->putq_aio, msg); @@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg) } static void -pull_sock_open(void *arg) +pull0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -pull_sock_close(void *arg) +pull0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static int -pull_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pull_sock_send(void *arg, nni_aio *aio) +pull0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pull_sock_recv(void *arg, nni_aio *aio) +pull0_sock_recv(void *arg, nni_aio *aio) { - pull_sock *s = arg; + pull0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops pull_pipe_ops = { - .pipe_init = pull_pipe_init, - .pipe_fini = pull_pipe_fini, - .pipe_start = pull_pipe_start, - .pipe_stop = pull_pipe_stop, +static nni_proto_pipe_ops pull0_pipe_ops = { + .pipe_init = pull0_pipe_init, + .pipe_fini = pull0_pipe_fini, + .pipe_start = pull0_pipe_start, + .pipe_stop = pull0_pipe_stop, }; -static nni_proto_sock_option pull_sock_options[] = { +static nni_proto_sock_option pull0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pull_sock_getopt_raw, - .pso_setopt = pull_sock_setopt_raw, + .pso_getopt = pull0_sock_getopt_raw, + .pso_setopt = pull0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pull_sock_ops = { - .sock_init = pull_sock_init, - .sock_fini = pull_sock_fini, - .sock_open = pull_sock_open, - .sock_close = pull_sock_close, - .sock_send = pull_sock_send, - .sock_recv = pull_sock_recv, - .sock_options = pull_sock_options, +static nni_proto_sock_ops pull0_sock_ops = { + .sock_init = pull0_sock_init, + .sock_fini = pull0_sock_fini, + .sock_open = pull0_sock_open, + .sock_close = pull0_sock_close, + .sock_send = pull0_sock_send, + .sock_recv = pull0_sock_recv, + .sock_options = pull0_sock_options, }; -static nni_proto pull_proto = { +static nni_proto pull0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PULL_V0, "pull" }, .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_pipe_ops = &pull_pipe_ops, - .proto_sock_ops = &pull_sock_ops, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, }; int nng_pull0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pull_proto)); + return (nni_proto_open(sidp, &pull0_proto)); } diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h new file mode 100644 index 00000000..75bded03 --- /dev/null +++ b/src/protocol/pipeline0/pull.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H +#define NNG_PROTOCOL_PIPELINE0_PULL_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pull0_open(nng_socket *); + +#ifndef nng_pull_open +#define nng_pull_open nng_pull0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PULL_H diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c index 9ff74558..3dd83fe0 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline0/push.c @@ -17,23 +17,31 @@ // Push distributes fairly, or tries to, by giving messages in round-robin // order. -typedef struct push_pipe push_pipe; -typedef struct push_sock push_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void push_send_cb(void *); -static void push_recv_cb(void *); -static void push_getq_cb(void *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// An nni_push_sock is our per-socket protocol private structure. -struct push_sock { +typedef struct push0_pipe push0_pipe; +typedef struct push0_sock push0_sock; + +static void push0_send_cb(void *); +static void push0_recv_cb(void *); +static void push0_getq_cb(void *); + +// push0_sock is our per-socket protocol private structure. +struct push0_sock { nni_msgq *uwq; int raw; }; -// An nni_push_pipe is our per-pipe protocol private structure. -struct push_pipe { +// push0_pipe is our per-pipe protocol private structure. +struct push0_pipe { nni_pipe * pipe; - push_sock * push; + push0_sock * push; nni_list_node node; nni_aio *aio_recv; @@ -42,9 +50,9 @@ struct push_pipe { }; static int -push_sock_init(void **sp, nni_sock *sock) +push0_sock_init(void **sp, nni_sock *sock) { - push_sock *s; + push0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock) } static void -push_sock_fini(void *arg) +push0_sock_fini(void *arg) { - push_sock *s = arg; + push0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -push_sock_open(void *arg) +push0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_sock_close(void *arg) +push0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_pipe_fini(void *arg) +push0_pipe_fini(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_fini(p->aio_recv); nni_aio_fini(p->aio_send); @@ -87,18 +95,18 @@ push_pipe_fini(void *arg) } static int -push_pipe_init(void **pp, nni_pipe *pipe, void *s) +push0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - push_pipe *p; - int rv; + push0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) { - push_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + push0_pipe_fini(p); return (rv); } NNI_LIST_NODE_INIT(&p->node); @@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -push_pipe_start(void *arg) +push0_pipe_start(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { return (NNG_EPROTO); @@ -129,9 +137,9 @@ push_pipe_start(void *arg) } static void -push_pipe_stop(void *arg) +push0_pipe_stop(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_send); @@ -139,9 +147,9 @@ push_pipe_stop(void *arg) } static void -push_recv_cb(void *arg) +push0_recv_cb(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. @@ -155,10 +163,10 @@ push_recv_cb(void *arg) } static void -push_send_cb(void *arg) +push0_send_cb(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -171,10 +179,10 @@ push_send_cb(void *arg) } static void -push_getq_cb(void *arg) +push0_getq_cb(void *arg) { - push_pipe *p = arg; - nni_aio * aio = p->aio_getq; + push0_pipe *p = arg; + nni_aio * aio = p->aio_getq; if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. @@ -189,71 +197,71 @@ push_getq_cb(void *arg) } static int -push_sock_setopt_raw(void *arg, const void *buf, size_t sz) +push0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -push_sock_getopt_raw(void *arg, void *buf, size_t *szp) +push0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -push_sock_send(void *arg, nni_aio *aio) +push0_sock_send(void *arg, nni_aio *aio) { - push_sock *s = arg; + push0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -push_sock_recv(void *arg, nni_aio *aio) +push0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } -static nni_proto_pipe_ops push_pipe_ops = { - .pipe_init = push_pipe_init, - .pipe_fini = push_pipe_fini, - .pipe_start = push_pipe_start, - .pipe_stop = push_pipe_stop, +static nni_proto_pipe_ops push0_pipe_ops = { + .pipe_init = push0_pipe_init, + .pipe_fini = push0_pipe_fini, + .pipe_start = push0_pipe_start, + .pipe_stop = push0_pipe_stop, }; -static nni_proto_sock_option push_sock_options[] = { +static nni_proto_sock_option push0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = push_sock_getopt_raw, - .pso_setopt = push_sock_setopt_raw, + .pso_getopt = push0_sock_getopt_raw, + .pso_setopt = push0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops push_sock_ops = { - .sock_init = push_sock_init, - .sock_fini = push_sock_fini, - .sock_open = push_sock_open, - .sock_close = push_sock_close, - .sock_options = push_sock_options, - .sock_send = push_sock_send, - .sock_recv = push_sock_recv, +static nni_proto_sock_ops push0_sock_ops = { + .sock_init = push0_sock_init, + .sock_fini = push0_sock_fini, + .sock_open = push0_sock_open, + .sock_close = push0_sock_close, + .sock_options = push0_sock_options, + .sock_send = push0_sock_send, + .sock_recv = push0_sock_recv, }; -static nni_proto push_proto = { +static nni_proto push0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUSH_V0, "push" }, .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_pipe_ops = &push_pipe_ops, - .proto_sock_ops = &push_sock_ops, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, }; int nng_push0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &push_proto)); + return (nni_proto_open(sidp, &push0_proto)); } diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h new file mode 100644 index 00000000..c7303b92 --- /dev/null +++ b/src/protocol/pipeline0/push.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H +#define NNG_PROTOCOL_PIPELINE0_PUSH_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_push0_open(nng_socket *); + +#ifndef nng_push_open +#define nng_push_open nng_push0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt new file mode 100644 index 00000000..4edcbfae --- /dev/null +++ b/src/protocol/pubsub0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUB0) + set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h) + install(FILES pub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +if (NNG_PROTO_SUB0) + set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h) + install(FILES sub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c index 9e5cd67f..f4a33b77 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -12,24 +12,33 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pubsub0/pub.h" // Publish protocol. The PUB protocol simply sends messages out, as // a broadcast. It has nothing more sophisticated because it does not // perform sender-side filtering. Its best effort delivery, so anything // that can't receive the message won't get one. -typedef struct pub_pipe pub_pipe; -typedef struct pub_sock pub_sock; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif -static void pub_pipe_recv_cb(void *); -static void pub_pipe_send_cb(void *); -static void pub_pipe_getq_cb(void *); -static void pub_sock_getq_cb(void *); -static void pub_sock_fini(void *); -static void pub_pipe_fini(void *); +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif -// A pub_sock is our per-socket protocol private structure. -struct pub_sock { +typedef struct pub0_pipe pub0_pipe; +typedef struct pub0_sock pub0_sock; + +static void pub0_pipe_recv_cb(void *); +static void pub0_pipe_send_cb(void *); +static void pub0_pipe_getq_cb(void *); +static void pub0_sock_getq_cb(void *); +static void pub0_sock_fini(void *); +static void pub0_pipe_fini(void *); + +// pub0_sock is our per-socket protocol private structure. +struct pub0_sock { nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -37,10 +46,10 @@ struct pub_sock { nni_mtx mtx; }; -// A pub_pipe is our per-pipe protocol private structure. -struct pub_pipe { +// pub0_pipe is our per-pipe protocol private structure. +struct pub0_pipe { nni_pipe * pipe; - pub_sock * pub; + pub0_sock * pub; nni_msgq * sendq; nni_aio * aio_getq; nni_aio * aio_send; @@ -49,9 +58,9 @@ struct pub_pipe { }; static void -pub_sock_fini(void *arg) +pub0_sock_fini(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -60,22 +69,22 @@ pub_sock_fini(void *arg) } static int -pub_sock_init(void **sp, nni_sock *sock) +pub0_sock_init(void **sp, nni_sock *sock) { - pub_sock *s; - int rv; + pub0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) { - pub_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) { + pub0_sock_fini(s); return (rv); } s->raw = 0; - NNI_LIST_INIT(&s->pipes, pub_pipe, node); + NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock) } static void -pub_sock_open(void *arg) +pub0_sock_open(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -pub_sock_close(void *arg) +pub0_sock_close(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -pub_pipe_fini(void *arg) +pub0_pipe_fini(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -111,10 +120,10 @@ pub_pipe_fini(void *arg) } static int -pub_pipe_init(void **pp, nni_pipe *pipe, void *s) +pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pub_pipe *p; - int rv; + pub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - pub_pipe_fini(p); + pub0_pipe_fini(p); return (rv); } @@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pub_pipe_start(void *arg) +pub0_pipe_start(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { return (NNG_EPROTO); @@ -157,10 +166,10 @@ pub_pipe_start(void *arg) } static void -pub_pipe_stop(void *arg) +pub0_pipe_stop(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -176,15 +185,15 @@ pub_pipe_stop(void *arg) } static void -pub_sock_getq_cb(void *arg) +pub0_sock_getq_cb(void *arg) { - pub_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg, *dup; + pub0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg, *dup; - pub_pipe *p; - pub_pipe *last; - int rv; + pub0_pipe *p; + pub0_pipe *last; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg) } static void -pub_pipe_recv_cb(void *arg) +pub0_pipe_recv_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg) } static void -pub_pipe_getq_cb(void *arg) +pub0_pipe_getq_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg) } static void -pub_pipe_send_cb(void *arg) +pub0_pipe_send_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg) } static int -pub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pub_sock_recv(void *arg, nni_aio *aio) +pub0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pub_sock_send(void *arg, nni_aio *aio) +pub0_sock_send(void *arg, nni_aio *aio) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } -static nni_proto_pipe_ops pub_pipe_ops = { - .pipe_init = pub_pipe_init, - .pipe_fini = pub_pipe_fini, - .pipe_start = pub_pipe_start, - .pipe_stop = pub_pipe_stop, +static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_init = pub0_pipe_init, + .pipe_fini = pub0_pipe_fini, + .pipe_start = pub0_pipe_start, + .pipe_stop = pub0_pipe_stop, }; -static nni_proto_sock_option pub_sock_options[] = { +static nni_proto_sock_option pub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pub_sock_getopt_raw, - .pso_setopt = pub_sock_setopt_raw, + .pso_getopt = pub0_sock_getopt_raw, + .pso_setopt = pub0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pub_sock_ops = { - .sock_init = pub_sock_init, - .sock_fini = pub_sock_fini, - .sock_open = pub_sock_open, - .sock_close = pub_sock_close, - .sock_send = pub_sock_send, - .sock_recv = pub_sock_recv, - .sock_options = pub_sock_options, +static nni_proto_sock_ops pub0_sock_ops = { + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_options = pub0_sock_options, }; -static nni_proto pub_proto = { +static nni_proto pub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUB_V0, "pub" }, .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_sock_ops = &pub_sock_ops, - .proto_pipe_ops = &pub_pipe_ops, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, }; int nng_pub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pub_proto)); + return (nni_proto_open(sidp, &pub0_proto)); } diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h new file mode 100644 index 00000000..2388a292 --- /dev/null +++ b/src/protocol/pubsub0/pub.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H +#define NNG_PROTOCOL_PUBSUB0_PUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pub0_open(nng_socket *); + +#ifndef nng_pub_open +#define nng_pub_open nng_pub0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_PUB_H diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c index 555d528e..6c504d75 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -12,54 +12,60 @@ #include <string.h> #include "core/nng_impl.h" - -const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE; -const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE; +#include "protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. -typedef struct sub_pipe sub_pipe; -typedef struct sub_sock sub_sock; -typedef struct sub_topic sub_topic; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_topic sub0_topic; -static void sub_recv_cb(void *); -static void sub_putq_cb(void *); -static void sub_pipe_fini(void *); +static void sub0_recv_cb(void *); +static void sub0_putq_cb(void *); +static void sub0_pipe_fini(void *); -struct sub_topic { +struct sub0_topic { nni_list_node node; size_t len; void * buf; }; -// An nni_rep_sock is our per-socket protocol private structure. -struct sub_sock { +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { nni_list topics; nni_msgq *urq; int raw; nni_mtx lk; }; -// An nni_rep_pipe is our per-pipe protocol private structure. -struct sub_pipe { - nni_pipe *pipe; - sub_sock *sub; - nni_aio * aio_recv; - nni_aio * aio_putq; +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static int -sub_sock_init(void **sp, nni_sock *sock) +sub0_sock_init(void **sp, nni_sock *sock) { - sub_sock *s; + sub0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); - NNI_LIST_INIT(&s->topics, sub_topic, node); + NNI_LIST_INIT(&s->topics, sub0_topic, node); s->raw = 0; s->urq = nni_sock_recvq(sock); @@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock) } static void -sub_sock_fini(void *arg) +sub0_sock_fini(void *arg) { - sub_sock * s = arg; - sub_topic *topic; + sub0_sock * s = arg; + sub0_topic *topic; while ((topic = nni_list_first(&s->topics)) != NULL) { nni_list_remove(&s->topics, topic); @@ -83,21 +89,21 @@ sub_sock_fini(void *arg) } static void -sub_sock_open(void *arg) +sub0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_sock_close(void *arg) +sub0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_pipe_fini(void *arg) +sub0_pipe_fini(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); @@ -105,17 +111,17 @@ sub_pipe_fini(void *arg) } static int -sub_pipe_init(void **pp, nni_pipe *pipe, void *s) +sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - sub_pipe *p; - int rv; + sub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) { - sub_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) { + sub0_pipe_fini(p); return (rv); } @@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -sub_pipe_start(void *arg) +sub0_pipe_start(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_pipe_recv(p->pipe, p->aio_recv); return (0); } static void -sub_pipe_stop(void *arg) +sub0_pipe_stop(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); } static void -sub_recv_cb(void *arg) +sub0_recv_cb(void *arg) { - sub_pipe *p = arg; - sub_sock *s = p->sub; - nni_msgq *urq = s->urq; - nni_msg * msg; + sub0_pipe *p = arg; + sub0_sock *s = p->sub; + nni_msgq * urq = s->urq; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -164,9 +170,9 @@ sub_recv_cb(void *arg) } static void -sub_putq_cb(void *arg) +sub0_putq_cb(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -184,11 +190,11 @@ sub_putq_cb(void *arg) // to replace this with a patricia trie, like old nanomsg had. static int -sub_subscribe(void *arg, const void *buf, size_t sz) +sub0_subscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - sub_topic *newtopic; + sub0_sock * s = arg; + sub0_topic *topic; + sub0_topic *newtopic; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } static int -sub_unsubscribe(void *arg, const void *buf, size_t sz) +sub0_unsubscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - int rv; + sub0_sock * s = arg; + sub0_topic *topic; + int rv; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) } static int -sub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -sub_sock_send(void *arg, nni_aio *aio) +sub0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -sub_sock_recv(void *arg, nni_aio *aio) +sub0_sock_recv(void *arg, nni_aio *aio) { - sub_sock *s = arg; + sub0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } static nni_msg * -sub_sock_filter(void *arg, nni_msg *msg) +sub0_sock_filter(void *arg, nni_msg *msg) { - sub_sock * s = arg; - sub_topic *topic; - char * body; - size_t len; - int match; + sub0_sock * s = arg; + sub0_topic *topic; + char * body; + size_t len; + int match; nni_mtx_lock(&s->lk); if (s->raw) { @@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops sub_pipe_ops = { - .pipe_init = sub_pipe_init, - .pipe_fini = sub_pipe_fini, - .pipe_start = sub_pipe_start, - .pipe_stop = sub_pipe_stop, +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_stop = sub0_pipe_stop, }; -static nni_proto_sock_option sub_sock_options[] = { +static nni_proto_sock_option sub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = sub_sock_getopt_raw, - .pso_setopt = sub_sock_setopt_raw, + .pso_getopt = sub0_sock_getopt_raw, + .pso_setopt = sub0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_subscribe, + .pso_setopt = sub0_subscribe, }, { .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_unsubscribe, + .pso_setopt = sub0_unsubscribe, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops sub_sock_ops = { - .sock_init = sub_sock_init, - .sock_fini = sub_sock_fini, - .sock_open = sub_sock_open, - .sock_close = sub_sock_close, - .sock_send = sub_sock_send, - .sock_recv = sub_sock_recv, - .sock_filter = sub_sock_filter, - .sock_options = sub_sock_options, +static nni_proto_sock_ops sub0_sock_ops = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = sub0_sock_filter, + .sock_options = sub0_sock_options, }; -static nni_proto sub_proto = { +static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &sub_sock_ops, - .proto_pipe_ops = &sub_pipe_ops, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, }; int nng_sub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &sub_proto)); + return (nni_proto_open(sidp, &sub0_proto)); } diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h new file mode 100644 index 00000000..1a09145d --- /dev/null +++ b/src/protocol/pubsub0/sub.h @@ -0,0 +1,31 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H +#define NNG_PROTOCOL_PUBSUB0_SUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_sub0_open(nng_socket *); + +#ifndef nng_sub_open +#define nng_sub_open nng_sub0_open +#endif + +#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" +#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_SUB_H diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt new file mode 100644 index 00000000..4e82ad41 --- /dev/null +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_REQ0) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + install(FILES req.h DESTINATION include/nng/protocol/reqrep0) +endif() + +if (NNG_PROTO_REP0) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + install(FILES rep.h DESTINATION include/nng/protocol/reqrep0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c index 100e739d..ee8e4277 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" // Response protocol. The REP protocol is the "reply" side of a // request-reply pair. This is useful for building RPC servers, for // example. -typedef struct rep_pipe rep_pipe; -typedef struct rep_sock rep_sock; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -static void rep_sock_getq_cb(void *); -static void rep_pipe_getq_cb(void *); -static void rep_pipe_putq_cb(void *); -static void rep_pipe_send_cb(void *); -static void rep_pipe_recv_cb(void *); -static void rep_pipe_fini(void *); +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -// A rep_sock is our per-socket protocol private structure. -struct rep_sock { +typedef struct rep0_pipe rep0_pipe; +typedef struct rep0_sock rep0_sock; + +static void rep0_sock_getq_cb(void *); +static void rep0_pipe_getq_cb(void *); +static void rep0_pipe_putq_cb(void *); +static void rep0_pipe_send_cb(void *); +static void rep0_pipe_recv_cb(void *); +static void rep0_pipe_fini(void *); + +// rep0_sock is our per-socket protocol private structure. +struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -40,21 +49,21 @@ struct rep_sock { nni_aio * aio_getq; }; -// A rep_pipe is our per-pipe protocol private structure. -struct rep_pipe { - nni_pipe *pipe; - rep_sock *rep; - nni_msgq *sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; +// rep0_pipe is our per-pipe protocol private structure. +struct rep0_pipe { + nni_pipe * pipe; + rep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static void -rep_sock_fini(void *arg) +rep0_sock_fini(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -67,18 +76,18 @@ rep_sock_fini(void *arg) } static int -rep_sock_init(void **sp, nni_sock *sock) +rep0_sock_init(void **sp, nni_sock *sock) { - rep_sock *s; - int rv; + rep0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { - rep_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + rep0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock) } static void -rep_sock_open(void *arg) +rep0_sock_open(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -rep_sock_close(void *arg) +rep0_sock_close(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -rep_pipe_fini(void *arg) +rep0_pipe_fini(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,20 +133,20 @@ rep_pipe_fini(void *arg) } static int -rep_pipe_init(void **pp, nni_pipe *pipe, void *s) +rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - rep_pipe *p; - int rv; + rep0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) { - rep_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + rep0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -rep_pipe_start(void *arg) +rep0_pipe_start(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - int rv; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + int rv; if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); @@ -164,10 +173,10 @@ rep_pipe_start(void *arg) } static void -rep_pipe_stop(void *arg) +rep0_pipe_stop(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_getq); @@ -179,14 +188,14 @@ rep_pipe_stop(void *arg) } static void -rep_sock_getq_cb(void *arg) +rep0_sock_getq_cb(void *arg) { - rep_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg; - uint32_t id; - rep_pipe *p; - int rv; + rep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + rep0_pipe *p; + int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg) } static void -rep_pipe_getq_cb(void *arg) +rep0_pipe_getq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg) } static void -rep_pipe_send_cb(void *arg) +rep0_pipe_send_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg) } static void -rep_pipe_recv_cb(void *arg) +rep0_pipe_recv_cb(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - nni_msg * msg; - int rv; - uint8_t * body; - int hops; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -329,9 +338,9 @@ drop: } static void -rep_pipe_putq_cb(void *arg) +rep0_pipe_putq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg) } static int -rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; - int rv; + rep0_sock *s = arg; + int rv; nni_mtx_lock(&s->lk); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -rep_sock_getopt_raw(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static nni_msg * -rep_sock_filter(void *arg, nni_msg *msg) +rep0_sock_filter(void *arg, nni_msg *msg) { - rep_sock *s = arg; - char * header; - size_t len; + rep0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->lk); if (s->raw) { @@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg) } static void -rep_sock_send(void *arg, nni_aio *aio) +rep0_sock_send(void *arg, nni_aio *aio) { - rep_sock *s = arg; - int rv; - nni_msg * msg; + rep0_sock *s = arg; + int rv; + nni_msg * msg; nni_mtx_lock(&s->lk); if (s->raw) { @@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio) } static void -rep_sock_recv(void *arg, nni_aio *aio) +rep0_sock_recv(void *arg, nni_aio *aio) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops rep_pipe_ops = { - .pipe_init = rep_pipe_init, - .pipe_fini = rep_pipe_fini, - .pipe_start = rep_pipe_start, - .pipe_stop = rep_pipe_stop, +static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_init = rep0_pipe_init, + .pipe_fini = rep0_pipe_fini, + .pipe_start = rep0_pipe_start, + .pipe_stop = rep0_pipe_stop, }; -static nni_proto_sock_option rep_sock_options[] = { +static nni_proto_sock_option rep0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = rep_sock_getopt_raw, - .pso_setopt = rep_sock_setopt_raw, + .pso_getopt = rep0_sock_getopt_raw, + .pso_setopt = rep0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = rep_sock_getopt_maxttl, - .pso_setopt = rep_sock_setopt_maxttl, + .pso_getopt = rep0_sock_getopt_maxttl, + .pso_setopt = rep0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops rep_sock_ops = { - .sock_init = rep_sock_init, - .sock_fini = rep_sock_fini, - .sock_open = rep_sock_open, - .sock_close = rep_sock_close, - .sock_options = rep_sock_options, - .sock_filter = rep_sock_filter, - .sock_send = rep_sock_send, - .sock_recv = rep_sock_recv, +static nni_proto_sock_ops rep0_sock_ops = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = rep0_sock_filter, + .sock_send = rep0_sock_send, + .sock_recv = rep0_sock_recv, }; -static nni_proto nni_rep_proto = { +static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, .proto_peer = { NNI_PROTO_REQ_V0, "req" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &rep_sock_ops, - .proto_pipe_ops = &rep_pipe_ops, + .proto_sock_ops = &rep0_sock_ops, + .proto_pipe_ops = &rep0_pipe_ops, }; int nng_rep0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_rep_proto)); + return (nni_proto_open(sidp, &rep0_proto)); } diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h new file mode 100644 index 00000000..93df9379 --- /dev/null +++ b/src/protocol/reqrep0/rep.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REP_H +#define NNG_PROTOCOL_REQREP0_REP_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_rep0_open(nng_socket *); + +#ifndef nng_rep_open +#define nng_rep_open nng_rep0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REP_H diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c index bead1ec4..94c7f1a0 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep0/req.c @@ -13,21 +13,28 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -typedef struct req_pipe req_pipe; -typedef struct req_sock req_sock; +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -static void req_resend(req_sock *); -static void req_timeout(void *); -static void req_pipe_fini(void *); +typedef struct req0_pipe req0_pipe; +typedef struct req0_sock req0_sock; -// A req_sock is our per-socket protocol private structure. -struct req_sock { +static void req0_resend(req0_sock *); +static void req0_timeout(void *); +static void req0_pipe_fini(void *); + +// A req0_sock is our per-socket protocol private structure. +struct req0_sock { nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -38,7 +45,7 @@ struct req_sock { int ttl; nni_msg * reqmsg; - req_pipe *pendpipe; + req0_pipe *pendpipe; nni_list readypipes; nni_list busypipes; @@ -51,10 +58,10 @@ struct req_sock { nni_cv cv; }; -// A req_pipe is our per-pipe protocol private structure. -struct req_pipe { +// A req0_pipe is our per-pipe protocol private structure. +struct req0_pipe { nni_pipe * pipe; - req_sock * req; + req0_sock * req; nni_list_node node; nni_aio * aio_getq; // raw mode only nni_aio * aio_sendraw; // raw mode only @@ -64,17 +71,17 @@ struct req_pipe { nni_mtx mtx; }; -static void req_resender(void *); -static void req_getq_cb(void *); -static void req_sendraw_cb(void *); -static void req_sendcooked_cb(void *); -static void req_recv_cb(void *); -static void req_putq_cb(void *); +static void req0_resender(void *); +static void req0_getq_cb(void *); +static void req0_sendraw_cb(void *); +static void req0_sendcooked_cb(void *); +static void req0_recv_cb(void *); +static void req0_putq_cb(void *); static int -req_sock_init(void **sp, nni_sock *sock) +req0_sock_init(void **sp, nni_sock *sock) { - req_sock *s; + req0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock) nni_mtx_init(&s->mtx); nni_cv_init(&s->cv, &s->mtx); - NNI_LIST_INIT(&s->readypipes, req_pipe, node); - NNI_LIST_INIT(&s->busypipes, req_pipe, node); - nni_timer_init(&s->timer, req_timeout, s); + NNI_LIST_INIT(&s->readypipes, req0_pipe, node); + NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + nni_timer_init(&s->timer, req0_timeout, s); // this is "semi random" start for request IDs. s->nextid = nni_random(); @@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock) } static void -req_sock_open(void *arg) +req0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -req_sock_close(void *arg) +req0_sock_close(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); s->closed = 1; @@ -120,9 +127,9 @@ req_sock_close(void *arg) } static void -req_sock_fini(void *arg) +req0_sock_fini(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); while ((!nni_list_empty(&s->readypipes)) || @@ -139,9 +146,9 @@ req_sock_fini(void *arg) } static void -req_pipe_fini(void *arg) +req0_pipe_fini(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_putq); @@ -153,22 +160,22 @@ req_pipe_fini(void *arg) } static int -req_pipe_init(void **pp, nni_pipe *pipe, void *s) +req0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - req_pipe *p; - int rv; + req0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) != + if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != 0)) { - req_pipe_fini(p); + req0_pipe_fini(p); return (rv); } @@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -req_pipe_start(void *arg) +req0_pipe_start(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { return (NNG_EPROTO); @@ -198,7 +205,7 @@ req_pipe_start(void *arg) // If sock was waiting for somewhere to send data, go ahead and // send it to this pipe. if (s->wantw) { - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); @@ -208,10 +215,10 @@ req_pipe_start(void *arg) } static void -req_pipe_stop(void *arg) +req0_pipe_stop(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_putq); @@ -238,50 +245,50 @@ req_pipe_stop(void *arg) s->pendpipe = NULL; s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static int -req_sock_setopt_raw(void *arg, const void *buf, size_t sz) +req0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -req_sock_getopt_raw(void *arg, void *buf, size_t *szp) +req0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static int -req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_ms(&s->retry, buf, sz)); } static int -req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_ms(s->retry, buf, szp)); } @@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) // kind of priority.) static void -req_getq_cb(void *arg) +req0_getq_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. @@ -326,9 +333,9 @@ req_getq_cb(void *arg) } static void -req_sendraw_cb(void *arg) +req0_sendraw_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_sendraw) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); @@ -342,10 +349,10 @@ req_sendraw_cb(void *arg) } static void -req_sendcooked_cb(void *arg) +req0_sendcooked_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_aio_result(p->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. @@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg) if (nni_list_active(&s->busypipes, p)) { nni_list_remove(&s->busypipes, p); nni_list_append(&s->readypipes, p); - req_resend(s); + req0_resend(s); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the @@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg) } static void -req_putq_cb(void *arg) +req0_putq_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -393,10 +400,10 @@ req_putq_cb(void *arg) } static void -req_recv_cb(void *arg) +req0_recv_cb(void *arg) { - req_pipe *p = arg; - nni_msg * msg; + req0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -431,23 +438,23 @@ malformed: } static void -req_timeout(void *arg) +req0_timeout(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->reqmsg != NULL) { s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static void -req_resend(req_sock *s) +req0_resend(req0_sock *s) { - req_pipe *p; - nni_msg * msg; + req0_pipe *p; + nni_msg * msg; // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode @@ -499,13 +506,13 @@ req_resend(req_sock *s) } static void -req_sock_send(void *arg, nni_aio *aio) +req0_sock_send(void *arg, nni_aio *aio) { - req_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; + req0_sock *s = arg; + uint32_t id; + size_t len; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio) s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); nni_mtx_unlock(&s->mtx); @@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio) } static nni_msg * -req_sock_filter(void *arg, nni_msg *msg) +req0_sock_filter(void *arg, nni_msg *msg) { - req_sock *s = arg; - nni_msg * rmsg; + req0_sock *s = arg; + nni_msg * rmsg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg) } static void -req_sock_recv(void *arg, nni_aio *aio) +req0_sock_recv(void *arg, nni_aio *aio) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (!s->raw) { @@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio) nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops req_pipe_ops = { - .pipe_init = req_pipe_init, - .pipe_fini = req_pipe_fini, - .pipe_start = req_pipe_start, - .pipe_stop = req_pipe_stop, +static nni_proto_pipe_ops req0_pipe_ops = { + .pipe_init = req0_pipe_init, + .pipe_fini = req0_pipe_fini, + .pipe_start = req0_pipe_start, + .pipe_stop = req0_pipe_stop, }; -static nni_proto_sock_option req_sock_options[] = { +static nni_proto_sock_option req0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = req_sock_getopt_raw, - .pso_setopt = req_sock_setopt_raw, + .pso_getopt = req0_sock_getopt_raw, + .pso_setopt = req0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = req_sock_getopt_maxttl, - .pso_setopt = req_sock_setopt_maxttl, + .pso_getopt = req0_sock_getopt_maxttl, + .pso_setopt = req0_sock_setopt_maxttl, }, { .pso_name = NNG_OPT_REQ_RESENDTIME, - .pso_getopt = req_sock_getopt_resendtime, - .pso_setopt = req_sock_setopt_resendtime, + .pso_getopt = req0_sock_getopt_resendtime, + .pso_setopt = req0_sock_setopt_resendtime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops req_sock_ops = { - .sock_init = req_sock_init, - .sock_fini = req_sock_fini, - .sock_open = req_sock_open, - .sock_close = req_sock_close, - .sock_options = req_sock_options, - .sock_filter = req_sock_filter, - .sock_send = req_sock_send, - .sock_recv = req_sock_recv, +static nni_proto_sock_ops req0_sock_ops = { + .sock_init = req0_sock_init, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_filter = req0_sock_filter, + .sock_send = req0_sock_send, + .sock_recv = req0_sock_recv, }; -static nni_proto req_proto = { +static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REQ_V0, "req" }, .proto_peer = { NNI_PROTO_REP_V0, "rep" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &req_sock_ops, - .proto_pipe_ops = &req_pipe_ops, + .proto_sock_ops = &req0_sock_ops, + .proto_pipe_ops = &req0_pipe_ops, }; int nng_req0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &req_proto)); + return (nni_proto_open(sidp, &req0_proto)); } diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h new file mode 100644 index 00000000..99c9bf62 --- /dev/null +++ b/src/protocol/reqrep0/req.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REQ_H +#define NNG_PROTOCOL_REQREP0_REQ_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_req0_open(nng_socket *); + +#ifndef nng_req_open +#define nng_req_open nng_req0_open +#endif + +#define NNG_OPT_REQ_RESENDTIME "req:resend-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REQ_H diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..61e5aa7b --- /dev/null +++ b/src/protocol/survey0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_SURVEYOR0) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + install(FILES survey.h DESTINATION include/nng/protocol/survey0) +endif() + +if (NNG_PROTO_RESPONDENT0) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + install(FILES respond.h DESTINATION include/nng/protocol/survey0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c index 94730be6..73e919c3 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey0/respond.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/respond.h" // Respondent protocol. The RESPONDENT protocol is the "replier" side of // the surveyor pattern. This is useful for building service discovery, or -// voting algorithsm, for example. +// voting algorithms, for example. -typedef struct resp_pipe resp_pipe; -typedef struct resp_sock resp_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void resp_recv_cb(void *); -static void resp_putq_cb(void *); -static void resp_getq_cb(void *); -static void resp_send_cb(void *); -static void resp_sock_getq_cb(void *); -static void resp_pipe_fini(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A resp_sock is our per-socket protocol private structure. -struct resp_sock { +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; + +static void resp0_recv_cb(void *); +static void resp0_putq_cb(void *); +static void resp0_getq_cb(void *); +static void resp0_send_cb(void *); +static void resp0_sock_getq_cb(void *); +static void resp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { nni_msgq * urq; nni_msgq * uwq; int raw; @@ -40,22 +49,22 @@ struct resp_sock { nni_mtx mtx; }; -// A resp_pipe is our per-pipe protocol private structure. -struct resp_pipe { - nni_pipe * npipe; - resp_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; }; static void -resp_sock_fini(void *arg) +resp0_sock_fini(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -68,18 +77,18 @@ resp_sock_fini(void *arg) } static int -resp_sock_init(void **sp, nni_sock *nsock) +resp0_sock_init(void **sp, nni_sock *nsock) { - resp_sock *s; - int rv; + resp0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { - resp_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + resp0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock) } static void -resp_sock_open(void *arg) +resp0_sock_open(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -resp_sock_close(void *arg) +resp0_sock_close(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -resp_pipe_fini(void *arg) +resp0_pipe_fini(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_getq); @@ -124,20 +133,20 @@ resp_pipe_fini(void *arg) } static int -resp_pipe_init(void **pp, nni_pipe *npipe, void *s) +resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - resp_pipe *p; - int rv; + resp0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) { - resp_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + resp0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -resp_pipe_start(void *arg) +resp0_pipe_start(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; p->id = nni_pipe_id(p->npipe); @@ -170,10 +179,10 @@ resp_pipe_start(void *arg) } static void -resp_pipe_stop(void *arg) +resp0_pipe_stop(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_putq); @@ -189,19 +198,19 @@ resp_pipe_stop(void *arg) } } -// resp_sock_send watches for messages from the upper write queue, +// resp0_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket.s void -resp_sock_getq_cb(void *arg) +resp0_sock_getq_cb(void *arg) { - resp_sock *s = arg; - nni_msg * msg; - uint32_t id; - resp_pipe *p; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + resp0_pipe *p; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg) } void -resp_getq_cb(void *arg) +resp0_getq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -249,9 +258,9 @@ resp_getq_cb(void *arg) } void -resp_send_cb(void *arg) +resp0_send_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -264,14 +273,14 @@ resp_send_cb(void *arg) } static void -resp_recv_cb(void *arg) +resp0_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq = s->urq; - nni_msg * msg; - int hops; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int rv; if (nni_aio_result(p->aio_recv) != 0) { goto error; @@ -324,9 +333,9 @@ error: } static void -resp_putq_cb(void *arg) +resp0_putq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -338,10 +347,10 @@ resp_putq_cb(void *arg) } static int -resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; - int rv; + resp0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -resp_sock_getopt_raw(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static void -resp_sock_send(void *arg, nni_aio *aio) +resp0_sock_send(void *arg, nni_aio *aio) { - resp_sock *s = arg; - nni_msg * msg; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio) } static nni_msg * -resp_sock_filter(void *arg, nni_msg *msg) +resp0_sock_filter(void *arg, nni_msg *msg) { - resp_sock *s = arg; - char * header; - size_t len; + resp0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg) } static void -resp_sock_recv(void *arg, nni_aio *aio) +resp0_sock_recv(void *arg, nni_aio *aio) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops resp_pipe_ops = { - .pipe_init = resp_pipe_init, - .pipe_fini = resp_pipe_fini, - .pipe_start = resp_pipe_start, - .pipe_stop = resp_pipe_stop, +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_stop = resp0_pipe_stop, }; -static nni_proto_sock_option resp_sock_options[] = { +static nni_proto_sock_option resp0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = resp_sock_getopt_raw, - .pso_setopt = resp_sock_setopt_raw, + .pso_getopt = resp0_sock_getopt_raw, + .pso_setopt = resp0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = resp_sock_getopt_maxttl, - .pso_setopt = resp_sock_setopt_maxttl, + .pso_getopt = resp0_sock_getopt_maxttl, + .pso_setopt = resp0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops resp_sock_ops = { - .sock_init = resp_sock_init, - .sock_fini = resp_sock_fini, - .sock_open = resp_sock_open, - .sock_close = resp_sock_close, - .sock_filter = resp_sock_filter, - .sock_send = resp_sock_send, - .sock_recv = resp_sock_recv, - .sock_options = resp_sock_options, +static nni_proto_sock_ops resp0_sock_ops = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = resp0_sock_filter, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, }; -static nni_proto resp_proto = { +static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &resp_sock_ops, - .proto_pipe_ops = &resp_pipe_ops, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, }; int nng_respondent0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &resp_proto)); + return (nni_proto_open(sidp, &resp0_proto)); } diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h new file mode 100644 index 00000000..58c65298 --- /dev/null +++ b/src/protocol/survey0/respond.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H +#define NNG_PROTOCOL_SURVEY0_RESPOND_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_respondent0_open(nng_socket *); + +#ifndef nng_respondent_open +#define nng_respondent_open nng_respondent0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c index 9dcd6664..02283436 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey0/survey.c @@ -12,22 +12,31 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -typedef struct surv_pipe surv_pipe; -typedef struct surv_sock surv_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void surv_sock_getq_cb(void *); -static void surv_getq_cb(void *); -static void surv_putq_cb(void *); -static void surv_send_cb(void *); -static void surv_recv_cb(void *); -static void surv_timeout(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A surv_sock is our per-socket protocol private structure. -struct surv_sock { +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; + +static void surv0_sock_getq_cb(void *); +static void surv0_getq_cb(void *); +static void surv0_putq_cb(void *); +static void surv0_send_cb(void *); +static void surv0_recv_cb(void *); +static void surv0_timeout(void *); + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { nni_duration survtime; nni_time expire; int raw; @@ -42,10 +51,10 @@ struct surv_sock { nni_mtx mtx; }; -// A surv_pipe is our per-pipe protocol private structure. -struct surv_pipe { +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { nni_pipe * npipe; - surv_sock * psock; + surv0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -55,9 +64,9 @@ struct surv_pipe { }; static void -surv_sock_fini(void *arg) +surv0_sock_fini(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -66,21 +75,21 @@ surv_sock_fini(void *arg) } static int -surv_sock_init(void **sp, nni_sock *nsock) +surv0_sock_init(void **sp, nni_sock *nsock) { - surv_sock *s; - int rv; + surv0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) { - surv_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { + surv0_sock_fini(s); return (rv); } - NNI_LIST_INIT(&s->pipes, surv_pipe, node); + NNI_LIST_INIT(&s->pipes, surv0_pipe, node); nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv_timeout, s); + nni_timer_init(&s->timer, surv0_timeout, s); s->nextid = nni_random(); s->raw = 0; @@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock) } static void -surv_sock_open(void *arg) +surv0_sock_open(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -surv_sock_close(void *arg) +surv0_sock_close(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_timer_cancel(&s->timer); nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -surv_pipe_fini(void *arg) +surv0_pipe_fini(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,21 +133,21 @@ surv_pipe_fini(void *arg) } static int -surv_pipe_init(void **pp, nni_pipe *npipe, void *s) +surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - surv_pipe *p; - int rv; + surv0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } // This depth could be tunable. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) { - surv_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + surv0_pipe_fini(p); return (rv); } @@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -surv_pipe_start(void *arg) +surv0_pipe_start(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); @@ -164,10 +173,10 @@ surv_pipe_start(void *arg) } static void -surv_pipe_stop(void *arg) +surv0_pipe_stop(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -184,9 +193,9 @@ surv_pipe_stop(void *arg) } static void -surv_getq_cb(void *arg) +surv0_getq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -200,9 +209,9 @@ surv_getq_cb(void *arg) } static void -surv_send_cb(void *arg) +surv0_send_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -215,9 +224,9 @@ surv_send_cb(void *arg) } static void -surv_putq_cb(void *arg) +surv0_putq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -230,10 +239,10 @@ surv_putq_cb(void *arg) } static void -surv_recv_cb(void *arg) +surv0_recv_cb(void *arg) { - surv_pipe *p = arg; - nni_msg * msg; + surv0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { goto failed; @@ -265,10 +274,10 @@ failed: } static int -surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; - int rv; + surv0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { @@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -surv_sock_getopt_raw(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_setopt_ms(&s->survtime, buf, sz)); } static int -surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_ms(s->survtime, buf, szp)); } static void -surv_sock_getq_cb(void *arg) +surv0_sock_getq_cb(void *arg) { - surv_sock *s = arg; - surv_pipe *p; - surv_pipe *last; - nni_msg * msg, *dup; + surv0_sock *s = arg; + surv0_pipe *p; + surv0_pipe *last; + nni_msg * msg, *dup; if (nni_aio_result(s->aio_getq) != 0) { // Should be NNG_ECLOSED. @@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg) } static void -surv_timeout(void *arg) +surv0_timeout(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); s->survid = 0; @@ -349,9 +358,9 @@ surv_timeout(void *arg) } static void -surv_sock_recv(void *arg, nni_aio *aio) +surv0_sock_recv(void *arg, nni_aio *aio) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->survid == 0) { @@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio) } static void -surv_sock_send(void *arg, nni_aio *aio) +surv0_sock_send(void *arg, nni_aio *aio) { - surv_sock *s = arg; - nni_msg * msg; - int rv; + surv0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio) } static nni_msg * -surv_sock_filter(void *arg, nni_msg *msg) +surv0_sock_filter(void *arg, nni_msg *msg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg) return (msg); } -static nni_proto_pipe_ops surv_pipe_ops = { - .pipe_init = surv_pipe_init, - .pipe_fini = surv_pipe_fini, - .pipe_start = surv_pipe_start, - .pipe_stop = surv_pipe_stop, +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_stop = surv0_pipe_stop, }; -static nni_proto_sock_option surv_sock_options[] = { +static nni_proto_sock_option surv0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = surv_sock_getopt_raw, - .pso_setopt = surv_sock_setopt_raw, + .pso_getopt = surv0_sock_getopt_raw, + .pso_setopt = surv0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, - .pso_getopt = surv_sock_getopt_surveytime, - .pso_setopt = surv_sock_setopt_surveytime, + .pso_getopt = surv0_sock_getopt_surveytime, + .pso_setopt = surv0_sock_setopt_surveytime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops surv_sock_ops = { - .sock_init = surv_sock_init, - .sock_fini = surv_sock_fini, - .sock_open = surv_sock_open, - .sock_close = surv_sock_close, - .sock_send = surv_sock_send, - .sock_recv = surv_sock_recv, - .sock_filter = surv_sock_filter, - .sock_options = surv_sock_options, +static nni_proto_sock_ops surv0_sock_ops = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, }; -static nni_proto surv_proto = { +static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &surv_sock_ops, - .proto_pipe_ops = &surv_pipe_ops, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, }; int nng_surveyor0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &surv_proto)); + return (nni_proto_open(sidp, &surv0_proto)); } diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h new file mode 100644 index 00000000..a7b6d943 --- /dev/null +++ b/src/protocol/survey0/survey.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H +#define NNG_PROTOCOL_SURVEY0_SURVEY_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_surveyor0_open(nng_socket *); + +#ifndef nng_surveyor_open +#define nng_surveyor_open nng_surveyor0_open +#endif + +#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H diff --git a/src/transport/inproc/CMakeLists.txt b/src/transport/inproc/CMakeLists.txt new file mode 100644 index 00000000..a445da85 --- /dev/null +++ b/src/transport/inproc/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# inproc protocol + +if (NNG_TRANSPORT_INPROC) + set(INPROC_SOURCES transport/inproc/inproc.c transport/inproc/inproc.h) + install(FILES inproc.h DESTINATION include/nng/transport/inproc) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${INPROC_SOURCES} PARENT_SCOPE) diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index c747f7dc..ae64263c 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -475,9 +475,8 @@ struct nni_tran nni_inproc_tran = { .tran_fini = nni_inproc_fini, }; - int nng_inproc_register(void) { - return (nni_tran_register(&nni_inproc_tran)); + return (nni_tran_register(&nni_inproc_tran)); } diff --git a/src/transport/ipc/CMakeLists.txt b/src/transport/ipc/CMakeLists.txt new file mode 100644 index 00000000..1a5496cf --- /dev/null +++ b/src/transport/ipc/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# ipc protocol + +if (NNG_TRANSPORT_IPC) + set(IPC_SOURCES transport/ipc/ipc.c transport/ipc/ipc.h) + install(FILES ipc.h DESTINATION include/nng/transport/ipc) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${IPC_SOURCES} PARENT_SCOPE) diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index afe8afa8..c13dbd34 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -704,9 +704,7 @@ static nni_tran_ep nni_ipc_ep_ops = { .ep_options = nni_ipc_ep_options, }; -// This is the IPC transport linkage, and should be the only global -// symbol in this entire file. -struct nni_tran nni_ipc_tran = { +static nni_tran nni_ipc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "ipc", .tran_ep = &nni_ipc_ep_ops, @@ -714,3 +712,9 @@ struct nni_tran nni_ipc_tran = { .tran_init = nni_ipc_tran_init, .tran_fini = nni_ipc_tran_fini, }; + +int +nng_ipc_register(void) +{ + return (nni_tran_register(&nni_ipc_tran)); +} diff --git a/src/transport/ipc/ipc.h b/src/transport/ipc/ipc.h new file mode 100644 index 00000000..f19762c7 --- /dev/null +++ b/src/transport/ipc/ipc.h @@ -0,0 +1,19 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_IPC_IPC_H +#define NNG_TRANSPORT_IPC_IPC_H + +// ipc transport. This is used for inter-process communication on +// the same host computer. + +extern int nng_ipc_register(void); + +#endif // NNG_TRANSPORT_IPC_IPC_H diff --git a/src/transport/tcp/CMakeLists.txt b/src/transport/tcp/CMakeLists.txt new file mode 100644 index 00000000..305c357a --- /dev/null +++ b/src/transport/tcp/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# TCP protocol + +if (NNG_TRANSPORT_TCP) + set(TCP_SOURCES transport/tcp/tcp.c transport/tcp/tcp.h) + install(FILES tcp.h DESTINATION include/nng/transport/tcp) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${TCP_SOURCES} PARENT_SCOPE) diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 43b2890d..e33db865 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -854,9 +854,7 @@ static nni_tran_ep nni_tcp_ep_ops = { .ep_options = nni_tcp_ep_options, }; -// This is the TCP transport linkage, and should be the only global -// symbol in this entire file. -struct nni_tran nni_tcp_tran = { +static nni_tran nni_tcp_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp", .tran_ep = &nni_tcp_ep_ops, @@ -864,3 +862,9 @@ struct nni_tran nni_tcp_tran = { .tran_init = nni_tcp_tran_init, .tran_fini = nni_tcp_tran_fini, }; + +int +nng_tcp_register(void) +{ + return (nni_tran_register(&nni_tcp_tran)); +} diff --git a/src/transport/tcp/tcp.h b/src/transport/tcp/tcp.h new file mode 100644 index 00000000..b4c79461 --- /dev/null +++ b/src/transport/tcp/tcp.h @@ -0,0 +1,18 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_TCP_TCP_H +#define NNG_TRANSPORT_TCP_TCP_H + +// TCP transport. This is used for communication over TCP/IP. + +extern int nng_tcp_register(void); + +#endif // NNG_TRANSPORT_TCP_TCP_H diff --git a/src/transport/zerotier/CMakeLists.txt b/src/transport/zerotier/CMakeLists.txt new file mode 100644 index 00000000..c1bb0c35 --- /dev/null +++ b/src/transport/zerotier/CMakeLists.txt @@ -0,0 +1,50 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# ZeroTier protocol + +set (NNG_TRANSPORT_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.") + +if (NNG_TRANSPORT_ZEROTIER) + + # We use the libzerotiercore.a library, which is unfortunately a C++ object + # even though it exposes only public C symbols. It would be extremely + # helpful if libzerotiercore didn't make us carry the whole C++ runtime + # behind us. The user must specify the location of the ZeroTier source + # tree (dev branch for now, and already compiled please) by setting the + # NNG_ZEROTIER_SOURCE macro. + # NB: This needs to be the zerotierone tree, not the libzt library. + # This is because we don't access the API, but instead use the low + # level zerotiercore functionality directly. + # NB: As we wind up linking libzerotiercore.a into the application, + # this means that your application will *also* need to either be licensed + # under the GPLv3, or you will need to have a commercial license from + # ZeroTier permitting its use elsewhere. + + enable_language(CXX) + find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_TRANSPORT_ZEROTIER_SOURCE}) + if (NNG_LIBZTCORE) + set(CMAKE_REQUIRED_INCLUDES ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include) + message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}") + set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) + set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES} PARENT_SCOPE) + set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include PARENT_SCOPE) + nng_check_sym(ZT_Node_join ZeroTierOne.h HAVE_ZTCORE) + endif() + if (NOT HAVE_ZTCORE) + message (FATAL_ERROR "Cannot find ZeroTier components") + endif() + message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}") + + set(ZT_SOURCES transport/zerotier/zerotier.c transport/zerotier/zerotier.h) + install(FILES zerotier.h DESTINATION include/nng/transport/zerotier) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${ZT_SOURCES} PARENT_SCOPE) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7cb4786f..3263e0a5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -61,20 +61,37 @@ if (NNG_TESTS) math (EXPR TEST_PORT "${TEST_PORT}+20") endmacro (add_nng_test) - macro (add_nng_compat_test NAME TIMEOUT) - list (APPEND all_tests ${NAME}) - add_executable (${NAME} ${NAME}.c) - target_link_libraries (${NAME} ${PROJECT_NAME}_static) - target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES}) - target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB) - if (CMAKE_THREAD_LIBS_INIT) - target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}") - endif() + # Compatibility tests are only added if all of the legacy protocols + # are present. It's not worth trying to figure out which of these + # should work and which shouldn't. + if (NNG_PROTO_BUS0 AND + NNG_PROTO_PAIR0 AND + NNG_PROTO_REQ0 AND + NNG_PROTO_REP0 AND + NNG_PROTO_PUB0 AND + NNG_PROTO_SUB0 AND + NNG_PROTO_PUSH0 AND + NNG_PROTO_PULL0) + + macro (add_nng_compat_test NAME TIMEOUT) + list (APPEND all_tests ${NAME}) + add_executable (${NAME} ${NAME}.c) + target_link_libraries (${NAME} ${PROJECT_NAME}_static) + target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES}) + target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB) + if (CMAKE_THREAD_LIBS_INIT) + target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}") + endif() - add_test (NAME ${NAME} COMMAND ${NAME} ${TEST_PORT}) - set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT}) - math (EXPR TEST_PORT "${TEST_PORT}+10") - endmacro (add_nng_compat_test) + add_test (NAME ${NAME} COMMAND ${NAME} ${TEST_PORT}) + set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT}) + math (EXPR TEST_PORT "${TEST_PORT}+10") + endmacro (add_nng_compat_test) + else () + macro (add_nng_compat_test NAME TIMEOUT) + endmacro (add_nng_compat_test) + message (STATUS "Compatibility tests disabled (unconfigured legacy protocols)") + endif () macro (add_nng_cpp_test NAME TIMEOUT) if (NOT NNG_ENABLE_COVERAGE) @@ -130,11 +147,13 @@ add_nng_test(device 5) add_nng_test(errors 2) add_nng_test(pair1 5) add_nng_test(udp 5) -if (NNG_HAVE_ZEROTIER) - add_nng_test(zt 60) -endif() +add_nng_test(zt 60) # compatbility tests +# We only support these if ALL the legacy protocols are supported. This +# is because we don't want to make modifications to partially enable some +# of these tests. Folks minimizing the library probably don't care too +# much about these anyway. add_nng_compat_test(compat_block 5) add_nng_compat_test(compat_bug777 5) add_nng_compat_test(compat_bus 5) diff --git a/tests/aio.c b/tests/aio.c index 2a31319d..0746fa51 100644 --- a/tests/aio.c +++ b/tests/aio.c @@ -11,6 +11,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/pair1/pair.h" + +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/bus.c b/tests/bus.c index 8283bf21..88137b81 100644 --- a/tests/bus.c +++ b/tests/bus.c @@ -11,6 +11,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/bus0/bus.h" + +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/cplusplus_pair.cc b/tests/cplusplus_pair.cc index 54a99737..11ce1912 100644 --- a/tests/cplusplus_pair.cc +++ b/tests/cplusplus_pair.cc @@ -8,59 +8,68 @@ // #include "nng.h" +#include "protocol/pair1/pair.h" #include <cstring> +#include <iostream> #define SOCKET_ADDRESS "inproc://c++" int main(int argc, char **argv) { - nng_socket s1; - nng_socket s2; - int rv; - size_t sz; - char buf[8]; - if ((rv = nng_pair0_open(&s1)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_pair0_open(&s2)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_send(s2, (void *)"ABC", 4, 0)) != 0) { - throw nng_strerror(rv); - } - sz = sizeof (buf); - if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) { - throw nng_strerror(rv); - } - if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) { - throw "Contents did not match"; - } - if ((rv = nng_send(s1, (void *)"DEF", 4, 0)) != 0) { - throw nng_strerror(rv); - } - sz = sizeof (buf); - if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) { - throw nng_strerror(rv); - } - if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) { - throw "Contents did not match"; - } - if ((rv = nng_close(s1)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_close(s2)) != 0) { - throw nng_strerror(rv); - } +#if defined(NNG_HAVE_PAIR1) - return (0); -} + nng_socket s1; + nng_socket s2; + int rv; + size_t sz; + char buf[8]; + + if ((rv = nng_pair1_open(&s1)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_pair1_open(&s2)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_send(s2, (void *) "ABC", 4, 0)) != 0) { + throw nng_strerror(rv); + } + sz = sizeof(buf); + if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) { + throw nng_strerror(rv); + } + if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) { + throw "Contents did not match"; + } + if ((rv = nng_send(s1, (void *) "DEF", 4, 0)) != 0) { + throw nng_strerror(rv); + } + sz = sizeof(buf); + if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) { + throw nng_strerror(rv); + } + if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) { + throw "Contents did not match"; + } + if ((rv = nng_close(s1)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_close(s2)) != 0) { + throw nng_strerror(rv); + } + std::cout << "Pass." << std::endl; +#else + std::cout << "Skipped (protocol unconfigured)." << std::endl; +#endif + + return (0); +} diff --git a/tests/device.c b/tests/device.c index 5c650a91..56ad2cb5 100644 --- a/tests/device.c +++ b/tests/device.c @@ -10,6 +10,8 @@ #include "convey.h" #include "nng.h" +#include "protocol/pair1/pair.h" +#include "stubs.h" #include <string.h> @@ -68,8 +70,8 @@ Main({ So(nng_listen(dev1, addr1, NULL, 0) == 0); So(nng_listen(dev2, addr2, NULL, 0) == 0); - So(nng_pair_open(&end1) == 0); - So(nng_pair_open(&end2) == 0); + So(nng_pair1_open(&end1) == 0); + So(nng_pair1_open(&end2) == 0); So(nng_dial(end1, addr1, NULL, 0) == 0); So(nng_dial(end2, addr2, NULL, 0) == 0); diff --git a/tests/pair1.c b/tests/pair1.c index 9ed73037..cd3a1035 100644 --- a/tests/pair1.c +++ b/tests/pair1.c @@ -10,8 +10,11 @@ #include "convey.h" #include "nng.h" +#include "protocol/pair1/pair.h" #include "trantest.h" +#include "stubs.h" + #include <string.h> #define SECOND(x) ((x) *1000) diff --git a/tests/pipeline.c b/tests/pipeline.c index 67be57c6..96ae3d17 100644 --- a/tests/pipeline.c +++ b/tests/pipeline.c @@ -10,6 +10,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/pollfd.c b/tests/pollfd.c index e1965b09..8526c705 100644 --- a/tests/pollfd.c +++ b/tests/pollfd.c @@ -8,9 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include "convey.h" -#include "nng.h" - #include <string.h> #ifndef _WIN32 @@ -32,14 +29,21 @@ #endif +#include "convey.h" +#include "nng.h" +#include "protocol/pair1/pair.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "stubs.h" + TestMain("Poll FDs", { Convey("Given a connected pair of sockets", { nng_socket s1; nng_socket s2; - So(nng_pair_open(&s1) == 0); - So(nng_pair_open(&s2) == 0); + So(nng_pair1_open(&s1) == 0); + So(nng_pair1_open(&s2) == 0); Reset({ nng_close(s1); nng_close(s2); @@ -106,26 +110,25 @@ TestMain("Poll FDs", { So(nng_getopt(s1, NNG_OPT_RECVFD, &fd, &sz) == 0); So(sz == sizeof(fd)); }); - Convey("We cannot get a send FD for PULL", { - nng_socket s3; - int fd; - size_t sz; - So(nng_pull_open(&s3) == 0); - Reset({ nng_close(s3); }); - sz = sizeof(fd); - So(nng_getopt(s3, NNG_OPT_SENDFD, &fd, &sz) == - NNG_ENOTSUP); - }); + }); - Convey("We cannot get a recv FD for PUSH", { - nng_socket s3; - int fd; - size_t sz; - So(nng_push_open(&s3) == 0); - Reset({ nng_close(s3); }); - sz = sizeof(fd); - So(nng_getopt(s3, NNG_OPT_RECVFD, &fd, &sz) == - NNG_ENOTSUP); - }); + Convey("We cannot get a send FD for PULL", { + nng_socket s3; + int fd; + size_t sz; + So(nng_pull0_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, NNG_OPT_SENDFD, &fd, &sz) == NNG_ENOTSUP); + }); + + Convey("We cannot get a recv FD for PUSH", { + nng_socket s3; + int fd; + size_t sz; + So(nng_push0_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, NNG_OPT_RECVFD, &fd, &sz) == NNG_ENOTSUP); }); }) diff --git a/tests/pubsub.c b/tests/pubsub.c index a8209a7d..796d951e 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -10,6 +10,9 @@ #include "convey.h" #include "nng.h" +#include "protocol/pubsub0/pub.h" +#include "protocol/pubsub0/sub.h" +#include "stubs.h" #include <string.h> @@ -34,6 +37,16 @@ TestMain("PUB/SUB pattern", { nng_msg *msg; So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP); }); + + Convey("It cannot subscribe", { + So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == + NNG_ENOTSUP); + }); + + Convey("It cannot unsubscribe", { + So(nng_setopt(pub, NNG_OPT_SUB_UNSUBSCRIBE, "", 0) == + NNG_ENOTSUP); + }); }); Convey("We can create a SUB socket", { @@ -48,6 +61,23 @@ TestMain("PUB/SUB pattern", { So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP); nng_msg_free(msg); }); + + Convey("It can subscribe", { + So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) == + 0); + So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0); + Convey("And it can unsubscribe", { + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, + "ABC", 3) == 0); + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", + 0) == 0); + + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", + 0) == NNG_ENOENT); + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, + "HELLO", 0) == NNG_ENOENT); + }); + }); }); Convey("We can create a linked PUB/SUB pair", { @@ -73,28 +103,6 @@ TestMain("PUB/SUB pattern", { nng_msleep(20); // give time for connecting threads - Convey("Sub can subscribe", { - So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) == - 0); - So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0); - Convey("Unsubscribe works", { - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, - "ABC", 3) == 0); - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", - 0) == 0); - - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", - 0) == NNG_ENOENT); - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, - "HELLO", 0) == NNG_ENOENT); - }); - }); - - Convey("Pub cannot subscribe", { - So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == - NNG_ENOTSUP); - }); - Convey("Subs can receive from pubs", { nng_msg *msg; diff --git a/tests/reconnect.c b/tests/reconnect.c index 8179055d..e191f70a 100644 --- a/tests/reconnect.c +++ b/tests/reconnect.c @@ -10,6 +10,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/reqrep.c b/tests/reqrep.c index df0621bc..4e837c34 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -10,6 +10,9 @@ #include "convey.h" #include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" #include <string.h> diff --git a/tests/resolv.c b/tests/resolv.c index 49d258c3..0678b05f 100644 --- a/tests/resolv.c +++ b/tests/resolv.c @@ -51,26 +51,6 @@ ip6tostr(void *addr) // too much on them, since localhost can be configured weirdly. Notably // the normal assumptions on Linux do *not* hold true. #if 0 - - Convey("Localhost IPv4 resolves", { - nni_aio aio; - const char *str; - memset(&aio, 0, sizeof (aio)); - nni_aio_init(&aio, NULL, NULL); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1, - &aio); - nni_aio_wait(&aio); - So(nni_aio_result(&aio) == 0); - So(aio.a_naddrs == 1); - So(aio.a_addrs[0].s_un.s_in.sa_family == NNG_AF_INET); - So(aio.a_addrs[0].s_un.s_in.sa_port == ntohs(80)); - So(aio.a_addrs[0].s_un.s_in.sa_addr == ntohl(0x7f000001)); - str = ip4tostr(&aio.a_addrs[0].s_un.s_in.sa_addr); - So(strcmp(str, "127.0.0.1") == 0); - nni_aio_fini(&aio); - } - ); - Convey("Localhost IPv6 resolves", { nni_aio aio; memset(&aio, 0, sizeof (aio)); @@ -87,44 +67,6 @@ ip6tostr(void *addr) So(strcmp(str, "::1") == 0); nni_aio_fini(&aio); } - ); - Convey("Localhost UNSPEC resolves", { - nni_aio aio; - memset(&aio, 0, sizeof (aio)); - const char *str; - int i; - nni_aio_init(&aio, NULL, NULL); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, - &aio); - nni_aio_wait(&aio); - So(nni_aio_result(&aio) == 0); - So(aio.a_naddrs == 2); - for (i = 0; i < 2; i++) { - switch (aio.a_addrs[i].s_un.s_family) { - case NNG_AF_INET6: - So(aio.a_addrs[i].s_un.s_in6.sa_port == - ntohs(80)); - str = - ip6tostr(&aio.a_addrs[i].s_un.s_in6.sa_addr); - So(strcmp(str, "::1") == 0); - break; - - case NNG_AF_INET: - So(aio.a_addrs[i].s_un.s_in.sa_port == - ntohs(80)); - str = - ip4tostr(&aio.a_addrs[i].s_un.s_in.sa_addr); - So(strcmp(str, "127.0.0.1") == 0); - break; - default: - So(1 == 0); - } - } - So(aio.a_addrs[0].s_un.s_family != - aio.a_addrs[1].s_un.s_family); - nni_aio_fini(&aio); - } - ); #endif TestMain("Resolver", { @@ -179,11 +121,18 @@ TestMain("Resolver", { So(strcmp(str, "8.8.4.4") == 0); nni_aio_fini(aio); }); + Convey("Numeric v6 resolves", { nni_aio * aio; const char * str; nng_sockaddr sa; + // Travis CI has moved some of their services to host that + // apparently don't support IPv6 at all. This is very sad. + if (getenv("TRAVIS") != NULL) { + ConveySkip("IPv6 missing from CI provider"); + } + nni_aio_init(&aio, NULL, NULL); aio->a_addr = &sa; nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio); @@ -230,5 +179,51 @@ TestMain("Resolver", { nni_aio_fini(aio); }); + Convey("Localhost IPv4 resolves", { + nni_aio * aio; + const char * str; + nng_sockaddr sa; + + nni_aio_init(&aio, NULL, NULL); + aio->a_addr = &sa; + nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + So(sa.s_un.s_in.sa_family == NNG_AF_INET); + So(sa.s_un.s_in.sa_port == ntohs(80)); + So(sa.s_un.s_in.sa_addr == ntohl(0x7f000001)); + str = ip4tostr(&sa.s_un.s_in.sa_addr); + So(strcmp(str, "127.0.0.1") == 0); + nni_aio_fini(aio); + }); + + Convey("Localhost UNSPEC resolves", { + nni_aio * aio; + const char * str; + nng_sockaddr sa; + + nni_aio_init(&aio, NULL, NULL); + aio->a_addr = &sa; + nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + So((sa.s_un.s_family == NNG_AF_INET) || + (sa.s_un.s_family == NNG_AF_INET6)); + switch (sa.s_un.s_family) { + case NNG_AF_INET: + So(sa.s_un.s_in.sa_port == ntohs(80)); + So(sa.s_un.s_in.sa_addr == ntohl(0x7f000001)); + str = ip4tostr(&sa.s_un.s_in.sa_addr); + So(strcmp(str, "127.0.0.1") == 0); + break; + case NNG_AF_INET6: + So(sa.s_un.s_in6.sa_port == ntohs(80)); + str = ip6tostr(&sa.s_un.s_in6.sa_addr); + So(strcmp(str, "::1") == 0); + break; + } + nni_aio_fini(aio); + }); + nni_fini(); }) diff --git a/tests/scalability.c b/tests/scalability.c index b1a9b767..d869d5fc 100644 --- a/tests/scalability.c +++ b/tests/scalability.c @@ -11,6 +11,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" + #include <string.h> static int nclients = 200; diff --git a/tests/sock.c b/tests/sock.c index 1c277b46..d3af414a 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -12,6 +12,9 @@ #include "nng.h" #include "trantest.h" +#include "protocol/pubsub0/sub.h" + +#include "protocol/pair1/pair.h" #include "stubs.h" #include <string.h> diff --git a/tests/stubs.h b/tests/stubs.h index 780ac772..0641c970 100644 --- a/tests/stubs.h +++ b/tests/stubs.h @@ -44,4 +44,51 @@ getms(void) #endif } +int +nosocket(nng_socket *s) +{ + ConveySkip("Protocol unconfigured"); + return (NNG_ENOTSUP); +} + +#ifndef NNG_HAVE_REQ0 +#define nng_req0_open nosocket +#endif + +#ifndef NNG_HAVE_REP0 +#define nng_rep0_open nosocket +#endif + +#ifndef NNG_HAVE_PUB0 +#define nng_pub0_open nosocket +#endif + +#ifndef NNG_HAVE_SUB0 +#define nng_sub0_open nosocket +#endif + +#ifndef NNG_HAVE_PAIR0 +#define nng_pair0_open nosocket +#endif + +#ifndef NNG_HAVE_PAIR1 +#define nng_pair1_open nosocket +#endif + +#ifndef NNG_HAVE_PUSH0 +#define nng_push0_open nosocket +#endif + +#ifndef NNG_HAVE_PULL0 +#define nng_pull0_open nosocket +#endif + +#ifndef NNG_HAVE_SURVEYOR0 +#define nng_surveyor0_open nosocket +#endif + +#ifndef NNG_HAVE_RESPONDENT0 +#define nng_respondent0_open nosocket +#endif + #endif // STUBS_H diff --git a/tests/survey.c b/tests/survey.c index f4fab009..a3a7ba1d 100644 --- a/tests/survey.c +++ b/tests/survey.c @@ -9,7 +9,11 @@ // #include "convey.h" + #include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" #include <string.h> diff --git a/tests/tcp.c b/tests/tcp.c index 79fe1893..fdcf458c 100644 --- a/tests/tcp.c +++ b/tests/tcp.c @@ -9,8 +9,11 @@ // #include "convey.h" +#include "nng.h" +#include "protocol/pair1/pair.h" #include "trantest.h" +#include "stubs.h" // TCP tests. #ifndef _WIN32 diff --git a/tests/tcp6.c b/tests/tcp6.c index b1695c84..a3e55d18 100644 --- a/tests/tcp6.c +++ b/tests/tcp6.c @@ -13,6 +13,9 @@ #include "core/nng_impl.h" // TCP tests for IPv6. +#include "protocol/pair1/pair.h" + +#include "stubs.h" static int has_v6(void) diff --git a/tests/trantest.h b/tests/trantest.h index 37763d24..d334c257 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -11,6 +11,8 @@ #include "convey.h" #include "core/nng_impl.h" #include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" #include <stdlib.h> #include <string.h> @@ -30,9 +32,54 @@ unsigned trantest_port = 0; typedef int (*trantest_proptest_t)(nng_msg *, nng_listener, nng_dialer); +#ifndef NNG_HAVE_ZEROTIER +#define nng_zt_register notransport +#endif +#ifndef NNG_HAVE_INPROC +#define nng_inproc_register notransport +#endif +#ifndef NNG_HAVE_IPC +#define nng_ipc_register notransport +#endif +#ifndef NNG_HAVE_TCP +#define nng_tcp_register notransport +#endif + +int +notransport(void) +{ + ConveySkip("Transport not configured"); + return (NNG_ENOTSUP); +} + +#define CHKTRAN(s, t) \ + if (strncmp(s, t, strlen(t)) == 0) \ + notransport() + +void +trantest_checktran(const char *url) +{ +#ifndef NNG_HAVE_ZEROTIER + CHKTRAN(url, "zt:"); +#endif +#ifndef NNG_HAVE_INPROC + CHKTRAN(url, "inproc:"); +#endif +#ifndef NNG_HAVE_IPC + CHKTRAN(url, "ipc:"); +#endif +#ifndef NNG_HAVE_TCP + CHKTRAN(url, "tcp:"); +#endif + + (void) url; +} + void trantest_next_address(char *out, const char *template) { + trantest_checktran(template); + if (trantest_port == 0) { char *pstr; trantest_port = 5555; @@ -56,11 +103,16 @@ void trantest_init(trantest *tt, const char *addr) { trantest_next_address(tt->addr, addr); + +#if defined(NNG_HAVE_REQ0) && defined(NNG_HAVE_REP0) So(nng_req_open(&tt->reqsock) == 0); So(nng_rep_open(&tt->repsock) == 0); tt->tran = nni_tran_find(addr); So(tt->tran != NULL); +#else + ConveySkip("Missing REQ or REP protocols"); +#endif } void @@ -9,9 +9,12 @@ // #include "convey.h" +#include "nng.h" +#include "protocol/pair0/pair.h" +#include "transport/zerotier/zerotier.h" #include "trantest.h" -#include "transport/zerotier/zerotier.h" +#include "stubs.h" // zerotier tests. @@ -35,6 +38,10 @@ mkdir(const char *path, int mode) #include <unistd.h> #endif // WIN32 +#ifndef NNG_HAVE_ZEROTIER +#define nng_zt_network_status_ok 0 +#endif + static int check_props(nng_msg *msg, nng_listener l, nng_dialer d) { @@ -197,6 +204,8 @@ TestMain("ZeroTier Transport", { char addr[NNG_MAXADDRLEN]; int rv; + So(nng_zt_register() == 0); + snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port); So(nng_pair_open(&s) == 0); @@ -240,6 +249,8 @@ TestMain("ZeroTier Transport", { // uint64_t node = 0xb000072fa6ull; // my personal host uint64_t node = 0x2d2f619cccull; // my personal host + So(nng_zt_register() == 0); + snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u", (unsigned long long) node, port); @@ -258,6 +269,8 @@ TestMain("ZeroTier Transport", { uint64_t node1 = 0; uint64_t node2 = 0; + So(nng_zt_register() == 0); + snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port); So(nng_pair_open(&s) == 0); @@ -306,6 +319,7 @@ TestMain("ZeroTier Transport", { port = 9944; // uint64_t node = 0xb000072fa6ull; // my personal host + So(nng_zt_register() == 0); snprintf(addr1, sizeof(addr1), "zt://" NWID ":%u", port); |
