diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-31 13:06:38 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-11-02 16:10:26 -0700 |
| commit | 7bf591e20a94b8d926f92ab9b320f3b75d342345 (patch) | |
| tree | d67ce7cab328a004346419047feede7d579dad77 | |
| parent | d340af7dc250388f48d36c5078c4857c51bb6121 (diff) | |
| download | nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.gz nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.tar.bz2 nng-7bf591e20a94b8d926f92ab9b320f3b75d342345.zip | |
fixes #143 Protocols and transports should be "configurable"
This makes all the protocols and transports optional. All
of them except ZeroTier are enabled by default, but you can
now disable them (remove from the build) with cmake options.
The test suite is modified so that tests still run as much
as they can, but skip over things caused by missing functionality
from the library (due to configuration).
Further, the constant definitions and prototypes for functions
that are specific to transports or protocols are moved into
appropriate headers, which should be included directly by
applications wishing to use these.
We have also added and improved documentation -- all of the
transports are documented, and several more man pages for
protocols have been added. (Req/Rep and Surveyor are still
missing.)
76 files changed, 2686 insertions, 1161 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index c2815ca3..6cd49b9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,11 +93,83 @@ option (NNG_TESTS "Build and run tests" ON) option (NNG_TOOLS "Build extra tools" OFF) option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS}) option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF) -option (NNG_ENABLE_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) -set (NNG_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.") # Enable access to private APIs for our own use. add_definitions (-DNNG_PRIVATE) +option (NNG_PROTO_BUS0 "Enable BUSv0 protocol." ON) +if (NNG_PROTO_BUS0) + add_definitions (-DNNG_HAVE_BUS0) +endif () + +option (NNG_PROTO_PAIR0 "Enable PAIRv0 protocol." ON) +if (NNG_PROTO_PAIR0) + add_definitions (-DNNG_HAVE_PAIR0) +endif () + +option (NNG_PROTO_PAIR1 "Enable PAIRv1 protocol." ON) +if (NNG_PROTO_PAIR1) + add_definitions (-DNNG_HAVE_PAIR1) +endif () + +option (NNG_PROTO_REQ0 "Enable REQv0 protocol." ON) +if (NNG_PROTO_REQ0) + add_definitions (-DNNG_HAVE_REQ0) +endif () + +option (NNG_PROTO_REP0 "Enable REPv0 protocol." ON) +if (NNG_PROTO_REP0) + add_definitions (-DNNG_HAVE_REP0) +endif () + +option (NNG_PROTO_PUB0 "Enable PUBv0 protocol." ON) +if (NNG_PROTO_PUB0) + add_definitions (-DNNG_HAVE_PUB0) +endif () + +option (NNG_PROTO_SUB0 "Enable SUBv0 protocol." ON) +if (NNG_PROTO_SUB0) + add_definitions (-DNNG_HAVE_SUB0) +endif () + +option (NNG_PROTO_PUSH0 "Enable PUSHv0 protocol." ON) +if (NNG_PROTO_PUSH0) + add_definitions (-DNNG_HAVE_PUSH0) +endif () + +option (NNG_PROTO_PULL0 "Enable PULLv0 protocol." ON) +if (NNG_PROTO_PULL0) + add_definitions (-DNNG_HAVE_PULL0) +endif () + +option (NNG_PROTO_SURVEYOR0 "Enable SURVEYORv0 protocol." ON) +if (NNG_PROTO_SURVEYOR0) + add_definitions (-DNNG_HAVE_SURVEYOR0) +endif () + +option (NNG_PROTO_RESPONDENT0 "Enable RESPONDENTv0 protocol." ON) +if (NNG_PROTO_RESPONDENT0) + add_definitions (-DNNG_HAVE_RESPONDENT0) +endif () + +option (NNG_TRANSPORT_INPROC "Enable inproc transport." ON) +if (NNG_TRANSPORT_INPROC) + add_definitions (-DNNG_HAVE_INPROC) +endif () + +option (NNG_TRANSPORT_IPC "Enable IPC transport." ON) +if (NNG_TRANSPORT_IPC) + add_definitions (-DNNG_HAVE_IPC) +endif () + +option (NNG_TRANSPORT_TCP "Enable TCP transport." ON) +if (NNG_TRANSPORT_TCP) + add_definitions (-DNNG_HAVE_TCP) +endif () + +option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) +if (NNG_TRANSPORT_ZEROTIER) + add_definitions (-DNNG_HAVE_ZEROTIER) +endif () # Platform checks. if (NNG_ENABLE_COVERAGE) @@ -247,39 +319,6 @@ nng_check_sym (strlcat string.h NNG_HAVE_STRLCAT) nng_check_sym (strlcpy string.h NNG_HAVE_STRLCPY) nng_check_sym (strnlen string.h NNG_HAVE_STRNLEN) -# Search for ZeroTier -# We use the libzerotiercore.a library, which is unfortunately a C++ object -# even though it exposes only public C symbols. It would be extremely -# helpful if libzerotiercore didn't make us carry the whole C++ runtime -# behind us. The user must specify the location of the ZeroTier source -# tree (dev branch for now, and already compiled please) by setting the -# NNG_ZEROTIER_SOURCE macro. -# NB: This needs to be the zerotierone tree, not the libzt library. -# This is because we don't access the API, but instead use the low -# level zerotiercore functionality directly. -# NB: As we wind up linking libzerotiercore.a into the application, -# this means that your application will *also* need to either be licensed -# under the GPLv3, or you will need to have a commercial license from -# ZeroTier permitting its use elsewhere. -if (NNG_ENABLE_ZEROTIER) - enable_language(CXX) - find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_ZEROTIER_SOURCE}) - if (NNG_LIBZTCORE) - set(CMAKE_REQUIRED_INCLUDES ${NNG_ZEROTIER_SOURCE}/include) -# set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} c++) -# set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} c++) - message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}") - set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) - set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) - set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_ZEROTIER_SOURCE}/include) - nng_check_sym(ZT_Node_join ZeroTierOne.h NNG_HAVE_ZEROTIER) - endif() - if (NOT NNG_HAVE_ZEROTIER) - message (FATAL_ERROR "Cannot find ZeroTier components") - endif() - message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}") -endif() - add_subdirectory (src) if (NNG_TESTS) diff --git a/docs/nng_bus.adoc b/docs/nng_bus.adoc index 03c43381..dd83062b 100644 --- a/docs/nng_bus.adoc +++ b/docs/nng_bus.adoc @@ -21,11 +21,9 @@ SYNOPSIS [source,c] ---------- -#include <nng/protocol/bus/bus.h> +#include <nng/protocol/bus0/bus.h> -int nng_bus_open(nng_socket *s); int nng_bus0_open(nng_socket *s); - ---------- DESCRIPTION @@ -57,7 +55,7 @@ likely that message loss is to occur. Socket Operations ~~~~~~~~~~~~~~~~~ -The `nng_bus_open()` call creates a bus socket. This socket +The `nng_bus0_open()` call creates a bus socket. This socket may be used to send and receive messages. Sending messages will attempt to deliver to each directly connected peer. diff --git a/docs/nng_inproc.adoc b/docs/nng_inproc.adoc index 9e044f6d..a6d83c95 100644 --- a/docs/nng_inproc.adoc +++ b/docs/nng_inproc.adoc @@ -47,8 +47,7 @@ URI Format This transport uses URIs using the scheme `inproc://`, followed by an arbitrary string of text, terminated by a `NUL` byte. The -entire URI must be less than `NNG_MAXADDRLEN` bytes long, including -the terminating `NUL`. +entire URI must be less than `NNG_MAXADDRLEN` bytes long. Multiple URIs can be used within the same application, and they will not interfere with one another. @@ -65,16 +64,22 @@ When using an `nng_sockaddr` structure, the actual structure is of type [source,c] -------- -#define NNG_AF_INPROC 1 +#define NNG_AF_INPROC 1 <1> #define NNG_MAXADDRLEN 128 -struct nng_sockaddr_inproc { +typedef nng_sockaddr_inproc { + // <2> uint16_t sa_family; // must be NNG_AF_INPROC - uint32_t sa_path[NNG_MAXADDRLEN]; // arbitrary "path" + char sa_path[NNG_MAXADDRLEN]; // arbitrary "path" + // } -------- +<1> The values of these macros may change, so applications +should avoid depending upon their values and instead use them symbolically. +<2> Other members may be present, but only those listed here +are suitable for application use. -The `sa_family` member will have the value `NNG_AF_INPROC` (1). +The `sa_family` member will have the value `NNG_AF_INPROC`. The `sa_path` member is an ASCIIZ string, and may contain any characters, terminated by a `NUL` byte. diff --git a/docs/nng_ipc.adoc b/docs/nng_ipc.adoc new file mode 100644 index 00000000..c5dac4f7 --- /dev/null +++ b/docs/nng_ipc.adoc @@ -0,0 +1,110 @@ +nng_ipc(7) +========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_ipc - IPC transport for nng + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/transport/ipc/ipc.h> + +int nng_ipc_register(void); +---------- + +DESCRIPTION +----------- + +The _nng_ipc_ transport provides communication support between +_nng_ sockets within different processes on the same host. For POSIX +platforms, this is implemented using UNIX domain sockets. For Windows, +this is implemented using Windows Named Pipes. Other platforms may +have different implementation strategies. + +// We need to insert a reference to the nanomsg RFC. + +Registration +~~~~~~~~~~~~ + +The _ipc_ transport is generally built-in to the _nng_ core, so +no extra steps to use it should be necessary. + +URI Format +~~~~~~~~~~ + +This transport uses URIs using the scheme `ipc://`, followed by +a path name in the file system where the socket or named pipe +should be created. + +TIP: On Windows, all names are prefixed by `\.\pipe\` and do not +occupy the normal file system. On POSIX platforms, the path is +taken literally, and is relative to the current directory, unless +an extra leading `/` is provided. For example, `ipc://myname` refers +to the name `myname` in the current directory, whereas `ipc:///tmp/myname` +refers to `myname` located in `/tmp`. + +The entire URI must be less than `NNG_MAXADDRLEN` bytes long. + +Socket Address +~~~~~~~~~~~~~~ + +When using an `nng_sockaddr` structure, the actual structure is of type +`nng_sockaddr_ipc`. This is a `struct` type with the following definition: + +[source,c] +-------- +#define NNG_AF_IPC 2 <1> +#define NNG_MAXADDRLEN 128 + +typedef struct { + // ... <2> + uint16_t sa_family; // must be NNG_AF_IPC + char sa_path[NNG_MAXADDRLEN]; // arbitrary "path" + // ... +} nng_sockaddr_ipc; +-------- +<1> The values of these macros may change, so applications +should avoid depending upon their values and instead use them symbolically. +<2> Other members may be present, but only those listed here +are suitable for application use. + +The `sa_family` member will have the value `NNG_AF_IPC`. +The `sa_path` member is an ASCIIZ string, and may contain any legal +path name (platform-dependent), terminated by a `NUL` byte. + +Transport Options +~~~~~~~~~~~~~~~~~ + +The _ipc_ transport has no special +options.footnote:[Options for security attributes and credentials are planned.] + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_pair.adoc b/docs/nng_pair.adoc new file mode 100644 index 00000000..b55000af --- /dev/null +++ b/docs/nng_pair.adoc @@ -0,0 +1,155 @@ +nng_pair(7) +=========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_pair - pair protocol + +SYNOPSIS +-------- + +.Version 0 +[source,c] +---------- +#include <nng/protocol/pair0/pair.h> + +int nng_pair0_open(nng_socket *s); +---------- + +.Version 1 +[source,c] +---------- +#include <nng/protocol/pair1/pair.h> + +int nng_pair1_open(nng_socket *s); +---------- + +DESCRIPTION +----------- + +The _nng_pair_ protocol implements a peer-to-peer pattern, where +relationships between peers are one-to-one. + +Version 1 of this protocol supports an optional _polyamorous_ mode where a +peer can maintain multiple partnerships. Using this mode requires +some additional sophistication in the application. + +Socket Operations +~~~~~~~~~~~~~~~~~ + +The `nng_pair_open()` call creates a _pair_ socket. Normally, this +pattern will block when attempting to send a message, if no peer is +able to receive the message. + +NOTE: Even though this mode may appear to be "reliable", because back-pressure +prevents discarding messages most of the time, there are topologies involving +_devices_ (see <<nng_device.adoc#,nng_device(3)>>) or raw mode sockets where +messages may be discarded. Applications that require reliable delivery +semantics should consider using <<nng_req.adoc#,nng_req(7)>> sockets, or +implement their own acknowledgement layer on top of pair sockets. + +In order to avoid head-of-line blocking conditions, _polyamorous_ mode pair +sockets (version 1 only) discard messages if they are unable to deliver them +to a peer. + +Protocol Versions +~~~~~~~~~~~~~~~~~ + +Version 0 is the legacy version of this protocol. It lacks any header +information, and is suitable when building simple one-to-one topologies. + +TIP: Use version 0 if you need to communicate with other implementations, +including the legacy https://github.com/nanomsg/nanomsg[nanomsg] library or +https://github.com/go-mangos/mangos[mangos]. + +Version 1 of the protocol offers improved protection against loops when +used with <<nng_device.adoc#,nng_device(3)>>. It also offers _polyamorous_ +mode for forming multiple partnerships on a single socket. + +NOTE: Version 1 of this protocol is considered experimental at this time. + +Polyamorous Mode +~~~~~~~~~~~~~~~~ + +Normally pair sockets are for one-to-one communication, and a given peer +will reject new connections if it already has an active connection to another +peer. + +In _polyamorous_ mode, which is only available with version 1, a socket can +support many one-to-one connections. In this mode, the application must +choose the remote peer to receive an ougoing message by setting the value +of the pipe ID on the outgoing message using +the <<nng_msg_set_pipe.adoc#,nng_msg_set_pipe(3)>> function. + +Most often the value of the outgoing pipe ID will be obtained from an incoming +message using the <<nng_msg_get_pipe.adoc#,nng_msg_get_pipe(3)>> function, +such as when replying to an incoming message. + +In order to prevent head-of-line blocking, if the peer on the given pipe +is not able to receive (or the pipe is no longer available, such as if the +peer has disconnected), then the message will be discarded with no notification +to the sender. + +Protocol Options +~~~~~~~~~~~~~~~~ + +The following protocol-specific options are available. + +`NNG_OPT_PAIR1_POLY`:: + + (Version 1 only). This option enables the use of _polyamorous_ mode. + The value is read-write, and takes an integer boolean value. The default + false value (0) indicates that legacy monogamous mode should be used. + +`NNG_OPT_MAXTTL`:: + + (Version 1 only). Maximum time-to-live. This option is an integer value + between 0 and 255, + inclusive, and is the maximum number of "hops" that a message may + pass through until it is discarded. The default value is 8. A value + of 0 may be used to disable the loop protection, allowing an infinite + number of hops. ++ +TIP: Each node along a forwarding path may have it's own value for the +maximum time-to-live, and performs its own checks before forwarding a message. +Therefore it is helpful if all nodes in the topology use the same value for +this option. + +Protocol Headers +~~~~~~~~~~~~~~~~ + +Version 0 of the pair protocol has no protocol-specific headers. + +Version 1 of the pair protocol uses a single 32-bit unsigned value. The +low-order (big-endian) byte of this value contains a "hop" count, and is +used in conjuction with the `NNG_OPT_MAXTTL` option to guard against +device forwarding loops. This value is initialized to 1, and incremented +each time the message is received by a new node. + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_pub.adoc b/docs/nng_pub.adoc index d4db45ae..4c3b764b 100644 --- a/docs/nng_pub.adoc +++ b/docs/nng_pub.adoc @@ -21,11 +21,9 @@ SYNOPSIS [source,c] ---------- -#include <nng/protocol/pubsub/pubsub.h> +#include <nng/protocol/pubsub0/pub.h> -int nng_pub_open(nng_socket *s); int nng_pub0_open(nng_socket *s); - ---------- DESCRIPTION @@ -51,7 +49,7 @@ accordingly. Socket Operations ~~~~~~~~~~~~~~~~~ -The `nng_pub_open()` call creates a publisher socket. This socket +The `nng_pub0_open()` call creates a publisher socket. This socket may be used to send messages, but is unable to receive them. Attempts to receive messages will result in `NNG_ENOTSUP`. diff --git a/docs/nng_pull.adoc b/docs/nng_pull.adoc new file mode 100644 index 00000000..17614a3d --- /dev/null +++ b/docs/nng_pull.adoc @@ -0,0 +1,85 @@ +nng_pull(7) +=========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_pull - pull protocol + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/protocol/pipeline0/pull.h> + +int nng_pull0_open(nng_socket *s); +---------- + +DESCRIPTION +----------- + +The _nng_pull_ protocol is one half of a pipeline pattern. The other half +is the <<nng_push.adoc#,nng_push(7)>> protocol. + +In the pipeline pattern, pushers distribute messages to pullers. +Each message sent +by a pusher will be sent to one of its peer pullers, +chosen in a round-robin fashion +from the set of connected peers available for receiving. +This property makes this pattern useful in load-balancing scenarios. + +Socket Operations +~~~~~~~~~~~~~~~~~ + +The `nng_pull0_open()` call creates a puller socket. This socket +may be used to receive messages, but is unable to send them. Attempts +to send messages will result in `NNG_ENOTSUP`. + +When receiving messages, the _nng_pull_ protocol accepts messages as +they arrive from peers. If two peers both have a message ready, the +order in which messages are handled is undefined. + +Protocol Versions +~~~~~~~~~~~~~~~~~ + +Only version 0 of this protocol is supported. (At the time of writing, +no other versions of this protocol have been defined.) + +Protocol Options +~~~~~~~~~~~~~~~~ + +The _nng_pull_ protocol has no protocol-specific options. + +Protocol Headers +~~~~~~~~~~~~~~~~ + +The _nng_pull_ protocol has no protocol-specific headers. + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> +<<nng_push.adoc#,nng_push(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_push.adoc b/docs/nng_push.adoc new file mode 100644 index 00000000..faf783b6 --- /dev/null +++ b/docs/nng_push.adoc @@ -0,0 +1,93 @@ +nng_push(7) +=========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_push - push protocol + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/protocol/pipeline0/push.h> + +int nng_push0_open(nng_socket *s); +---------- + +DESCRIPTION +----------- + +The _nng_push_ protocol is one half of a pipeline pattern. The +other side is the <<nng_pull.adoc#,nng_pull(7)>> protocol. + +In the pipeline pattern, pushers distribute messages to pullers. +Each message sent +by a pusher will be sent to one of its peer pullers, +chosen in a round-robin fashion +from the set of connected peers available for receiving. +This property makes this pattern useful in load-balancing scenarios. + +Socket Operations +~~~~~~~~~~~~~~~~~ + +The `nng_push0_open()` call creates a pusher socket. This socket +may be used to send messages, but is unable to receive them. Attempts +to receive messages will result in `NNG_ENOTSUP`. + +Send operations will observe flow control (back-pressure), so that +only peers capable of accepting a message will be considered. If no +peer is available to receive a message, then the send operation will +wait until one is available, or the operation times out. + +NOTE: Although the pipeline protocol honors flow control, and attempts +to avoid dropping messages, no guarantee of delivery is made. Furthermore, +as there is no capability for message acknowledgement, applications that +need reliable delivery are encouraged to consider the +<<nng_req.adoc#,nng_req(7)>> protocol instead. + +Protocol Versions +~~~~~~~~~~~~~~~~~ + +Only version 0 of this protocol is supported. (At the time of writing, +no other versions of this protocol have been defined.) + +Protocol Options +~~~~~~~~~~~~~~~~ + +The _nng_push_ protocol has no protocol-specific options. + +Protocol Headers +~~~~~~~~~~~~~~~~ + +The _nng_push_ protocol has no protocol-specific headers. + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> +<<nng_pull.adoc#,nng_pull(7)>> +<<nng_req.adoc#,nng_req(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/docs/nng_sub.adoc b/docs/nng_sub.adoc index 0b409904..5359acec 100644 --- a/docs/nng_sub.adoc +++ b/docs/nng_sub.adoc @@ -21,11 +21,9 @@ SYNOPSIS [source,c] ---------- -#include <nng/protocol/pubsub/pubsub.h> +#include <nng/protocol/pubsub0/sub.h> -int nng_sub_open(nng_socket *s); int nng_sub0_open(nng_socket *s); - ---------- DESCRIPTION @@ -51,7 +49,7 @@ accordingly. Socket Operations ~~~~~~~~~~~~~~~~~ -The `nng_sub_open()` call creates a subscriber socket. This socket +The `nng_sub0_open()` call creates a subscriber socket. This socket may be used to receive messages, but is unable to send them. Attempts to send messages will result in `NNG_ENOTSUP`. diff --git a/docs/nng_tcp.adoc b/docs/nng_tcp.adoc new file mode 100644 index 00000000..a1d0cdab --- /dev/null +++ b/docs/nng_tcp.adoc @@ -0,0 +1,137 @@ +nng_tcp(7) +========== +:doctype: manpage +:manmanual: nng +:mansource: nng +:icons: font +:source-highlighter: pygments +:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \ + Copyright 2017 Capitar IT Group BV <info@capitar.com> \ + This software is supplied under the terms of the MIT License, a \ + copy of which should be located in the distribution where this \ + file was obtained (LICENSE.txt). A copy of the license may also \ + be found online at https://opensource.org/licenses/MIT. + +NAME +---- +nng_tcp - TCP/IP transport for nng + +SYNOPSIS +-------- + +[source,c] +---------- +#include <nng/transport/tcp/tcp.h> + +int nng_tcp_register(void); +---------- + +DESCRIPTION +----------- + +The _nng_tcp_ transport provides communication support between +_nng_ sockets across a TCP/IP network. Both IPv4 and IPv6 +are supported when the underlying platform also supports it. + +// We need to insert a reference to the nanomsg RFC. + +Registration +~~~~~~~~~~~~ + +The _tcp_ transport is generally built-in to the _nng_ core, so +no extra steps to use it should be necessary. + +URI Format +~~~~~~~~~~ + +This transport uses URIs using the scheme `tcp://`, followed by +an IP address or hostname, followed by a colon and finally a +TCP port number. For example, to contact port 80 on the localhost +either of the following URIs could be used: `tcp://127.0.0.1:80` or +`tcp://localhost:80`. + +When specifying IPv6 addresses, the address must be enclosed in +square brackets (`[]`) to avoid confusion with the final colon +separating the port. + +For example, the same port 80 on the IPv6 loopback address ('::1') would +be specified as `tcp://[::1]:80`. + +NOTE: When using symbolic names, the name is resolved when the +name is first used. _nng_ won't become aware of changes in the +name resolution until restart, +usually.footnote:[This is a bug and will likely be fixed in the future.] + +The special value of 0 (`INADDR_ANY`) can be used for a listener +to indicate that it should listen on all interfaces on the host. +A short-hand for this form is to either omit the address, or specify +the asterisk (`*`) character. For example, the following three +URIs are all equivalent, and could be used to listen to port 9999 +on the host: + + 1. `tcp://0.0.0.0:9999` + 2. `tcp://*:9999` + 3. `tcp://:9999` + +The entire URI must be less than `NNG_MAXADDRLEN` bytes long. + +Socket Address +~~~~~~~~~~~~~~ + +When using an `nng_sockaddr` structure, the actual structure is either +of type `nng_sockaddr_in` (for IPv4) or `nng_sockaddr_in6` (for IPv6). +These are `struct` types with the following definitions: + +[source,c] +-------- +#define NNG_AF_INET 3 <1> +#define NNG_AF_INET6 4 +#define NNG_MAXADDRLEN 128 + +typedef struct { + // ... <2> + uint16_t sa_family; // must be NNG_AF_INET + uint16_t sa_port; // TCP port number + uint32_t sa_addr; + // ... +} nng_sockaddr_in; + +typedef struct { + // ... <2> + uint16_t sa_family; // must be NNG_AF_INET6 + uint16_t sa_port; // TCP port number + uint8_t sa_addr[16]; + // ... +} nng_sockaddr_in6; +-------- +<1> The values of these macros may change, so applications +should avoid depending upon their values and instead use them symbolically. +<2> Other members may be present, but only those listed here +are suitable for application use. + +The `sa_family` member will have the value `NNG_AF_INET` or `NNG_AF_INET6`. +The `sa_port` and `sa_addr` are the TCP port number and address, both in +network byte order (most significant byte is first). + +Transport Options +~~~~~~~~~~~~~~~~~ + +The _tcp_ transport has no special +options.footnote:[Options for TCP keepalive, linger, and nodelay are planned.] + +AUTHORS +------- +link:mailto:garrett@damore.org[Garrett D'Amore] + +SEE ALSO +-------- +<<nng.adoc#,nng(7)>> + +COPYRIGHT +--------- + +Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] + +Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV] + +This document is supplied under the terms of the +https://opensource.org/licenses/LICENSE.txt[MIT License]. diff --git a/perf/perf.c b/perf/perf.c index 9333d4f5..1d89fc61 100644 --- a/perf/perf.c +++ b/perf/perf.c @@ -22,6 +22,24 @@ // change without notice, and not part of the stable API or ABI. #include "core/nng_impl.h" +#if defined(NNG_ENABLE_PAIR1) +#include "protocol/pair1/pair.h" + +#elif defined(NNG_ENABLE_PAIR0) +#include "protocol/pair0/pair.h" + +#else + +static void die(const char *, ...); + +static int +nng_pair_open(nng_socket *rv) +{ + die("No pair protocol enabled in this build!"); + return (NNG_ENOTSUP); +} +#endif // NNG_ENABLE_PAIR + static void latency_client(const char *, int, int); static void latency_server(const char *, int, int); static void throughput_client(const char *, int, int); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a5a49052..4901789b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -74,30 +74,6 @@ set (NNG_SOURCES core/timer.h core/transport.c core/transport.h - - protocol/bus/bus.c - - protocol/pair/pair_v0.c - protocol/pair/pair_v1.c - - protocol/pipeline/pull.c - protocol/pipeline/push.c - - protocol/pubsub/pub.c - protocol/pubsub/sub.c - - protocol/reqrep/rep.c - protocol/reqrep/req.c - - protocol/survey/respond.c - protocol/survey/survey.c - - transport/inproc/inproc.c - - transport/ipc/ipc.c - - transport/tcp/tcp.c - ) if (NNG_PLATFORM_POSIX) @@ -146,14 +122,19 @@ endif() install(FILES transport/inproc/inproc.h DESTINATION include/nng/transport/inproc) -if (NNG_ENABLE_ZEROTIER) - set (NNG_SOURCES ${NNG_SOURCES} - transport/zerotier/zerotier.c - transport/zerotier/zerotier.h - ) - install(FILES transport/zerotier/zerotier.h - DESTINATION include/nng/transport/zerotier) -endif() + +add_subdirectory(protocol/bus0) +add_subdirectory(protocol/pair0) +add_subdirectory(protocol/pair1) +add_subdirectory(protocol/pipeline0) +add_subdirectory(protocol/pubsub0) +add_subdirectory(protocol/reqrep0) +add_subdirectory(protocol/survey0) + +add_subdirectory(transport/inproc) +add_subdirectory(transport/ipc) +add_subdirectory(transport/tcp) +add_subdirectory(transport/zerotier) include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src ${NNG_REQUIRED_INCLUDES}) @@ -216,3 +197,7 @@ install (TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_static LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) + +# Promote settings to parent +set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} PARENT_SCOPE) +set(NNG_REQUIRED_LFLAGS ${NNG_REQUIRED_LFLAGS} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 3f1ffcb5..10bffaa4 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -297,7 +297,6 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - nni_aio *aio; if (mq->mq_cb_fn != NULL) { int flags = 0; diff --git a/src/core/protocol.h b/src/core/protocol.h index 5db259f0..39bde059 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -147,21 +147,26 @@ extern int nni_proto_open(nng_socket *, const nni_proto *); // Protocol numbers are never more than 16 bits. Also, there will never be // a valid protocol numbered 0 (NNG_PROTO_NONE). #define NNI_PROTO(major, minor) (((major) *16) + (minor)) -enum nng_proto_enum { - NNI_PROTO_NONE = NNI_PROTO(0, 0), - NNI_PROTO_PAIR_V0 = NNI_PROTO(1, 0), - NNI_PROTO_PAIR_V1 = NNI_PROTO(1, 1), - NNI_PROTO_PUB_V0 = NNI_PROTO(2, 0), - NNI_PROTO_SUB_V0 = NNI_PROTO(2, 1), - NNI_PROTO_REQ_V0 = NNI_PROTO(3, 0), - NNI_PROTO_REP_V0 = NNI_PROTO(3, 1), - NNI_PROTO_PUSH_V0 = NNI_PROTO(5, 0), - NNI_PROTO_PULL_V0 = NNI_PROTO(5, 1), - NNI_PROTO_SURVEYOR_V0 = NNI_PROTO(6, 2), - NNI_PROTO_RESPONDENT_V0 = NNI_PROTO(6, 3), - NNI_PROTO_BUS_V0 = NNI_PROTO(7, 0), - NNI_PROTO_STAR_V0 = NNI_PROTO(100, 0), -}; + +// Protocol major numbers. This is here for documentation only, and +// to serve as a "registry" for managing new protocol numbers. Consider +// updating this table when adding new protocols. +// +// Protocol Maj Min Name Notes +// ------------------------------------------- +// NONE 0 0 reserved +// PAIRv0 1 0 pair +// PAIRv1 1 1 pair1 nng only, experimental +// PUBv0 2 0 pub +// SUBv0 2 1 sub +// REQv0 3 0 req +// REPv0 3 1 rep +// PUSHv0 5 0 push +// PULLv0 5 1 pull +// SURVEYORv0 6 2 surveyor minors 0 & 1 retired +// RESPONDENTv0 6 3 respondent +// BUSv0 7 0 bus +// STARv0 100 0 star mangos only, experimental extern int nni_proto_sys_init(void); extern void nni_proto_sys_fini(void); diff --git a/src/core/socket.c b/src/core/socket.c index c9b70ccb..4a84b639 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -102,7 +102,6 @@ static int nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) { int rv; - uint32_t flags; nni_notifyfd *fd; nni_msgq * mq; nni_msgq_cb cb; diff --git a/src/core/transport.c b/src/core/transport.c index 6dcf4538..af9c93fb 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -10,6 +10,9 @@ #include "core/nng_impl.h" #include "transport/inproc/inproc.h" +#include "transport/ipc/ipc.h" +#include "transport/tcp/tcp.h" +#include "transport/zerotier/zerotier.h" #include <stdio.h> #include <string.h> @@ -51,6 +54,11 @@ nni_tran_register(const nni_tran *tran) nni_mtx_lock(&nni_tran_lk); // Check to see if the transport is already registered... NNI_LIST_FOREACH (&nni_tran_list, t) { + if (tran->tran_init == t->t_tran.tran_init) { + nni_mtx_unlock(&nni_tran_lk); + // Same transport, duplicate registration. + return (0); + } if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) { nni_mtx_unlock(&nni_tran_lk); return (NNG_ESTATE); @@ -129,20 +137,40 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz) // nni_tran_sys_init initializes the entire transport subsystem, including // each individual transport. + +typedef int (*nni_tran_ctor)(void); + +static nni_tran_ctor nni_tran_ctors[] = { +#ifdef NNG_HAVE_INPROC + nng_inproc_register, +#endif +#ifdef NNG_HAVE_IPC + nng_ipc_register, +#endif +#ifdef NNG_HAVE_TCP + nng_tcp_register, +#endif +#ifdef NNI_HAVE_ZEROTIER + nng_zt_register, +#endif + NULL, +}; + int nni_tran_sys_init(void) { - int rv; + int i; nni_tran_inited = 1; NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node); nni_mtx_init(&nni_tran_lk); - if (((rv = nng_inproc_register()) != 0) || - ((rv = nni_tran_register(&nni_ipc_tran)) != 0) || - ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) { - nni_tran_sys_fini(); - return (rv); + for (i = 0; nni_tran_ctors[i] != NULL; i++) { + int rv; + if ((rv = (nni_tran_ctors[i])()) != 0) { + nni_tran_sys_fini(); + return (rv); + } } return (0); } @@ -337,33 +337,6 @@ enum nng_flag_enum { NNG_FLAG_NONBLOCK = 2, // Non-blocking operations. }; -// Builtin protocol socket constructors. -NNG_DECL int nng_bus0_open(nng_socket *); -NNG_DECL int nng_pair0_open(nng_socket *); -NNG_DECL int nng_pair1_open(nng_socket *); -NNG_DECL int nng_pub0_open(nng_socket *); -NNG_DECL int nng_sub0_open(nng_socket *); -NNG_DECL int nng_push0_open(nng_socket *); -NNG_DECL int nng_pull0_open(nng_socket *); -NNG_DECL int nng_req0_open(nng_socket *); -NNG_DECL int nng_rep0_open(nng_socket *); -NNG_DECL int nng_surveyor0_open(nng_socket *); -NNG_DECL int nng_respondent0_open(nng_socket *); - -// Default versions. These provide compile time defaults; note that -// the actual protocols are baked into the binary; this should avoid -// suprising. Choosing a new protocol should be done explicitly. -#define nng_bus_open nng_bus0_open -#define nng_pair_open nng_pair1_open -#define nng_pub_open nng_pub0_open -#define nng_sub_open nng_sub0_open -#define nng_push_open nng_push0_open -#define nng_pull_open nng_pull0_open -#define nng_req_open nng_req0_open -#define nng_rep_open nng_rep0_open -#define nng_surveyor_open nng_surveyor0_open -#define nng_respondent_open nng_respondent0_open - // Options. #define NNG_OPT_SOCKNAME "socket-name" #define NNG_OPT_DOMAIN "compat:domain" // legacy compat only @@ -385,15 +358,6 @@ NNG_DECL int nng_respondent0_open(nng_socket *); #define NNG_OPT_RECONNMINT "reconnect-time-min" #define NNG_OPT_RECONNMAXT "reconnect-time-max" -#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" - -#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" -#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" - -#define NNG_OPT_REQ_RESENDTIME "req:resend-time" - -#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" - // XXX: TBD: priorities, socket names, ipv4only // Statistics. These are for informational purposes only, and subject diff --git a/src/nng_compat.c b/src/nng_compat.c index f8a7ceb0..482834a1 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -10,6 +10,16 @@ #include "nng_compat.h" #include "nng.h" +#include "protocol/bus0/bus.h" +#include "protocol/pair0/pair.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "protocol/pubsub0/pub.h" +#include "protocol/pubsub0/sub.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" #include <stdio.h> #include <string.h> @@ -92,17 +102,37 @@ static const struct { uint16_t p_id; int (*p_open)(nng_socket *); } nn_protocols[] = { - // clang-format off +// clang-format off +#ifdef NNG_HAVE_BUS0 { NN_BUS, nng_bus0_open }, +#endif +#ifdef NNG_HAVE_PAIR0 { NN_PAIR, nng_pair0_open }, +#endif +#ifdef NNG_HAVE_PUSH0 { NN_PUSH, nng_push0_open }, +#endif +#ifdef NNG_HAVE_PULL0 { NN_PULL, nng_pull0_open }, +#endif +#ifdef NNG_HAVE_PUB0 { NN_PUB, nng_pub0_open }, +#endif +#ifdef NNG_HAVE_SUB0 { NN_SUB, nng_sub0_open }, +#endif +#ifdef NNG_HAVE_REQ0 { NN_REQ, nng_req0_open }, +#endif +#ifdef NNG_HAVE_REP0 { NN_REP, nng_rep0_open }, +#endif +#ifdef NNG_HAVE_SURVEYOR0 { NN_SURVEYOR, nng_surveyor0_open }, +#endif +#ifdef NNG_HAVE_RESPONDENT0 { NN_RESPONDENT, nng_respondent0_open }, +#endif { 0, NULL }, // clang-format on }; @@ -115,7 +145,7 @@ nn_socket(int domain, int protocol) int i; if ((domain != AF_SP) && (domain != AF_SP_RAW)) { - nn_seterror(EAFNOSUPPORT); + errno = EAFNOSUPPORT; return (-1); } @@ -125,7 +155,7 @@ nn_socket(int domain, int protocol) } } if (nn_protocols[i].p_open == NULL) { - nn_seterror(ENOTSUP); + errno = ENOTSUP; return (-1); } diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt new file mode 100644 index 00000000..5071054a --- /dev/null +++ b/src/protocol/bus0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Bus protocol + +if (NNG_PROTO_BUS0) + set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h) + install(FILES bus.h DESTINATION include/nng/protocol/bus0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c index 6ec0066b..3e15000b 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus0/bus.c @@ -12,31 +12,36 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/bus0/bus.h" // Bus protocol. The BUS protocol, each peer sends a message to its peers. // However, bus protocols do not "forward" (absent a device). So in order // for each participant to receive the message, each sender must be connected // to every other node in the network (full mesh). -typedef struct bus_pipe bus_pipe; -typedef struct bus_sock bus_sock; +#ifndef NNI_PROTO_BUS_V0 +#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0) +#endif -static void bus_sock_getq(bus_sock *); -static void bus_sock_send(void *, nni_aio *); -static void bus_sock_recv(void *, nni_aio *); +typedef struct bus0_pipe bus0_pipe; +typedef struct bus0_sock bus0_sock; -static void bus_pipe_getq(bus_pipe *); -static void bus_pipe_send(bus_pipe *); -static void bus_pipe_recv(bus_pipe *); +static void bus0_sock_getq(bus0_sock *); +static void bus0_sock_send(void *, nni_aio *); +static void bus0_sock_recv(void *, nni_aio *); -static void bus_sock_getq_cb(void *); -static void bus_pipe_getq_cb(void *); -static void bus_pipe_send_cb(void *); -static void bus_pipe_recv_cb(void *); -static void bus_pipe_putq_cb(void *); +static void bus0_pipe_getq(bus0_pipe *); +static void bus0_pipe_send(bus0_pipe *); +static void bus0_pipe_recv(bus0_pipe *); -// A bus_sock is our per-socket protocol private structure. -struct bus_sock { +static void bus0_sock_getq_cb(void *); +static void bus0_pipe_getq_cb(void *); +static void bus0_pipe_send_cb(void *); +static void bus0_pipe_recv_cb(void *); +static void bus0_pipe_putq_cb(void *); + +// bus0_sock is our per-socket protocol private structure. +struct bus0_sock { int raw; nni_aio * aio_getq; nni_list pipes; @@ -45,10 +50,10 @@ struct bus_sock { nni_msgq *urq; }; -// A bus_pipe is our per-pipe protocol private structure. -struct bus_pipe { +// bus0_pipe is our per-pipe protocol private structure. +struct bus0_pipe { nni_pipe * npipe; - bus_sock * psock; + bus0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -59,9 +64,9 @@ struct bus_pipe { }; static void -bus_sock_fini(void *arg) +bus0_sock_fini(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -70,18 +75,18 @@ bus_sock_fini(void *arg) } static int -bus_sock_init(void **sp, nni_sock *nsock) +bus0_sock_init(void **sp, nni_sock *nsock) { - bus_sock *s; - int rv; + bus0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - NNI_LIST_INIT(&s->pipes, bus_pipe, node); + NNI_LIST_INIT(&s->pipes, bus0_pipe, node); nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) { - bus_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) { + bus0_sock_fini(s); return (rv); } s->raw = 0; @@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock) } static void -bus_sock_open(void *arg) +bus0_sock_open(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_close(void *arg) +bus0_sock_close(void *arg) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -bus_pipe_fini(void *arg) +bus0_pipe_fini(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -123,10 +128,10 @@ bus_pipe_fini(void *arg) } static int -bus_pipe_init(void **pp, nni_pipe *npipe, void *s) +bus0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - bus_pipe *p; - int rv; + bus0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) NNI_LIST_NODE_INIT(&p->node); nni_mtx_init(&p->mtx); if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) { - bus_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) { + bus0_pipe_fini(p); return (rv); } @@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -bus_pipe_start(void *arg) +bus0_pipe_start(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); - bus_pipe_recv(p); - bus_pipe_getq(p); + bus0_pipe_recv(p); + bus0_pipe_getq(p); return (0); } static void -bus_pipe_stop(void *arg) +bus0_pipe_stop(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; nni_msgq_close(p->sendq); @@ -185,9 +190,9 @@ bus_pipe_stop(void *arg) } static void -bus_pipe_getq_cb(void *arg) +bus0_pipe_getq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { // closed? @@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg) } static void -bus_pipe_send_cb(void *arg) +bus0_pipe_send_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { // closed? @@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg) return; } - bus_pipe_getq(p); + bus0_pipe_getq(p); } static void -bus_pipe_recv_cb(void *arg) +bus0_pipe_recv_cb(void *arg) { - bus_pipe *p = arg; - bus_sock *s = p->psock; - nni_msg * msg; + bus0_pipe *p = arg; + bus0_sock *s = p->psock; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->npipe); @@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg) } static void -bus_pipe_putq_cb(void *arg) +bus0_pipe_putq_cb(void *arg) { - bus_pipe *p = arg; + bus0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg) } // Wait for another recv. - bus_pipe_recv(p); + bus0_pipe_recv(p); } static void -bus_sock_getq_cb(void *arg) +bus0_sock_getq_cb(void *arg) { - bus_sock *s = arg; - bus_pipe *p; - bus_pipe *lastp; - nni_msg * msg; - nni_msg * dup; - uint32_t sender; + bus0_sock *s = arg; + bus0_pipe *p; + bus0_pipe *lastp; + nni_msg * msg; + nni_msg * dup; + uint32_t sender; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg) nni_msg_free(msg); } - bus_sock_getq(s); + bus0_sock_getq(s); } static void -bus_sock_getq(bus_sock *s) +bus0_sock_getq(bus0_sock *s) { nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -bus_pipe_getq(bus_pipe *p) +bus0_pipe_getq(bus0_pipe *p) { nni_msgq_aio_get(p->sendq, p->aio_getq); } static void -bus_pipe_recv(bus_pipe *p) +bus0_pipe_recv(bus0_pipe *p) { nni_pipe_recv(p->npipe, p->aio_recv); } static int -bus_sock_setopt_raw(void *arg, const void *buf, size_t sz) +bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) +bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - bus_sock *s = arg; + bus0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -bus_sock_send(void *arg, nni_aio *aio) +bus0_sock_send(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -bus_sock_recv(void *arg, nni_aio *aio) +bus0_sock_recv(void *arg, nni_aio *aio) { - bus_sock *s = arg; + bus0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops bus_pipe_ops = { - .pipe_init = bus_pipe_init, - .pipe_fini = bus_pipe_fini, - .pipe_start = bus_pipe_start, - .pipe_stop = bus_pipe_stop, +static nni_proto_pipe_ops bus0_pipe_ops = { + .pipe_init = bus0_pipe_init, + .pipe_fini = bus0_pipe_fini, + .pipe_start = bus0_pipe_start, + .pipe_stop = bus0_pipe_stop, }; -static nni_proto_sock_option bus_sock_options[] = { +static nni_proto_sock_option bus0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = bus_sock_getopt_raw, - .pso_setopt = bus_sock_setopt_raw, + .pso_getopt = bus0_sock_getopt_raw, + .pso_setopt = bus0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops bus_sock_ops = { - .sock_init = bus_sock_init, - .sock_fini = bus_sock_fini, - .sock_open = bus_sock_open, - .sock_close = bus_sock_close, - .sock_send = bus_sock_send, - .sock_recv = bus_sock_recv, - .sock_options = bus_sock_options, +static nni_proto_sock_ops bus0_sock_ops = { + .sock_init = bus0_sock_init, + .sock_fini = bus0_sock_fini, + .sock_open = bus0_sock_open, + .sock_close = bus0_sock_close, + .sock_send = bus0_sock_send, + .sock_recv = bus0_sock_recv, + .sock_options = bus0_sock_options, }; -static nni_proto bus_proto = { +static nni_proto bus0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_BUS_V0, "bus" }, .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &bus_sock_ops, - .proto_pipe_ops = &bus_pipe_ops, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, }; int nng_bus0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &bus_proto)); + return (nni_proto_open(sidp, &bus0_proto)); } diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h new file mode 100644 index 00000000..0ef3d391 --- /dev/null +++ b/src/protocol/bus0/bus.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_BUS0_BUS_H +#define NNG_PROTOCOL_BUS0_BUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_bus0_open(nng_socket *); + +#ifndef nng_bus_open +#define nng_bus_open nng_bus0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_BUS0_BUS_H diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt new file mode 100644 index 00000000..68e7ad34 --- /dev/null +++ b/src/protocol/pair0/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv0 protocol + +if (NNG_PROTO_PAIR0) + set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c index 93cd1497..bac405b8 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair0/pair.c @@ -17,6 +17,10 @@ // While a peer is connected to the server, all other peer connection // attempts are discarded. +#ifndef NNI_PROTO_PAIR_V0 +#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0) +#endif + typedef struct pair0_pipe pair0_pipe; typedef struct pair0_sock pair0_sock; diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h new file mode 100644 index 00000000..6828c921 --- /dev/null +++ b/src/protocol/pair0/pair.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR0_PAIR_H +#define NNG_PROTOCOL_PAIR0_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair0_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR0_PAIR_H diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt new file mode 100644 index 00000000..f35d6959 --- /dev/null +++ b/src/protocol/pair1/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# PAIRv1 protocol + +if (NNG_PROTO_PAIR1) + set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h) + install(FILES pair.h DESTINATION include/nng/protocol/pair1) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE) diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c index e14d06d5..3f6f63fc 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair1/pair.c @@ -13,10 +13,16 @@ #include "core/nng_impl.h" +#include "protocol/pair1/pair.h" + // Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern, // usually, but it can support a polyamorous mode where a single server can // communicate with multiple partners. +#ifndef NNI_PROTO_PAIR_V1 +#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1) +#endif + typedef struct pair1_pipe pair1_pipe; typedef struct pair1_sock pair1_sock; diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h new file mode 100644 index 00000000..bc519d9f --- /dev/null +++ b/src/protocol/pair1/pair.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PAIR1_PAIR_H +#define NNG_PROTOCOL_PAIR1_PAIR_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pair1_open(nng_socket *); + +#ifndef nng_pair_open +#define nng_pair_open nng_pair1_open +#endif + +#define NNG_OPT_PAIR1_POLY "pair1:polyamorous" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PAIR1_PAIR_H diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt new file mode 100644 index 00000000..6153c5a7 --- /dev/null +++ b/src/protocol/pipeline0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUSH0) + set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h) + install(FILES push.h DESTINATION include/nng/protocol/pipeline0) +endif() + +if (NNG_PROTO_PULL0) + set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h) + install(FILES pull.h DESTINATION include/nng/protocol/pipeline0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c index 9685f0a1..8c16cb17 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -15,31 +15,39 @@ // Pull protocol. The PULL protocol is the "read" side of a pipeline. -typedef struct pull_pipe pull_pipe; -typedef struct pull_sock pull_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void pull_putq_cb(void *); -static void pull_recv_cb(void *); -static void pull_putq(pull_pipe *, nni_msg *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// A pull_sock is our per-socket protocol private structure. -struct pull_sock { +typedef struct pull0_pipe pull0_pipe; +typedef struct pull0_sock pull0_sock; + +static void pull0_putq_cb(void *); +static void pull0_recv_cb(void *); +static void pull0_putq(pull0_pipe *, nni_msg *); + +// pull0_sock is our per-socket protocol private structure. +struct pull0_sock { nni_msgq *urq; int raw; }; -// A pull_pipe is our per-pipe protocol private structure. -struct pull_pipe { - nni_pipe * pipe; - pull_sock *pull; - nni_aio * putq_aio; - nni_aio * recv_aio; +// pull0_pipe is our per-pipe protocol private structure. +struct pull0_pipe { + nni_pipe * pipe; + pull0_sock *pull; + nni_aio * putq_aio; + nni_aio * recv_aio; }; static int -pull_sock_init(void **sp, nni_sock *sock) +pull0_sock_init(void **sp, nni_sock *sock) { - pull_sock *s; + pull0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock) } static void -pull_sock_fini(void *arg) +pull0_sock_fini(void *arg) { - pull_sock *s = arg; + pull0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -pull_pipe_fini(void *arg) +pull0_pipe_fini(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_fini(p->putq_aio); nni_aio_fini(p->recv_aio); @@ -70,17 +78,17 @@ pull_pipe_fini(void *arg) } static int -pull_pipe_init(void **pp, nni_pipe *pipe, void *s) +pull0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pull_pipe *p; - int rv; + pull0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) { - pull_pipe_fini(p); + if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) { + pull0_pipe_fini(p); return (rv); } @@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pull_pipe_start(void *arg) +pull0_pipe_start(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; // Start the pending pull... nni_pipe_recv(p->pipe, p->recv_aio); @@ -102,20 +110,20 @@ pull_pipe_start(void *arg) } static void -pull_pipe_stop(void *arg) +pull0_pipe_stop(void *arg) { - pull_pipe *p = arg; + pull0_pipe *p = arg; nni_aio_stop(p->putq_aio); nni_aio_stop(p->recv_aio); } static void -pull_recv_cb(void *arg) +pull0_recv_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->recv_aio; - nni_msg * msg; + pull0_pipe *p = arg; + nni_aio * aio = p->recv_aio; + nni_msg * msg; if (nni_aio_result(aio) != 0) { // Failed to get a message, probably the pipe is closed. @@ -127,14 +135,14 @@ pull_recv_cb(void *arg) msg = nni_aio_get_msg(aio); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); nni_aio_set_msg(aio, NULL); - pull_putq(p, msg); + pull0_putq(p, msg); } static void -pull_putq_cb(void *arg) +pull0_putq_cb(void *arg) { - pull_pipe *p = arg; - nni_aio * aio = p->putq_aio; + pull0_pipe *p = arg; + nni_aio * aio = p->putq_aio; if (nni_aio_result(aio) != 0) { // If we failed to put, probably NNG_ECLOSED, nothing else @@ -148,11 +156,11 @@ pull_putq_cb(void *arg) nni_pipe_recv(p->pipe, p->recv_aio); } -// nni_pull_putq schedules a put operation to the user socket (sendup). +// pull0_putq schedules a put operation to the user socket (sendup). static void -pull_putq(pull_pipe *p, nni_msg *msg) +pull0_putq(pull0_pipe *p, nni_msg *msg) { - pull_sock *s = p->pull; + pull0_sock *s = p->pull; nni_aio_set_msg(p->putq_aio, msg); @@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg) } static void -pull_sock_open(void *arg) +pull0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -pull_sock_close(void *arg) +pull0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static int -pull_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pull_sock *s = arg; + pull0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pull_sock_send(void *arg, nni_aio *aio) +pull0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pull_sock_recv(void *arg, nni_aio *aio) +pull0_sock_recv(void *arg, nni_aio *aio) { - pull_sock *s = arg; + pull0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops pull_pipe_ops = { - .pipe_init = pull_pipe_init, - .pipe_fini = pull_pipe_fini, - .pipe_start = pull_pipe_start, - .pipe_stop = pull_pipe_stop, +static nni_proto_pipe_ops pull0_pipe_ops = { + .pipe_init = pull0_pipe_init, + .pipe_fini = pull0_pipe_fini, + .pipe_start = pull0_pipe_start, + .pipe_stop = pull0_pipe_stop, }; -static nni_proto_sock_option pull_sock_options[] = { +static nni_proto_sock_option pull0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pull_sock_getopt_raw, - .pso_setopt = pull_sock_setopt_raw, + .pso_getopt = pull0_sock_getopt_raw, + .pso_setopt = pull0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pull_sock_ops = { - .sock_init = pull_sock_init, - .sock_fini = pull_sock_fini, - .sock_open = pull_sock_open, - .sock_close = pull_sock_close, - .sock_send = pull_sock_send, - .sock_recv = pull_sock_recv, - .sock_options = pull_sock_options, +static nni_proto_sock_ops pull0_sock_ops = { + .sock_init = pull0_sock_init, + .sock_fini = pull0_sock_fini, + .sock_open = pull0_sock_open, + .sock_close = pull0_sock_close, + .sock_send = pull0_sock_send, + .sock_recv = pull0_sock_recv, + .sock_options = pull0_sock_options, }; -static nni_proto pull_proto = { +static nni_proto pull0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PULL_V0, "pull" }, .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_pipe_ops = &pull_pipe_ops, - .proto_sock_ops = &pull_sock_ops, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, }; int nng_pull0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pull_proto)); + return (nni_proto_open(sidp, &pull0_proto)); } diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h new file mode 100644 index 00000000..75bded03 --- /dev/null +++ b/src/protocol/pipeline0/pull.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H +#define NNG_PROTOCOL_PIPELINE0_PULL_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pull0_open(nng_socket *); + +#ifndef nng_pull_open +#define nng_pull_open nng_pull0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PULL_H diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c index 9ff74558..3dd83fe0 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline0/push.c @@ -17,23 +17,31 @@ // Push distributes fairly, or tries to, by giving messages in round-robin // order. -typedef struct push_pipe push_pipe; -typedef struct push_sock push_sock; +#ifndef NNI_PROTO_PULL_V0 +#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1) +#endif -static void push_send_cb(void *); -static void push_recv_cb(void *); -static void push_getq_cb(void *); +#ifndef NNI_PROTO_PUSH_V0 +#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0) +#endif -// An nni_push_sock is our per-socket protocol private structure. -struct push_sock { +typedef struct push0_pipe push0_pipe; +typedef struct push0_sock push0_sock; + +static void push0_send_cb(void *); +static void push0_recv_cb(void *); +static void push0_getq_cb(void *); + +// push0_sock is our per-socket protocol private structure. +struct push0_sock { nni_msgq *uwq; int raw; }; -// An nni_push_pipe is our per-pipe protocol private structure. -struct push_pipe { +// push0_pipe is our per-pipe protocol private structure. +struct push0_pipe { nni_pipe * pipe; - push_sock * push; + push0_sock * push; nni_list_node node; nni_aio *aio_recv; @@ -42,9 +50,9 @@ struct push_pipe { }; static int -push_sock_init(void **sp, nni_sock *sock) +push0_sock_init(void **sp, nni_sock *sock) { - push_sock *s; + push0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock) } static void -push_sock_fini(void *arg) +push0_sock_fini(void *arg) { - push_sock *s = arg; + push0_sock *s = arg; NNI_FREE_STRUCT(s); } static void -push_sock_open(void *arg) +push0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_sock_close(void *arg) +push0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -push_pipe_fini(void *arg) +push0_pipe_fini(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_fini(p->aio_recv); nni_aio_fini(p->aio_send); @@ -87,18 +95,18 @@ push_pipe_fini(void *arg) } static int -push_pipe_init(void **pp, nni_pipe *pipe, void *s) +push0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - push_pipe *p; - int rv; + push0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) { - push_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) { + push0_pipe_fini(p); return (rv); } NNI_LIST_NODE_INIT(&p->node); @@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -push_pipe_start(void *arg) +push0_pipe_start(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) { return (NNG_EPROTO); @@ -129,9 +137,9 @@ push_pipe_start(void *arg) } static void -push_pipe_stop(void *arg) +push0_pipe_stop(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; nni_aio_stop(p->aio_recv); nni_aio_stop(p->aio_send); @@ -139,9 +147,9 @@ push_pipe_stop(void *arg) } static void -push_recv_cb(void *arg) +push0_recv_cb(void *arg) { - push_pipe *p = arg; + push0_pipe *p = arg; // We normally expect to receive an error. If a pipe actually // sends us data, we just discard it. @@ -155,10 +163,10 @@ push_recv_cb(void *arg) } static void -push_send_cb(void *arg) +push0_send_cb(void *arg) { - push_pipe *p = arg; - push_sock *s = p->push; + push0_pipe *p = arg; + push0_sock *s = p->push; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -171,10 +179,10 @@ push_send_cb(void *arg) } static void -push_getq_cb(void *arg) +push0_getq_cb(void *arg) { - push_pipe *p = arg; - nni_aio * aio = p->aio_getq; + push0_pipe *p = arg; + nni_aio * aio = p->aio_getq; if (nni_aio_result(aio) != 0) { // If the socket is closing, nothing else we can do. @@ -189,71 +197,71 @@ push_getq_cb(void *arg) } static int -push_sock_setopt_raw(void *arg, const void *buf, size_t sz) +push0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -push_sock_getopt_raw(void *arg, void *buf, size_t *szp) +push0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - push_sock *s = arg; + push0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -push_sock_send(void *arg, nni_aio *aio) +push0_sock_send(void *arg, nni_aio *aio) { - push_sock *s = arg; + push0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } static void -push_sock_recv(void *arg, nni_aio *aio) +push0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } -static nni_proto_pipe_ops push_pipe_ops = { - .pipe_init = push_pipe_init, - .pipe_fini = push_pipe_fini, - .pipe_start = push_pipe_start, - .pipe_stop = push_pipe_stop, +static nni_proto_pipe_ops push0_pipe_ops = { + .pipe_init = push0_pipe_init, + .pipe_fini = push0_pipe_fini, + .pipe_start = push0_pipe_start, + .pipe_stop = push0_pipe_stop, }; -static nni_proto_sock_option push_sock_options[] = { +static nni_proto_sock_option push0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = push_sock_getopt_raw, - .pso_setopt = push_sock_setopt_raw, + .pso_getopt = push0_sock_getopt_raw, + .pso_setopt = push0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops push_sock_ops = { - .sock_init = push_sock_init, - .sock_fini = push_sock_fini, - .sock_open = push_sock_open, - .sock_close = push_sock_close, - .sock_options = push_sock_options, - .sock_send = push_sock_send, - .sock_recv = push_sock_recv, +static nni_proto_sock_ops push0_sock_ops = { + .sock_init = push0_sock_init, + .sock_fini = push0_sock_fini, + .sock_open = push0_sock_open, + .sock_close = push0_sock_close, + .sock_options = push0_sock_options, + .sock_send = push0_sock_send, + .sock_recv = push0_sock_recv, }; -static nni_proto push_proto = { +static nni_proto push0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUSH_V0, "push" }, .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_pipe_ops = &push_pipe_ops, - .proto_sock_ops = &push_sock_ops, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, }; int nng_push0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &push_proto)); + return (nni_proto_open(sidp, &push0_proto)); } diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h new file mode 100644 index 00000000..c7303b92 --- /dev/null +++ b/src/protocol/pipeline0/push.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H +#define NNG_PROTOCOL_PIPELINE0_PUSH_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_push0_open(nng_socket *); + +#ifndef nng_push_open +#define nng_push_open nng_push0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt new file mode 100644 index 00000000..4edcbfae --- /dev/null +++ b/src/protocol/pubsub0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Pub/Sub protocol + +if (NNG_PROTO_PUB0) + set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h) + install(FILES pub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +if (NNG_PROTO_SUB0) + set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h) + install(FILES sub.h DESTINATION include/nng/protocol/pubsub0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c index 9e5cd67f..f4a33b77 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -12,24 +12,33 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/pubsub0/pub.h" // Publish protocol. The PUB protocol simply sends messages out, as // a broadcast. It has nothing more sophisticated because it does not // perform sender-side filtering. Its best effort delivery, so anything // that can't receive the message won't get one. -typedef struct pub_pipe pub_pipe; -typedef struct pub_sock pub_sock; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif -static void pub_pipe_recv_cb(void *); -static void pub_pipe_send_cb(void *); -static void pub_pipe_getq_cb(void *); -static void pub_sock_getq_cb(void *); -static void pub_sock_fini(void *); -static void pub_pipe_fini(void *); +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif -// A pub_sock is our per-socket protocol private structure. -struct pub_sock { +typedef struct pub0_pipe pub0_pipe; +typedef struct pub0_sock pub0_sock; + +static void pub0_pipe_recv_cb(void *); +static void pub0_pipe_send_cb(void *); +static void pub0_pipe_getq_cb(void *); +static void pub0_sock_getq_cb(void *); +static void pub0_sock_fini(void *); +static void pub0_pipe_fini(void *); + +// pub0_sock is our per-socket protocol private structure. +struct pub0_sock { nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -37,10 +46,10 @@ struct pub_sock { nni_mtx mtx; }; -// A pub_pipe is our per-pipe protocol private structure. -struct pub_pipe { +// pub0_pipe is our per-pipe protocol private structure. +struct pub0_pipe { nni_pipe * pipe; - pub_sock * pub; + pub0_sock * pub; nni_msgq * sendq; nni_aio * aio_getq; nni_aio * aio_send; @@ -49,9 +58,9 @@ struct pub_pipe { }; static void -pub_sock_fini(void *arg) +pub0_sock_fini(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -60,22 +69,22 @@ pub_sock_fini(void *arg) } static int -pub_sock_init(void **sp, nni_sock *sock) +pub0_sock_init(void **sp, nni_sock *sock) { - pub_sock *s; - int rv; + pub0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) { - pub_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) { + pub0_sock_fini(s); return (rv); } s->raw = 0; - NNI_LIST_INIT(&s->pipes, pub_pipe, node); + NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock) } static void -pub_sock_open(void *arg) +pub0_sock_open(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -pub_sock_close(void *arg) +pub0_sock_close(void *arg) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -pub_pipe_fini(void *arg) +pub0_pipe_fini(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -111,10 +120,10 @@ pub_pipe_fini(void *arg) } static int -pub_pipe_init(void **pp, nni_pipe *pipe, void *s) +pub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - pub_pipe *p; - int rv; + pub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) // XXX: consider making this depth tunable if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) { + ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - pub_pipe_fini(p); + pub0_pipe_fini(p); return (rv); } @@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -pub_pipe_start(void *arg) +pub0_pipe_start(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) { return (NNG_EPROTO); @@ -157,10 +166,10 @@ pub_pipe_start(void *arg) } static void -pub_pipe_stop(void *arg) +pub0_pipe_stop(void *arg) { - pub_pipe *p = arg; - pub_sock *s = p->pub; + pub0_pipe *p = arg; + pub0_sock *s = p->pub; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -176,15 +185,15 @@ pub_pipe_stop(void *arg) } static void -pub_sock_getq_cb(void *arg) +pub0_sock_getq_cb(void *arg) { - pub_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg, *dup; + pub0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg, *dup; - pub_pipe *p; - pub_pipe *last; - int rv; + pub0_pipe *p; + pub0_pipe *last; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg) } static void -pub_pipe_recv_cb(void *arg) +pub0_pipe_recv_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg) } static void -pub_pipe_getq_cb(void *arg) +pub0_pipe_getq_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg) } static void -pub_pipe_send_cb(void *arg) +pub0_pipe_send_cb(void *arg) { - pub_pipe *p = arg; + pub0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg) } static int -pub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -pub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - pub_sock *s = arg; + pub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -pub_sock_recv(void *arg, nni_aio *aio) +pub0_sock_recv(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -pub_sock_send(void *arg, nni_aio *aio) +pub0_sock_send(void *arg, nni_aio *aio) { - pub_sock *s = arg; + pub0_sock *s = arg; nni_msgq_aio_put(s->uwq, aio); } -static nni_proto_pipe_ops pub_pipe_ops = { - .pipe_init = pub_pipe_init, - .pipe_fini = pub_pipe_fini, - .pipe_start = pub_pipe_start, - .pipe_stop = pub_pipe_stop, +static nni_proto_pipe_ops pub0_pipe_ops = { + .pipe_init = pub0_pipe_init, + .pipe_fini = pub0_pipe_fini, + .pipe_start = pub0_pipe_start, + .pipe_stop = pub0_pipe_stop, }; -static nni_proto_sock_option pub_sock_options[] = { +static nni_proto_sock_option pub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = pub_sock_getopt_raw, - .pso_setopt = pub_sock_setopt_raw, + .pso_getopt = pub0_sock_getopt_raw, + .pso_setopt = pub0_sock_setopt_raw, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops pub_sock_ops = { - .sock_init = pub_sock_init, - .sock_fini = pub_sock_fini, - .sock_open = pub_sock_open, - .sock_close = pub_sock_close, - .sock_send = pub_sock_send, - .sock_recv = pub_sock_recv, - .sock_options = pub_sock_options, +static nni_proto_sock_ops pub0_sock_ops = { + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_options = pub0_sock_options, }; -static nni_proto pub_proto = { +static nni_proto pub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_PUB_V0, "pub" }, .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, .proto_flags = NNI_PROTO_FLAG_SND, - .proto_sock_ops = &pub_sock_ops, - .proto_pipe_ops = &pub_pipe_ops, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, }; int nng_pub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &pub_proto)); + return (nni_proto_open(sidp, &pub0_proto)); } diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h new file mode 100644 index 00000000..2388a292 --- /dev/null +++ b/src/protocol/pubsub0/pub.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H +#define NNG_PROTOCOL_PUBSUB0_PUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_pub0_open(nng_socket *); + +#ifndef nng_pub_open +#define nng_pub_open nng_pub0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_PUB_H diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c index 555d528e..6c504d75 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -12,54 +12,60 @@ #include <string.h> #include "core/nng_impl.h" - -const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE; -const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE; +#include "protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. -typedef struct sub_pipe sub_pipe; -typedef struct sub_sock sub_sock; -typedef struct sub_topic sub_topic; +#ifndef NNI_PROTO_SUB_V0 +#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) +#endif + +#ifndef NNI_PROTO_PUB_V0 +#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) +#endif + +typedef struct sub0_pipe sub0_pipe; +typedef struct sub0_sock sub0_sock; +typedef struct sub0_topic sub0_topic; -static void sub_recv_cb(void *); -static void sub_putq_cb(void *); -static void sub_pipe_fini(void *); +static void sub0_recv_cb(void *); +static void sub0_putq_cb(void *); +static void sub0_pipe_fini(void *); -struct sub_topic { +struct sub0_topic { nni_list_node node; size_t len; void * buf; }; -// An nni_rep_sock is our per-socket protocol private structure. -struct sub_sock { +// sub0_sock is our per-socket protocol private structure. +struct sub0_sock { nni_list topics; nni_msgq *urq; int raw; nni_mtx lk; }; -// An nni_rep_pipe is our per-pipe protocol private structure. -struct sub_pipe { - nni_pipe *pipe; - sub_sock *sub; - nni_aio * aio_recv; - nni_aio * aio_putq; +// sub0_pipe is our per-pipe protocol private structure. +struct sub0_pipe { + nni_pipe * pipe; + sub0_sock *sub; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static int -sub_sock_init(void **sp, nni_sock *sock) +sub0_sock_init(void **sp, nni_sock *sock) { - sub_sock *s; + sub0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); - NNI_LIST_INIT(&s->topics, sub_topic, node); + NNI_LIST_INIT(&s->topics, sub0_topic, node); s->raw = 0; s->urq = nni_sock_recvq(sock); @@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock) } static void -sub_sock_fini(void *arg) +sub0_sock_fini(void *arg) { - sub_sock * s = arg; - sub_topic *topic; + sub0_sock * s = arg; + sub0_topic *topic; while ((topic = nni_list_first(&s->topics)) != NULL) { nni_list_remove(&s->topics, topic); @@ -83,21 +89,21 @@ sub_sock_fini(void *arg) } static void -sub_sock_open(void *arg) +sub0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_sock_close(void *arg) +sub0_sock_close(void *arg) { NNI_ARG_UNUSED(arg); } static void -sub_pipe_fini(void *arg) +sub0_pipe_fini(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); @@ -105,17 +111,17 @@ sub_pipe_fini(void *arg) } static int -sub_pipe_init(void **pp, nni_pipe *pipe, void *s) +sub0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - sub_pipe *p; - int rv; + sub0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) { - sub_pipe_fini(p); + if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) { + sub0_pipe_fini(p); return (rv); } @@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -sub_pipe_start(void *arg) +sub0_pipe_start(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_pipe_recv(p->pipe, p->aio_recv); return (0); } static void -sub_pipe_stop(void *arg) +sub0_pipe_stop(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); } static void -sub_recv_cb(void *arg) +sub0_recv_cb(void *arg) { - sub_pipe *p = arg; - sub_sock *s = p->sub; - nni_msgq *urq = s->urq; - nni_msg * msg; + sub0_pipe *p = arg; + sub0_sock *s = p->sub; + nni_msgq * urq = s->urq; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -164,9 +170,9 @@ sub_recv_cb(void *arg) } static void -sub_putq_cb(void *arg) +sub0_putq_cb(void *arg) { - sub_pipe *p = arg; + sub0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -184,11 +190,11 @@ sub_putq_cb(void *arg) // to replace this with a patricia trie, like old nanomsg had. static int -sub_subscribe(void *arg, const void *buf, size_t sz) +sub0_subscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - sub_topic *newtopic; + sub0_sock * s = arg; + sub0_topic *topic; + sub0_topic *newtopic; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } static int -sub_unsubscribe(void *arg, const void *buf, size_t sz) +sub0_unsubscribe(void *arg, const void *buf, size_t sz) { - sub_sock * s = arg; - sub_topic *topic; - int rv; + sub0_sock * s = arg; + sub0_topic *topic; + int rv; nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { @@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) } static int -sub_sock_setopt_raw(void *arg, const void *buf, size_t sz) +sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) +sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - sub_sock *s = arg; + sub0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static void -sub_sock_send(void *arg, nni_aio *aio) +sub0_sock_send(void *arg, nni_aio *aio) { nni_aio_finish_error(aio, NNG_ENOTSUP); } static void -sub_sock_recv(void *arg, nni_aio *aio) +sub0_sock_recv(void *arg, nni_aio *aio) { - sub_sock *s = arg; + sub0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } static nni_msg * -sub_sock_filter(void *arg, nni_msg *msg) +sub0_sock_filter(void *arg, nni_msg *msg) { - sub_sock * s = arg; - sub_topic *topic; - char * body; - size_t len; - int match; + sub0_sock * s = arg; + sub0_topic *topic; + char * body; + size_t len; + int match; nni_mtx_lock(&s->lk); if (s->raw) { @@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops sub_pipe_ops = { - .pipe_init = sub_pipe_init, - .pipe_fini = sub_pipe_fini, - .pipe_start = sub_pipe_start, - .pipe_stop = sub_pipe_stop, +static nni_proto_pipe_ops sub0_pipe_ops = { + .pipe_init = sub0_pipe_init, + .pipe_fini = sub0_pipe_fini, + .pipe_start = sub0_pipe_start, + .pipe_stop = sub0_pipe_stop, }; -static nni_proto_sock_option sub_sock_options[] = { +static nni_proto_sock_option sub0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = sub_sock_getopt_raw, - .pso_setopt = sub_sock_setopt_raw, + .pso_getopt = sub0_sock_getopt_raw, + .pso_setopt = sub0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_subscribe, + .pso_setopt = sub0_subscribe, }, { .pso_name = NNG_OPT_SUB_UNSUBSCRIBE, .pso_getopt = NULL, - .pso_setopt = sub_unsubscribe, + .pso_setopt = sub0_unsubscribe, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops sub_sock_ops = { - .sock_init = sub_sock_init, - .sock_fini = sub_sock_fini, - .sock_open = sub_sock_open, - .sock_close = sub_sock_close, - .sock_send = sub_sock_send, - .sock_recv = sub_sock_recv, - .sock_filter = sub_sock_filter, - .sock_options = sub_sock_options, +static nni_proto_sock_ops sub0_sock_ops = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = sub0_sock_filter, + .sock_options = sub0_sock_options, }; -static nni_proto sub_proto = { +static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, - .proto_sock_ops = &sub_sock_ops, - .proto_pipe_ops = &sub_pipe_ops, + .proto_sock_ops = &sub0_sock_ops, + .proto_pipe_ops = &sub0_pipe_ops, }; int nng_sub0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &sub_proto)); + return (nni_proto_open(sidp, &sub0_proto)); } diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h new file mode 100644 index 00000000..1a09145d --- /dev/null +++ b/src/protocol/pubsub0/sub.h @@ -0,0 +1,31 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H +#define NNG_PROTOCOL_PUBSUB0_SUB_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_sub0_open(nng_socket *); + +#ifndef nng_sub_open +#define nng_sub_open nng_sub0_open +#endif + +#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" +#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_PUBSUB0_SUB_H diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt new file mode 100644 index 00000000..4e82ad41 --- /dev/null +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_REQ0) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + install(FILES req.h DESTINATION include/nng/protocol/reqrep0) +endif() + +if (NNG_PROTO_REP0) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + install(FILES rep.h DESTINATION include/nng/protocol/reqrep0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c index 100e739d..ee8e4277 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" // Response protocol. The REP protocol is the "reply" side of a // request-reply pair. This is useful for building RPC servers, for // example. -typedef struct rep_pipe rep_pipe; -typedef struct rep_sock rep_sock; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -static void rep_sock_getq_cb(void *); -static void rep_pipe_getq_cb(void *); -static void rep_pipe_putq_cb(void *); -static void rep_pipe_send_cb(void *); -static void rep_pipe_recv_cb(void *); -static void rep_pipe_fini(void *); +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -// A rep_sock is our per-socket protocol private structure. -struct rep_sock { +typedef struct rep0_pipe rep0_pipe; +typedef struct rep0_sock rep0_sock; + +static void rep0_sock_getq_cb(void *); +static void rep0_pipe_getq_cb(void *); +static void rep0_pipe_putq_cb(void *); +static void rep0_pipe_send_cb(void *); +static void rep0_pipe_recv_cb(void *); +static void rep0_pipe_fini(void *); + +// rep0_sock is our per-socket protocol private structure. +struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -40,21 +49,21 @@ struct rep_sock { nni_aio * aio_getq; }; -// A rep_pipe is our per-pipe protocol private structure. -struct rep_pipe { - nni_pipe *pipe; - rep_sock *rep; - nni_msgq *sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; +// rep0_pipe is our per-pipe protocol private structure. +struct rep0_pipe { + nni_pipe * pipe; + rep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; }; static void -rep_sock_fini(void *arg) +rep0_sock_fini(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -67,18 +76,18 @@ rep_sock_fini(void *arg) } static int -rep_sock_init(void **sp, nni_sock *sock) +rep0_sock_init(void **sp, nni_sock *sock) { - rep_sock *s; - int rv; + rep0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { - rep_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + rep0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock) } static void -rep_sock_open(void *arg) +rep0_sock_open(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -rep_sock_close(void *arg) +rep0_sock_close(void *arg) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -rep_pipe_fini(void *arg) +rep0_pipe_fini(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,20 +133,20 @@ rep_pipe_fini(void *arg) } static int -rep_pipe_init(void **pp, nni_pipe *pipe, void *s) +rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - rep_pipe *p; - int rv; + rep0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) { - rep_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + rep0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -rep_pipe_start(void *arg) +rep0_pipe_start(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - int rv; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + int rv; if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); @@ -164,10 +173,10 @@ rep_pipe_start(void *arg) } static void -rep_pipe_stop(void *arg) +rep0_pipe_stop(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_getq); @@ -179,14 +188,14 @@ rep_pipe_stop(void *arg) } static void -rep_sock_getq_cb(void *arg) +rep0_sock_getq_cb(void *arg) { - rep_sock *s = arg; - nni_msgq *uwq = s->uwq; - nni_msg * msg; - uint32_t id; - rep_pipe *p; - int rv; + rep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + rep0_pipe *p; + int rv; // This watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate @@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg) } static void -rep_pipe_getq_cb(void *arg) +rep0_pipe_getq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->pipe); @@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg) } static void -rep_pipe_send_cb(void *arg) +rep0_pipe_send_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg) } static void -rep_pipe_recv_cb(void *arg) +rep0_pipe_recv_cb(void *arg) { - rep_pipe *p = arg; - rep_sock *s = p->rep; - nni_msg * msg; - int rv; - uint8_t * body; - int hops; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -329,9 +338,9 @@ drop: } static void -rep_pipe_putq_cb(void *arg) +rep0_pipe_putq_cb(void *arg) { - rep_pipe *p = arg; + rep0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg) } static int -rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; - int rv; + rep0_sock *s = arg; + int rv; nni_mtx_lock(&s->lk); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -rep_sock_getopt_raw(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - rep_sock *s = arg; + rep0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static nni_msg * -rep_sock_filter(void *arg, nni_msg *msg) +rep0_sock_filter(void *arg, nni_msg *msg) { - rep_sock *s = arg; - char * header; - size_t len; + rep0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->lk); if (s->raw) { @@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg) } static void -rep_sock_send(void *arg, nni_aio *aio) +rep0_sock_send(void *arg, nni_aio *aio) { - rep_sock *s = arg; - int rv; - nni_msg * msg; + rep0_sock *s = arg; + int rv; + nni_msg * msg; nni_mtx_lock(&s->lk); if (s->raw) { @@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio) } static void -rep_sock_recv(void *arg, nni_aio *aio) +rep0_sock_recv(void *arg, nni_aio *aio) { - rep_sock *s = arg; + rep0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe_ops rep_pipe_ops = { - .pipe_init = rep_pipe_init, - .pipe_fini = rep_pipe_fini, - .pipe_start = rep_pipe_start, - .pipe_stop = rep_pipe_stop, +static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_init = rep0_pipe_init, + .pipe_fini = rep0_pipe_fini, + .pipe_start = rep0_pipe_start, + .pipe_stop = rep0_pipe_stop, }; -static nni_proto_sock_option rep_sock_options[] = { +static nni_proto_sock_option rep0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = rep_sock_getopt_raw, - .pso_setopt = rep_sock_setopt_raw, + .pso_getopt = rep0_sock_getopt_raw, + .pso_setopt = rep0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = rep_sock_getopt_maxttl, - .pso_setopt = rep_sock_setopt_maxttl, + .pso_getopt = rep0_sock_getopt_maxttl, + .pso_setopt = rep0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops rep_sock_ops = { - .sock_init = rep_sock_init, - .sock_fini = rep_sock_fini, - .sock_open = rep_sock_open, - .sock_close = rep_sock_close, - .sock_options = rep_sock_options, - .sock_filter = rep_sock_filter, - .sock_send = rep_sock_send, - .sock_recv = rep_sock_recv, +static nni_proto_sock_ops rep0_sock_ops = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = rep0_sock_filter, + .sock_send = rep0_sock_send, + .sock_recv = rep0_sock_recv, }; -static nni_proto nni_rep_proto = { +static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, .proto_peer = { NNI_PROTO_REQ_V0, "req" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &rep_sock_ops, - .proto_pipe_ops = &rep_pipe_ops, + .proto_sock_ops = &rep0_sock_ops, + .proto_pipe_ops = &rep0_pipe_ops, }; int nng_rep0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &nni_rep_proto)); + return (nni_proto_open(sidp, &rep0_proto)); } diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h new file mode 100644 index 00000000..93df9379 --- /dev/null +++ b/src/protocol/reqrep0/rep.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REP_H +#define NNG_PROTOCOL_REQREP0_REP_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_rep0_open(nng_socket *); + +#ifndef nng_rep_open +#define nng_rep_open nng_rep0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REP_H diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c index bead1ec4..94c7f1a0 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep0/req.c @@ -13,21 +13,28 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" // Request protocol. The REQ protocol is the "request" side of a // request-reply pair. This is useful for building RPC clients, for example. -const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME; +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif -typedef struct req_pipe req_pipe; -typedef struct req_sock req_sock; +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif -static void req_resend(req_sock *); -static void req_timeout(void *); -static void req_pipe_fini(void *); +typedef struct req0_pipe req0_pipe; +typedef struct req0_sock req0_sock; -// A req_sock is our per-socket protocol private structure. -struct req_sock { +static void req0_resend(req0_sock *); +static void req0_timeout(void *); +static void req0_pipe_fini(void *); + +// A req0_sock is our per-socket protocol private structure. +struct req0_sock { nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -38,7 +45,7 @@ struct req_sock { int ttl; nni_msg * reqmsg; - req_pipe *pendpipe; + req0_pipe *pendpipe; nni_list readypipes; nni_list busypipes; @@ -51,10 +58,10 @@ struct req_sock { nni_cv cv; }; -// A req_pipe is our per-pipe protocol private structure. -struct req_pipe { +// A req0_pipe is our per-pipe protocol private structure. +struct req0_pipe { nni_pipe * pipe; - req_sock * req; + req0_sock * req; nni_list_node node; nni_aio * aio_getq; // raw mode only nni_aio * aio_sendraw; // raw mode only @@ -64,17 +71,17 @@ struct req_pipe { nni_mtx mtx; }; -static void req_resender(void *); -static void req_getq_cb(void *); -static void req_sendraw_cb(void *); -static void req_sendcooked_cb(void *); -static void req_recv_cb(void *); -static void req_putq_cb(void *); +static void req0_resender(void *); +static void req0_getq_cb(void *); +static void req0_sendraw_cb(void *); +static void req0_sendcooked_cb(void *); +static void req0_recv_cb(void *); +static void req0_putq_cb(void *); static int -req_sock_init(void **sp, nni_sock *sock) +req0_sock_init(void **sp, nni_sock *sock) { - req_sock *s; + req0_sock *s; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); @@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock) nni_mtx_init(&s->mtx); nni_cv_init(&s->cv, &s->mtx); - NNI_LIST_INIT(&s->readypipes, req_pipe, node); - NNI_LIST_INIT(&s->busypipes, req_pipe, node); - nni_timer_init(&s->timer, req_timeout, s); + NNI_LIST_INIT(&s->readypipes, req0_pipe, node); + NNI_LIST_INIT(&s->busypipes, req0_pipe, node); + nni_timer_init(&s->timer, req0_timeout, s); // this is "semi random" start for request IDs. s->nextid = nni_random(); @@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock) } static void -req_sock_open(void *arg) +req0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void -req_sock_close(void *arg) +req0_sock_close(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); s->closed = 1; @@ -120,9 +127,9 @@ req_sock_close(void *arg) } static void -req_sock_fini(void *arg) +req0_sock_fini(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); while ((!nni_list_empty(&s->readypipes)) || @@ -139,9 +146,9 @@ req_sock_fini(void *arg) } static void -req_pipe_fini(void *arg) +req0_pipe_fini(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_putq); @@ -153,22 +160,22 @@ req_pipe_fini(void *arg) } static int -req_pipe_init(void **pp, nni_pipe *pipe, void *s) +req0_pipe_init(void **pp, nni_pipe *pipe, void *s) { - req_pipe *p; - int rv; + req0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) != + if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != 0)) { - req_pipe_fini(p); + req0_pipe_fini(p); return (rv); } @@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s) } static int -req_pipe_start(void *arg) +req0_pipe_start(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { return (NNG_EPROTO); @@ -198,7 +205,7 @@ req_pipe_start(void *arg) // If sock was waiting for somewhere to send data, go ahead and // send it to this pipe. if (s->wantw) { - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); @@ -208,10 +215,10 @@ req_pipe_start(void *arg) } static void -req_pipe_stop(void *arg) +req0_pipe_stop(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_putq); @@ -238,50 +245,50 @@ req_pipe_stop(void *arg) s->pendpipe = NULL; s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static int -req_sock_setopt_raw(void *arg, const void *buf, size_t sz) +req0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int -req_sock_getopt_raw(void *arg, void *buf, size_t *szp) +req0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static int -req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_setopt_ms(&s->retry, buf, sz)); } static int -req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) { - req_sock *s = arg; + req0_sock *s = arg; return (nni_getopt_ms(s->retry, buf, szp)); } @@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp) // kind of priority.) static void -req_getq_cb(void *arg) +req0_getq_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. @@ -326,9 +333,9 @@ req_getq_cb(void *arg) } static void -req_sendraw_cb(void *arg) +req0_sendraw_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_sendraw) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); @@ -342,10 +349,10 @@ req_sendraw_cb(void *arg) } static void -req_sendcooked_cb(void *arg) +req0_sendcooked_cb(void *arg) { - req_pipe *p = arg; - req_sock *s = p->req; + req0_pipe *p = arg; + req0_sock *s = p->req; if (nni_aio_result(p->aio_sendcooked) != 0) { // We failed to send... clean up and deal with it. @@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg) if (nni_list_active(&s->busypipes, p)) { nni_list_remove(&s->busypipes, p); nni_list_append(&s->readypipes, p); - req_resend(s); + req0_resend(s); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the @@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg) } static void -req_putq_cb(void *arg) +req0_putq_cb(void *arg) { - req_pipe *p = arg; + req0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -393,10 +400,10 @@ req_putq_cb(void *arg) } static void -req_recv_cb(void *arg) +req0_recv_cb(void *arg) { - req_pipe *p = arg; - nni_msg * msg; + req0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -431,23 +438,23 @@ malformed: } static void -req_timeout(void *arg) +req0_timeout(void *arg) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->reqmsg != NULL) { s->wantw = 1; - req_resend(s); + req0_resend(s); } nni_mtx_unlock(&s->mtx); } static void -req_resend(req_sock *s) +req0_resend(req0_sock *s) { - req_pipe *p; - nni_msg * msg; + req0_pipe *p; + nni_msg * msg; // Note: This routine should be called with the socket lock held. // Also, this should only be called while handling cooked mode @@ -499,13 +506,13 @@ req_resend(req_sock *s) } static void -req_sock_send(void *arg, nni_aio *aio) +req0_sock_send(void *arg, nni_aio *aio) { - req_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; + req0_sock *s = arg; + uint32_t id; + size_t len; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio) s->resend = NNI_TIME_ZERO; s->wantw = 1; - req_resend(s); + req0_resend(s); nni_mtx_unlock(&s->mtx); @@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio) } static nni_msg * -req_sock_filter(void *arg, nni_msg *msg) +req0_sock_filter(void *arg, nni_msg *msg) { - req_sock *s = arg; - nni_msg * rmsg; + req0_sock *s = arg; + nni_msg * rmsg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg) } static void -req_sock_recv(void *arg, nni_aio *aio) +req0_sock_recv(void *arg, nni_aio *aio) { - req_sock *s = arg; + req0_sock *s = arg; nni_mtx_lock(&s->mtx); if (!s->raw) { @@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio) nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops req_pipe_ops = { - .pipe_init = req_pipe_init, - .pipe_fini = req_pipe_fini, - .pipe_start = req_pipe_start, - .pipe_stop = req_pipe_stop, +static nni_proto_pipe_ops req0_pipe_ops = { + .pipe_init = req0_pipe_init, + .pipe_fini = req0_pipe_fini, + .pipe_start = req0_pipe_start, + .pipe_stop = req0_pipe_stop, }; -static nni_proto_sock_option req_sock_options[] = { +static nni_proto_sock_option req0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = req_sock_getopt_raw, - .pso_setopt = req_sock_setopt_raw, + .pso_getopt = req0_sock_getopt_raw, + .pso_setopt = req0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = req_sock_getopt_maxttl, - .pso_setopt = req_sock_setopt_maxttl, + .pso_getopt = req0_sock_getopt_maxttl, + .pso_setopt = req0_sock_setopt_maxttl, }, { .pso_name = NNG_OPT_REQ_RESENDTIME, - .pso_getopt = req_sock_getopt_resendtime, - .pso_setopt = req_sock_setopt_resendtime, + .pso_getopt = req0_sock_getopt_resendtime, + .pso_setopt = req0_sock_setopt_resendtime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops req_sock_ops = { - .sock_init = req_sock_init, - .sock_fini = req_sock_fini, - .sock_open = req_sock_open, - .sock_close = req_sock_close, - .sock_options = req_sock_options, - .sock_filter = req_sock_filter, - .sock_send = req_sock_send, - .sock_recv = req_sock_recv, +static nni_proto_sock_ops req0_sock_ops = { + .sock_init = req0_sock_init, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_filter = req0_sock_filter, + .sock_send = req0_sock_send, + .sock_recv = req0_sock_recv, }; -static nni_proto req_proto = { +static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REQ_V0, "req" }, .proto_peer = { NNI_PROTO_REP_V0, "rep" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &req_sock_ops, - .proto_pipe_ops = &req_pipe_ops, + .proto_sock_ops = &req0_sock_ops, + .proto_pipe_ops = &req0_pipe_ops, }; int nng_req0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &req_proto)); + return (nni_proto_open(sidp, &req0_proto)); } diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h new file mode 100644 index 00000000..99c9bf62 --- /dev/null +++ b/src/protocol/reqrep0/req.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_REQREP0_REQ_H +#define NNG_PROTOCOL_REQREP0_REQ_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_req0_open(nng_socket *); + +#ifndef nng_req_open +#define nng_req_open nng_req0_open +#endif + +#define NNG_OPT_REQ_RESENDTIME "req:resend-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_REQREP0_REQ_H diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt new file mode 100644 index 00000000..61e5aa7b --- /dev/null +++ b/src/protocol/survey0/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# Req/Rep protocol + +if (NNG_PROTO_SURVEYOR0) + set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h) + install(FILES survey.h DESTINATION include/nng/protocol/survey0) +endif() + +if (NNG_PROTO_RESPONDENT0) + set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h) + install(FILES respond.h DESTINATION include/nng/protocol/survey0) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c index 94730be6..73e919c3 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey0/respond.c @@ -12,23 +12,32 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/respond.h" // Respondent protocol. The RESPONDENT protocol is the "replier" side of // the surveyor pattern. This is useful for building service discovery, or -// voting algorithsm, for example. +// voting algorithms, for example. -typedef struct resp_pipe resp_pipe; -typedef struct resp_sock resp_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void resp_recv_cb(void *); -static void resp_putq_cb(void *); -static void resp_getq_cb(void *); -static void resp_send_cb(void *); -static void resp_sock_getq_cb(void *); -static void resp_pipe_fini(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A resp_sock is our per-socket protocol private structure. -struct resp_sock { +typedef struct resp0_pipe resp0_pipe; +typedef struct resp0_sock resp0_sock; + +static void resp0_recv_cb(void *); +static void resp0_putq_cb(void *); +static void resp0_getq_cb(void *); +static void resp0_send_cb(void *); +static void resp0_sock_getq_cb(void *); +static void resp0_pipe_fini(void *); + +// resp0_sock is our per-socket protocol private structure. +struct resp0_sock { nni_msgq * urq; nni_msgq * uwq; int raw; @@ -40,22 +49,22 @@ struct resp_sock { nni_mtx mtx; }; -// A resp_pipe is our per-pipe protocol private structure. -struct resp_pipe { - nni_pipe * npipe; - resp_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; +// resp0_pipe is our per-pipe protocol private structure. +struct resp0_pipe { + nni_pipe * npipe; + resp0_sock *psock; + uint32_t id; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_putq; + nni_aio * aio_send; + nni_aio * aio_recv; }; static void -resp_sock_fini(void *arg) +resp0_sock_fini(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -68,18 +77,18 @@ resp_sock_fini(void *arg) } static int -resp_sock_init(void **sp, nni_sock *nsock) +resp0_sock_init(void **sp, nni_sock *nsock) { - resp_sock *s; - int rv; + resp0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { - resp_sock_fini(s); + ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + resp0_sock_fini(s); return (rv); } @@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock) } static void -resp_sock_open(void *arg) +resp0_sock_open(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -resp_sock_close(void *arg) +resp0_sock_close(void *arg) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -resp_pipe_fini(void *arg) +resp0_pipe_fini(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_getq); @@ -124,20 +133,20 @@ resp_pipe_fini(void *arg) } static int -resp_pipe_init(void **pp, nni_pipe *npipe, void *s) +resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - resp_pipe *p; - int rv; + resp0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) { - resp_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + resp0_pipe_fini(p); return (rv); } @@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -resp_pipe_start(void *arg) +resp0_pipe_start(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + int rv; p->id = nni_pipe_id(p->npipe); @@ -170,10 +179,10 @@ resp_pipe_start(void *arg) } static void -resp_pipe_stop(void *arg) +resp0_pipe_stop(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; nni_msgq_close(p->sendq); nni_aio_stop(p->aio_putq); @@ -189,19 +198,19 @@ resp_pipe_stop(void *arg) } } -// resp_sock_send watches for messages from the upper write queue, +// resp0_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket.s void -resp_sock_getq_cb(void *arg) +resp0_sock_getq_cb(void *arg) { - resp_sock *s = arg; - nni_msg * msg; - uint32_t id; - resp_pipe *p; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + uint32_t id; + resp0_pipe *p; + int rv; if (nni_aio_result(s->aio_getq) != 0) { return; @@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg) } void -resp_getq_cb(void *arg) +resp0_getq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -249,9 +258,9 @@ resp_getq_cb(void *arg) } void -resp_send_cb(void *arg) +resp0_send_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -264,14 +273,14 @@ resp_send_cb(void *arg) } static void -resp_recv_cb(void *arg) +resp0_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq = s->urq; - nni_msg * msg; - int hops; - int rv; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + nni_msgq * urq = s->urq; + nni_msg * msg; + int hops; + int rv; if (nni_aio_result(p->aio_recv) != 0) { goto error; @@ -324,9 +333,9 @@ error: } static void -resp_putq_cb(void *arg) +resp0_putq_cb(void *arg) { - resp_pipe *p = arg; + resp0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -338,10 +347,10 @@ resp_putq_cb(void *arg) } static int -resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; - int rv; + resp0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); @@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -resp_sock_getopt_raw(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_setopt_int(&s->ttl, buf, sz, 1, 255)); } static int -resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) { - resp_sock *s = arg; + resp0_sock *s = arg; return (nni_getopt_int(s->ttl, buf, szp)); } static void -resp_sock_send(void *arg, nni_aio *aio) +resp0_sock_send(void *arg, nni_aio *aio) { - resp_sock *s = arg; - nni_msg * msg; - int rv; + resp0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio) } static nni_msg * -resp_sock_filter(void *arg, nni_msg *msg) +resp0_sock_filter(void *arg, nni_msg *msg) { - resp_sock *s = arg; - char * header; - size_t len; + resp0_sock *s = arg; + char * header; + size_t len; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg) } static void -resp_sock_recv(void *arg, nni_aio *aio) +resp0_sock_recv(void *arg, nni_aio *aio) { - resp_sock *s = arg; + resp0_sock *s = arg; nni_msgq_aio_get(s->urq, aio); } -static nni_proto_pipe_ops resp_pipe_ops = { - .pipe_init = resp_pipe_init, - .pipe_fini = resp_pipe_fini, - .pipe_start = resp_pipe_start, - .pipe_stop = resp_pipe_stop, +static nni_proto_pipe_ops resp0_pipe_ops = { + .pipe_init = resp0_pipe_init, + .pipe_fini = resp0_pipe_fini, + .pipe_start = resp0_pipe_start, + .pipe_stop = resp0_pipe_stop, }; -static nni_proto_sock_option resp_sock_options[] = { +static nni_proto_sock_option resp0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = resp_sock_getopt_raw, - .pso_setopt = resp_sock_setopt_raw, + .pso_getopt = resp0_sock_getopt_raw, + .pso_setopt = resp0_sock_setopt_raw, }, { .pso_name = NNG_OPT_MAXTTL, - .pso_getopt = resp_sock_getopt_maxttl, - .pso_setopt = resp_sock_setopt_maxttl, + .pso_getopt = resp0_sock_getopt_maxttl, + .pso_setopt = resp0_sock_setopt_maxttl, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops resp_sock_ops = { - .sock_init = resp_sock_init, - .sock_fini = resp_sock_fini, - .sock_open = resp_sock_open, - .sock_close = resp_sock_close, - .sock_filter = resp_sock_filter, - .sock_send = resp_sock_send, - .sock_recv = resp_sock_recv, - .sock_options = resp_sock_options, +static nni_proto_sock_ops resp0_sock_ops = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = resp0_sock_filter, + .sock_send = resp0_sock_send, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, }; -static nni_proto resp_proto = { +static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &resp_sock_ops, - .proto_pipe_ops = &resp_pipe_ops, + .proto_sock_ops = &resp0_sock_ops, + .proto_pipe_ops = &resp0_pipe_ops, }; int nng_respondent0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &resp_proto)); + return (nni_proto_open(sidp, &resp0_proto)); } diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h new file mode 100644 index 00000000..58c65298 --- /dev/null +++ b/src/protocol/survey0/respond.h @@ -0,0 +1,28 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H +#define NNG_PROTOCOL_SURVEY0_RESPOND_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_respondent0_open(nng_socket *); + +#ifndef nng_respondent_open +#define nng_respondent_open nng_respondent0_open +#endif + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c index 9dcd6664..02283436 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey0/survey.c @@ -12,22 +12,31 @@ #include <string.h> #include "core/nng_impl.h" +#include "protocol/survey0/survey.h" // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. -typedef struct surv_pipe surv_pipe; -typedef struct surv_sock surv_sock; +#ifndef NNI_PROTO_SURVEYOR_V0 +#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) +#endif -static void surv_sock_getq_cb(void *); -static void surv_getq_cb(void *); -static void surv_putq_cb(void *); -static void surv_send_cb(void *); -static void surv_recv_cb(void *); -static void surv_timeout(void *); +#ifndef NNI_PROTO_RESPONDENT_V0 +#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3) +#endif -// A surv_sock is our per-socket protocol private structure. -struct surv_sock { +typedef struct surv0_pipe surv0_pipe; +typedef struct surv0_sock surv0_sock; + +static void surv0_sock_getq_cb(void *); +static void surv0_getq_cb(void *); +static void surv0_putq_cb(void *); +static void surv0_send_cb(void *); +static void surv0_recv_cb(void *); +static void surv0_timeout(void *); + +// surv0_sock is our per-socket protocol private structure. +struct surv0_sock { nni_duration survtime; nni_time expire; int raw; @@ -42,10 +51,10 @@ struct surv_sock { nni_mtx mtx; }; -// A surv_pipe is our per-pipe protocol private structure. -struct surv_pipe { +// surv0_pipe is our per-pipe protocol private structure. +struct surv0_pipe { nni_pipe * npipe; - surv_sock * psock; + surv0_sock * psock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; @@ -55,9 +64,9 @@ struct surv_pipe { }; static void -surv_sock_fini(void *arg) +surv0_sock_fini(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); @@ -66,21 +75,21 @@ surv_sock_fini(void *arg) } static int -surv_sock_init(void **sp, nni_sock *nsock) +surv0_sock_init(void **sp, nni_sock *nsock) { - surv_sock *s; - int rv; + surv0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) { - surv_sock_fini(s); + if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { + surv0_sock_fini(s); return (rv); } - NNI_LIST_INIT(&s->pipes, surv_pipe, node); + NNI_LIST_INIT(&s->pipes, surv0_pipe, node); nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv_timeout, s); + nni_timer_init(&s->timer, surv0_timeout, s); s->nextid = nni_random(); s->raw = 0; @@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock) } static void -surv_sock_open(void *arg) +surv0_sock_open(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_msgq_aio_get(s->uwq, s->aio_getq); } static void -surv_sock_close(void *arg) +surv0_sock_close(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_timer_cancel(&s->timer); nni_aio_cancel(s->aio_getq, NNG_ECLOSED); } static void -surv_pipe_fini(void *arg) +surv0_pipe_fini(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); @@ -124,21 +133,21 @@ surv_pipe_fini(void *arg) } static int -surv_pipe_init(void **pp, nni_pipe *npipe, void *s) +surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) { - surv_pipe *p; - int rv; + surv0_pipe *p; + int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } // This depth could be tunable. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) { - surv_pipe_fini(p); + ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + surv0_pipe_fini(p); return (rv); } @@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s) } static int -surv_pipe_start(void *arg) +surv0_pipe_start(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); @@ -164,10 +173,10 @@ surv_pipe_start(void *arg) } static void -surv_pipe_stop(void *arg) +surv0_pipe_stop(void *arg) { - surv_pipe *p = arg; - surv_sock *s = p->psock; + surv0_pipe *p = arg; + surv0_sock *s = p->psock; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); @@ -184,9 +193,9 @@ surv_pipe_stop(void *arg) } static void -surv_getq_cb(void *arg) +surv0_getq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_getq) != 0) { nni_pipe_stop(p->npipe); @@ -200,9 +209,9 @@ surv_getq_cb(void *arg) } static void -surv_send_cb(void *arg) +surv0_send_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_send) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -215,9 +224,9 @@ surv_send_cb(void *arg) } static void -surv_putq_cb(void *arg) +surv0_putq_cb(void *arg) { - surv_pipe *p = arg; + surv0_pipe *p = arg; if (nni_aio_result(p->aio_putq) != 0) { nni_msg_free(nni_aio_get_msg(p->aio_putq)); @@ -230,10 +239,10 @@ surv_putq_cb(void *arg) } static void -surv_recv_cb(void *arg) +surv0_recv_cb(void *arg) { - surv_pipe *p = arg; - nni_msg * msg; + surv0_pipe *p = arg; + nni_msg * msg; if (nni_aio_result(p->aio_recv) != 0) { goto failed; @@ -265,10 +274,10 @@ failed: } static int -surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; - int rv; + surv0_sock *s = arg; + int rv; nni_mtx_lock(&s->mtx); if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { @@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) } static int -surv_sock_getopt_raw(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_int(s->raw, buf, szp)); } static int -surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) +surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_setopt_ms(&s->survtime, buf, sz)); } static int -surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) +surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp) { - surv_sock *s = arg; + surv0_sock *s = arg; return (nni_getopt_ms(s->survtime, buf, szp)); } static void -surv_sock_getq_cb(void *arg) +surv0_sock_getq_cb(void *arg) { - surv_sock *s = arg; - surv_pipe *p; - surv_pipe *last; - nni_msg * msg, *dup; + surv0_sock *s = arg; + surv0_pipe *p; + surv0_pipe *last; + nni_msg * msg, *dup; if (nni_aio_result(s->aio_getq) != 0) { // Should be NNG_ECLOSED. @@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg) } static void -surv_timeout(void *arg) +surv0_timeout(void *arg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); s->survid = 0; @@ -349,9 +358,9 @@ surv_timeout(void *arg) } static void -surv_sock_recv(void *arg, nni_aio *aio) +surv0_sock_recv(void *arg, nni_aio *aio) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->survid == 0) { @@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio) } static void -surv_sock_send(void *arg, nni_aio *aio) +surv0_sock_send(void *arg, nni_aio *aio) { - surv_sock *s = arg; - nni_msg * msg; - int rv; + surv0_sock *s = arg; + nni_msg * msg; + int rv; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio) } static nni_msg * -surv_sock_filter(void *arg, nni_msg *msg) +surv0_sock_filter(void *arg, nni_msg *msg) { - surv_sock *s = arg; + surv0_sock *s = arg; nni_mtx_lock(&s->mtx); if (s->raw) { @@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg) return (msg); } -static nni_proto_pipe_ops surv_pipe_ops = { - .pipe_init = surv_pipe_init, - .pipe_fini = surv_pipe_fini, - .pipe_start = surv_pipe_start, - .pipe_stop = surv_pipe_stop, +static nni_proto_pipe_ops surv0_pipe_ops = { + .pipe_init = surv0_pipe_init, + .pipe_fini = surv0_pipe_fini, + .pipe_start = surv0_pipe_start, + .pipe_stop = surv0_pipe_stop, }; -static nni_proto_sock_option surv_sock_options[] = { +static nni_proto_sock_option surv0_sock_options[] = { { .pso_name = NNG_OPT_RAW, - .pso_getopt = surv_sock_getopt_raw, - .pso_setopt = surv_sock_setopt_raw, + .pso_getopt = surv0_sock_getopt_raw, + .pso_setopt = surv0_sock_setopt_raw, }, { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, - .pso_getopt = surv_sock_getopt_surveytime, - .pso_setopt = surv_sock_setopt_surveytime, + .pso_getopt = surv0_sock_getopt_surveytime, + .pso_setopt = surv0_sock_setopt_surveytime, }, // terminate list { NULL, NULL, NULL }, }; -static nni_proto_sock_ops surv_sock_ops = { - .sock_init = surv_sock_init, - .sock_fini = surv_sock_fini, - .sock_open = surv_sock_open, - .sock_close = surv_sock_close, - .sock_send = surv_sock_send, - .sock_recv = surv_sock_recv, - .sock_filter = surv_sock_filter, - .sock_options = surv_sock_options, +static nni_proto_sock_ops surv0_sock_ops = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, }; -static nni_proto surv_proto = { +static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_flags = NNI_PROTO_FLAG_SNDRCV, - .proto_sock_ops = &surv_sock_ops, - .proto_pipe_ops = &surv_pipe_ops, + .proto_sock_ops = &surv0_sock_ops, + .proto_pipe_ops = &surv0_pipe_ops, }; int nng_surveyor0_open(nng_socket *sidp) { - return (nni_proto_open(sidp, &surv_proto)); + return (nni_proto_open(sidp, &surv0_proto)); } diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h new file mode 100644 index 00000000..a7b6d943 --- /dev/null +++ b/src/protocol/survey0/survey.h @@ -0,0 +1,30 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H +#define NNG_PROTOCOL_SURVEY0_SURVEY_H + +#ifdef __cplusplus +extern "C" { +#endif + +NNG_DECL int nng_surveyor0_open(nng_socket *); + +#ifndef nng_surveyor_open +#define nng_surveyor_open nng_surveyor0_open +#endif + +#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" + +#ifdef __cplusplus +} +#endif + +#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H diff --git a/src/transport/inproc/CMakeLists.txt b/src/transport/inproc/CMakeLists.txt new file mode 100644 index 00000000..a445da85 --- /dev/null +++ b/src/transport/inproc/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# inproc protocol + +if (NNG_TRANSPORT_INPROC) + set(INPROC_SOURCES transport/inproc/inproc.c transport/inproc/inproc.h) + install(FILES inproc.h DESTINATION include/nng/transport/inproc) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${INPROC_SOURCES} PARENT_SCOPE) diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index c747f7dc..ae64263c 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -475,9 +475,8 @@ struct nni_tran nni_inproc_tran = { .tran_fini = nni_inproc_fini, }; - int nng_inproc_register(void) { - return (nni_tran_register(&nni_inproc_tran)); + return (nni_tran_register(&nni_inproc_tran)); } diff --git a/src/transport/ipc/CMakeLists.txt b/src/transport/ipc/CMakeLists.txt new file mode 100644 index 00000000..1a5496cf --- /dev/null +++ b/src/transport/ipc/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# ipc protocol + +if (NNG_TRANSPORT_IPC) + set(IPC_SOURCES transport/ipc/ipc.c transport/ipc/ipc.h) + install(FILES ipc.h DESTINATION include/nng/transport/ipc) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${IPC_SOURCES} PARENT_SCOPE) diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index afe8afa8..c13dbd34 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -704,9 +704,7 @@ static nni_tran_ep nni_ipc_ep_ops = { .ep_options = nni_ipc_ep_options, }; -// This is the IPC transport linkage, and should be the only global -// symbol in this entire file. -struct nni_tran nni_ipc_tran = { +static nni_tran nni_ipc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "ipc", .tran_ep = &nni_ipc_ep_ops, @@ -714,3 +712,9 @@ struct nni_tran nni_ipc_tran = { .tran_init = nni_ipc_tran_init, .tran_fini = nni_ipc_tran_fini, }; + +int +nng_ipc_register(void) +{ + return (nni_tran_register(&nni_ipc_tran)); +} diff --git a/src/transport/ipc/ipc.h b/src/transport/ipc/ipc.h new file mode 100644 index 00000000..f19762c7 --- /dev/null +++ b/src/transport/ipc/ipc.h @@ -0,0 +1,19 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_IPC_IPC_H +#define NNG_TRANSPORT_IPC_IPC_H + +// ipc transport. This is used for inter-process communication on +// the same host computer. + +extern int nng_ipc_register(void); + +#endif // NNG_TRANSPORT_IPC_IPC_H diff --git a/src/transport/tcp/CMakeLists.txt b/src/transport/tcp/CMakeLists.txt new file mode 100644 index 00000000..305c357a --- /dev/null +++ b/src/transport/tcp/CMakeLists.txt @@ -0,0 +1,18 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# TCP protocol + +if (NNG_TRANSPORT_TCP) + set(TCP_SOURCES transport/tcp/tcp.c transport/tcp/tcp.h) + install(FILES tcp.h DESTINATION include/nng/transport/tcp) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${TCP_SOURCES} PARENT_SCOPE) diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 43b2890d..e33db865 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -854,9 +854,7 @@ static nni_tran_ep nni_tcp_ep_ops = { .ep_options = nni_tcp_ep_options, }; -// This is the TCP transport linkage, and should be the only global -// symbol in this entire file. -struct nni_tran nni_tcp_tran = { +static nni_tran nni_tcp_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "tcp", .tran_ep = &nni_tcp_ep_ops, @@ -864,3 +862,9 @@ struct nni_tran nni_tcp_tran = { .tran_init = nni_tcp_tran_init, .tran_fini = nni_tcp_tran_fini, }; + +int +nng_tcp_register(void) +{ + return (nni_tran_register(&nni_tcp_tran)); +} diff --git a/src/transport/tcp/tcp.h b/src/transport/tcp/tcp.h new file mode 100644 index 00000000..b4c79461 --- /dev/null +++ b/src/transport/tcp/tcp.h @@ -0,0 +1,18 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef NNG_TRANSPORT_TCP_TCP_H +#define NNG_TRANSPORT_TCP_TCP_H + +// TCP transport. This is used for communication over TCP/IP. + +extern int nng_tcp_register(void); + +#endif // NNG_TRANSPORT_TCP_TCP_H diff --git a/src/transport/zerotier/CMakeLists.txt b/src/transport/zerotier/CMakeLists.txt new file mode 100644 index 00000000..c1bb0c35 --- /dev/null +++ b/src/transport/zerotier/CMakeLists.txt @@ -0,0 +1,50 @@ +# +# Copyright 2017 Garrett D'Amore <garrett@damore.org> +# Copyright 2017 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# ZeroTier protocol + +set (NNG_TRANSPORT_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.") + +if (NNG_TRANSPORT_ZEROTIER) + + # We use the libzerotiercore.a library, which is unfortunately a C++ object + # even though it exposes only public C symbols. It would be extremely + # helpful if libzerotiercore didn't make us carry the whole C++ runtime + # behind us. The user must specify the location of the ZeroTier source + # tree (dev branch for now, and already compiled please) by setting the + # NNG_ZEROTIER_SOURCE macro. + # NB: This needs to be the zerotierone tree, not the libzt library. + # This is because we don't access the API, but instead use the low + # level zerotiercore functionality directly. + # NB: As we wind up linking libzerotiercore.a into the application, + # this means that your application will *also* need to either be licensed + # under the GPLv3, or you will need to have a commercial license from + # ZeroTier permitting its use elsewhere. + + enable_language(CXX) + find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_TRANSPORT_ZEROTIER_SOURCE}) + if (NNG_LIBZTCORE) + set(CMAKE_REQUIRED_INCLUDES ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include) + message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}") + set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}) + set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES} PARENT_SCOPE) + set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include PARENT_SCOPE) + nng_check_sym(ZT_Node_join ZeroTierOne.h HAVE_ZTCORE) + endif() + if (NOT HAVE_ZTCORE) + message (FATAL_ERROR "Cannot find ZeroTier components") + endif() + message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}") + + set(ZT_SOURCES transport/zerotier/zerotier.c transport/zerotier/zerotier.h) + install(FILES zerotier.h DESTINATION include/nng/transport/zerotier) +endif() + +set(NNG_SOURCES ${NNG_SOURCES} ${ZT_SOURCES} PARENT_SCOPE) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7cb4786f..3263e0a5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -61,20 +61,37 @@ if (NNG_TESTS) math (EXPR TEST_PORT "${TEST_PORT}+20") endmacro (add_nng_test) - macro (add_nng_compat_test NAME TIMEOUT) - list (APPEND all_tests ${NAME}) - add_executable (${NAME} ${NAME}.c) - target_link_libraries (${NAME} ${PROJECT_NAME}_static) - target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES}) - target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB) - if (CMAKE_THREAD_LIBS_INIT) - target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}") - endif() + # Compatibility tests are only added if all of the legacy protocols + # are present. It's not worth trying to figure out which of these + # should work and which shouldn't. + if (NNG_PROTO_BUS0 AND + NNG_PROTO_PAIR0 AND + NNG_PROTO_REQ0 AND + NNG_PROTO_REP0 AND + NNG_PROTO_PUB0 AND + NNG_PROTO_SUB0 AND + NNG_PROTO_PUSH0 AND + NNG_PROTO_PULL0) + + macro (add_nng_compat_test NAME TIMEOUT) + list (APPEND all_tests ${NAME}) + add_executable (${NAME} ${NAME}.c) + target_link_libraries (${NAME} ${PROJECT_NAME}_static) + target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES}) + target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB) + if (CMAKE_THREAD_LIBS_INIT) + target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}") + endif() - add_test (NAME ${NAME} COMMAND ${NAME} ${TEST_PORT}) - set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT}) - math (EXPR TEST_PORT "${TEST_PORT}+10") - endmacro (add_nng_compat_test) + add_test (NAME ${NAME} COMMAND ${NAME} ${TEST_PORT}) + set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT}) + math (EXPR TEST_PORT "${TEST_PORT}+10") + endmacro (add_nng_compat_test) + else () + macro (add_nng_compat_test NAME TIMEOUT) + endmacro (add_nng_compat_test) + message (STATUS "Compatibility tests disabled (unconfigured legacy protocols)") + endif () macro (add_nng_cpp_test NAME TIMEOUT) if (NOT NNG_ENABLE_COVERAGE) @@ -130,11 +147,13 @@ add_nng_test(device 5) add_nng_test(errors 2) add_nng_test(pair1 5) add_nng_test(udp 5) -if (NNG_HAVE_ZEROTIER) - add_nng_test(zt 60) -endif() +add_nng_test(zt 60) # compatbility tests +# We only support these if ALL the legacy protocols are supported. This +# is because we don't want to make modifications to partially enable some +# of these tests. Folks minimizing the library probably don't care too +# much about these anyway. add_nng_compat_test(compat_block 5) add_nng_compat_test(compat_bug777 5) add_nng_compat_test(compat_bus 5) diff --git a/tests/aio.c b/tests/aio.c index 2a31319d..0746fa51 100644 --- a/tests/aio.c +++ b/tests/aio.c @@ -11,6 +11,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/pair1/pair.h" + +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/bus.c b/tests/bus.c index 8283bf21..88137b81 100644 --- a/tests/bus.c +++ b/tests/bus.c @@ -11,6 +11,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/bus0/bus.h" + +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/cplusplus_pair.cc b/tests/cplusplus_pair.cc index 54a99737..11ce1912 100644 --- a/tests/cplusplus_pair.cc +++ b/tests/cplusplus_pair.cc @@ -8,59 +8,68 @@ // #include "nng.h" +#include "protocol/pair1/pair.h" #include <cstring> +#include <iostream> #define SOCKET_ADDRESS "inproc://c++" int main(int argc, char **argv) { - nng_socket s1; - nng_socket s2; - int rv; - size_t sz; - char buf[8]; - if ((rv = nng_pair0_open(&s1)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_pair0_open(&s2)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_send(s2, (void *)"ABC", 4, 0)) != 0) { - throw nng_strerror(rv); - } - sz = sizeof (buf); - if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) { - throw nng_strerror(rv); - } - if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) { - throw "Contents did not match"; - } - if ((rv = nng_send(s1, (void *)"DEF", 4, 0)) != 0) { - throw nng_strerror(rv); - } - sz = sizeof (buf); - if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) { - throw nng_strerror(rv); - } - if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) { - throw "Contents did not match"; - } - if ((rv = nng_close(s1)) != 0) { - throw nng_strerror(rv); - } - if ((rv = nng_close(s2)) != 0) { - throw nng_strerror(rv); - } +#if defined(NNG_HAVE_PAIR1) - return (0); -} + nng_socket s1; + nng_socket s2; + int rv; + size_t sz; + char buf[8]; + + if ((rv = nng_pair1_open(&s1)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_pair1_open(&s2)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_send(s2, (void *) "ABC", 4, 0)) != 0) { + throw nng_strerror(rv); + } + sz = sizeof(buf); + if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) { + throw nng_strerror(rv); + } + if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) { + throw "Contents did not match"; + } + if ((rv = nng_send(s1, (void *) "DEF", 4, 0)) != 0) { + throw nng_strerror(rv); + } + sz = sizeof(buf); + if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) { + throw nng_strerror(rv); + } + if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) { + throw "Contents did not match"; + } + if ((rv = nng_close(s1)) != 0) { + throw nng_strerror(rv); + } + if ((rv = nng_close(s2)) != 0) { + throw nng_strerror(rv); + } + std::cout << "Pass." << std::endl; +#else + std::cout << "Skipped (protocol unconfigured)." << std::endl; +#endif + + return (0); +} diff --git a/tests/device.c b/tests/device.c index 5c650a91..56ad2cb5 100644 --- a/tests/device.c +++ b/tests/device.c @@ -10,6 +10,8 @@ #include "convey.h" #include "nng.h" +#include "protocol/pair1/pair.h" +#include "stubs.h" #include <string.h> @@ -68,8 +70,8 @@ Main({ So(nng_listen(dev1, addr1, NULL, 0) == 0); So(nng_listen(dev2, addr2, NULL, 0) == 0); - So(nng_pair_open(&end1) == 0); - So(nng_pair_open(&end2) == 0); + So(nng_pair1_open(&end1) == 0); + So(nng_pair1_open(&end2) == 0); So(nng_dial(end1, addr1, NULL, 0) == 0); So(nng_dial(end2, addr2, NULL, 0) == 0); diff --git a/tests/pair1.c b/tests/pair1.c index 9ed73037..cd3a1035 100644 --- a/tests/pair1.c +++ b/tests/pair1.c @@ -10,8 +10,11 @@ #include "convey.h" #include "nng.h" +#include "protocol/pair1/pair.h" #include "trantest.h" +#include "stubs.h" + #include <string.h> #define SECOND(x) ((x) *1000) diff --git a/tests/pipeline.c b/tests/pipeline.c index 67be57c6..96ae3d17 100644 --- a/tests/pipeline.c +++ b/tests/pipeline.c @@ -10,6 +10,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/pollfd.c b/tests/pollfd.c index e1965b09..8526c705 100644 --- a/tests/pollfd.c +++ b/tests/pollfd.c @@ -8,9 +8,6 @@ // found online at https://opensource.org/licenses/MIT. // -#include "convey.h" -#include "nng.h" - #include <string.h> #ifndef _WIN32 @@ -32,14 +29,21 @@ #endif +#include "convey.h" +#include "nng.h" +#include "protocol/pair1/pair.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "stubs.h" + TestMain("Poll FDs", { Convey("Given a connected pair of sockets", { nng_socket s1; nng_socket s2; - So(nng_pair_open(&s1) == 0); - So(nng_pair_open(&s2) == 0); + So(nng_pair1_open(&s1) == 0); + So(nng_pair1_open(&s2) == 0); Reset({ nng_close(s1); nng_close(s2); @@ -106,26 +110,25 @@ TestMain("Poll FDs", { So(nng_getopt(s1, NNG_OPT_RECVFD, &fd, &sz) == 0); So(sz == sizeof(fd)); }); - Convey("We cannot get a send FD for PULL", { - nng_socket s3; - int fd; - size_t sz; - So(nng_pull_open(&s3) == 0); - Reset({ nng_close(s3); }); - sz = sizeof(fd); - So(nng_getopt(s3, NNG_OPT_SENDFD, &fd, &sz) == - NNG_ENOTSUP); - }); + }); - Convey("We cannot get a recv FD for PUSH", { - nng_socket s3; - int fd; - size_t sz; - So(nng_push_open(&s3) == 0); - Reset({ nng_close(s3); }); - sz = sizeof(fd); - So(nng_getopt(s3, NNG_OPT_RECVFD, &fd, &sz) == - NNG_ENOTSUP); - }); + Convey("We cannot get a send FD for PULL", { + nng_socket s3; + int fd; + size_t sz; + So(nng_pull0_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, NNG_OPT_SENDFD, &fd, &sz) == NNG_ENOTSUP); + }); + + Convey("We cannot get a recv FD for PUSH", { + nng_socket s3; + int fd; + size_t sz; + So(nng_push0_open(&s3) == 0); + Reset({ nng_close(s3); }); + sz = sizeof(fd); + So(nng_getopt(s3, NNG_OPT_RECVFD, &fd, &sz) == NNG_ENOTSUP); }); }) diff --git a/tests/pubsub.c b/tests/pubsub.c index a8209a7d..796d951e 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -10,6 +10,9 @@ #include "convey.h" #include "nng.h" +#include "protocol/pubsub0/pub.h" +#include "protocol/pubsub0/sub.h" +#include "stubs.h" #include <string.h> @@ -34,6 +37,16 @@ TestMain("PUB/SUB pattern", { nng_msg *msg; So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP); }); + + Convey("It cannot subscribe", { + So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == + NNG_ENOTSUP); + }); + + Convey("It cannot unsubscribe", { + So(nng_setopt(pub, NNG_OPT_SUB_UNSUBSCRIBE, "", 0) == + NNG_ENOTSUP); + }); }); Convey("We can create a SUB socket", { @@ -48,6 +61,23 @@ TestMain("PUB/SUB pattern", { So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP); nng_msg_free(msg); }); + + Convey("It can subscribe", { + So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) == + 0); + So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0); + Convey("And it can unsubscribe", { + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, + "ABC", 3) == 0); + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", + 0) == 0); + + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", + 0) == NNG_ENOENT); + So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, + "HELLO", 0) == NNG_ENOENT); + }); + }); }); Convey("We can create a linked PUB/SUB pair", { @@ -73,28 +103,6 @@ TestMain("PUB/SUB pattern", { nng_msleep(20); // give time for connecting threads - Convey("Sub can subscribe", { - So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) == - 0); - So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0); - Convey("Unsubscribe works", { - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, - "ABC", 3) == 0); - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", - 0) == 0); - - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "", - 0) == NNG_ENOENT); - So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, - "HELLO", 0) == NNG_ENOENT); - }); - }); - - Convey("Pub cannot subscribe", { - So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == - NNG_ENOTSUP); - }); - Convey("Subs can receive from pubs", { nng_msg *msg; diff --git a/tests/reconnect.c b/tests/reconnect.c index 8179055d..e191f70a 100644 --- a/tests/reconnect.c +++ b/tests/reconnect.c @@ -10,6 +10,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "stubs.h" + #include <string.h> #define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) diff --git a/tests/reqrep.c b/tests/reqrep.c index df0621bc..4e837c34 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -10,6 +10,9 @@ #include "convey.h" #include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" #include <string.h> diff --git a/tests/resolv.c b/tests/resolv.c index 49d258c3..0678b05f 100644 --- a/tests/resolv.c +++ b/tests/resolv.c @@ -51,26 +51,6 @@ ip6tostr(void *addr) // too much on them, since localhost can be configured weirdly. Notably // the normal assumptions on Linux do *not* hold true. #if 0 - - Convey("Localhost IPv4 resolves", { - nni_aio aio; - const char *str; - memset(&aio, 0, sizeof (aio)); - nni_aio_init(&aio, NULL, NULL); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1, - &aio); - nni_aio_wait(&aio); - So(nni_aio_result(&aio) == 0); - So(aio.a_naddrs == 1); - So(aio.a_addrs[0].s_un.s_in.sa_family == NNG_AF_INET); - So(aio.a_addrs[0].s_un.s_in.sa_port == ntohs(80)); - So(aio.a_addrs[0].s_un.s_in.sa_addr == ntohl(0x7f000001)); - str = ip4tostr(&aio.a_addrs[0].s_un.s_in.sa_addr); - So(strcmp(str, "127.0.0.1") == 0); - nni_aio_fini(&aio); - } - ); - Convey("Localhost IPv6 resolves", { nni_aio aio; memset(&aio, 0, sizeof (aio)); @@ -87,44 +67,6 @@ ip6tostr(void *addr) So(strcmp(str, "::1") == 0); nni_aio_fini(&aio); } - ); - Convey("Localhost UNSPEC resolves", { - nni_aio aio; - memset(&aio, 0, sizeof (aio)); - const char *str; - int i; - nni_aio_init(&aio, NULL, NULL); - nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, - &aio); - nni_aio_wait(&aio); - So(nni_aio_result(&aio) == 0); - So(aio.a_naddrs == 2); - for (i = 0; i < 2; i++) { - switch (aio.a_addrs[i].s_un.s_family) { - case NNG_AF_INET6: - So(aio.a_addrs[i].s_un.s_in6.sa_port == - ntohs(80)); - str = - ip6tostr(&aio.a_addrs[i].s_un.s_in6.sa_addr); - So(strcmp(str, "::1") == 0); - break; - - case NNG_AF_INET: - So(aio.a_addrs[i].s_un.s_in.sa_port == - ntohs(80)); - str = - ip4tostr(&aio.a_addrs[i].s_un.s_in.sa_addr); - So(strcmp(str, "127.0.0.1") == 0); - break; - default: - So(1 == 0); - } - } - So(aio.a_addrs[0].s_un.s_family != - aio.a_addrs[1].s_un.s_family); - nni_aio_fini(&aio); - } - ); #endif TestMain("Resolver", { @@ -179,11 +121,18 @@ TestMain("Resolver", { So(strcmp(str, "8.8.4.4") == 0); nni_aio_fini(aio); }); + Convey("Numeric v6 resolves", { nni_aio * aio; const char * str; nng_sockaddr sa; + // Travis CI has moved some of their services to host that + // apparently don't support IPv6 at all. This is very sad. + if (getenv("TRAVIS") != NULL) { + ConveySkip("IPv6 missing from CI provider"); + } + nni_aio_init(&aio, NULL, NULL); aio->a_addr = &sa; nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio); @@ -230,5 +179,51 @@ TestMain("Resolver", { nni_aio_fini(aio); }); + Convey("Localhost IPv4 resolves", { + nni_aio * aio; + const char * str; + nng_sockaddr sa; + + nni_aio_init(&aio, NULL, NULL); + aio->a_addr = &sa; + nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + So(sa.s_un.s_in.sa_family == NNG_AF_INET); + So(sa.s_un.s_in.sa_port == ntohs(80)); + So(sa.s_un.s_in.sa_addr == ntohl(0x7f000001)); + str = ip4tostr(&sa.s_un.s_in.sa_addr); + So(strcmp(str, "127.0.0.1") == 0); + nni_aio_fini(aio); + }); + + Convey("Localhost UNSPEC resolves", { + nni_aio * aio; + const char * str; + nng_sockaddr sa; + + nni_aio_init(&aio, NULL, NULL); + aio->a_addr = &sa; + nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, aio); + nni_aio_wait(aio); + So(nni_aio_result(aio) == 0); + So((sa.s_un.s_family == NNG_AF_INET) || + (sa.s_un.s_family == NNG_AF_INET6)); + switch (sa.s_un.s_family) { + case NNG_AF_INET: + So(sa.s_un.s_in.sa_port == ntohs(80)); + So(sa.s_un.s_in.sa_addr == ntohl(0x7f000001)); + str = ip4tostr(&sa.s_un.s_in.sa_addr); + So(strcmp(str, "127.0.0.1") == 0); + break; + case NNG_AF_INET6: + So(sa.s_un.s_in6.sa_port == ntohs(80)); + str = ip6tostr(&sa.s_un.s_in6.sa_addr); + So(strcmp(str, "::1") == 0); + break; + } + nni_aio_fini(aio); + }); + nni_fini(); }) diff --git a/tests/scalability.c b/tests/scalability.c index b1a9b767..d869d5fc 100644 --- a/tests/scalability.c +++ b/tests/scalability.c @@ -11,6 +11,10 @@ #include "convey.h" #include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" + #include <string.h> static int nclients = 200; diff --git a/tests/sock.c b/tests/sock.c index 1c277b46..d3af414a 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -12,6 +12,9 @@ #include "nng.h" #include "trantest.h" +#include "protocol/pubsub0/sub.h" + +#include "protocol/pair1/pair.h" #include "stubs.h" #include <string.h> diff --git a/tests/stubs.h b/tests/stubs.h index 780ac772..0641c970 100644 --- a/tests/stubs.h +++ b/tests/stubs.h @@ -44,4 +44,51 @@ getms(void) #endif } +int +nosocket(nng_socket *s) +{ + ConveySkip("Protocol unconfigured"); + return (NNG_ENOTSUP); +} + +#ifndef NNG_HAVE_REQ0 +#define nng_req0_open nosocket +#endif + +#ifndef NNG_HAVE_REP0 +#define nng_rep0_open nosocket +#endif + +#ifndef NNG_HAVE_PUB0 +#define nng_pub0_open nosocket +#endif + +#ifndef NNG_HAVE_SUB0 +#define nng_sub0_open nosocket +#endif + +#ifndef NNG_HAVE_PAIR0 +#define nng_pair0_open nosocket +#endif + +#ifndef NNG_HAVE_PAIR1 +#define nng_pair1_open nosocket +#endif + +#ifndef NNG_HAVE_PUSH0 +#define nng_push0_open nosocket +#endif + +#ifndef NNG_HAVE_PULL0 +#define nng_pull0_open nosocket +#endif + +#ifndef NNG_HAVE_SURVEYOR0 +#define nng_surveyor0_open nosocket +#endif + +#ifndef NNG_HAVE_RESPONDENT0 +#define nng_respondent0_open nosocket +#endif + #endif // STUBS_H diff --git a/tests/survey.c b/tests/survey.c index f4fab009..a3a7ba1d 100644 --- a/tests/survey.c +++ b/tests/survey.c @@ -9,7 +9,11 @@ // #include "convey.h" + #include "nng.h" +#include "protocol/survey0/respond.h" +#include "protocol/survey0/survey.h" +#include "stubs.h" #include <string.h> diff --git a/tests/tcp.c b/tests/tcp.c index 79fe1893..fdcf458c 100644 --- a/tests/tcp.c +++ b/tests/tcp.c @@ -9,8 +9,11 @@ // #include "convey.h" +#include "nng.h" +#include "protocol/pair1/pair.h" #include "trantest.h" +#include "stubs.h" // TCP tests. #ifndef _WIN32 diff --git a/tests/tcp6.c b/tests/tcp6.c index b1695c84..a3e55d18 100644 --- a/tests/tcp6.c +++ b/tests/tcp6.c @@ -13,6 +13,9 @@ #include "core/nng_impl.h" // TCP tests for IPv6. +#include "protocol/pair1/pair.h" + +#include "stubs.h" static int has_v6(void) diff --git a/tests/trantest.h b/tests/trantest.h index 37763d24..d334c257 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -11,6 +11,8 @@ #include "convey.h" #include "core/nng_impl.h" #include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" #include <stdlib.h> #include <string.h> @@ -30,9 +32,54 @@ unsigned trantest_port = 0; typedef int (*trantest_proptest_t)(nng_msg *, nng_listener, nng_dialer); +#ifndef NNG_HAVE_ZEROTIER +#define nng_zt_register notransport +#endif +#ifndef NNG_HAVE_INPROC +#define nng_inproc_register notransport +#endif +#ifndef NNG_HAVE_IPC +#define nng_ipc_register notransport +#endif +#ifndef NNG_HAVE_TCP +#define nng_tcp_register notransport +#endif + +int +notransport(void) +{ + ConveySkip("Transport not configured"); + return (NNG_ENOTSUP); +} + +#define CHKTRAN(s, t) \ + if (strncmp(s, t, strlen(t)) == 0) \ + notransport() + +void +trantest_checktran(const char *url) +{ +#ifndef NNG_HAVE_ZEROTIER + CHKTRAN(url, "zt:"); +#endif +#ifndef NNG_HAVE_INPROC + CHKTRAN(url, "inproc:"); +#endif +#ifndef NNG_HAVE_IPC + CHKTRAN(url, "ipc:"); +#endif +#ifndef NNG_HAVE_TCP + CHKTRAN(url, "tcp:"); +#endif + + (void) url; +} + void trantest_next_address(char *out, const char *template) { + trantest_checktran(template); + if (trantest_port == 0) { char *pstr; trantest_port = 5555; @@ -56,11 +103,16 @@ void trantest_init(trantest *tt, const char *addr) { trantest_next_address(tt->addr, addr); + +#if defined(NNG_HAVE_REQ0) && defined(NNG_HAVE_REP0) So(nng_req_open(&tt->reqsock) == 0); So(nng_rep_open(&tt->repsock) == 0); tt->tran = nni_tran_find(addr); So(tt->tran != NULL); +#else + ConveySkip("Missing REQ or REP protocols"); +#endif } void @@ -9,9 +9,12 @@ // #include "convey.h" +#include "nng.h" +#include "protocol/pair0/pair.h" +#include "transport/zerotier/zerotier.h" #include "trantest.h" -#include "transport/zerotier/zerotier.h" +#include "stubs.h" // zerotier tests. @@ -35,6 +38,10 @@ mkdir(const char *path, int mode) #include <unistd.h> #endif // WIN32 +#ifndef NNG_HAVE_ZEROTIER +#define nng_zt_network_status_ok 0 +#endif + static int check_props(nng_msg *msg, nng_listener l, nng_dialer d) { @@ -197,6 +204,8 @@ TestMain("ZeroTier Transport", { char addr[NNG_MAXADDRLEN]; int rv; + So(nng_zt_register() == 0); + snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port); So(nng_pair_open(&s) == 0); @@ -240,6 +249,8 @@ TestMain("ZeroTier Transport", { // uint64_t node = 0xb000072fa6ull; // my personal host uint64_t node = 0x2d2f619cccull; // my personal host + So(nng_zt_register() == 0); + snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u", (unsigned long long) node, port); @@ -258,6 +269,8 @@ TestMain("ZeroTier Transport", { uint64_t node1 = 0; uint64_t node2 = 0; + So(nng_zt_register() == 0); + snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port); So(nng_pair_open(&s) == 0); @@ -306,6 +319,7 @@ TestMain("ZeroTier Transport", { port = 9944; // uint64_t node = 0xb000072fa6ull; // my personal host + So(nng_zt_register() == 0); snprintf(addr1, sizeof(addr1), "zt://" NWID ":%u", port); |
