aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt109
-rw-r--r--docs/nng_bus.adoc6
-rw-r--r--docs/nng_inproc.adoc17
-rw-r--r--docs/nng_ipc.adoc110
-rw-r--r--docs/nng_pair.adoc155
-rw-r--r--docs/nng_pub.adoc6
-rw-r--r--docs/nng_pull.adoc85
-rw-r--r--docs/nng_push.adoc93
-rw-r--r--docs/nng_sub.adoc6
-rw-r--r--docs/nng_tcp.adoc137
-rw-r--r--perf/perf.c18
-rw-r--r--src/CMakeLists.txt49
-rw-r--r--src/core/msgqueue.c1
-rw-r--r--src/core/protocol.h35
-rw-r--r--src/core/socket.c1
-rw-r--r--src/core/transport.c40
-rw-r--r--src/nng.h36
-rw-r--r--src/nng_compat.c36
-rw-r--r--src/protocol/bus0/CMakeLists.txt18
-rw-r--r--src/protocol/bus0/bus.c (renamed from src/protocol/bus/bus.c)205
-rw-r--r--src/protocol/bus0/bus.h28
-rw-r--r--src/protocol/pair0/CMakeLists.txt18
-rw-r--r--src/protocol/pair0/pair.c (renamed from src/protocol/pair/pair_v0.c)4
-rw-r--r--src/protocol/pair0/pair.h28
-rw-r--r--src/protocol/pair1/CMakeLists.txt18
-rw-r--r--src/protocol/pair1/pair.c (renamed from src/protocol/pair/pair_v1.c)6
-rw-r--r--src/protocol/pair1/pair.h30
-rw-r--r--src/protocol/pipeline0/CMakeLists.txt23
-rw-r--r--src/protocol/pipeline0/pull.c (renamed from src/protocol/pipeline/pull.c)146
-rw-r--r--src/protocol/pipeline0/pull.h28
-rw-r--r--src/protocol/pipeline0/push.c (renamed from src/protocol/pipeline/push.c)138
-rw-r--r--src/protocol/pipeline0/push.h28
-rw-r--r--src/protocol/pubsub0/CMakeLists.txt23
-rw-r--r--src/protocol/pubsub0/pub.c (renamed from src/protocol/pubsub/pub.c)169
-rw-r--r--src/protocol/pubsub0/pub.h28
-rw-r--r--src/protocol/pubsub0/sub.c (renamed from src/protocol/pubsub/sub.c)184
-rw-r--r--src/protocol/pubsub0/sub.h31
-rw-r--r--src/protocol/reqrep0/CMakeLists.txt23
-rw-r--r--src/protocol/reqrep0/rep.c (renamed from src/protocol/reqrep/rep.c)227
-rw-r--r--src/protocol/reqrep0/rep.h28
-rw-r--r--src/protocol/reqrep0/req.c (renamed from src/protocol/reqrep/req.c)239
-rw-r--r--src/protocol/reqrep0/req.h30
-rw-r--r--src/protocol/survey0/CMakeLists.txt23
-rw-r--r--src/protocol/survey0/respond.c (renamed from src/protocol/survey/respond.c)231
-rw-r--r--src/protocol/survey0/respond.h28
-rw-r--r--src/protocol/survey0/survey.c (renamed from src/protocol/survey/survey.c)205
-rw-r--r--src/protocol/survey0/survey.h30
-rw-r--r--src/transport/inproc/CMakeLists.txt18
-rw-r--r--src/transport/inproc/inproc.c3
-rw-r--r--src/transport/ipc/CMakeLists.txt18
-rw-r--r--src/transport/ipc/ipc.c10
-rw-r--r--src/transport/ipc/ipc.h19
-rw-r--r--src/transport/tcp/CMakeLists.txt18
-rw-r--r--src/transport/tcp/tcp.c10
-rw-r--r--src/transport/tcp/tcp.h18
-rw-r--r--src/transport/zerotier/CMakeLists.txt50
-rw-r--r--tests/CMakeLists.txt51
-rw-r--r--tests/aio.c4
-rw-r--r--tests/bus.c4
-rw-r--r--tests/cplusplus_pair.cc99
-rw-r--r--tests/device.c6
-rw-r--r--tests/pair1.c3
-rw-r--r--tests/pipeline.c4
-rw-r--r--tests/pollfd.c53
-rw-r--r--tests/pubsub.c52
-rw-r--r--tests/reconnect.c4
-rw-r--r--tests/reqrep.c3
-rw-r--r--tests/resolv.c111
-rw-r--r--tests/scalability.c4
-rw-r--r--tests/sock.c3
-rw-r--r--tests/stubs.h47
-rw-r--r--tests/survey.c4
-rw-r--r--tests/tcp.c3
-rw-r--r--tests/tcp6.c3
-rw-r--r--tests/trantest.h52
-rw-r--r--tests/zt.c16
76 files changed, 2686 insertions, 1161 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c2815ca3..6cd49b9f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -93,11 +93,83 @@ option (NNG_TESTS "Build and run tests" ON)
option (NNG_TOOLS "Build extra tools" OFF)
option (NNG_ENABLE_NNGCAT "Enable building nngcat utility." ${NNG_TOOLS})
option (NNG_ENABLE_COVERAGE "Enable coverage reporting." OFF)
-option (NNG_ENABLE_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF)
-set (NNG_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.")
# Enable access to private APIs for our own use.
add_definitions (-DNNG_PRIVATE)
+option (NNG_PROTO_BUS0 "Enable BUSv0 protocol." ON)
+if (NNG_PROTO_BUS0)
+ add_definitions (-DNNG_HAVE_BUS0)
+endif ()
+
+option (NNG_PROTO_PAIR0 "Enable PAIRv0 protocol." ON)
+if (NNG_PROTO_PAIR0)
+ add_definitions (-DNNG_HAVE_PAIR0)
+endif ()
+
+option (NNG_PROTO_PAIR1 "Enable PAIRv1 protocol." ON)
+if (NNG_PROTO_PAIR1)
+ add_definitions (-DNNG_HAVE_PAIR1)
+endif ()
+
+option (NNG_PROTO_REQ0 "Enable REQv0 protocol." ON)
+if (NNG_PROTO_REQ0)
+ add_definitions (-DNNG_HAVE_REQ0)
+endif ()
+
+option (NNG_PROTO_REP0 "Enable REPv0 protocol." ON)
+if (NNG_PROTO_REP0)
+ add_definitions (-DNNG_HAVE_REP0)
+endif ()
+
+option (NNG_PROTO_PUB0 "Enable PUBv0 protocol." ON)
+if (NNG_PROTO_PUB0)
+ add_definitions (-DNNG_HAVE_PUB0)
+endif ()
+
+option (NNG_PROTO_SUB0 "Enable SUBv0 protocol." ON)
+if (NNG_PROTO_SUB0)
+ add_definitions (-DNNG_HAVE_SUB0)
+endif ()
+
+option (NNG_PROTO_PUSH0 "Enable PUSHv0 protocol." ON)
+if (NNG_PROTO_PUSH0)
+ add_definitions (-DNNG_HAVE_PUSH0)
+endif ()
+
+option (NNG_PROTO_PULL0 "Enable PULLv0 protocol." ON)
+if (NNG_PROTO_PULL0)
+ add_definitions (-DNNG_HAVE_PULL0)
+endif ()
+
+option (NNG_PROTO_SURVEYOR0 "Enable SURVEYORv0 protocol." ON)
+if (NNG_PROTO_SURVEYOR0)
+ add_definitions (-DNNG_HAVE_SURVEYOR0)
+endif ()
+
+option (NNG_PROTO_RESPONDENT0 "Enable RESPONDENTv0 protocol." ON)
+if (NNG_PROTO_RESPONDENT0)
+ add_definitions (-DNNG_HAVE_RESPONDENT0)
+endif ()
+
+option (NNG_TRANSPORT_INPROC "Enable inproc transport." ON)
+if (NNG_TRANSPORT_INPROC)
+ add_definitions (-DNNG_HAVE_INPROC)
+endif ()
+
+option (NNG_TRANSPORT_IPC "Enable IPC transport." ON)
+if (NNG_TRANSPORT_IPC)
+ add_definitions (-DNNG_HAVE_IPC)
+endif ()
+
+option (NNG_TRANSPORT_TCP "Enable TCP transport." ON)
+if (NNG_TRANSPORT_TCP)
+ add_definitions (-DNNG_HAVE_TCP)
+endif ()
+
+option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF)
+if (NNG_TRANSPORT_ZEROTIER)
+ add_definitions (-DNNG_HAVE_ZEROTIER)
+endif ()
# Platform checks.
if (NNG_ENABLE_COVERAGE)
@@ -247,39 +319,6 @@ nng_check_sym (strlcat string.h NNG_HAVE_STRLCAT)
nng_check_sym (strlcpy string.h NNG_HAVE_STRLCPY)
nng_check_sym (strnlen string.h NNG_HAVE_STRNLEN)
-# Search for ZeroTier
-# We use the libzerotiercore.a library, which is unfortunately a C++ object
-# even though it exposes only public C symbols. It would be extremely
-# helpful if libzerotiercore didn't make us carry the whole C++ runtime
-# behind us. The user must specify the location of the ZeroTier source
-# tree (dev branch for now, and already compiled please) by setting the
-# NNG_ZEROTIER_SOURCE macro.
-# NB: This needs to be the zerotierone tree, not the libzt library.
-# This is because we don't access the API, but instead use the low
-# level zerotiercore functionality directly.
-# NB: As we wind up linking libzerotiercore.a into the application,
-# this means that your application will *also* need to either be licensed
-# under the GPLv3, or you will need to have a commercial license from
-# ZeroTier permitting its use elsewhere.
-if (NNG_ENABLE_ZEROTIER)
- enable_language(CXX)
- find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_ZEROTIER_SOURCE})
- if (NNG_LIBZTCORE)
- set(CMAKE_REQUIRED_INCLUDES ${NNG_ZEROTIER_SOURCE}/include)
-# set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} c++)
-# set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} c++)
- message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}")
- set(CMAKE_REQUIRED_LIBRARIES ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES})
- set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES})
- set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_ZEROTIER_SOURCE}/include)
- nng_check_sym(ZT_Node_join ZeroTierOne.h NNG_HAVE_ZEROTIER)
- endif()
- if (NOT NNG_HAVE_ZEROTIER)
- message (FATAL_ERROR "Cannot find ZeroTier components")
- endif()
- message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}")
-endif()
-
add_subdirectory (src)
if (NNG_TESTS)
diff --git a/docs/nng_bus.adoc b/docs/nng_bus.adoc
index 03c43381..dd83062b 100644
--- a/docs/nng_bus.adoc
+++ b/docs/nng_bus.adoc
@@ -21,11 +21,9 @@ SYNOPSIS
[source,c]
----------
-#include <nng/protocol/bus/bus.h>
+#include <nng/protocol/bus0/bus.h>
-int nng_bus_open(nng_socket *s);
int nng_bus0_open(nng_socket *s);
-
----------
DESCRIPTION
@@ -57,7 +55,7 @@ likely that message loss is to occur.
Socket Operations
~~~~~~~~~~~~~~~~~
-The `nng_bus_open()` call creates a bus socket. This socket
+The `nng_bus0_open()` call creates a bus socket. This socket
may be used to send and receive messages. Sending messages will
attempt to deliver to each directly connected peer.
diff --git a/docs/nng_inproc.adoc b/docs/nng_inproc.adoc
index 9e044f6d..a6d83c95 100644
--- a/docs/nng_inproc.adoc
+++ b/docs/nng_inproc.adoc
@@ -47,8 +47,7 @@ URI Format
This transport uses URIs using the scheme `inproc://`, followed by
an arbitrary string of text, terminated by a `NUL` byte. The
-entire URI must be less than `NNG_MAXADDRLEN` bytes long, including
-the terminating `NUL`.
+entire URI must be less than `NNG_MAXADDRLEN` bytes long.
Multiple URIs can be used within the
same application, and they will not interfere with one another.
@@ -65,16 +64,22 @@ When using an `nng_sockaddr` structure, the actual structure is of type
[source,c]
--------
-#define NNG_AF_INPROC 1
+#define NNG_AF_INPROC 1 <1>
#define NNG_MAXADDRLEN 128
-struct nng_sockaddr_inproc {
+typedef nng_sockaddr_inproc {
+ // <2>
uint16_t sa_family; // must be NNG_AF_INPROC
- uint32_t sa_path[NNG_MAXADDRLEN]; // arbitrary "path"
+ char sa_path[NNG_MAXADDRLEN]; // arbitrary "path"
+ //
}
--------
+<1> The values of these macros may change, so applications
+should avoid depending upon their values and instead use them symbolically.
+<2> Other members may be present, but only those listed here
+are suitable for application use.
-The `sa_family` member will have the value `NNG_AF_INPROC` (1).
+The `sa_family` member will have the value `NNG_AF_INPROC`.
The `sa_path` member is an ASCIIZ string, and may contain any characters,
terminated by a `NUL` byte.
diff --git a/docs/nng_ipc.adoc b/docs/nng_ipc.adoc
new file mode 100644
index 00000000..c5dac4f7
--- /dev/null
+++ b/docs/nng_ipc.adoc
@@ -0,0 +1,110 @@
+nng_ipc(7)
+==========
+:doctype: manpage
+:manmanual: nng
+:mansource: nng
+:icons: font
+:source-highlighter: pygments
+:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \
+ Copyright 2017 Capitar IT Group BV <info@capitar.com> \
+ This software is supplied under the terms of the MIT License, a \
+ copy of which should be located in the distribution where this \
+ file was obtained (LICENSE.txt). A copy of the license may also \
+ be found online at https://opensource.org/licenses/MIT.
+
+NAME
+----
+nng_ipc - IPC transport for nng
+
+SYNOPSIS
+--------
+
+[source,c]
+----------
+#include <nng/transport/ipc/ipc.h>
+
+int nng_ipc_register(void);
+----------
+
+DESCRIPTION
+-----------
+
+The _nng_ipc_ transport provides communication support between
+_nng_ sockets within different processes on the same host. For POSIX
+platforms, this is implemented using UNIX domain sockets. For Windows,
+this is implemented using Windows Named Pipes. Other platforms may
+have different implementation strategies.
+
+// We need to insert a reference to the nanomsg RFC.
+
+Registration
+~~~~~~~~~~~~
+
+The _ipc_ transport is generally built-in to the _nng_ core, so
+no extra steps to use it should be necessary.
+
+URI Format
+~~~~~~~~~~
+
+This transport uses URIs using the scheme `ipc://`, followed by
+a path name in the file system where the socket or named pipe
+should be created.
+
+TIP: On Windows, all names are prefixed by `\.\pipe\` and do not
+occupy the normal file system. On POSIX platforms, the path is
+taken literally, and is relative to the current directory, unless
+an extra leading `/` is provided. For example, `ipc://myname` refers
+to the name `myname` in the current directory, whereas `ipc:///tmp/myname`
+refers to `myname` located in `/tmp`.
+
+The entire URI must be less than `NNG_MAXADDRLEN` bytes long.
+
+Socket Address
+~~~~~~~~~~~~~~
+
+When using an `nng_sockaddr` structure, the actual structure is of type
+`nng_sockaddr_ipc`. This is a `struct` type with the following definition:
+
+[source,c]
+--------
+#define NNG_AF_IPC 2 <1>
+#define NNG_MAXADDRLEN 128
+
+typedef struct {
+ // ... <2>
+ uint16_t sa_family; // must be NNG_AF_IPC
+ char sa_path[NNG_MAXADDRLEN]; // arbitrary "path"
+ // ...
+} nng_sockaddr_ipc;
+--------
+<1> The values of these macros may change, so applications
+should avoid depending upon their values and instead use them symbolically.
+<2> Other members may be present, but only those listed here
+are suitable for application use.
+
+The `sa_family` member will have the value `NNG_AF_IPC`.
+The `sa_path` member is an ASCIIZ string, and may contain any legal
+path name (platform-dependent), terminated by a `NUL` byte.
+
+Transport Options
+~~~~~~~~~~~~~~~~~
+
+The _ipc_ transport has no special
+options.footnote:[Options for security attributes and credentials are planned.]
+
+AUTHORS
+-------
+link:mailto:garrett@damore.org[Garrett D'Amore]
+
+SEE ALSO
+--------
+<<nng.adoc#,nng(7)>>
+
+COPYRIGHT
+---------
+
+Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] +
+Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV]
+
+This document is supplied under the terms of the
+https://opensource.org/licenses/LICENSE.txt[MIT License].
diff --git a/docs/nng_pair.adoc b/docs/nng_pair.adoc
new file mode 100644
index 00000000..b55000af
--- /dev/null
+++ b/docs/nng_pair.adoc
@@ -0,0 +1,155 @@
+nng_pair(7)
+===========
+:doctype: manpage
+:manmanual: nng
+:mansource: nng
+:icons: font
+:source-highlighter: pygments
+:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \
+ Copyright 2017 Capitar IT Group BV <info@capitar.com> \
+ This software is supplied under the terms of the MIT License, a \
+ copy of which should be located in the distribution where this \
+ file was obtained (LICENSE.txt). A copy of the license may also \
+ be found online at https://opensource.org/licenses/MIT.
+
+NAME
+----
+nng_pair - pair protocol
+
+SYNOPSIS
+--------
+
+.Version 0
+[source,c]
+----------
+#include <nng/protocol/pair0/pair.h>
+
+int nng_pair0_open(nng_socket *s);
+----------
+
+.Version 1
+[source,c]
+----------
+#include <nng/protocol/pair1/pair.h>
+
+int nng_pair1_open(nng_socket *s);
+----------
+
+DESCRIPTION
+-----------
+
+The _nng_pair_ protocol implements a peer-to-peer pattern, where
+relationships between peers are one-to-one.
+
+Version 1 of this protocol supports an optional _polyamorous_ mode where a
+peer can maintain multiple partnerships. Using this mode requires
+some additional sophistication in the application.
+
+Socket Operations
+~~~~~~~~~~~~~~~~~
+
+The `nng_pair_open()` call creates a _pair_ socket. Normally, this
+pattern will block when attempting to send a message, if no peer is
+able to receive the message.
+
+NOTE: Even though this mode may appear to be "reliable", because back-pressure
+prevents discarding messages most of the time, there are topologies involving
+_devices_ (see <<nng_device.adoc#,nng_device(3)>>) or raw mode sockets where
+messages may be discarded. Applications that require reliable delivery
+semantics should consider using <<nng_req.adoc#,nng_req(7)>> sockets, or
+implement their own acknowledgement layer on top of pair sockets.
+
+In order to avoid head-of-line blocking conditions, _polyamorous_ mode pair
+sockets (version 1 only) discard messages if they are unable to deliver them
+to a peer.
+
+Protocol Versions
+~~~~~~~~~~~~~~~~~
+
+Version 0 is the legacy version of this protocol. It lacks any header
+information, and is suitable when building simple one-to-one topologies.
+
+TIP: Use version 0 if you need to communicate with other implementations,
+including the legacy https://github.com/nanomsg/nanomsg[nanomsg] library or
+https://github.com/go-mangos/mangos[mangos].
+
+Version 1 of the protocol offers improved protection against loops when
+used with <<nng_device.adoc#,nng_device(3)>>. It also offers _polyamorous_
+mode for forming multiple partnerships on a single socket.
+
+NOTE: Version 1 of this protocol is considered experimental at this time.
+
+Polyamorous Mode
+~~~~~~~~~~~~~~~~
+
+Normally pair sockets are for one-to-one communication, and a given peer
+will reject new connections if it already has an active connection to another
+peer.
+
+In _polyamorous_ mode, which is only available with version 1, a socket can
+support many one-to-one connections. In this mode, the application must
+choose the remote peer to receive an ougoing message by setting the value
+of the pipe ID on the outgoing message using
+the <<nng_msg_set_pipe.adoc#,nng_msg_set_pipe(3)>> function.
+
+Most often the value of the outgoing pipe ID will be obtained from an incoming
+message using the <<nng_msg_get_pipe.adoc#,nng_msg_get_pipe(3)>> function,
+such as when replying to an incoming message.
+
+In order to prevent head-of-line blocking, if the peer on the given pipe
+is not able to receive (or the pipe is no longer available, such as if the
+peer has disconnected), then the message will be discarded with no notification
+to the sender.
+
+Protocol Options
+~~~~~~~~~~~~~~~~
+
+The following protocol-specific options are available.
+
+`NNG_OPT_PAIR1_POLY`::
+
+ (Version 1 only). This option enables the use of _polyamorous_ mode.
+ The value is read-write, and takes an integer boolean value. The default
+ false value (0) indicates that legacy monogamous mode should be used.
+
+`NNG_OPT_MAXTTL`::
+
+ (Version 1 only). Maximum time-to-live. This option is an integer value
+ between 0 and 255,
+ inclusive, and is the maximum number of "hops" that a message may
+ pass through until it is discarded. The default value is 8. A value
+ of 0 may be used to disable the loop protection, allowing an infinite
+ number of hops.
++
+TIP: Each node along a forwarding path may have it's own value for the
+maximum time-to-live, and performs its own checks before forwarding a message.
+Therefore it is helpful if all nodes in the topology use the same value for
+this option.
+
+Protocol Headers
+~~~~~~~~~~~~~~~~
+
+Version 0 of the pair protocol has no protocol-specific headers.
+
+Version 1 of the pair protocol uses a single 32-bit unsigned value. The
+low-order (big-endian) byte of this value contains a "hop" count, and is
+used in conjuction with the `NNG_OPT_MAXTTL` option to guard against
+device forwarding loops. This value is initialized to 1, and incremented
+each time the message is received by a new node.
+
+AUTHORS
+-------
+link:mailto:garrett@damore.org[Garrett D'Amore]
+
+SEE ALSO
+--------
+<<nng.adoc#,nng(7)>>
+
+COPYRIGHT
+---------
+
+Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] +
+Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV]
+
+This document is supplied under the terms of the
+https://opensource.org/licenses/LICENSE.txt[MIT License].
diff --git a/docs/nng_pub.adoc b/docs/nng_pub.adoc
index d4db45ae..4c3b764b 100644
--- a/docs/nng_pub.adoc
+++ b/docs/nng_pub.adoc
@@ -21,11 +21,9 @@ SYNOPSIS
[source,c]
----------
-#include <nng/protocol/pubsub/pubsub.h>
+#include <nng/protocol/pubsub0/pub.h>
-int nng_pub_open(nng_socket *s);
int nng_pub0_open(nng_socket *s);
-
----------
DESCRIPTION
@@ -51,7 +49,7 @@ accordingly.
Socket Operations
~~~~~~~~~~~~~~~~~
-The `nng_pub_open()` call creates a publisher socket. This socket
+The `nng_pub0_open()` call creates a publisher socket. This socket
may be used to send messages, but is unable to receive them. Attempts
to receive messages will result in `NNG_ENOTSUP`.
diff --git a/docs/nng_pull.adoc b/docs/nng_pull.adoc
new file mode 100644
index 00000000..17614a3d
--- /dev/null
+++ b/docs/nng_pull.adoc
@@ -0,0 +1,85 @@
+nng_pull(7)
+===========
+:doctype: manpage
+:manmanual: nng
+:mansource: nng
+:icons: font
+:source-highlighter: pygments
+:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \
+ Copyright 2017 Capitar IT Group BV <info@capitar.com> \
+ This software is supplied under the terms of the MIT License, a \
+ copy of which should be located in the distribution where this \
+ file was obtained (LICENSE.txt). A copy of the license may also \
+ be found online at https://opensource.org/licenses/MIT.
+
+NAME
+----
+nng_pull - pull protocol
+
+SYNOPSIS
+--------
+
+[source,c]
+----------
+#include <nng/protocol/pipeline0/pull.h>
+
+int nng_pull0_open(nng_socket *s);
+----------
+
+DESCRIPTION
+-----------
+
+The _nng_pull_ protocol is one half of a pipeline pattern. The other half
+is the <<nng_push.adoc#,nng_push(7)>> protocol.
+
+In the pipeline pattern, pushers distribute messages to pullers.
+Each message sent
+by a pusher will be sent to one of its peer pullers,
+chosen in a round-robin fashion
+from the set of connected peers available for receiving.
+This property makes this pattern useful in load-balancing scenarios.
+
+Socket Operations
+~~~~~~~~~~~~~~~~~
+
+The `nng_pull0_open()` call creates a puller socket. This socket
+may be used to receive messages, but is unable to send them. Attempts
+to send messages will result in `NNG_ENOTSUP`.
+
+When receiving messages, the _nng_pull_ protocol accepts messages as
+they arrive from peers. If two peers both have a message ready, the
+order in which messages are handled is undefined.
+
+Protocol Versions
+~~~~~~~~~~~~~~~~~
+
+Only version 0 of this protocol is supported. (At the time of writing,
+no other versions of this protocol have been defined.)
+
+Protocol Options
+~~~~~~~~~~~~~~~~
+
+The _nng_pull_ protocol has no protocol-specific options.
+
+Protocol Headers
+~~~~~~~~~~~~~~~~
+
+The _nng_pull_ protocol has no protocol-specific headers.
+
+AUTHORS
+-------
+link:mailto:garrett@damore.org[Garrett D'Amore]
+
+SEE ALSO
+--------
+<<nng.adoc#,nng(7)>>
+<<nng_push.adoc#,nng_push(7)>>
+
+COPYRIGHT
+---------
+
+Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] +
+Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV]
+
+This document is supplied under the terms of the
+https://opensource.org/licenses/LICENSE.txt[MIT License].
diff --git a/docs/nng_push.adoc b/docs/nng_push.adoc
new file mode 100644
index 00000000..faf783b6
--- /dev/null
+++ b/docs/nng_push.adoc
@@ -0,0 +1,93 @@
+nng_push(7)
+===========
+:doctype: manpage
+:manmanual: nng
+:mansource: nng
+:icons: font
+:source-highlighter: pygments
+:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \
+ Copyright 2017 Capitar IT Group BV <info@capitar.com> \
+ This software is supplied under the terms of the MIT License, a \
+ copy of which should be located in the distribution where this \
+ file was obtained (LICENSE.txt). A copy of the license may also \
+ be found online at https://opensource.org/licenses/MIT.
+
+NAME
+----
+nng_push - push protocol
+
+SYNOPSIS
+--------
+
+[source,c]
+----------
+#include <nng/protocol/pipeline0/push.h>
+
+int nng_push0_open(nng_socket *s);
+----------
+
+DESCRIPTION
+-----------
+
+The _nng_push_ protocol is one half of a pipeline pattern. The
+other side is the <<nng_pull.adoc#,nng_pull(7)>> protocol.
+
+In the pipeline pattern, pushers distribute messages to pullers.
+Each message sent
+by a pusher will be sent to one of its peer pullers,
+chosen in a round-robin fashion
+from the set of connected peers available for receiving.
+This property makes this pattern useful in load-balancing scenarios.
+
+Socket Operations
+~~~~~~~~~~~~~~~~~
+
+The `nng_push0_open()` call creates a pusher socket. This socket
+may be used to send messages, but is unable to receive them. Attempts
+to receive messages will result in `NNG_ENOTSUP`.
+
+Send operations will observe flow control (back-pressure), so that
+only peers capable of accepting a message will be considered. If no
+peer is available to receive a message, then the send operation will
+wait until one is available, or the operation times out.
+
+NOTE: Although the pipeline protocol honors flow control, and attempts
+to avoid dropping messages, no guarantee of delivery is made. Furthermore,
+as there is no capability for message acknowledgement, applications that
+need reliable delivery are encouraged to consider the
+<<nng_req.adoc#,nng_req(7)>> protocol instead.
+
+Protocol Versions
+~~~~~~~~~~~~~~~~~
+
+Only version 0 of this protocol is supported. (At the time of writing,
+no other versions of this protocol have been defined.)
+
+Protocol Options
+~~~~~~~~~~~~~~~~
+
+The _nng_push_ protocol has no protocol-specific options.
+
+Protocol Headers
+~~~~~~~~~~~~~~~~
+
+The _nng_push_ protocol has no protocol-specific headers.
+
+AUTHORS
+-------
+link:mailto:garrett@damore.org[Garrett D'Amore]
+
+SEE ALSO
+--------
+<<nng.adoc#,nng(7)>>
+<<nng_pull.adoc#,nng_pull(7)>>
+<<nng_req.adoc#,nng_req(7)>>
+
+COPYRIGHT
+---------
+
+Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] +
+Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV]
+
+This document is supplied under the terms of the
+https://opensource.org/licenses/LICENSE.txt[MIT License].
diff --git a/docs/nng_sub.adoc b/docs/nng_sub.adoc
index 0b409904..5359acec 100644
--- a/docs/nng_sub.adoc
+++ b/docs/nng_sub.adoc
@@ -21,11 +21,9 @@ SYNOPSIS
[source,c]
----------
-#include <nng/protocol/pubsub/pubsub.h>
+#include <nng/protocol/pubsub0/sub.h>
-int nng_sub_open(nng_socket *s);
int nng_sub0_open(nng_socket *s);
-
----------
DESCRIPTION
@@ -51,7 +49,7 @@ accordingly.
Socket Operations
~~~~~~~~~~~~~~~~~
-The `nng_sub_open()` call creates a subscriber socket. This socket
+The `nng_sub0_open()` call creates a subscriber socket. This socket
may be used to receive messages, but is unable to send them. Attempts
to send messages will result in `NNG_ENOTSUP`.
diff --git a/docs/nng_tcp.adoc b/docs/nng_tcp.adoc
new file mode 100644
index 00000000..a1d0cdab
--- /dev/null
+++ b/docs/nng_tcp.adoc
@@ -0,0 +1,137 @@
+nng_tcp(7)
+==========
+:doctype: manpage
+:manmanual: nng
+:mansource: nng
+:icons: font
+:source-highlighter: pygments
+:copyright: Copyright 2017 Garrett D'Amore <garrett@damore.org> \
+ Copyright 2017 Capitar IT Group BV <info@capitar.com> \
+ This software is supplied under the terms of the MIT License, a \
+ copy of which should be located in the distribution where this \
+ file was obtained (LICENSE.txt). A copy of the license may also \
+ be found online at https://opensource.org/licenses/MIT.
+
+NAME
+----
+nng_tcp - TCP/IP transport for nng
+
+SYNOPSIS
+--------
+
+[source,c]
+----------
+#include <nng/transport/tcp/tcp.h>
+
+int nng_tcp_register(void);
+----------
+
+DESCRIPTION
+-----------
+
+The _nng_tcp_ transport provides communication support between
+_nng_ sockets across a TCP/IP network. Both IPv4 and IPv6
+are supported when the underlying platform also supports it.
+
+// We need to insert a reference to the nanomsg RFC.
+
+Registration
+~~~~~~~~~~~~
+
+The _tcp_ transport is generally built-in to the _nng_ core, so
+no extra steps to use it should be necessary.
+
+URI Format
+~~~~~~~~~~
+
+This transport uses URIs using the scheme `tcp://`, followed by
+an IP address or hostname, followed by a colon and finally a
+TCP port number. For example, to contact port 80 on the localhost
+either of the following URIs could be used: `tcp://127.0.0.1:80` or
+`tcp://localhost:80`.
+
+When specifying IPv6 addresses, the address must be enclosed in
+square brackets (`[]`) to avoid confusion with the final colon
+separating the port.
+
+For example, the same port 80 on the IPv6 loopback address ('::1') would
+be specified as `tcp://[::1]:80`.
+
+NOTE: When using symbolic names, the name is resolved when the
+name is first used. _nng_ won't become aware of changes in the
+name resolution until restart,
+usually.footnote:[This is a bug and will likely be fixed in the future.]
+
+The special value of 0 (`INADDR_ANY`) can be used for a listener
+to indicate that it should listen on all interfaces on the host.
+A short-hand for this form is to either omit the address, or specify
+the asterisk (`*`) character. For example, the following three
+URIs are all equivalent, and could be used to listen to port 9999
+on the host:
+
+ 1. `tcp://0.0.0.0:9999`
+ 2. `tcp://*:9999`
+ 3. `tcp://:9999`
+
+The entire URI must be less than `NNG_MAXADDRLEN` bytes long.
+
+Socket Address
+~~~~~~~~~~~~~~
+
+When using an `nng_sockaddr` structure, the actual structure is either
+of type `nng_sockaddr_in` (for IPv4) or `nng_sockaddr_in6` (for IPv6).
+These are `struct` types with the following definitions:
+
+[source,c]
+--------
+#define NNG_AF_INET 3 <1>
+#define NNG_AF_INET6 4
+#define NNG_MAXADDRLEN 128
+
+typedef struct {
+ // ... <2>
+ uint16_t sa_family; // must be NNG_AF_INET
+ uint16_t sa_port; // TCP port number
+ uint32_t sa_addr;
+ // ...
+} nng_sockaddr_in;
+
+typedef struct {
+ // ... <2>
+ uint16_t sa_family; // must be NNG_AF_INET6
+ uint16_t sa_port; // TCP port number
+ uint8_t sa_addr[16];
+ // ...
+} nng_sockaddr_in6;
+--------
+<1> The values of these macros may change, so applications
+should avoid depending upon their values and instead use them symbolically.
+<2> Other members may be present, but only those listed here
+are suitable for application use.
+
+The `sa_family` member will have the value `NNG_AF_INET` or `NNG_AF_INET6`.
+The `sa_port` and `sa_addr` are the TCP port number and address, both in
+network byte order (most significant byte is first).
+
+Transport Options
+~~~~~~~~~~~~~~~~~
+
+The _tcp_ transport has no special
+options.footnote:[Options for TCP keepalive, linger, and nodelay are planned.]
+
+AUTHORS
+-------
+link:mailto:garrett@damore.org[Garrett D'Amore]
+
+SEE ALSO
+--------
+<<nng.adoc#,nng(7)>>
+
+COPYRIGHT
+---------
+
+Copyright 2017 mailto:garrett@damore.org[Garrett D'Amore] +
+Copyright 2017 mailto:info@capitar.com[Capitar IT Group BV]
+
+This document is supplied under the terms of the
+https://opensource.org/licenses/LICENSE.txt[MIT License].
diff --git a/perf/perf.c b/perf/perf.c
index 9333d4f5..1d89fc61 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -22,6 +22,24 @@
// change without notice, and not part of the stable API or ABI.
#include "core/nng_impl.h"
+#if defined(NNG_ENABLE_PAIR1)
+#include "protocol/pair1/pair.h"
+
+#elif defined(NNG_ENABLE_PAIR0)
+#include "protocol/pair0/pair.h"
+
+#else
+
+static void die(const char *, ...);
+
+static int
+nng_pair_open(nng_socket *rv)
+{
+ die("No pair protocol enabled in this build!");
+ return (NNG_ENOTSUP);
+}
+#endif // NNG_ENABLE_PAIR
+
static void latency_client(const char *, int, int);
static void latency_server(const char *, int, int);
static void throughput_client(const char *, int, int);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a5a49052..4901789b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -74,30 +74,6 @@ set (NNG_SOURCES
core/timer.h
core/transport.c
core/transport.h
-
- protocol/bus/bus.c
-
- protocol/pair/pair_v0.c
- protocol/pair/pair_v1.c
-
- protocol/pipeline/pull.c
- protocol/pipeline/push.c
-
- protocol/pubsub/pub.c
- protocol/pubsub/sub.c
-
- protocol/reqrep/rep.c
- protocol/reqrep/req.c
-
- protocol/survey/respond.c
- protocol/survey/survey.c
-
- transport/inproc/inproc.c
-
- transport/ipc/ipc.c
-
- transport/tcp/tcp.c
-
)
if (NNG_PLATFORM_POSIX)
@@ -146,14 +122,19 @@ endif()
install(FILES transport/inproc/inproc.h
DESTINATION include/nng/transport/inproc)
-if (NNG_ENABLE_ZEROTIER)
- set (NNG_SOURCES ${NNG_SOURCES}
- transport/zerotier/zerotier.c
- transport/zerotier/zerotier.h
- )
- install(FILES transport/zerotier/zerotier.h
- DESTINATION include/nng/transport/zerotier)
-endif()
+
+add_subdirectory(protocol/bus0)
+add_subdirectory(protocol/pair0)
+add_subdirectory(protocol/pair1)
+add_subdirectory(protocol/pipeline0)
+add_subdirectory(protocol/pubsub0)
+add_subdirectory(protocol/reqrep0)
+add_subdirectory(protocol/survey0)
+
+add_subdirectory(transport/inproc)
+add_subdirectory(transport/ipc)
+add_subdirectory(transport/tcp)
+add_subdirectory(transport/zerotier)
include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src
${NNG_REQUIRED_INCLUDES})
@@ -216,3 +197,7 @@ install (TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_static
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
+
+# Promote settings to parent
+set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} PARENT_SCOPE)
+set(NNG_REQUIRED_LFLAGS ${NNG_REQUIRED_LFLAGS} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3f1ffcb5..10bffaa4 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -297,7 +297,6 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
- nni_aio *aio;
if (mq->mq_cb_fn != NULL) {
int flags = 0;
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 5db259f0..39bde059 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -147,21 +147,26 @@ extern int nni_proto_open(nng_socket *, const nni_proto *);
// Protocol numbers are never more than 16 bits. Also, there will never be
// a valid protocol numbered 0 (NNG_PROTO_NONE).
#define NNI_PROTO(major, minor) (((major) *16) + (minor))
-enum nng_proto_enum {
- NNI_PROTO_NONE = NNI_PROTO(0, 0),
- NNI_PROTO_PAIR_V0 = NNI_PROTO(1, 0),
- NNI_PROTO_PAIR_V1 = NNI_PROTO(1, 1),
- NNI_PROTO_PUB_V0 = NNI_PROTO(2, 0),
- NNI_PROTO_SUB_V0 = NNI_PROTO(2, 1),
- NNI_PROTO_REQ_V0 = NNI_PROTO(3, 0),
- NNI_PROTO_REP_V0 = NNI_PROTO(3, 1),
- NNI_PROTO_PUSH_V0 = NNI_PROTO(5, 0),
- NNI_PROTO_PULL_V0 = NNI_PROTO(5, 1),
- NNI_PROTO_SURVEYOR_V0 = NNI_PROTO(6, 2),
- NNI_PROTO_RESPONDENT_V0 = NNI_PROTO(6, 3),
- NNI_PROTO_BUS_V0 = NNI_PROTO(7, 0),
- NNI_PROTO_STAR_V0 = NNI_PROTO(100, 0),
-};
+
+// Protocol major numbers. This is here for documentation only, and
+// to serve as a "registry" for managing new protocol numbers. Consider
+// updating this table when adding new protocols.
+//
+// Protocol Maj Min Name Notes
+// -------------------------------------------
+// NONE 0 0 reserved
+// PAIRv0 1 0 pair
+// PAIRv1 1 1 pair1 nng only, experimental
+// PUBv0 2 0 pub
+// SUBv0 2 1 sub
+// REQv0 3 0 req
+// REPv0 3 1 rep
+// PUSHv0 5 0 push
+// PULLv0 5 1 pull
+// SURVEYORv0 6 2 surveyor minors 0 & 1 retired
+// RESPONDENTv0 6 3 respondent
+// BUSv0 7 0 bus
+// STARv0 100 0 star mangos only, experimental
extern int nni_proto_sys_init(void);
extern void nni_proto_sys_fini(void);
diff --git a/src/core/socket.c b/src/core/socket.c
index c9b70ccb..4a84b639 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -102,7 +102,6 @@ static int
nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
{
int rv;
- uint32_t flags;
nni_notifyfd *fd;
nni_msgq * mq;
nni_msgq_cb cb;
diff --git a/src/core/transport.c b/src/core/transport.c
index 6dcf4538..af9c93fb 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -10,6 +10,9 @@
#include "core/nng_impl.h"
#include "transport/inproc/inproc.h"
+#include "transport/ipc/ipc.h"
+#include "transport/tcp/tcp.h"
+#include "transport/zerotier/zerotier.h"
#include <stdio.h>
#include <string.h>
@@ -51,6 +54,11 @@ nni_tran_register(const nni_tran *tran)
nni_mtx_lock(&nni_tran_lk);
// Check to see if the transport is already registered...
NNI_LIST_FOREACH (&nni_tran_list, t) {
+ if (tran->tran_init == t->t_tran.tran_init) {
+ nni_mtx_unlock(&nni_tran_lk);
+ // Same transport, duplicate registration.
+ return (0);
+ }
if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) {
nni_mtx_unlock(&nni_tran_lk);
return (NNG_ESTATE);
@@ -129,20 +137,40 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz)
// nni_tran_sys_init initializes the entire transport subsystem, including
// each individual transport.
+
+typedef int (*nni_tran_ctor)(void);
+
+static nni_tran_ctor nni_tran_ctors[] = {
+#ifdef NNG_HAVE_INPROC
+ nng_inproc_register,
+#endif
+#ifdef NNG_HAVE_IPC
+ nng_ipc_register,
+#endif
+#ifdef NNG_HAVE_TCP
+ nng_tcp_register,
+#endif
+#ifdef NNI_HAVE_ZEROTIER
+ nng_zt_register,
+#endif
+ NULL,
+};
+
int
nni_tran_sys_init(void)
{
- int rv;
+ int i;
nni_tran_inited = 1;
NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node);
nni_mtx_init(&nni_tran_lk);
- if (((rv = nng_inproc_register()) != 0) ||
- ((rv = nni_tran_register(&nni_ipc_tran)) != 0) ||
- ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) {
- nni_tran_sys_fini();
- return (rv);
+ for (i = 0; nni_tran_ctors[i] != NULL; i++) {
+ int rv;
+ if ((rv = (nni_tran_ctors[i])()) != 0) {
+ nni_tran_sys_fini();
+ return (rv);
+ }
}
return (0);
}
diff --git a/src/nng.h b/src/nng.h
index c39ba9ee..6d375509 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -337,33 +337,6 @@ enum nng_flag_enum {
NNG_FLAG_NONBLOCK = 2, // Non-blocking operations.
};
-// Builtin protocol socket constructors.
-NNG_DECL int nng_bus0_open(nng_socket *);
-NNG_DECL int nng_pair0_open(nng_socket *);
-NNG_DECL int nng_pair1_open(nng_socket *);
-NNG_DECL int nng_pub0_open(nng_socket *);
-NNG_DECL int nng_sub0_open(nng_socket *);
-NNG_DECL int nng_push0_open(nng_socket *);
-NNG_DECL int nng_pull0_open(nng_socket *);
-NNG_DECL int nng_req0_open(nng_socket *);
-NNG_DECL int nng_rep0_open(nng_socket *);
-NNG_DECL int nng_surveyor0_open(nng_socket *);
-NNG_DECL int nng_respondent0_open(nng_socket *);
-
-// Default versions. These provide compile time defaults; note that
-// the actual protocols are baked into the binary; this should avoid
-// suprising. Choosing a new protocol should be done explicitly.
-#define nng_bus_open nng_bus0_open
-#define nng_pair_open nng_pair1_open
-#define nng_pub_open nng_pub0_open
-#define nng_sub_open nng_sub0_open
-#define nng_push_open nng_push0_open
-#define nng_pull_open nng_pull0_open
-#define nng_req_open nng_req0_open
-#define nng_rep_open nng_rep0_open
-#define nng_surveyor_open nng_surveyor0_open
-#define nng_respondent_open nng_respondent0_open
-
// Options.
#define NNG_OPT_SOCKNAME "socket-name"
#define NNG_OPT_DOMAIN "compat:domain" // legacy compat only
@@ -385,15 +358,6 @@ NNG_DECL int nng_respondent0_open(nng_socket *);
#define NNG_OPT_RECONNMINT "reconnect-time-min"
#define NNG_OPT_RECONNMAXT "reconnect-time-max"
-#define NNG_OPT_PAIR1_POLY "pair1:polyamorous"
-
-#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
-#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
-
-#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
-
-#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time"
-
// XXX: TBD: priorities, socket names, ipv4only
// Statistics. These are for informational purposes only, and subject
diff --git a/src/nng_compat.c b/src/nng_compat.c
index f8a7ceb0..482834a1 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -10,6 +10,16 @@
#include "nng_compat.h"
#include "nng.h"
+#include "protocol/bus0/bus.h"
+#include "protocol/pair0/pair.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "protocol/pubsub0/pub.h"
+#include "protocol/pubsub0/sub.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
#include <stdio.h>
#include <string.h>
@@ -92,17 +102,37 @@ static const struct {
uint16_t p_id;
int (*p_open)(nng_socket *);
} nn_protocols[] = {
- // clang-format off
+// clang-format off
+#ifdef NNG_HAVE_BUS0
{ NN_BUS, nng_bus0_open },
+#endif
+#ifdef NNG_HAVE_PAIR0
{ NN_PAIR, nng_pair0_open },
+#endif
+#ifdef NNG_HAVE_PUSH0
{ NN_PUSH, nng_push0_open },
+#endif
+#ifdef NNG_HAVE_PULL0
{ NN_PULL, nng_pull0_open },
+#endif
+#ifdef NNG_HAVE_PUB0
{ NN_PUB, nng_pub0_open },
+#endif
+#ifdef NNG_HAVE_SUB0
{ NN_SUB, nng_sub0_open },
+#endif
+#ifdef NNG_HAVE_REQ0
{ NN_REQ, nng_req0_open },
+#endif
+#ifdef NNG_HAVE_REP0
{ NN_REP, nng_rep0_open },
+#endif
+#ifdef NNG_HAVE_SURVEYOR0
{ NN_SURVEYOR, nng_surveyor0_open },
+#endif
+#ifdef NNG_HAVE_RESPONDENT0
{ NN_RESPONDENT, nng_respondent0_open },
+#endif
{ 0, NULL },
// clang-format on
};
@@ -115,7 +145,7 @@ nn_socket(int domain, int protocol)
int i;
if ((domain != AF_SP) && (domain != AF_SP_RAW)) {
- nn_seterror(EAFNOSUPPORT);
+ errno = EAFNOSUPPORT;
return (-1);
}
@@ -125,7 +155,7 @@ nn_socket(int domain, int protocol)
}
}
if (nn_protocols[i].p_open == NULL) {
- nn_seterror(ENOTSUP);
+ errno = ENOTSUP;
return (-1);
}
diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt
new file mode 100644
index 00000000..5071054a
--- /dev/null
+++ b/src/protocol/bus0/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Bus protocol
+
+if (NNG_PROTO_BUS0)
+ set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h)
+ install(FILES bus.h DESTINATION include/nng/protocol/bus0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c
index 6ec0066b..3e15000b 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -12,31 +12,36 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/bus0/bus.h"
// Bus protocol. The BUS protocol, each peer sends a message to its peers.
// However, bus protocols do not "forward" (absent a device). So in order
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct bus_pipe bus_pipe;
-typedef struct bus_sock bus_sock;
+#ifndef NNI_PROTO_BUS_V0
+#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0)
+#endif
-static void bus_sock_getq(bus_sock *);
-static void bus_sock_send(void *, nni_aio *);
-static void bus_sock_recv(void *, nni_aio *);
+typedef struct bus0_pipe bus0_pipe;
+typedef struct bus0_sock bus0_sock;
-static void bus_pipe_getq(bus_pipe *);
-static void bus_pipe_send(bus_pipe *);
-static void bus_pipe_recv(bus_pipe *);
+static void bus0_sock_getq(bus0_sock *);
+static void bus0_sock_send(void *, nni_aio *);
+static void bus0_sock_recv(void *, nni_aio *);
-static void bus_sock_getq_cb(void *);
-static void bus_pipe_getq_cb(void *);
-static void bus_pipe_send_cb(void *);
-static void bus_pipe_recv_cb(void *);
-static void bus_pipe_putq_cb(void *);
+static void bus0_pipe_getq(bus0_pipe *);
+static void bus0_pipe_send(bus0_pipe *);
+static void bus0_pipe_recv(bus0_pipe *);
-// A bus_sock is our per-socket protocol private structure.
-struct bus_sock {
+static void bus0_sock_getq_cb(void *);
+static void bus0_pipe_getq_cb(void *);
+static void bus0_pipe_send_cb(void *);
+static void bus0_pipe_recv_cb(void *);
+static void bus0_pipe_putq_cb(void *);
+
+// bus0_sock is our per-socket protocol private structure.
+struct bus0_sock {
int raw;
nni_aio * aio_getq;
nni_list pipes;
@@ -45,10 +50,10 @@ struct bus_sock {
nni_msgq *urq;
};
-// A bus_pipe is our per-pipe protocol private structure.
-struct bus_pipe {
+// bus0_pipe is our per-pipe protocol private structure.
+struct bus0_pipe {
nni_pipe * npipe;
- bus_sock * psock;
+ bus0_sock * psock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
@@ -59,9 +64,9 @@ struct bus_pipe {
};
static void
-bus_sock_fini(void *arg)
+bus0_sock_fini(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -70,18 +75,18 @@ bus_sock_fini(void *arg)
}
static int
-bus_sock_init(void **sp, nni_sock *nsock)
+bus0_sock_init(void **sp, nni_sock *nsock)
{
- bus_sock *s;
- int rv;
+ bus0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&s->pipes, bus_pipe, node);
+ NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) {
- bus_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
+ bus0_sock_fini(s);
return (rv);
}
s->raw = 0;
@@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock)
}
static void
-bus_sock_open(void *arg)
+bus0_sock_open(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
- bus_sock_getq(s);
+ bus0_sock_getq(s);
}
static void
-bus_sock_close(void *arg)
+bus0_sock_close(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-bus_pipe_fini(void *arg)
+bus0_pipe_fini(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -123,10 +128,10 @@ bus_pipe_fini(void *arg)
}
static int
-bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
+bus0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- bus_pipe *p;
- int rv;
+ bus0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) {
- bus_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
+ bus0_pipe_fini(p);
return (rv);
}
@@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-bus_pipe_start(void *arg)
+bus0_pipe_start(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
- bus_pipe_recv(p);
- bus_pipe_getq(p);
+ bus0_pipe_recv(p);
+ bus0_pipe_getq(p);
return (0);
}
static void
-bus_pipe_stop(void *arg)
+bus0_pipe_stop(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
nni_msgq_close(p->sendq);
@@ -185,9 +190,9 @@ bus_pipe_stop(void *arg)
}
static void
-bus_pipe_getq_cb(void *arg)
+bus0_pipe_getq_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
// closed?
@@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg)
}
static void
-bus_pipe_send_cb(void *arg)
+bus0_pipe_send_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
// closed?
@@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg)
return;
}
- bus_pipe_getq(p);
+ bus0_pipe_getq(p);
}
static void
-bus_pipe_recv_cb(void *arg)
+bus0_pipe_recv_cb(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
- nni_msg * msg;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
@@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg)
}
static void
-bus_pipe_putq_cb(void *arg)
+bus0_pipe_putq_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg)
}
// Wait for another recv.
- bus_pipe_recv(p);
+ bus0_pipe_recv(p);
}
static void
-bus_sock_getq_cb(void *arg)
+bus0_sock_getq_cb(void *arg)
{
- bus_sock *s = arg;
- bus_pipe *p;
- bus_pipe *lastp;
- nni_msg * msg;
- nni_msg * dup;
- uint32_t sender;
+ bus0_sock *s = arg;
+ bus0_pipe *p;
+ bus0_pipe *lastp;
+ nni_msg * msg;
+ nni_msg * dup;
+ uint32_t sender;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg)
nni_msg_free(msg);
}
- bus_sock_getq(s);
+ bus0_sock_getq(s);
}
static void
-bus_sock_getq(bus_sock *s)
+bus0_sock_getq(bus0_sock *s)
{
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-bus_pipe_getq(bus_pipe *p)
+bus0_pipe_getq(bus0_pipe *p)
{
nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-bus_pipe_recv(bus_pipe *p)
+bus0_pipe_recv(bus0_pipe *p)
{
nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-bus_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-bus_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-bus_sock_send(void *arg, nni_aio *aio)
+bus0_sock_send(void *arg, nni_aio *aio)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
static void
-bus_sock_recv(void *arg, nni_aio *aio)
+bus0_sock_recv(void *arg, nni_aio *aio)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops bus_pipe_ops = {
- .pipe_init = bus_pipe_init,
- .pipe_fini = bus_pipe_fini,
- .pipe_start = bus_pipe_start,
- .pipe_stop = bus_pipe_stop,
+static nni_proto_pipe_ops bus0_pipe_ops = {
+ .pipe_init = bus0_pipe_init,
+ .pipe_fini = bus0_pipe_fini,
+ .pipe_start = bus0_pipe_start,
+ .pipe_stop = bus0_pipe_stop,
};
-static nni_proto_sock_option bus_sock_options[] = {
+static nni_proto_sock_option bus0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = bus_sock_getopt_raw,
- .pso_setopt = bus_sock_setopt_raw,
+ .pso_getopt = bus0_sock_getopt_raw,
+ .pso_setopt = bus0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops bus_sock_ops = {
- .sock_init = bus_sock_init,
- .sock_fini = bus_sock_fini,
- .sock_open = bus_sock_open,
- .sock_close = bus_sock_close,
- .sock_send = bus_sock_send,
- .sock_recv = bus_sock_recv,
- .sock_options = bus_sock_options,
+static nni_proto_sock_ops bus0_sock_ops = {
+ .sock_init = bus0_sock_init,
+ .sock_fini = bus0_sock_fini,
+ .sock_open = bus0_sock_open,
+ .sock_close = bus0_sock_close,
+ .sock_send = bus0_sock_send,
+ .sock_recv = bus0_sock_recv,
+ .sock_options = bus0_sock_options,
};
-static nni_proto bus_proto = {
+static nni_proto bus0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_BUS_V0, "bus" },
.proto_peer = { NNI_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &bus_sock_ops,
- .proto_pipe_ops = &bus_pipe_ops,
+ .proto_sock_ops = &bus0_sock_ops,
+ .proto_pipe_ops = &bus0_pipe_ops,
};
int
nng_bus0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &bus_proto));
+ return (nni_proto_open(sidp, &bus0_proto));
}
diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h
new file mode 100644
index 00000000..0ef3d391
--- /dev/null
+++ b/src/protocol/bus0/bus.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_BUS0_BUS_H
+#define NNG_PROTOCOL_BUS0_BUS_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_bus0_open(nng_socket *);
+
+#ifndef nng_bus_open
+#define nng_bus_open nng_bus0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_BUS0_BUS_H
diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt
new file mode 100644
index 00000000..68e7ad34
--- /dev/null
+++ b/src/protocol/pair0/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# PAIRv0 protocol
+
+if (NNG_PROTO_PAIR0)
+ set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h)
+ install(FILES pair.h DESTINATION include/nng/protocol/pair0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c
index 93cd1497..bac405b8 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair0/pair.c
@@ -17,6 +17,10 @@
// While a peer is connected to the server, all other peer connection
// attempts are discarded.
+#ifndef NNI_PROTO_PAIR_V0
+#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0)
+#endif
+
typedef struct pair0_pipe pair0_pipe;
typedef struct pair0_sock pair0_sock;
diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h
new file mode 100644
index 00000000..6828c921
--- /dev/null
+++ b/src/protocol/pair0/pair.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PAIR0_PAIR_H
+#define NNG_PROTOCOL_PAIR0_PAIR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pair0_open(nng_socket *);
+
+#ifndef nng_pair_open
+#define nng_pair_open nng_pair0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PAIR0_PAIR_H
diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt
new file mode 100644
index 00000000..f35d6959
--- /dev/null
+++ b/src/protocol/pair1/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# PAIRv1 protocol
+
+if (NNG_PROTO_PAIR1)
+ set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h)
+ install(FILES pair.h DESTINATION include/nng/protocol/pair1)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c
index e14d06d5..3f6f63fc 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair1/pair.c
@@ -13,10 +13,16 @@
#include "core/nng_impl.h"
+#include "protocol/pair1/pair.h"
+
// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern,
// usually, but it can support a polyamorous mode where a single server can
// communicate with multiple partners.
+#ifndef NNI_PROTO_PAIR_V1
+#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1)
+#endif
+
typedef struct pair1_pipe pair1_pipe;
typedef struct pair1_sock pair1_sock;
diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h
new file mode 100644
index 00000000..bc519d9f
--- /dev/null
+++ b/src/protocol/pair1/pair.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PAIR1_PAIR_H
+#define NNG_PROTOCOL_PAIR1_PAIR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pair1_open(nng_socket *);
+
+#ifndef nng_pair_open
+#define nng_pair_open nng_pair1_open
+#endif
+
+#define NNG_OPT_PAIR1_POLY "pair1:polyamorous"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PAIR1_PAIR_H
diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt
new file mode 100644
index 00000000..6153c5a7
--- /dev/null
+++ b/src/protocol/pipeline0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Pub/Sub protocol
+
+if (NNG_PROTO_PUSH0)
+ set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h)
+ install(FILES push.h DESTINATION include/nng/protocol/pipeline0)
+endif()
+
+if (NNG_PROTO_PULL0)
+ set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h)
+ install(FILES pull.h DESTINATION include/nng/protocol/pipeline0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c
index 9685f0a1..8c16cb17 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -15,31 +15,39 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct pull_pipe pull_pipe;
-typedef struct pull_sock pull_sock;
+#ifndef NNI_PROTO_PULL_V0
+#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1)
+#endif
-static void pull_putq_cb(void *);
-static void pull_recv_cb(void *);
-static void pull_putq(pull_pipe *, nni_msg *);
+#ifndef NNI_PROTO_PUSH_V0
+#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0)
+#endif
-// A pull_sock is our per-socket protocol private structure.
-struct pull_sock {
+typedef struct pull0_pipe pull0_pipe;
+typedef struct pull0_sock pull0_sock;
+
+static void pull0_putq_cb(void *);
+static void pull0_recv_cb(void *);
+static void pull0_putq(pull0_pipe *, nni_msg *);
+
+// pull0_sock is our per-socket protocol private structure.
+struct pull0_sock {
nni_msgq *urq;
int raw;
};
-// A pull_pipe is our per-pipe protocol private structure.
-struct pull_pipe {
- nni_pipe * pipe;
- pull_sock *pull;
- nni_aio * putq_aio;
- nni_aio * recv_aio;
+// pull0_pipe is our per-pipe protocol private structure.
+struct pull0_pipe {
+ nni_pipe * pipe;
+ pull0_sock *pull;
+ nni_aio * putq_aio;
+ nni_aio * recv_aio;
};
static int
-pull_sock_init(void **sp, nni_sock *sock)
+pull0_sock_init(void **sp, nni_sock *sock)
{
- pull_sock *s;
+ pull0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock)
}
static void
-pull_sock_fini(void *arg)
+pull0_sock_fini(void *arg)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
NNI_FREE_STRUCT(s);
}
static void
-pull_pipe_fini(void *arg)
+pull0_pipe_fini(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
nni_aio_fini(p->putq_aio);
nni_aio_fini(p->recv_aio);
@@ -70,17 +78,17 @@ pull_pipe_fini(void *arg)
}
static int
-pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pull0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- pull_pipe *p;
- int rv;
+ pull0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) {
- pull_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
+ pull0_pipe_fini(p);
return (rv);
}
@@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-pull_pipe_start(void *arg)
+pull0_pipe_start(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
// Start the pending pull...
nni_pipe_recv(p->pipe, p->recv_aio);
@@ -102,20 +110,20 @@ pull_pipe_start(void *arg)
}
static void
-pull_pipe_stop(void *arg)
+pull0_pipe_stop(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
nni_aio_stop(p->putq_aio);
nni_aio_stop(p->recv_aio);
}
static void
-pull_recv_cb(void *arg)
+pull0_recv_cb(void *arg)
{
- pull_pipe *p = arg;
- nni_aio * aio = p->recv_aio;
- nni_msg * msg;
+ pull0_pipe *p = arg;
+ nni_aio * aio = p->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
@@ -127,14 +135,14 @@ pull_recv_cb(void *arg)
msg = nni_aio_get_msg(aio);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
nni_aio_set_msg(aio, NULL);
- pull_putq(p, msg);
+ pull0_putq(p, msg);
}
static void
-pull_putq_cb(void *arg)
+pull0_putq_cb(void *arg)
{
- pull_pipe *p = arg;
- nni_aio * aio = p->putq_aio;
+ pull0_pipe *p = arg;
+ nni_aio * aio = p->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
@@ -148,11 +156,11 @@ pull_putq_cb(void *arg)
nni_pipe_recv(p->pipe, p->recv_aio);
}
-// nni_pull_putq schedules a put operation to the user socket (sendup).
+// pull0_putq schedules a put operation to the user socket (sendup).
static void
-pull_putq(pull_pipe *p, nni_msg *msg)
+pull0_putq(pull0_pipe *p, nni_msg *msg)
{
- pull_sock *s = p->pull;
+ pull0_sock *s = p->pull;
nni_aio_set_msg(p->putq_aio, msg);
@@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg)
}
static void
-pull_sock_open(void *arg)
+pull0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-pull_sock_close(void *arg)
+pull0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static int
-pull_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-pull_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-pull_sock_send(void *arg, nni_aio *aio)
+pull0_sock_send(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-pull_sock_recv(void *arg, nni_aio *aio)
+pull0_sock_recv(void *arg, nni_aio *aio)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops pull_pipe_ops = {
- .pipe_init = pull_pipe_init,
- .pipe_fini = pull_pipe_fini,
- .pipe_start = pull_pipe_start,
- .pipe_stop = pull_pipe_stop,
+static nni_proto_pipe_ops pull0_pipe_ops = {
+ .pipe_init = pull0_pipe_init,
+ .pipe_fini = pull0_pipe_fini,
+ .pipe_start = pull0_pipe_start,
+ .pipe_stop = pull0_pipe_stop,
};
-static nni_proto_sock_option pull_sock_options[] = {
+static nni_proto_sock_option pull0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = pull_sock_getopt_raw,
- .pso_setopt = pull_sock_setopt_raw,
+ .pso_getopt = pull0_sock_getopt_raw,
+ .pso_setopt = pull0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops pull_sock_ops = {
- .sock_init = pull_sock_init,
- .sock_fini = pull_sock_fini,
- .sock_open = pull_sock_open,
- .sock_close = pull_sock_close,
- .sock_send = pull_sock_send,
- .sock_recv = pull_sock_recv,
- .sock_options = pull_sock_options,
+static nni_proto_sock_ops pull0_sock_ops = {
+ .sock_init = pull0_sock_init,
+ .sock_fini = pull0_sock_fini,
+ .sock_open = pull0_sock_open,
+ .sock_close = pull0_sock_close,
+ .sock_send = pull0_sock_send,
+ .sock_recv = pull0_sock_recv,
+ .sock_options = pull0_sock_options,
};
-static nni_proto pull_proto = {
+static nni_proto pull0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PULL_V0, "pull" },
.proto_peer = { NNI_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_pipe_ops = &pull_pipe_ops,
- .proto_sock_ops = &pull_sock_ops,
+ .proto_pipe_ops = &pull0_pipe_ops,
+ .proto_sock_ops = &pull0_sock_ops,
};
int
nng_pull0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &pull_proto));
+ return (nni_proto_open(sidp, &pull0_proto));
}
diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h
new file mode 100644
index 00000000..75bded03
--- /dev/null
+++ b/src/protocol/pipeline0/pull.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H
+#define NNG_PROTOCOL_PIPELINE0_PULL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pull0_open(nng_socket *);
+
+#ifndef nng_pull_open
+#define nng_pull_open nng_pull0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PIPELINE0_PULL_H
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c
index 9ff74558..3dd83fe0 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -17,23 +17,31 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct push_pipe push_pipe;
-typedef struct push_sock push_sock;
+#ifndef NNI_PROTO_PULL_V0
+#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1)
+#endif
-static void push_send_cb(void *);
-static void push_recv_cb(void *);
-static void push_getq_cb(void *);
+#ifndef NNI_PROTO_PUSH_V0
+#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0)
+#endif
-// An nni_push_sock is our per-socket protocol private structure.
-struct push_sock {
+typedef struct push0_pipe push0_pipe;
+typedef struct push0_sock push0_sock;
+
+static void push0_send_cb(void *);
+static void push0_recv_cb(void *);
+static void push0_getq_cb(void *);
+
+// push0_sock is our per-socket protocol private structure.
+struct push0_sock {
nni_msgq *uwq;
int raw;
};
-// An nni_push_pipe is our per-pipe protocol private structure.
-struct push_pipe {
+// push0_pipe is our per-pipe protocol private structure.
+struct push0_pipe {
nni_pipe * pipe;
- push_sock * push;
+ push0_sock * push;
nni_list_node node;
nni_aio *aio_recv;
@@ -42,9 +50,9 @@ struct push_pipe {
};
static int
-push_sock_init(void **sp, nni_sock *sock)
+push0_sock_init(void **sp, nni_sock *sock)
{
- push_sock *s;
+ push0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock)
}
static void
-push_sock_fini(void *arg)
+push0_sock_fini(void *arg)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
NNI_FREE_STRUCT(s);
}
static void
-push_sock_open(void *arg)
+push0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-push_sock_close(void *arg)
+push0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-push_pipe_fini(void *arg)
+push0_pipe_fini(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_send);
@@ -87,18 +95,18 @@ push_pipe_fini(void *arg)
}
static int
-push_pipe_init(void **pp, nni_pipe *pipe, void *s)
+push0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- push_pipe *p;
- int rv;
+ push0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) {
- push_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) {
+ push0_pipe_fini(p);
return (rv);
}
NNI_LIST_NODE_INIT(&p->node);
@@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-push_pipe_start(void *arg)
+push0_pipe_start(void *arg)
{
- push_pipe *p = arg;
- push_sock *s = p->push;
+ push0_pipe *p = arg;
+ push0_sock *s = p->push;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) {
return (NNG_EPROTO);
@@ -129,9 +137,9 @@ push_pipe_start(void *arg)
}
static void
-push_pipe_stop(void *arg)
+push0_pipe_stop(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_send);
@@ -139,9 +147,9 @@ push_pipe_stop(void *arg)
}
static void
-push_recv_cb(void *arg)
+push0_recv_cb(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
@@ -155,10 +163,10 @@ push_recv_cb(void *arg)
}
static void
-push_send_cb(void *arg)
+push0_send_cb(void *arg)
{
- push_pipe *p = arg;
- push_sock *s = p->push;
+ push0_pipe *p = arg;
+ push0_sock *s = p->push;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -171,10 +179,10 @@ push_send_cb(void *arg)
}
static void
-push_getq_cb(void *arg)
+push0_getq_cb(void *arg)
{
- push_pipe *p = arg;
- nni_aio * aio = p->aio_getq;
+ push0_pipe *p = arg;
+ nni_aio * aio = p->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
@@ -189,71 +197,71 @@ push_getq_cb(void *arg)
}
static int
-push_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+push0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-push_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+push0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-push_sock_send(void *arg, nni_aio *aio)
+push0_sock_send(void *arg, nni_aio *aio)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
static void
-push_sock_recv(void *arg, nni_aio *aio)
+push0_sock_recv(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
-static nni_proto_pipe_ops push_pipe_ops = {
- .pipe_init = push_pipe_init,
- .pipe_fini = push_pipe_fini,
- .pipe_start = push_pipe_start,
- .pipe_stop = push_pipe_stop,
+static nni_proto_pipe_ops push0_pipe_ops = {
+ .pipe_init = push0_pipe_init,
+ .pipe_fini = push0_pipe_fini,
+ .pipe_start = push0_pipe_start,
+ .pipe_stop = push0_pipe_stop,
};
-static nni_proto_sock_option push_sock_options[] = {
+static nni_proto_sock_option push0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = push_sock_getopt_raw,
- .pso_setopt = push_sock_setopt_raw,
+ .pso_getopt = push0_sock_getopt_raw,
+ .pso_setopt = push0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops push_sock_ops = {
- .sock_init = push_sock_init,
- .sock_fini = push_sock_fini,
- .sock_open = push_sock_open,
- .sock_close = push_sock_close,
- .sock_options = push_sock_options,
- .sock_send = push_sock_send,
- .sock_recv = push_sock_recv,
+static nni_proto_sock_ops push0_sock_ops = {
+ .sock_init = push0_sock_init,
+ .sock_fini = push0_sock_fini,
+ .sock_open = push0_sock_open,
+ .sock_close = push0_sock_close,
+ .sock_options = push0_sock_options,
+ .sock_send = push0_sock_send,
+ .sock_recv = push0_sock_recv,
};
-static nni_proto push_proto = {
+static nni_proto push0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUSH_V0, "push" },
.proto_peer = { NNI_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_pipe_ops = &push_pipe_ops,
- .proto_sock_ops = &push_sock_ops,
+ .proto_pipe_ops = &push0_pipe_ops,
+ .proto_sock_ops = &push0_sock_ops,
};
int
nng_push0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &push_proto));
+ return (nni_proto_open(sidp, &push0_proto));
}
diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h
new file mode 100644
index 00000000..c7303b92
--- /dev/null
+++ b/src/protocol/pipeline0/push.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H
+#define NNG_PROTOCOL_PIPELINE0_PUSH_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_push0_open(nng_socket *);
+
+#ifndef nng_push_open
+#define nng_push_open nng_push0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H
diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt
new file mode 100644
index 00000000..4edcbfae
--- /dev/null
+++ b/src/protocol/pubsub0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Pub/Sub protocol
+
+if (NNG_PROTO_PUB0)
+ set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h)
+ install(FILES pub.h DESTINATION include/nng/protocol/pubsub0)
+endif()
+
+if (NNG_PROTO_SUB0)
+ set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h)
+ install(FILES sub.h DESTINATION include/nng/protocol/pubsub0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c
index 9e5cd67f..f4a33b77 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -12,24 +12,33 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/pubsub0/pub.h"
// Publish protocol. The PUB protocol simply sends messages out, as
// a broadcast. It has nothing more sophisticated because it does not
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct pub_pipe pub_pipe;
-typedef struct pub_sock pub_sock;
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
-static void pub_pipe_recv_cb(void *);
-static void pub_pipe_send_cb(void *);
-static void pub_pipe_getq_cb(void *);
-static void pub_sock_getq_cb(void *);
-static void pub_sock_fini(void *);
-static void pub_pipe_fini(void *);
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
-// A pub_sock is our per-socket protocol private structure.
-struct pub_sock {
+typedef struct pub0_pipe pub0_pipe;
+typedef struct pub0_sock pub0_sock;
+
+static void pub0_pipe_recv_cb(void *);
+static void pub0_pipe_send_cb(void *);
+static void pub0_pipe_getq_cb(void *);
+static void pub0_sock_getq_cb(void *);
+static void pub0_sock_fini(void *);
+static void pub0_pipe_fini(void *);
+
+// pub0_sock is our per-socket protocol private structure.
+struct pub0_sock {
nni_msgq *uwq;
int raw;
nni_aio * aio_getq;
@@ -37,10 +46,10 @@ struct pub_sock {
nni_mtx mtx;
};
-// A pub_pipe is our per-pipe protocol private structure.
-struct pub_pipe {
+// pub0_pipe is our per-pipe protocol private structure.
+struct pub0_pipe {
nni_pipe * pipe;
- pub_sock * pub;
+ pub0_sock * pub;
nni_msgq * sendq;
nni_aio * aio_getq;
nni_aio * aio_send;
@@ -49,9 +58,9 @@ struct pub_pipe {
};
static void
-pub_sock_fini(void *arg)
+pub0_sock_fini(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -60,22 +69,22 @@ pub_sock_fini(void *arg)
}
static int
-pub_sock_init(void **sp, nni_sock *sock)
+pub0_sock_init(void **sp, nni_sock *sock)
{
- pub_sock *s;
- int rv;
+ pub0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) {
- pub_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) {
+ pub0_sock_fini(s);
return (rv);
}
s->raw = 0;
- NNI_LIST_INIT(&s->pipes, pub_pipe, node);
+ NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock)
}
static void
-pub_sock_open(void *arg)
+pub0_sock_open(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-pub_sock_close(void *arg)
+pub0_sock_close(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-pub_pipe_fini(void *arg)
+pub0_pipe_fini(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
@@ -111,10 +120,10 @@ pub_pipe_fini(void *arg)
}
static int
-pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- pub_pipe *p;
- int rv;
+ pub0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
// XXX: consider making this depth tunable
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
- pub_pipe_fini(p);
+ pub0_pipe_fini(p);
return (rv);
}
@@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-pub_pipe_start(void *arg)
+pub0_pipe_start(void *arg)
{
- pub_pipe *p = arg;
- pub_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *s = p->pub;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) {
return (NNG_EPROTO);
@@ -157,10 +166,10 @@ pub_pipe_start(void *arg)
}
static void
-pub_pipe_stop(void *arg)
+pub0_pipe_stop(void *arg)
{
- pub_pipe *p = arg;
- pub_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *s = p->pub;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
@@ -176,15 +185,15 @@ pub_pipe_stop(void *arg)
}
static void
-pub_sock_getq_cb(void *arg)
+pub0_sock_getq_cb(void *arg)
{
- pub_sock *s = arg;
- nni_msgq *uwq = s->uwq;
- nni_msg * msg, *dup;
+ pub0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg, *dup;
- pub_pipe *p;
- pub_pipe *last;
- int rv;
+ pub0_pipe *p;
+ pub0_pipe *last;
+ int rv;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg)
}
static void
-pub_pipe_recv_cb(void *arg)
+pub0_pipe_recv_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg)
}
static void
-pub_pipe_getq_cb(void *arg)
+pub0_pipe_getq_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->pipe);
@@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg)
}
static void
-pub_pipe_send_cb(void *arg)
+pub0_pipe_send_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg)
}
static int
-pub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-pub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-pub_sock_recv(void *arg, nni_aio *aio)
+pub0_sock_recv(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-pub_sock_send(void *arg, nni_aio *aio)
+pub0_sock_send(void *arg, nni_aio *aio)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
-static nni_proto_pipe_ops pub_pipe_ops = {
- .pipe_init = pub_pipe_init,
- .pipe_fini = pub_pipe_fini,
- .pipe_start = pub_pipe_start,
- .pipe_stop = pub_pipe_stop,
+static nni_proto_pipe_ops pub0_pipe_ops = {
+ .pipe_init = pub0_pipe_init,
+ .pipe_fini = pub0_pipe_fini,
+ .pipe_start = pub0_pipe_start,
+ .pipe_stop = pub0_pipe_stop,
};
-static nni_proto_sock_option pub_sock_options[] = {
+static nni_proto_sock_option pub0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = pub_sock_getopt_raw,
- .pso_setopt = pub_sock_setopt_raw,
+ .pso_getopt = pub0_sock_getopt_raw,
+ .pso_setopt = pub0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops pub_sock_ops = {
- .sock_init = pub_sock_init,
- .sock_fini = pub_sock_fini,
- .sock_open = pub_sock_open,
- .sock_close = pub_sock_close,
- .sock_send = pub_sock_send,
- .sock_recv = pub_sock_recv,
- .sock_options = pub_sock_options,
+static nni_proto_sock_ops pub0_sock_ops = {
+ .sock_init = pub0_sock_init,
+ .sock_fini = pub0_sock_fini,
+ .sock_open = pub0_sock_open,
+ .sock_close = pub0_sock_close,
+ .sock_send = pub0_sock_send,
+ .sock_recv = pub0_sock_recv,
+ .sock_options = pub0_sock_options,
};
-static nni_proto pub_proto = {
+static nni_proto pub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUB_V0, "pub" },
.proto_peer = { NNI_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_sock_ops = &pub_sock_ops,
- .proto_pipe_ops = &pub_pipe_ops,
+ .proto_sock_ops = &pub0_sock_ops,
+ .proto_pipe_ops = &pub0_pipe_ops,
};
int
nng_pub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &pub_proto));
+ return (nni_proto_open(sidp, &pub0_proto));
}
diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h
new file mode 100644
index 00000000..2388a292
--- /dev/null
+++ b/src/protocol/pubsub0/pub.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H
+#define NNG_PROTOCOL_PUBSUB0_PUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pub0_open(nng_socket *);
+
+#ifndef nng_pub_open
+#define nng_pub_open nng_pub0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PUBSUB0_PUB_H
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c
index 555d528e..6c504d75 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -12,54 +12,60 @@
#include <string.h>
#include "core/nng_impl.h"
-
-const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE;
-const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE;
+#include "protocol/pubsub0/sub.h"
// Subscriber protocol. The SUB protocol receives messages sent to
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct sub_pipe sub_pipe;
-typedef struct sub_sock sub_sock;
-typedef struct sub_topic sub_topic;
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
+
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
+
+typedef struct sub0_pipe sub0_pipe;
+typedef struct sub0_sock sub0_sock;
+typedef struct sub0_topic sub0_topic;
-static void sub_recv_cb(void *);
-static void sub_putq_cb(void *);
-static void sub_pipe_fini(void *);
+static void sub0_recv_cb(void *);
+static void sub0_putq_cb(void *);
+static void sub0_pipe_fini(void *);
-struct sub_topic {
+struct sub0_topic {
nni_list_node node;
size_t len;
void * buf;
};
-// An nni_rep_sock is our per-socket protocol private structure.
-struct sub_sock {
+// sub0_sock is our per-socket protocol private structure.
+struct sub0_sock {
nni_list topics;
nni_msgq *urq;
int raw;
nni_mtx lk;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
-struct sub_pipe {
- nni_pipe *pipe;
- sub_sock *sub;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+// sub0_pipe is our per-pipe protocol private structure.
+struct sub0_pipe {
+ nni_pipe * pipe;
+ sub0_sock *sub;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static int
-sub_sock_init(void **sp, nni_sock *sock)
+sub0_sock_init(void **sp, nni_sock *sock)
{
- sub_sock *s;
+ sub0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
- NNI_LIST_INIT(&s->topics, sub_topic, node);
+ NNI_LIST_INIT(&s->topics, sub0_topic, node);
s->raw = 0;
s->urq = nni_sock_recvq(sock);
@@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock)
}
static void
-sub_sock_fini(void *arg)
+sub0_sock_fini(void *arg)
{
- sub_sock * s = arg;
- sub_topic *topic;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
while ((topic = nni_list_first(&s->topics)) != NULL) {
nni_list_remove(&s->topics, topic);
@@ -83,21 +89,21 @@ sub_sock_fini(void *arg)
}
static void
-sub_sock_open(void *arg)
+sub0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-sub_sock_close(void *arg)
+sub0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-sub_pipe_fini(void *arg)
+sub0_pipe_fini(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
@@ -105,17 +111,17 @@ sub_pipe_fini(void *arg)
}
static int
-sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
+sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- sub_pipe *p;
- int rv;
+ sub0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) {
- sub_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) {
+ sub0_pipe_fini(p);
return (rv);
}
@@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-sub_pipe_start(void *arg)
+sub0_pipe_start(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-sub_pipe_stop(void *arg)
+sub0_pipe_stop(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
}
static void
-sub_recv_cb(void *arg)
+sub0_recv_cb(void *arg)
{
- sub_pipe *p = arg;
- sub_sock *s = p->sub;
- nni_msgq *urq = s->urq;
- nni_msg * msg;
+ sub0_pipe *p = arg;
+ sub0_sock *s = p->sub;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -164,9 +170,9 @@ sub_recv_cb(void *arg)
}
static void
-sub_putq_cb(void *arg)
+sub0_putq_cb(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -184,11 +190,11 @@ sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub_subscribe(void *arg, const void *buf, size_t sz)
+sub0_subscribe(void *arg, const void *buf, size_t sz)
{
- sub_sock * s = arg;
- sub_topic *topic;
- sub_topic *newtopic;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ sub0_topic *newtopic;
nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
@@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
}
static int
-sub_unsubscribe(void *arg, const void *buf, size_t sz)
+sub0_unsubscribe(void *arg, const void *buf, size_t sz)
{
- sub_sock * s = arg;
- sub_topic *topic;
- int rv;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ int rv;
nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
@@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
}
static int
-sub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-sub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-sub_sock_send(void *arg, nni_aio *aio)
+sub0_sock_send(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-sub_sock_recv(void *arg, nni_aio *aio)
+sub0_sock_recv(void *arg, nni_aio *aio)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
static nni_msg *
-sub_sock_filter(void *arg, nni_msg *msg)
+sub0_sock_filter(void *arg, nni_msg *msg)
{
- sub_sock * s = arg;
- sub_topic *topic;
- char * body;
- size_t len;
- int match;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ char * body;
+ size_t len;
+ int match;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops sub_pipe_ops = {
- .pipe_init = sub_pipe_init,
- .pipe_fini = sub_pipe_fini,
- .pipe_start = sub_pipe_start,
- .pipe_stop = sub_pipe_stop,
+static nni_proto_pipe_ops sub0_pipe_ops = {
+ .pipe_init = sub0_pipe_init,
+ .pipe_fini = sub0_pipe_fini,
+ .pipe_start = sub0_pipe_start,
+ .pipe_stop = sub0_pipe_stop,
};
-static nni_proto_sock_option sub_sock_options[] = {
+static nni_proto_sock_option sub0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = sub_sock_getopt_raw,
- .pso_setopt = sub_sock_setopt_raw,
+ .pso_getopt = sub0_sock_getopt_raw,
+ .pso_setopt = sub0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_SUB_SUBSCRIBE,
.pso_getopt = NULL,
- .pso_setopt = sub_subscribe,
+ .pso_setopt = sub0_subscribe,
},
{
.pso_name = NNG_OPT_SUB_UNSUBSCRIBE,
.pso_getopt = NULL,
- .pso_setopt = sub_unsubscribe,
+ .pso_setopt = sub0_unsubscribe,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops sub_sock_ops = {
- .sock_init = sub_sock_init,
- .sock_fini = sub_sock_fini,
- .sock_open = sub_sock_open,
- .sock_close = sub_sock_close,
- .sock_send = sub_sock_send,
- .sock_recv = sub_sock_recv,
- .sock_filter = sub_sock_filter,
- .sock_options = sub_sock_options,
+static nni_proto_sock_ops sub0_sock_ops = {
+ .sock_init = sub0_sock_init,
+ .sock_fini = sub0_sock_fini,
+ .sock_open = sub0_sock_open,
+ .sock_close = sub0_sock_close,
+ .sock_send = sub0_sock_send,
+ .sock_recv = sub0_sock_recv,
+ .sock_filter = sub0_sock_filter,
+ .sock_options = sub0_sock_options,
};
-static nni_proto sub_proto = {
+static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
.proto_peer = { NNI_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &sub_sock_ops,
- .proto_pipe_ops = &sub_pipe_ops,
+ .proto_sock_ops = &sub0_sock_ops,
+ .proto_pipe_ops = &sub0_pipe_ops,
};
int
nng_sub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &sub_proto));
+ return (nni_proto_open(sidp, &sub0_proto));
}
diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h
new file mode 100644
index 00000000..1a09145d
--- /dev/null
+++ b/src/protocol/pubsub0/sub.h
@@ -0,0 +1,31 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H
+#define NNG_PROTOCOL_PUBSUB0_SUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_sub0_open(nng_socket *);
+
+#ifndef nng_sub_open
+#define nng_sub_open nng_sub0_open
+#endif
+
+#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
+#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PUBSUB0_SUB_H
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt
new file mode 100644
index 00000000..4e82ad41
--- /dev/null
+++ b/src/protocol/reqrep0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+
+if (NNG_PROTO_REQ0)
+ set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h)
+ install(FILES req.h DESTINATION include/nng/protocol/reqrep0)
+endif()
+
+if (NNG_PROTO_REP0)
+ set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h)
+ install(FILES rep.h DESTINATION include/nng/protocol/reqrep0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c
index 100e739d..ee8e4277 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -12,23 +12,32 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/reqrep0/rep.h"
// Response protocol. The REP protocol is the "reply" side of a
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct rep_pipe rep_pipe;
-typedef struct rep_sock rep_sock;
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
-static void rep_sock_getq_cb(void *);
-static void rep_pipe_getq_cb(void *);
-static void rep_pipe_putq_cb(void *);
-static void rep_pipe_send_cb(void *);
-static void rep_pipe_recv_cb(void *);
-static void rep_pipe_fini(void *);
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
-// A rep_sock is our per-socket protocol private structure.
-struct rep_sock {
+typedef struct rep0_pipe rep0_pipe;
+typedef struct rep0_sock rep0_sock;
+
+static void rep0_sock_getq_cb(void *);
+static void rep0_pipe_getq_cb(void *);
+static void rep0_pipe_putq_cb(void *);
+static void rep0_pipe_send_cb(void *);
+static void rep0_pipe_recv_cb(void *);
+static void rep0_pipe_fini(void *);
+
+// rep0_sock is our per-socket protocol private structure.
+struct rep0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
@@ -40,21 +49,21 @@ struct rep_sock {
nni_aio * aio_getq;
};
-// A rep_pipe is our per-pipe protocol private structure.
-struct rep_pipe {
- nni_pipe *pipe;
- rep_sock *rep;
- nni_msgq *sendq;
- nni_aio * aio_getq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+// rep0_pipe is our per-pipe protocol private structure.
+struct rep0_pipe {
+ nni_pipe * pipe;
+ rep0_sock *rep;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static void
-rep_sock_fini(void *arg)
+rep0_sock_fini(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -67,18 +76,18 @@ rep_sock_fini(void *arg)
}
static int
-rep_sock_init(void **sp, nni_sock *sock)
+rep0_sock_init(void **sp, nni_sock *sock)
{
- rep_sock *s;
- int rv;
+ rep0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
- rep_sock_fini(s);
+ ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) {
+ rep0_sock_fini(s);
return (rv);
}
@@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock)
}
static void
-rep_sock_open(void *arg)
+rep0_sock_open(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-rep_sock_close(void *arg)
+rep0_sock_close(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-rep_pipe_fini(void *arg)
+rep0_pipe_fini(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -124,20 +133,20 @@ rep_pipe_fini(void *arg)
}
static int
-rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
+rep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- rep_pipe *p;
- int rv;
+ rep0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) {
- rep_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) {
+ rep0_pipe_fini(p);
return (rv);
}
@@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-rep_pipe_start(void *arg)
+rep0_pipe_start(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
- int rv;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ int rv;
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
@@ -164,10 +173,10 @@ rep_pipe_start(void *arg)
}
static void
-rep_pipe_stop(void *arg)
+rep0_pipe_stop(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
nni_msgq_close(p->sendq);
nni_aio_stop(p->aio_getq);
@@ -179,14 +188,14 @@ rep_pipe_stop(void *arg)
}
static void
-rep_sock_getq_cb(void *arg)
+rep0_sock_getq_cb(void *arg)
{
- rep_sock *s = arg;
- nni_msgq *uwq = s->uwq;
- nni_msg * msg;
- uint32_t id;
- rep_pipe *p;
- int rv;
+ rep0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ rep0_pipe *p;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg)
}
static void
-rep_pipe_getq_cb(void *arg)
+rep0_pipe_getq_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->pipe);
@@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg)
}
static void
-rep_pipe_send_cb(void *arg)
+rep0_pipe_send_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg)
}
static void
-rep_pipe_recv_cb(void *arg)
+rep0_pipe_recv_cb(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
- nni_msg * msg;
- int rv;
- uint8_t * body;
- int hops;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+ int hops;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -329,9 +338,9 @@ drop:
}
static void
-rep_pipe_putq_cb(void *arg)
+rep0_pipe_putq_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg)
}
static int
-rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- rep_sock *s = arg;
- int rv;
+ rep0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->lk);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
@@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-rep_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static nni_msg *
-rep_sock_filter(void *arg, nni_msg *msg)
+rep0_sock_filter(void *arg, nni_msg *msg)
{
- rep_sock *s = arg;
- char * header;
- size_t len;
+ rep0_sock *s = arg;
+ char * header;
+ size_t len;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg)
}
static void
-rep_sock_send(void *arg, nni_aio *aio)
+rep0_sock_send(void *arg, nni_aio *aio)
{
- rep_sock *s = arg;
- int rv;
- nni_msg * msg;
+ rep0_sock *s = arg;
+ int rv;
+ nni_msg * msg;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio)
}
static void
-rep_sock_recv(void *arg, nni_aio *aio)
+rep0_sock_recv(void *arg, nni_aio *aio)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops rep_pipe_ops = {
- .pipe_init = rep_pipe_init,
- .pipe_fini = rep_pipe_fini,
- .pipe_start = rep_pipe_start,
- .pipe_stop = rep_pipe_stop,
+static nni_proto_pipe_ops rep0_pipe_ops = {
+ .pipe_init = rep0_pipe_init,
+ .pipe_fini = rep0_pipe_fini,
+ .pipe_start = rep0_pipe_start,
+ .pipe_stop = rep0_pipe_stop,
};
-static nni_proto_sock_option rep_sock_options[] = {
+static nni_proto_sock_option rep0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = rep_sock_getopt_raw,
- .pso_setopt = rep_sock_setopt_raw,
+ .pso_getopt = rep0_sock_getopt_raw,
+ .pso_setopt = rep0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = rep_sock_getopt_maxttl,
- .pso_setopt = rep_sock_setopt_maxttl,
+ .pso_getopt = rep0_sock_getopt_maxttl,
+ .pso_setopt = rep0_sock_setopt_maxttl,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops rep_sock_ops = {
- .sock_init = rep_sock_init,
- .sock_fini = rep_sock_fini,
- .sock_open = rep_sock_open,
- .sock_close = rep_sock_close,
- .sock_options = rep_sock_options,
- .sock_filter = rep_sock_filter,
- .sock_send = rep_sock_send,
- .sock_recv = rep_sock_recv,
+static nni_proto_sock_ops rep0_sock_ops = {
+ .sock_init = rep0_sock_init,
+ .sock_fini = rep0_sock_fini,
+ .sock_open = rep0_sock_open,
+ .sock_close = rep0_sock_close,
+ .sock_options = rep0_sock_options,
+ .sock_filter = rep0_sock_filter,
+ .sock_send = rep0_sock_send,
+ .sock_recv = rep0_sock_recv,
};
-static nni_proto nni_rep_proto = {
+static nni_proto rep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REP_V0, "rep" },
.proto_peer = { NNI_PROTO_REQ_V0, "req" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &rep_sock_ops,
- .proto_pipe_ops = &rep_pipe_ops,
+ .proto_sock_ops = &rep0_sock_ops,
+ .proto_pipe_ops = &rep0_pipe_ops,
};
int
nng_rep0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_rep_proto));
+ return (nni_proto_open(sidp, &rep0_proto));
}
diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h
new file mode 100644
index 00000000..93df9379
--- /dev/null
+++ b/src/protocol/reqrep0/rep.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_REQREP0_REP_H
+#define NNG_PROTOCOL_REQREP0_REP_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_rep0_open(nng_socket *);
+
+#ifndef nng_rep_open
+#define nng_rep_open nng_rep0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_REQREP0_REP_H
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c
index bead1ec4..94c7f1a0 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -13,21 +13,28 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
-const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME;
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
-typedef struct req_pipe req_pipe;
-typedef struct req_sock req_sock;
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
-static void req_resend(req_sock *);
-static void req_timeout(void *);
-static void req_pipe_fini(void *);
+typedef struct req0_pipe req0_pipe;
+typedef struct req0_sock req0_sock;
-// A req_sock is our per-socket protocol private structure.
-struct req_sock {
+static void req0_resend(req0_sock *);
+static void req0_timeout(void *);
+static void req0_pipe_fini(void *);
+
+// A req0_sock is our per-socket protocol private structure.
+struct req0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_duration retry;
@@ -38,7 +45,7 @@ struct req_sock {
int ttl;
nni_msg * reqmsg;
- req_pipe *pendpipe;
+ req0_pipe *pendpipe;
nni_list readypipes;
nni_list busypipes;
@@ -51,10 +58,10 @@ struct req_sock {
nni_cv cv;
};
-// A req_pipe is our per-pipe protocol private structure.
-struct req_pipe {
+// A req0_pipe is our per-pipe protocol private structure.
+struct req0_pipe {
nni_pipe * pipe;
- req_sock * req;
+ req0_sock * req;
nni_list_node node;
nni_aio * aio_getq; // raw mode only
nni_aio * aio_sendraw; // raw mode only
@@ -64,17 +71,17 @@ struct req_pipe {
nni_mtx mtx;
};
-static void req_resender(void *);
-static void req_getq_cb(void *);
-static void req_sendraw_cb(void *);
-static void req_sendcooked_cb(void *);
-static void req_recv_cb(void *);
-static void req_putq_cb(void *);
+static void req0_resender(void *);
+static void req0_getq_cb(void *);
+static void req0_sendraw_cb(void *);
+static void req0_sendcooked_cb(void *);
+static void req0_recv_cb(void *);
+static void req0_putq_cb(void *);
static int
-req_sock_init(void **sp, nni_sock *sock)
+req0_sock_init(void **sp, nni_sock *sock)
{
- req_sock *s;
+ req0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock)
nni_mtx_init(&s->mtx);
nni_cv_init(&s->cv, &s->mtx);
- NNI_LIST_INIT(&s->readypipes, req_pipe, node);
- NNI_LIST_INIT(&s->busypipes, req_pipe, node);
- nni_timer_init(&s->timer, req_timeout, s);
+ NNI_LIST_INIT(&s->readypipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->busypipes, req0_pipe, node);
+ nni_timer_init(&s->timer, req0_timeout, s);
// this is "semi random" start for request IDs.
s->nextid = nni_random();
@@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock)
}
static void
-req_sock_open(void *arg)
+req0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-req_sock_close(void *arg)
+req0_sock_close(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
s->closed = 1;
@@ -120,9 +127,9 @@ req_sock_close(void *arg)
}
static void
-req_sock_fini(void *arg)
+req0_sock_fini(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
while ((!nni_list_empty(&s->readypipes)) ||
@@ -139,9 +146,9 @@ req_sock_fini(void *arg)
}
static void
-req_pipe_fini(void *arg)
+req0_pipe_fini(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_putq);
@@ -153,22 +160,22 @@ req_pipe_fini(void *arg)
}
static int
-req_pipe_init(void **pp, nni_pipe *pipe, void *s)
+req0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- req_pipe *p;
- int rv;
+ req0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) !=
+ if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) !=
0)) {
- req_pipe_fini(p);
+ req0_pipe_fini(p);
return (rv);
}
@@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-req_pipe_start(void *arg)
+req0_pipe_start(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
return (NNG_EPROTO);
@@ -198,7 +205,7 @@ req_pipe_start(void *arg)
// If sock was waiting for somewhere to send data, go ahead and
// send it to this pipe.
if (s->wantw) {
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
@@ -208,10 +215,10 @@ req_pipe_start(void *arg)
}
static void
-req_pipe_stop(void *arg)
+req0_pipe_stop(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_putq);
@@ -238,50 +245,50 @@ req_pipe_stop(void *arg)
s->pendpipe = NULL;
s->resend = NNI_TIME_ZERO;
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
}
static int
-req_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-req_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static int
-req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_ms(&s->retry, buf, sz));
}
static int
-req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_ms(s->retry, buf, szp));
}
@@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
// kind of priority.)
static void
-req_getq_cb(void *arg)
+req0_getq_cb(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
// We should be in RAW mode. Cooked mode traffic bypasses
// the upper write queue entirely, and should never end up here.
@@ -326,9 +333,9 @@ req_getq_cb(void *arg)
}
static void
-req_sendraw_cb(void *arg)
+req0_sendraw_cb(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
if (nni_aio_result(p->aio_sendraw) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_sendraw));
@@ -342,10 +349,10 @@ req_sendraw_cb(void *arg)
}
static void
-req_sendcooked_cb(void *arg)
+req0_sendcooked_cb(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
if (nni_aio_result(p->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
@@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg)
if (nni_list_active(&s->busypipes, p)) {
nni_list_remove(&s->busypipes, p);
nni_list_append(&s->readypipes, p);
- req_resend(s);
+ req0_resend(s);
} else {
// We wind up here if stop was called from the reader
// side while we were waiting to be scheduled to run for the
@@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg)
}
static void
-req_putq_cb(void *arg)
+req0_putq_cb(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -393,10 +400,10 @@ req_putq_cb(void *arg)
}
static void
-req_recv_cb(void *arg)
+req0_recv_cb(void *arg)
{
- req_pipe *p = arg;
- nni_msg * msg;
+ req0_pipe *p = arg;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -431,23 +438,23 @@ malformed:
}
static void
-req_timeout(void *arg)
+req0_timeout(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->reqmsg != NULL) {
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
}
static void
-req_resend(req_sock *s)
+req0_resend(req0_sock *s)
{
- req_pipe *p;
- nni_msg * msg;
+ req0_pipe *p;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
@@ -499,13 +506,13 @@ req_resend(req_sock *s)
}
static void
-req_sock_send(void *arg, nni_aio *aio)
+req0_sock_send(void *arg, nni_aio *aio)
{
- req_sock *s = arg;
- uint32_t id;
- size_t len;
- nni_msg * msg;
- int rv;
+ req0_sock *s = arg;
+ uint32_t id;
+ size_t len;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio)
s->resend = NNI_TIME_ZERO;
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
nni_mtx_unlock(&s->mtx);
@@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-req_sock_filter(void *arg, nni_msg *msg)
+req0_sock_filter(void *arg, nni_msg *msg)
{
- req_sock *s = arg;
- nni_msg * rmsg;
+ req0_sock *s = arg;
+ nni_msg * rmsg;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg)
}
static void
-req_sock_recv(void *arg, nni_aio *aio)
+req0_sock_recv(void *arg, nni_aio *aio)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (!s->raw) {
@@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio)
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops req_pipe_ops = {
- .pipe_init = req_pipe_init,
- .pipe_fini = req_pipe_fini,
- .pipe_start = req_pipe_start,
- .pipe_stop = req_pipe_stop,
+static nni_proto_pipe_ops req0_pipe_ops = {
+ .pipe_init = req0_pipe_init,
+ .pipe_fini = req0_pipe_fini,
+ .pipe_start = req0_pipe_start,
+ .pipe_stop = req0_pipe_stop,
};
-static nni_proto_sock_option req_sock_options[] = {
+static nni_proto_sock_option req0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = req_sock_getopt_raw,
- .pso_setopt = req_sock_setopt_raw,
+ .pso_getopt = req0_sock_getopt_raw,
+ .pso_setopt = req0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = req_sock_getopt_maxttl,
- .pso_setopt = req_sock_setopt_maxttl,
+ .pso_getopt = req0_sock_getopt_maxttl,
+ .pso_setopt = req0_sock_setopt_maxttl,
},
{
.pso_name = NNG_OPT_REQ_RESENDTIME,
- .pso_getopt = req_sock_getopt_resendtime,
- .pso_setopt = req_sock_setopt_resendtime,
+ .pso_getopt = req0_sock_getopt_resendtime,
+ .pso_setopt = req0_sock_setopt_resendtime,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops req_sock_ops = {
- .sock_init = req_sock_init,
- .sock_fini = req_sock_fini,
- .sock_open = req_sock_open,
- .sock_close = req_sock_close,
- .sock_options = req_sock_options,
- .sock_filter = req_sock_filter,
- .sock_send = req_sock_send,
- .sock_recv = req_sock_recv,
+static nni_proto_sock_ops req0_sock_ops = {
+ .sock_init = req0_sock_init,
+ .sock_fini = req0_sock_fini,
+ .sock_open = req0_sock_open,
+ .sock_close = req0_sock_close,
+ .sock_options = req0_sock_options,
+ .sock_filter = req0_sock_filter,
+ .sock_send = req0_sock_send,
+ .sock_recv = req0_sock_recv,
};
-static nni_proto req_proto = {
+static nni_proto req0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REQ_V0, "req" },
.proto_peer = { NNI_PROTO_REP_V0, "rep" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &req_sock_ops,
- .proto_pipe_ops = &req_pipe_ops,
+ .proto_sock_ops = &req0_sock_ops,
+ .proto_pipe_ops = &req0_pipe_ops,
};
int
nng_req0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &req_proto));
+ return (nni_proto_open(sidp, &req0_proto));
}
diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h
new file mode 100644
index 00000000..99c9bf62
--- /dev/null
+++ b/src/protocol/reqrep0/req.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_REQREP0_REQ_H
+#define NNG_PROTOCOL_REQREP0_REQ_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_req0_open(nng_socket *);
+
+#ifndef nng_req_open
+#define nng_req_open nng_req0_open
+#endif
+
+#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_REQREP0_REQ_H
diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt
new file mode 100644
index 00000000..61e5aa7b
--- /dev/null
+++ b/src/protocol/survey0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+
+if (NNG_PROTO_SURVEYOR0)
+ set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h)
+ install(FILES survey.h DESTINATION include/nng/protocol/survey0)
+endif()
+
+if (NNG_PROTO_RESPONDENT0)
+ set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h)
+ install(FILES respond.h DESTINATION include/nng/protocol/survey0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c
index 94730be6..73e919c3 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -12,23 +12,32 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/survey0/respond.h"
// Respondent protocol. The RESPONDENT protocol is the "replier" side of
// the surveyor pattern. This is useful for building service discovery, or
-// voting algorithsm, for example.
+// voting algorithms, for example.
-typedef struct resp_pipe resp_pipe;
-typedef struct resp_sock resp_sock;
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
-static void resp_recv_cb(void *);
-static void resp_putq_cb(void *);
-static void resp_getq_cb(void *);
-static void resp_send_cb(void *);
-static void resp_sock_getq_cb(void *);
-static void resp_pipe_fini(void *);
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
-// A resp_sock is our per-socket protocol private structure.
-struct resp_sock {
+typedef struct resp0_pipe resp0_pipe;
+typedef struct resp0_sock resp0_sock;
+
+static void resp0_recv_cb(void *);
+static void resp0_putq_cb(void *);
+static void resp0_getq_cb(void *);
+static void resp0_send_cb(void *);
+static void resp0_sock_getq_cb(void *);
+static void resp0_pipe_fini(void *);
+
+// resp0_sock is our per-socket protocol private structure.
+struct resp0_sock {
nni_msgq * urq;
nni_msgq * uwq;
int raw;
@@ -40,22 +49,22 @@ struct resp_sock {
nni_mtx mtx;
};
-// A resp_pipe is our per-pipe protocol private structure.
-struct resp_pipe {
- nni_pipe * npipe;
- resp_sock *psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio * aio_getq;
- nni_aio * aio_putq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+// resp0_pipe is our per-pipe protocol private structure.
+struct resp0_pipe {
+ nni_pipe * npipe;
+ resp0_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-resp_sock_fini(void *arg)
+resp0_sock_fini(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -68,18 +77,18 @@ resp_sock_fini(void *arg)
}
static int
-resp_sock_init(void **sp, nni_sock *nsock)
+resp0_sock_init(void **sp, nni_sock *nsock)
{
- resp_sock *s;
- int rv;
+ resp0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
- resp_sock_fini(s);
+ ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) {
+ resp0_sock_fini(s);
return (rv);
}
@@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock)
}
static void
-resp_sock_open(void *arg)
+resp0_sock_open(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-resp_sock_close(void *arg)
+resp0_sock_close(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-resp_pipe_fini(void *arg)
+resp0_pipe_fini(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_getq);
@@ -124,20 +133,20 @@ resp_pipe_fini(void *arg)
}
static int
-resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
+resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- resp_pipe *p;
- int rv;
+ resp0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) {
- resp_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) {
+ resp0_pipe_fini(p);
return (rv);
}
@@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-resp_pipe_start(void *arg)
+resp0_pipe_start(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- int rv;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ int rv;
p->id = nni_pipe_id(p->npipe);
@@ -170,10 +179,10 @@ resp_pipe_start(void *arg)
}
static void
-resp_pipe_stop(void *arg)
+resp0_pipe_stop(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
nni_msgq_close(p->sendq);
nni_aio_stop(p->aio_putq);
@@ -189,19 +198,19 @@ resp_pipe_stop(void *arg)
}
}
-// resp_sock_send watches for messages from the upper write queue,
+// resp0_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.s
void
-resp_sock_getq_cb(void *arg)
+resp0_sock_getq_cb(void *arg)
{
- resp_sock *s = arg;
- nni_msg * msg;
- uint32_t id;
- resp_pipe *p;
- int rv;
+ resp0_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+ resp0_pipe *p;
+ int rv;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg)
}
void
-resp_getq_cb(void *arg)
+resp0_getq_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
@@ -249,9 +258,9 @@ resp_getq_cb(void *arg)
}
void
-resp_send_cb(void *arg)
+resp0_send_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -264,14 +273,14 @@ resp_send_cb(void *arg)
}
static void
-resp_recv_cb(void *arg)
+resp0_recv_cb(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- nni_msgq * urq = s->urq;
- nni_msg * msg;
- int hops;
- int rv;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
+ int hops;
+ int rv;
if (nni_aio_result(p->aio_recv) != 0) {
goto error;
@@ -324,9 +333,9 @@ error:
}
static void
-resp_putq_cb(void *arg)
+resp0_putq_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -338,10 +347,10 @@ resp_putq_cb(void *arg)
}
static int
-resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- resp_sock *s = arg;
- int rv;
+ resp0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->mtx);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
@@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-resp_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static void
-resp_sock_send(void *arg, nni_aio *aio)
+resp0_sock_send(void *arg, nni_aio *aio)
{
- resp_sock *s = arg;
- nni_msg * msg;
- int rv;
+ resp0_sock *s = arg;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-resp_sock_filter(void *arg, nni_msg *msg)
+resp0_sock_filter(void *arg, nni_msg *msg)
{
- resp_sock *s = arg;
- char * header;
- size_t len;
+ resp0_sock *s = arg;
+ char * header;
+ size_t len;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg)
}
static void
-resp_sock_recv(void *arg, nni_aio *aio)
+resp0_sock_recv(void *arg, nni_aio *aio)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops resp_pipe_ops = {
- .pipe_init = resp_pipe_init,
- .pipe_fini = resp_pipe_fini,
- .pipe_start = resp_pipe_start,
- .pipe_stop = resp_pipe_stop,
+static nni_proto_pipe_ops resp0_pipe_ops = {
+ .pipe_init = resp0_pipe_init,
+ .pipe_fini = resp0_pipe_fini,
+ .pipe_start = resp0_pipe_start,
+ .pipe_stop = resp0_pipe_stop,
};
-static nni_proto_sock_option resp_sock_options[] = {
+static nni_proto_sock_option resp0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = resp_sock_getopt_raw,
- .pso_setopt = resp_sock_setopt_raw,
+ .pso_getopt = resp0_sock_getopt_raw,
+ .pso_setopt = resp0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = resp_sock_getopt_maxttl,
- .pso_setopt = resp_sock_setopt_maxttl,
+ .pso_getopt = resp0_sock_getopt_maxttl,
+ .pso_setopt = resp0_sock_setopt_maxttl,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops resp_sock_ops = {
- .sock_init = resp_sock_init,
- .sock_fini = resp_sock_fini,
- .sock_open = resp_sock_open,
- .sock_close = resp_sock_close,
- .sock_filter = resp_sock_filter,
- .sock_send = resp_sock_send,
- .sock_recv = resp_sock_recv,
- .sock_options = resp_sock_options,
+static nni_proto_sock_ops resp0_sock_ops = {
+ .sock_init = resp0_sock_init,
+ .sock_fini = resp0_sock_fini,
+ .sock_open = resp0_sock_open,
+ .sock_close = resp0_sock_close,
+ .sock_filter = resp0_sock_filter,
+ .sock_send = resp0_sock_send,
+ .sock_recv = resp0_sock_recv,
+ .sock_options = resp0_sock_options,
};
-static nni_proto resp_proto = {
+static nni_proto resp0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &resp_sock_ops,
- .proto_pipe_ops = &resp_pipe_ops,
+ .proto_sock_ops = &resp0_sock_ops,
+ .proto_pipe_ops = &resp0_pipe_ops,
};
int
nng_respondent0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &resp_proto));
+ return (nni_proto_open(sidp, &resp0_proto));
}
diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h
new file mode 100644
index 00000000..58c65298
--- /dev/null
+++ b/src/protocol/survey0/respond.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H
+#define NNG_PROTOCOL_SURVEY0_RESPOND_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_respondent0_open(nng_socket *);
+
+#ifndef nng_respondent_open
+#define nng_respondent_open nng_respondent0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c
index 9dcd6664..02283436 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -12,22 +12,31 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/survey0/survey.h"
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct surv_pipe surv_pipe;
-typedef struct surv_sock surv_sock;
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
-static void surv_sock_getq_cb(void *);
-static void surv_getq_cb(void *);
-static void surv_putq_cb(void *);
-static void surv_send_cb(void *);
-static void surv_recv_cb(void *);
-static void surv_timeout(void *);
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
-// A surv_sock is our per-socket protocol private structure.
-struct surv_sock {
+typedef struct surv0_pipe surv0_pipe;
+typedef struct surv0_sock surv0_sock;
+
+static void surv0_sock_getq_cb(void *);
+static void surv0_getq_cb(void *);
+static void surv0_putq_cb(void *);
+static void surv0_send_cb(void *);
+static void surv0_recv_cb(void *);
+static void surv0_timeout(void *);
+
+// surv0_sock is our per-socket protocol private structure.
+struct surv0_sock {
nni_duration survtime;
nni_time expire;
int raw;
@@ -42,10 +51,10 @@ struct surv_sock {
nni_mtx mtx;
};
-// A surv_pipe is our per-pipe protocol private structure.
-struct surv_pipe {
+// surv0_pipe is our per-pipe protocol private structure.
+struct surv0_pipe {
nni_pipe * npipe;
- surv_sock * psock;
+ surv0_sock * psock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
@@ -55,9 +64,9 @@ struct surv_pipe {
};
static void
-surv_sock_fini(void *arg)
+surv0_sock_fini(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -66,21 +75,21 @@ surv_sock_fini(void *arg)
}
static int
-surv_sock_init(void **sp, nni_sock *nsock)
+surv0_sock_init(void **sp, nni_sock *nsock)
{
- surv_sock *s;
- int rv;
+ surv0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) {
- surv_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) {
+ surv0_sock_fini(s);
return (rv);
}
- NNI_LIST_INIT(&s->pipes, surv_pipe, node);
+ NNI_LIST_INIT(&s->pipes, surv0_pipe, node);
nni_mtx_init(&s->mtx);
- nni_timer_init(&s->timer, surv_timeout, s);
+ nni_timer_init(&s->timer, surv0_timeout, s);
s->nextid = nni_random();
s->raw = 0;
@@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock)
}
static void
-surv_sock_open(void *arg)
+surv0_sock_open(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-surv_sock_close(void *arg)
+surv0_sock_close(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_timer_cancel(&s->timer);
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-surv_pipe_fini(void *arg)
+surv0_pipe_fini(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -124,21 +133,21 @@ surv_pipe_fini(void *arg)
}
static int
-surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
+surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- surv_pipe *p;
- int rv;
+ surv0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
// This depth could be tunable.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) {
- surv_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) {
+ surv0_pipe_fini(p);
return (rv);
}
@@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-surv_pipe_start(void *arg)
+surv0_pipe_start(void *arg)
{
- surv_pipe *p = arg;
- surv_sock *s = p->psock;
+ surv0_pipe *p = arg;
+ surv0_sock *s = p->psock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
@@ -164,10 +173,10 @@ surv_pipe_start(void *arg)
}
static void
-surv_pipe_stop(void *arg)
+surv0_pipe_stop(void *arg)
{
- surv_pipe *p = arg;
- surv_sock *s = p->psock;
+ surv0_pipe *p = arg;
+ surv0_sock *s = p->psock;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
@@ -184,9 +193,9 @@ surv_pipe_stop(void *arg)
}
static void
-surv_getq_cb(void *arg)
+surv0_getq_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
@@ -200,9 +209,9 @@ surv_getq_cb(void *arg)
}
static void
-surv_send_cb(void *arg)
+surv0_send_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -215,9 +224,9 @@ surv_send_cb(void *arg)
}
static void
-surv_putq_cb(void *arg)
+surv0_putq_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -230,10 +239,10 @@ surv_putq_cb(void *arg)
}
static void
-surv_recv_cb(void *arg)
+surv0_recv_cb(void *arg)
{
- surv_pipe *p = arg;
- nni_msg * msg;
+ surv0_pipe *p = arg;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
goto failed;
@@ -265,10 +274,10 @@ failed:
}
static int
-surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- surv_sock *s = arg;
- int rv;
+ surv0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->mtx);
if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
@@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-surv_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz)
+surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_setopt_ms(&s->survtime, buf, sz));
}
static int
-surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp)
+surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_getopt_ms(s->survtime, buf, szp));
}
static void
-surv_sock_getq_cb(void *arg)
+surv0_sock_getq_cb(void *arg)
{
- surv_sock *s = arg;
- surv_pipe *p;
- surv_pipe *last;
- nni_msg * msg, *dup;
+ surv0_sock *s = arg;
+ surv0_pipe *p;
+ surv0_pipe *last;
+ nni_msg * msg, *dup;
if (nni_aio_result(s->aio_getq) != 0) {
// Should be NNG_ECLOSED.
@@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg)
}
static void
-surv_timeout(void *arg)
+surv0_timeout(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
s->survid = 0;
@@ -349,9 +358,9 @@ surv_timeout(void *arg)
}
static void
-surv_sock_recv(void *arg, nni_aio *aio)
+surv0_sock_recv(void *arg, nni_aio *aio)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->survid == 0) {
@@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio)
}
static void
-surv_sock_send(void *arg, nni_aio *aio)
+surv0_sock_send(void *arg, nni_aio *aio)
{
- surv_sock *s = arg;
- nni_msg * msg;
- int rv;
+ surv0_sock *s = arg;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-surv_sock_filter(void *arg, nni_msg *msg)
+surv0_sock_filter(void *arg, nni_msg *msg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg)
return (msg);
}
-static nni_proto_pipe_ops surv_pipe_ops = {
- .pipe_init = surv_pipe_init,
- .pipe_fini = surv_pipe_fini,
- .pipe_start = surv_pipe_start,
- .pipe_stop = surv_pipe_stop,
+static nni_proto_pipe_ops surv0_pipe_ops = {
+ .pipe_init = surv0_pipe_init,
+ .pipe_fini = surv0_pipe_fini,
+ .pipe_start = surv0_pipe_start,
+ .pipe_stop = surv0_pipe_stop,
};
-static nni_proto_sock_option surv_sock_options[] = {
+static nni_proto_sock_option surv0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = surv_sock_getopt_raw,
- .pso_setopt = surv_sock_setopt_raw,
+ .pso_getopt = surv0_sock_getopt_raw,
+ .pso_setopt = surv0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_SURVEYOR_SURVEYTIME,
- .pso_getopt = surv_sock_getopt_surveytime,
- .pso_setopt = surv_sock_setopt_surveytime,
+ .pso_getopt = surv0_sock_getopt_surveytime,
+ .pso_setopt = surv0_sock_setopt_surveytime,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops surv_sock_ops = {
- .sock_init = surv_sock_init,
- .sock_fini = surv_sock_fini,
- .sock_open = surv_sock_open,
- .sock_close = surv_sock_close,
- .sock_send = surv_sock_send,
- .sock_recv = surv_sock_recv,
- .sock_filter = surv_sock_filter,
- .sock_options = surv_sock_options,
+static nni_proto_sock_ops surv0_sock_ops = {
+ .sock_init = surv0_sock_init,
+ .sock_fini = surv0_sock_fini,
+ .sock_open = surv0_sock_open,
+ .sock_close = surv0_sock_close,
+ .sock_send = surv0_sock_send,
+ .sock_recv = surv0_sock_recv,
+ .sock_filter = surv0_sock_filter,
+ .sock_options = surv0_sock_options,
};
-static nni_proto surv_proto = {
+static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &surv_sock_ops,
- .proto_pipe_ops = &surv_pipe_ops,
+ .proto_sock_ops = &surv0_sock_ops,
+ .proto_pipe_ops = &surv0_pipe_ops,
};
int
nng_surveyor0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &surv_proto));
+ return (nni_proto_open(sidp, &surv0_proto));
}
diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h
new file mode 100644
index 00000000..a7b6d943
--- /dev/null
+++ b/src/protocol/survey0/survey.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H
+#define NNG_PROTOCOL_SURVEY0_SURVEY_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_surveyor0_open(nng_socket *);
+
+#ifndef nng_surveyor_open
+#define nng_surveyor_open nng_surveyor0_open
+#endif
+
+#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H
diff --git a/src/transport/inproc/CMakeLists.txt b/src/transport/inproc/CMakeLists.txt
new file mode 100644
index 00000000..a445da85
--- /dev/null
+++ b/src/transport/inproc/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# inproc protocol
+
+if (NNG_TRANSPORT_INPROC)
+ set(INPROC_SOURCES transport/inproc/inproc.c transport/inproc/inproc.h)
+ install(FILES inproc.h DESTINATION include/nng/transport/inproc)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${INPROC_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index c747f7dc..ae64263c 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -475,9 +475,8 @@ struct nni_tran nni_inproc_tran = {
.tran_fini = nni_inproc_fini,
};
-
int
nng_inproc_register(void)
{
- return (nni_tran_register(&nni_inproc_tran));
+ return (nni_tran_register(&nni_inproc_tran));
}
diff --git a/src/transport/ipc/CMakeLists.txt b/src/transport/ipc/CMakeLists.txt
new file mode 100644
index 00000000..1a5496cf
--- /dev/null
+++ b/src/transport/ipc/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# ipc protocol
+
+if (NNG_TRANSPORT_IPC)
+ set(IPC_SOURCES transport/ipc/ipc.c transport/ipc/ipc.h)
+ install(FILES ipc.h DESTINATION include/nng/transport/ipc)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${IPC_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index afe8afa8..c13dbd34 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -704,9 +704,7 @@ static nni_tran_ep nni_ipc_ep_ops = {
.ep_options = nni_ipc_ep_options,
};
-// This is the IPC transport linkage, and should be the only global
-// symbol in this entire file.
-struct nni_tran nni_ipc_tran = {
+static nni_tran nni_ipc_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "ipc",
.tran_ep = &nni_ipc_ep_ops,
@@ -714,3 +712,9 @@ struct nni_tran nni_ipc_tran = {
.tran_init = nni_ipc_tran_init,
.tran_fini = nni_ipc_tran_fini,
};
+
+int
+nng_ipc_register(void)
+{
+ return (nni_tran_register(&nni_ipc_tran));
+}
diff --git a/src/transport/ipc/ipc.h b/src/transport/ipc/ipc.h
new file mode 100644
index 00000000..f19762c7
--- /dev/null
+++ b/src/transport/ipc/ipc.h
@@ -0,0 +1,19 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_IPC_IPC_H
+#define NNG_TRANSPORT_IPC_IPC_H
+
+// ipc transport. This is used for inter-process communication on
+// the same host computer.
+
+extern int nng_ipc_register(void);
+
+#endif // NNG_TRANSPORT_IPC_IPC_H
diff --git a/src/transport/tcp/CMakeLists.txt b/src/transport/tcp/CMakeLists.txt
new file mode 100644
index 00000000..305c357a
--- /dev/null
+++ b/src/transport/tcp/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# TCP protocol
+
+if (NNG_TRANSPORT_TCP)
+ set(TCP_SOURCES transport/tcp/tcp.c transport/tcp/tcp.h)
+ install(FILES tcp.h DESTINATION include/nng/transport/tcp)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${TCP_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 43b2890d..e33db865 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -854,9 +854,7 @@ static nni_tran_ep nni_tcp_ep_ops = {
.ep_options = nni_tcp_ep_options,
};
-// This is the TCP transport linkage, and should be the only global
-// symbol in this entire file.
-struct nni_tran nni_tcp_tran = {
+static nni_tran nni_tcp_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tcp",
.tran_ep = &nni_tcp_ep_ops,
@@ -864,3 +862,9 @@ struct nni_tran nni_tcp_tran = {
.tran_init = nni_tcp_tran_init,
.tran_fini = nni_tcp_tran_fini,
};
+
+int
+nng_tcp_register(void)
+{
+ return (nni_tran_register(&nni_tcp_tran));
+}
diff --git a/src/transport/tcp/tcp.h b/src/transport/tcp/tcp.h
new file mode 100644
index 00000000..b4c79461
--- /dev/null
+++ b/src/transport/tcp/tcp.h
@@ -0,0 +1,18 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_TCP_TCP_H
+#define NNG_TRANSPORT_TCP_TCP_H
+
+// TCP transport. This is used for communication over TCP/IP.
+
+extern int nng_tcp_register(void);
+
+#endif // NNG_TRANSPORT_TCP_TCP_H
diff --git a/src/transport/zerotier/CMakeLists.txt b/src/transport/zerotier/CMakeLists.txt
new file mode 100644
index 00000000..c1bb0c35
--- /dev/null
+++ b/src/transport/zerotier/CMakeLists.txt
@@ -0,0 +1,50 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# ZeroTier protocol
+
+set (NNG_TRANSPORT_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.")
+
+if (NNG_TRANSPORT_ZEROTIER)
+
+ # We use the libzerotiercore.a library, which is unfortunately a C++ object
+ # even though it exposes only public C symbols. It would be extremely
+ # helpful if libzerotiercore didn't make us carry the whole C++ runtime
+ # behind us. The user must specify the location of the ZeroTier source
+ # tree (dev branch for now, and already compiled please) by setting the
+ # NNG_ZEROTIER_SOURCE macro.
+ # NB: This needs to be the zerotierone tree, not the libzt library.
+ # This is because we don't access the API, but instead use the low
+ # level zerotiercore functionality directly.
+ # NB: As we wind up linking libzerotiercore.a into the application,
+ # this means that your application will *also* need to either be licensed
+ # under the GPLv3, or you will need to have a commercial license from
+ # ZeroTier permitting its use elsewhere.
+
+ enable_language(CXX)
+ find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_TRANSPORT_ZEROTIER_SOURCE})
+ if (NNG_LIBZTCORE)
+ set(CMAKE_REQUIRED_INCLUDES ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include)
+ message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}")
+ set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES})
+ set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES} PARENT_SCOPE)
+ set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include PARENT_SCOPE)
+ nng_check_sym(ZT_Node_join ZeroTierOne.h HAVE_ZTCORE)
+ endif()
+ if (NOT HAVE_ZTCORE)
+ message (FATAL_ERROR "Cannot find ZeroTier components")
+ endif()
+ message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}")
+
+ set(ZT_SOURCES transport/zerotier/zerotier.c transport/zerotier/zerotier.h)
+ install(FILES zerotier.h DESTINATION include/nng/transport/zerotier)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${ZT_SOURCES} PARENT_SCOPE)
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 7cb4786f..3263e0a5 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -61,20 +61,37 @@ if (NNG_TESTS)
math (EXPR TEST_PORT "${TEST_PORT}+20")
endmacro (add_nng_test)
- macro (add_nng_compat_test NAME TIMEOUT)
- list (APPEND all_tests ${NAME})
- add_executable (${NAME} ${NAME}.c)
- target_link_libraries (${NAME} ${PROJECT_NAME}_static)
- target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES})
- target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB)
- if (CMAKE_THREAD_LIBS_INIT)
- target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}")
- endif()
+ # Compatibility tests are only added if all of the legacy protocols
+ # are present. It's not worth trying to figure out which of these
+ # should work and which shouldn't.
+ if (NNG_PROTO_BUS0 AND
+ NNG_PROTO_PAIR0 AND
+ NNG_PROTO_REQ0 AND
+ NNG_PROTO_REP0 AND
+ NNG_PROTO_PUB0 AND
+ NNG_PROTO_SUB0 AND
+ NNG_PROTO_PUSH0 AND
+ NNG_PROTO_PULL0)
+
+ macro (add_nng_compat_test NAME TIMEOUT)
+ list (APPEND all_tests ${NAME})
+ add_executable (${NAME} ${NAME}.c)
+ target_link_libraries (${NAME} ${PROJECT_NAME}_static)
+ target_link_libraries (${NAME} ${NNG_REQUIRED_LIBRARIES})
+ target_compile_definitions(${NAME} PUBLIC -DNNG_STATIC_LIB)
+ if (CMAKE_THREAD_LIBS_INIT)
+ target_link_libraries (${NAME} "${CMAKE_THREAD_LIBS_INIT}")
+ endif()
- add_test (NAME ${NAME} COMMAND ${NAME} ${TEST_PORT})
- set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT})
- math (EXPR TEST_PORT "${TEST_PORT}+10")
- endmacro (add_nng_compat_test)
+ add_test (NAME ${NAME} COMMAND ${NAME} ${TEST_PORT})
+ set_tests_properties (${NAME} PROPERTIES TIMEOUT ${TIMEOUT})
+ math (EXPR TEST_PORT "${TEST_PORT}+10")
+ endmacro (add_nng_compat_test)
+ else ()
+ macro (add_nng_compat_test NAME TIMEOUT)
+ endmacro (add_nng_compat_test)
+ message (STATUS "Compatibility tests disabled (unconfigured legacy protocols)")
+ endif ()
macro (add_nng_cpp_test NAME TIMEOUT)
if (NOT NNG_ENABLE_COVERAGE)
@@ -130,11 +147,13 @@ add_nng_test(device 5)
add_nng_test(errors 2)
add_nng_test(pair1 5)
add_nng_test(udp 5)
-if (NNG_HAVE_ZEROTIER)
- add_nng_test(zt 60)
-endif()
+add_nng_test(zt 60)
# compatbility tests
+# We only support these if ALL the legacy protocols are supported. This
+# is because we don't want to make modifications to partially enable some
+# of these tests. Folks minimizing the library probably don't care too
+# much about these anyway.
add_nng_compat_test(compat_block 5)
add_nng_compat_test(compat_bug777 5)
add_nng_compat_test(compat_bus 5)
diff --git a/tests/aio.c b/tests/aio.c
index 2a31319d..0746fa51 100644
--- a/tests/aio.c
+++ b/tests/aio.c
@@ -11,6 +11,10 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/pair1/pair.h"
+
+#include "stubs.h"
+
#include <string.h>
#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
diff --git a/tests/bus.c b/tests/bus.c
index 8283bf21..88137b81 100644
--- a/tests/bus.c
+++ b/tests/bus.c
@@ -11,6 +11,10 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/bus0/bus.h"
+
+#include "stubs.h"
+
#include <string.h>
#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
diff --git a/tests/cplusplus_pair.cc b/tests/cplusplus_pair.cc
index 54a99737..11ce1912 100644
--- a/tests/cplusplus_pair.cc
+++ b/tests/cplusplus_pair.cc
@@ -8,59 +8,68 @@
//
#include "nng.h"
+#include "protocol/pair1/pair.h"
#include <cstring>
+#include <iostream>
#define SOCKET_ADDRESS "inproc://c++"
int
main(int argc, char **argv)
{
- nng_socket s1;
- nng_socket s2;
- int rv;
- size_t sz;
- char buf[8];
- if ((rv = nng_pair0_open(&s1)) != 0) {
- throw nng_strerror(rv);
- }
- if ((rv = nng_pair0_open(&s2)) != 0) {
- throw nng_strerror(rv);
- }
- if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) {
- throw nng_strerror(rv);
- }
- if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) {
- throw nng_strerror(rv);
- }
- if ((rv = nng_send(s2, (void *)"ABC", 4, 0)) != 0) {
- throw nng_strerror(rv);
- }
- sz = sizeof (buf);
- if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) {
- throw nng_strerror(rv);
- }
- if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) {
- throw "Contents did not match";
- }
- if ((rv = nng_send(s1, (void *)"DEF", 4, 0)) != 0) {
- throw nng_strerror(rv);
- }
- sz = sizeof (buf);
- if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) {
- throw nng_strerror(rv);
- }
- if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) {
- throw "Contents did not match";
- }
- if ((rv = nng_close(s1)) != 0) {
- throw nng_strerror(rv);
- }
- if ((rv = nng_close(s2)) != 0) {
- throw nng_strerror(rv);
- }
+#if defined(NNG_HAVE_PAIR1)
- return (0);
-}
+ nng_socket s1;
+ nng_socket s2;
+ int rv;
+ size_t sz;
+ char buf[8];
+
+ if ((rv = nng_pair1_open(&s1)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((rv = nng_pair1_open(&s2)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((rv = nng_send(s2, (void *) "ABC", 4, 0)) != 0) {
+ throw nng_strerror(rv);
+ }
+ sz = sizeof(buf);
+ if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) {
+ throw "Contents did not match";
+ }
+ if ((rv = nng_send(s1, (void *) "DEF", 4, 0)) != 0) {
+ throw nng_strerror(rv);
+ }
+ sz = sizeof(buf);
+ if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) {
+ throw "Contents did not match";
+ }
+ if ((rv = nng_close(s1)) != 0) {
+ throw nng_strerror(rv);
+ }
+ if ((rv = nng_close(s2)) != 0) {
+ throw nng_strerror(rv);
+ }
+ std::cout << "Pass." << std::endl;
+#else
+ std::cout << "Skipped (protocol unconfigured)." << std::endl;
+#endif
+
+ return (0);
+}
diff --git a/tests/device.c b/tests/device.c
index 5c650a91..56ad2cb5 100644
--- a/tests/device.c
+++ b/tests/device.c
@@ -10,6 +10,8 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/pair1/pair.h"
+#include "stubs.h"
#include <string.h>
@@ -68,8 +70,8 @@ Main({
So(nng_listen(dev1, addr1, NULL, 0) == 0);
So(nng_listen(dev2, addr2, NULL, 0) == 0);
- So(nng_pair_open(&end1) == 0);
- So(nng_pair_open(&end2) == 0);
+ So(nng_pair1_open(&end1) == 0);
+ So(nng_pair1_open(&end2) == 0);
So(nng_dial(end1, addr1, NULL, 0) == 0);
So(nng_dial(end2, addr2, NULL, 0) == 0);
diff --git a/tests/pair1.c b/tests/pair1.c
index 9ed73037..cd3a1035 100644
--- a/tests/pair1.c
+++ b/tests/pair1.c
@@ -10,8 +10,11 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/pair1/pair.h"
#include "trantest.h"
+#include "stubs.h"
+
#include <string.h>
#define SECOND(x) ((x) *1000)
diff --git a/tests/pipeline.c b/tests/pipeline.c
index 67be57c6..96ae3d17 100644
--- a/tests/pipeline.c
+++ b/tests/pipeline.c
@@ -10,6 +10,10 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "stubs.h"
+
#include <string.h>
#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
diff --git a/tests/pollfd.c b/tests/pollfd.c
index e1965b09..8526c705 100644
--- a/tests/pollfd.c
+++ b/tests/pollfd.c
@@ -8,9 +8,6 @@
// found online at https://opensource.org/licenses/MIT.
//
-#include "convey.h"
-#include "nng.h"
-
#include <string.h>
#ifndef _WIN32
@@ -32,14 +29,21 @@
#endif
+#include "convey.h"
+#include "nng.h"
+#include "protocol/pair1/pair.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "stubs.h"
+
TestMain("Poll FDs", {
Convey("Given a connected pair of sockets", {
nng_socket s1;
nng_socket s2;
- So(nng_pair_open(&s1) == 0);
- So(nng_pair_open(&s2) == 0);
+ So(nng_pair1_open(&s1) == 0);
+ So(nng_pair1_open(&s2) == 0);
Reset({
nng_close(s1);
nng_close(s2);
@@ -106,26 +110,25 @@ TestMain("Poll FDs", {
So(nng_getopt(s1, NNG_OPT_RECVFD, &fd, &sz) == 0);
So(sz == sizeof(fd));
});
- Convey("We cannot get a send FD for PULL", {
- nng_socket s3;
- int fd;
- size_t sz;
- So(nng_pull_open(&s3) == 0);
- Reset({ nng_close(s3); });
- sz = sizeof(fd);
- So(nng_getopt(s3, NNG_OPT_SENDFD, &fd, &sz) ==
- NNG_ENOTSUP);
- });
+ });
- Convey("We cannot get a recv FD for PUSH", {
- nng_socket s3;
- int fd;
- size_t sz;
- So(nng_push_open(&s3) == 0);
- Reset({ nng_close(s3); });
- sz = sizeof(fd);
- So(nng_getopt(s3, NNG_OPT_RECVFD, &fd, &sz) ==
- NNG_ENOTSUP);
- });
+ Convey("We cannot get a send FD for PULL", {
+ nng_socket s3;
+ int fd;
+ size_t sz;
+ So(nng_pull0_open(&s3) == 0);
+ Reset({ nng_close(s3); });
+ sz = sizeof(fd);
+ So(nng_getopt(s3, NNG_OPT_SENDFD, &fd, &sz) == NNG_ENOTSUP);
+ });
+
+ Convey("We cannot get a recv FD for PUSH", {
+ nng_socket s3;
+ int fd;
+ size_t sz;
+ So(nng_push0_open(&s3) == 0);
+ Reset({ nng_close(s3); });
+ sz = sizeof(fd);
+ So(nng_getopt(s3, NNG_OPT_RECVFD, &fd, &sz) == NNG_ENOTSUP);
});
})
diff --git a/tests/pubsub.c b/tests/pubsub.c
index a8209a7d..796d951e 100644
--- a/tests/pubsub.c
+++ b/tests/pubsub.c
@@ -10,6 +10,9 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/pubsub0/pub.h"
+#include "protocol/pubsub0/sub.h"
+#include "stubs.h"
#include <string.h>
@@ -34,6 +37,16 @@ TestMain("PUB/SUB pattern", {
nng_msg *msg;
So(nng_recvmsg(pub, &msg, 0) == NNG_ENOTSUP);
});
+
+ Convey("It cannot subscribe", {
+ So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) ==
+ NNG_ENOTSUP);
+ });
+
+ Convey("It cannot unsubscribe", {
+ So(nng_setopt(pub, NNG_OPT_SUB_UNSUBSCRIBE, "", 0) ==
+ NNG_ENOTSUP);
+ });
});
Convey("We can create a SUB socket", {
@@ -48,6 +61,23 @@ TestMain("PUB/SUB pattern", {
So(nng_sendmsg(sub, msg, 0) == NNG_ENOTSUP);
nng_msg_free(msg);
});
+
+ Convey("It can subscribe", {
+ So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) ==
+ 0);
+ So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0);
+ Convey("And it can unsubscribe", {
+ So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE,
+ "ABC", 3) == 0);
+ So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "",
+ 0) == 0);
+
+ So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "",
+ 0) == NNG_ENOENT);
+ So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE,
+ "HELLO", 0) == NNG_ENOENT);
+ });
+ });
});
Convey("We can create a linked PUB/SUB pair", {
@@ -73,28 +103,6 @@ TestMain("PUB/SUB pattern", {
nng_msleep(20); // give time for connecting threads
- Convey("Sub can subscribe", {
- So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "ABC", 3) ==
- 0);
- So(nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0) == 0);
- Convey("Unsubscribe works", {
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE,
- "ABC", 3) == 0);
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "",
- 0) == 0);
-
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE, "",
- 0) == NNG_ENOENT);
- So(nng_setopt(sub, NNG_OPT_SUB_UNSUBSCRIBE,
- "HELLO", 0) == NNG_ENOENT);
- });
- });
-
- Convey("Pub cannot subscribe", {
- So(nng_setopt(pub, NNG_OPT_SUB_SUBSCRIBE, "", 0) ==
- NNG_ENOTSUP);
- });
-
Convey("Subs can receive from pubs", {
nng_msg *msg;
diff --git a/tests/reconnect.c b/tests/reconnect.c
index 8179055d..e191f70a 100644
--- a/tests/reconnect.c
+++ b/tests/reconnect.c
@@ -10,6 +10,10 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "stubs.h"
+
#include <string.h>
#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
diff --git a/tests/reqrep.c b/tests/reqrep.c
index df0621bc..4e837c34 100644
--- a/tests/reqrep.c
+++ b/tests/reqrep.c
@@ -10,6 +10,9 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "stubs.h"
#include <string.h>
diff --git a/tests/resolv.c b/tests/resolv.c
index 49d258c3..0678b05f 100644
--- a/tests/resolv.c
+++ b/tests/resolv.c
@@ -51,26 +51,6 @@ ip6tostr(void *addr)
// too much on them, since localhost can be configured weirdly. Notably
// the normal assumptions on Linux do *not* hold true.
#if 0
-
- Convey("Localhost IPv4 resolves", {
- nni_aio aio;
- const char *str;
- memset(&aio, 0, sizeof (aio));
- nni_aio_init(&aio, NULL, NULL);
- nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1,
- &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
- So(aio.a_naddrs == 1);
- So(aio.a_addrs[0].s_un.s_in.sa_family == NNG_AF_INET);
- So(aio.a_addrs[0].s_un.s_in.sa_port == ntohs(80));
- So(aio.a_addrs[0].s_un.s_in.sa_addr == ntohl(0x7f000001));
- str = ip4tostr(&aio.a_addrs[0].s_un.s_in.sa_addr);
- So(strcmp(str, "127.0.0.1") == 0);
- nni_aio_fini(&aio);
- }
- );
-
Convey("Localhost IPv6 resolves", {
nni_aio aio;
memset(&aio, 0, sizeof (aio));
@@ -87,44 +67,6 @@ ip6tostr(void *addr)
So(strcmp(str, "::1") == 0);
nni_aio_fini(&aio);
}
- );
- Convey("Localhost UNSPEC resolves", {
- nni_aio aio;
- memset(&aio, 0, sizeof (aio));
- const char *str;
- int i;
- nni_aio_init(&aio, NULL, NULL);
- nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1,
- &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
- So(aio.a_naddrs == 2);
- for (i = 0; i < 2; i++) {
- switch (aio.a_addrs[i].s_un.s_family) {
- case NNG_AF_INET6:
- So(aio.a_addrs[i].s_un.s_in6.sa_port ==
- ntohs(80));
- str =
- ip6tostr(&aio.a_addrs[i].s_un.s_in6.sa_addr);
- So(strcmp(str, "::1") == 0);
- break;
-
- case NNG_AF_INET:
- So(aio.a_addrs[i].s_un.s_in.sa_port ==
- ntohs(80));
- str =
- ip4tostr(&aio.a_addrs[i].s_un.s_in.sa_addr);
- So(strcmp(str, "127.0.0.1") == 0);
- break;
- default:
- So(1 == 0);
- }
- }
- So(aio.a_addrs[0].s_un.s_family !=
- aio.a_addrs[1].s_un.s_family);
- nni_aio_fini(&aio);
- }
- );
#endif
TestMain("Resolver", {
@@ -179,11 +121,18 @@ TestMain("Resolver", {
So(strcmp(str, "8.8.4.4") == 0);
nni_aio_fini(aio);
});
+
Convey("Numeric v6 resolves", {
nni_aio * aio;
const char * str;
nng_sockaddr sa;
+ // Travis CI has moved some of their services to host that
+ // apparently don't support IPv6 at all. This is very sad.
+ if (getenv("TRAVIS") != NULL) {
+ ConveySkip("IPv6 missing from CI provider");
+ }
+
nni_aio_init(&aio, NULL, NULL);
aio->a_addr = &sa;
nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio);
@@ -230,5 +179,51 @@ TestMain("Resolver", {
nni_aio_fini(aio);
});
+ Convey("Localhost IPv4 resolves", {
+ nni_aio * aio;
+ const char * str;
+ nng_sockaddr sa;
+
+ nni_aio_init(&aio, NULL, NULL);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("localhost", "80", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+ So(sa.s_un.s_in.sa_family == NNG_AF_INET);
+ So(sa.s_un.s_in.sa_port == ntohs(80));
+ So(sa.s_un.s_in.sa_addr == ntohl(0x7f000001));
+ str = ip4tostr(&sa.s_un.s_in.sa_addr);
+ So(strcmp(str, "127.0.0.1") == 0);
+ nni_aio_fini(aio);
+ });
+
+ Convey("Localhost UNSPEC resolves", {
+ nni_aio * aio;
+ const char * str;
+ nng_sockaddr sa;
+
+ nni_aio_init(&aio, NULL, NULL);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("localhost", "80", NNG_AF_UNSPEC, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
+ So((sa.s_un.s_family == NNG_AF_INET) ||
+ (sa.s_un.s_family == NNG_AF_INET6));
+ switch (sa.s_un.s_family) {
+ case NNG_AF_INET:
+ So(sa.s_un.s_in.sa_port == ntohs(80));
+ So(sa.s_un.s_in.sa_addr == ntohl(0x7f000001));
+ str = ip4tostr(&sa.s_un.s_in.sa_addr);
+ So(strcmp(str, "127.0.0.1") == 0);
+ break;
+ case NNG_AF_INET6:
+ So(sa.s_un.s_in6.sa_port == ntohs(80));
+ str = ip6tostr(&sa.s_un.s_in6.sa_addr);
+ So(strcmp(str, "::1") == 0);
+ break;
+ }
+ nni_aio_fini(aio);
+ });
+
nni_fini();
})
diff --git a/tests/scalability.c b/tests/scalability.c
index b1a9b767..d869d5fc 100644
--- a/tests/scalability.c
+++ b/tests/scalability.c
@@ -11,6 +11,10 @@
#include "convey.h"
#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "stubs.h"
+
#include <string.h>
static int nclients = 200;
diff --git a/tests/sock.c b/tests/sock.c
index 1c277b46..d3af414a 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -12,6 +12,9 @@
#include "nng.h"
#include "trantest.h"
+#include "protocol/pubsub0/sub.h"
+
+#include "protocol/pair1/pair.h"
#include "stubs.h"
#include <string.h>
diff --git a/tests/stubs.h b/tests/stubs.h
index 780ac772..0641c970 100644
--- a/tests/stubs.h
+++ b/tests/stubs.h
@@ -44,4 +44,51 @@ getms(void)
#endif
}
+int
+nosocket(nng_socket *s)
+{
+ ConveySkip("Protocol unconfigured");
+ return (NNG_ENOTSUP);
+}
+
+#ifndef NNG_HAVE_REQ0
+#define nng_req0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_REP0
+#define nng_rep0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_PUB0
+#define nng_pub0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_SUB0
+#define nng_sub0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_PAIR0
+#define nng_pair0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_PAIR1
+#define nng_pair1_open nosocket
+#endif
+
+#ifndef NNG_HAVE_PUSH0
+#define nng_push0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_PULL0
+#define nng_pull0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_SURVEYOR0
+#define nng_surveyor0_open nosocket
+#endif
+
+#ifndef NNG_HAVE_RESPONDENT0
+#define nng_respondent0_open nosocket
+#endif
+
#endif // STUBS_H
diff --git a/tests/survey.c b/tests/survey.c
index f4fab009..a3a7ba1d 100644
--- a/tests/survey.c
+++ b/tests/survey.c
@@ -9,7 +9,11 @@
//
#include "convey.h"
+
#include "nng.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
+#include "stubs.h"
#include <string.h>
diff --git a/tests/tcp.c b/tests/tcp.c
index 79fe1893..fdcf458c 100644
--- a/tests/tcp.c
+++ b/tests/tcp.c
@@ -9,8 +9,11 @@
//
#include "convey.h"
+#include "nng.h"
+#include "protocol/pair1/pair.h"
#include "trantest.h"
+#include "stubs.h"
// TCP tests.
#ifndef _WIN32
diff --git a/tests/tcp6.c b/tests/tcp6.c
index b1695c84..a3e55d18 100644
--- a/tests/tcp6.c
+++ b/tests/tcp6.c
@@ -13,6 +13,9 @@
#include "core/nng_impl.h"
// TCP tests for IPv6.
+#include "protocol/pair1/pair.h"
+
+#include "stubs.h"
static int
has_v6(void)
diff --git a/tests/trantest.h b/tests/trantest.h
index 37763d24..d334c257 100644
--- a/tests/trantest.h
+++ b/tests/trantest.h
@@ -11,6 +11,8 @@
#include "convey.h"
#include "core/nng_impl.h"
#include "nng.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
#include <stdlib.h>
#include <string.h>
@@ -30,9 +32,54 @@ unsigned trantest_port = 0;
typedef int (*trantest_proptest_t)(nng_msg *, nng_listener, nng_dialer);
+#ifndef NNG_HAVE_ZEROTIER
+#define nng_zt_register notransport
+#endif
+#ifndef NNG_HAVE_INPROC
+#define nng_inproc_register notransport
+#endif
+#ifndef NNG_HAVE_IPC
+#define nng_ipc_register notransport
+#endif
+#ifndef NNG_HAVE_TCP
+#define nng_tcp_register notransport
+#endif
+
+int
+notransport(void)
+{
+ ConveySkip("Transport not configured");
+ return (NNG_ENOTSUP);
+}
+
+#define CHKTRAN(s, t) \
+ if (strncmp(s, t, strlen(t)) == 0) \
+ notransport()
+
+void
+trantest_checktran(const char *url)
+{
+#ifndef NNG_HAVE_ZEROTIER
+ CHKTRAN(url, "zt:");
+#endif
+#ifndef NNG_HAVE_INPROC
+ CHKTRAN(url, "inproc:");
+#endif
+#ifndef NNG_HAVE_IPC
+ CHKTRAN(url, "ipc:");
+#endif
+#ifndef NNG_HAVE_TCP
+ CHKTRAN(url, "tcp:");
+#endif
+
+ (void) url;
+}
+
void
trantest_next_address(char *out, const char *template)
{
+ trantest_checktran(template);
+
if (trantest_port == 0) {
char *pstr;
trantest_port = 5555;
@@ -56,11 +103,16 @@ void
trantest_init(trantest *tt, const char *addr)
{
trantest_next_address(tt->addr, addr);
+
+#if defined(NNG_HAVE_REQ0) && defined(NNG_HAVE_REP0)
So(nng_req_open(&tt->reqsock) == 0);
So(nng_rep_open(&tt->repsock) == 0);
tt->tran = nni_tran_find(addr);
So(tt->tran != NULL);
+#else
+ ConveySkip("Missing REQ or REP protocols");
+#endif
}
void
diff --git a/tests/zt.c b/tests/zt.c
index 562f4ee1..9680f82d 100644
--- a/tests/zt.c
+++ b/tests/zt.c
@@ -9,9 +9,12 @@
//
#include "convey.h"
+#include "nng.h"
+#include "protocol/pair0/pair.h"
+#include "transport/zerotier/zerotier.h"
#include "trantest.h"
-#include "transport/zerotier/zerotier.h"
+#include "stubs.h"
// zerotier tests.
@@ -35,6 +38,10 @@ mkdir(const char *path, int mode)
#include <unistd.h>
#endif // WIN32
+#ifndef NNG_HAVE_ZEROTIER
+#define nng_zt_network_status_ok 0
+#endif
+
static int
check_props(nng_msg *msg, nng_listener l, nng_dialer d)
{
@@ -197,6 +204,8 @@ TestMain("ZeroTier Transport", {
char addr[NNG_MAXADDRLEN];
int rv;
+ So(nng_zt_register() == 0);
+
snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port);
So(nng_pair_open(&s) == 0);
@@ -240,6 +249,8 @@ TestMain("ZeroTier Transport", {
// uint64_t node = 0xb000072fa6ull; // my personal host
uint64_t node = 0x2d2f619cccull; // my personal host
+ So(nng_zt_register() == 0);
+
snprintf(addr, sizeof(addr), "zt://" NWID "/%llx:%u",
(unsigned long long) node, port);
@@ -258,6 +269,8 @@ TestMain("ZeroTier Transport", {
uint64_t node1 = 0;
uint64_t node2 = 0;
+ So(nng_zt_register() == 0);
+
snprintf(addr, sizeof(addr), "zt://" NWID ":%u", port);
So(nng_pair_open(&s) == 0);
@@ -306,6 +319,7 @@ TestMain("ZeroTier Transport", {
port = 9944;
// uint64_t node = 0xb000072fa6ull; // my personal host
+ So(nng_zt_register() == 0);
snprintf(addr1, sizeof(addr1), "zt://" NWID ":%u", port);