aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt49
-rw-r--r--src/core/msgqueue.c1
-rw-r--r--src/core/protocol.h35
-rw-r--r--src/core/socket.c1
-rw-r--r--src/core/transport.c40
-rw-r--r--src/nng.h36
-rw-r--r--src/nng_compat.c36
-rw-r--r--src/protocol/bus0/CMakeLists.txt18
-rw-r--r--src/protocol/bus0/bus.c (renamed from src/protocol/bus/bus.c)205
-rw-r--r--src/protocol/bus0/bus.h28
-rw-r--r--src/protocol/pair0/CMakeLists.txt18
-rw-r--r--src/protocol/pair0/pair.c (renamed from src/protocol/pair/pair_v0.c)4
-rw-r--r--src/protocol/pair0/pair.h28
-rw-r--r--src/protocol/pair1/CMakeLists.txt18
-rw-r--r--src/protocol/pair1/pair.c (renamed from src/protocol/pair/pair_v1.c)6
-rw-r--r--src/protocol/pair1/pair.h30
-rw-r--r--src/protocol/pipeline0/CMakeLists.txt23
-rw-r--r--src/protocol/pipeline0/pull.c (renamed from src/protocol/pipeline/pull.c)146
-rw-r--r--src/protocol/pipeline0/pull.h28
-rw-r--r--src/protocol/pipeline0/push.c (renamed from src/protocol/pipeline/push.c)138
-rw-r--r--src/protocol/pipeline0/push.h28
-rw-r--r--src/protocol/pubsub0/CMakeLists.txt23
-rw-r--r--src/protocol/pubsub0/pub.c (renamed from src/protocol/pubsub/pub.c)169
-rw-r--r--src/protocol/pubsub0/pub.h28
-rw-r--r--src/protocol/pubsub0/sub.c (renamed from src/protocol/pubsub/sub.c)184
-rw-r--r--src/protocol/pubsub0/sub.h31
-rw-r--r--src/protocol/reqrep0/CMakeLists.txt23
-rw-r--r--src/protocol/reqrep0/rep.c (renamed from src/protocol/reqrep/rep.c)227
-rw-r--r--src/protocol/reqrep0/rep.h28
-rw-r--r--src/protocol/reqrep0/req.c (renamed from src/protocol/reqrep/req.c)239
-rw-r--r--src/protocol/reqrep0/req.h30
-rw-r--r--src/protocol/survey0/CMakeLists.txt23
-rw-r--r--src/protocol/survey0/respond.c (renamed from src/protocol/survey/respond.c)231
-rw-r--r--src/protocol/survey0/respond.h28
-rw-r--r--src/protocol/survey0/survey.c (renamed from src/protocol/survey/survey.c)205
-rw-r--r--src/protocol/survey0/survey.h30
-rw-r--r--src/transport/inproc/CMakeLists.txt18
-rw-r--r--src/transport/inproc/inproc.c3
-rw-r--r--src/transport/ipc/CMakeLists.txt18
-rw-r--r--src/transport/ipc/ipc.c10
-rw-r--r--src/transport/ipc/ipc.h19
-rw-r--r--src/transport/tcp/CMakeLists.txt18
-rw-r--r--src/transport/tcp/tcp.c10
-rw-r--r--src/transport/tcp/tcp.h18
-rw-r--r--src/transport/zerotier/CMakeLists.txt50
45 files changed, 1640 insertions, 939 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a5a49052..4901789b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -74,30 +74,6 @@ set (NNG_SOURCES
core/timer.h
core/transport.c
core/transport.h
-
- protocol/bus/bus.c
-
- protocol/pair/pair_v0.c
- protocol/pair/pair_v1.c
-
- protocol/pipeline/pull.c
- protocol/pipeline/push.c
-
- protocol/pubsub/pub.c
- protocol/pubsub/sub.c
-
- protocol/reqrep/rep.c
- protocol/reqrep/req.c
-
- protocol/survey/respond.c
- protocol/survey/survey.c
-
- transport/inproc/inproc.c
-
- transport/ipc/ipc.c
-
- transport/tcp/tcp.c
-
)
if (NNG_PLATFORM_POSIX)
@@ -146,14 +122,19 @@ endif()
install(FILES transport/inproc/inproc.h
DESTINATION include/nng/transport/inproc)
-if (NNG_ENABLE_ZEROTIER)
- set (NNG_SOURCES ${NNG_SOURCES}
- transport/zerotier/zerotier.c
- transport/zerotier/zerotier.h
- )
- install(FILES transport/zerotier/zerotier.h
- DESTINATION include/nng/transport/zerotier)
-endif()
+
+add_subdirectory(protocol/bus0)
+add_subdirectory(protocol/pair0)
+add_subdirectory(protocol/pair1)
+add_subdirectory(protocol/pipeline0)
+add_subdirectory(protocol/pubsub0)
+add_subdirectory(protocol/reqrep0)
+add_subdirectory(protocol/survey0)
+
+add_subdirectory(transport/inproc)
+add_subdirectory(transport/ipc)
+add_subdirectory(transport/tcp)
+add_subdirectory(transport/zerotier)
include_directories(AFTER SYSTEM ${PROJECT_SOURCE_DIR}/src
${NNG_REQUIRED_INCLUDES})
@@ -216,3 +197,7 @@ install (TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_static
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
+
+# Promote settings to parent
+set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} PARENT_SCOPE)
+set(NNG_REQUIRED_LFLAGS ${NNG_REQUIRED_LFLAGS} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3f1ffcb5..10bffaa4 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -297,7 +297,6 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
- nni_aio *aio;
if (mq->mq_cb_fn != NULL) {
int flags = 0;
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 5db259f0..39bde059 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -147,21 +147,26 @@ extern int nni_proto_open(nng_socket *, const nni_proto *);
// Protocol numbers are never more than 16 bits. Also, there will never be
// a valid protocol numbered 0 (NNG_PROTO_NONE).
#define NNI_PROTO(major, minor) (((major) *16) + (minor))
-enum nng_proto_enum {
- NNI_PROTO_NONE = NNI_PROTO(0, 0),
- NNI_PROTO_PAIR_V0 = NNI_PROTO(1, 0),
- NNI_PROTO_PAIR_V1 = NNI_PROTO(1, 1),
- NNI_PROTO_PUB_V0 = NNI_PROTO(2, 0),
- NNI_PROTO_SUB_V0 = NNI_PROTO(2, 1),
- NNI_PROTO_REQ_V0 = NNI_PROTO(3, 0),
- NNI_PROTO_REP_V0 = NNI_PROTO(3, 1),
- NNI_PROTO_PUSH_V0 = NNI_PROTO(5, 0),
- NNI_PROTO_PULL_V0 = NNI_PROTO(5, 1),
- NNI_PROTO_SURVEYOR_V0 = NNI_PROTO(6, 2),
- NNI_PROTO_RESPONDENT_V0 = NNI_PROTO(6, 3),
- NNI_PROTO_BUS_V0 = NNI_PROTO(7, 0),
- NNI_PROTO_STAR_V0 = NNI_PROTO(100, 0),
-};
+
+// Protocol major numbers. This is here for documentation only, and
+// to serve as a "registry" for managing new protocol numbers. Consider
+// updating this table when adding new protocols.
+//
+// Protocol Maj Min Name Notes
+// -------------------------------------------
+// NONE 0 0 reserved
+// PAIRv0 1 0 pair
+// PAIRv1 1 1 pair1 nng only, experimental
+// PUBv0 2 0 pub
+// SUBv0 2 1 sub
+// REQv0 3 0 req
+// REPv0 3 1 rep
+// PUSHv0 5 0 push
+// PULLv0 5 1 pull
+// SURVEYORv0 6 2 surveyor minors 0 & 1 retired
+// RESPONDENTv0 6 3 respondent
+// BUSv0 7 0 bus
+// STARv0 100 0 star mangos only, experimental
extern int nni_proto_sys_init(void);
extern void nni_proto_sys_fini(void);
diff --git a/src/core/socket.c b/src/core/socket.c
index c9b70ccb..4a84b639 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -102,7 +102,6 @@ static int
nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
{
int rv;
- uint32_t flags;
nni_notifyfd *fd;
nni_msgq * mq;
nni_msgq_cb cb;
diff --git a/src/core/transport.c b/src/core/transport.c
index 6dcf4538..af9c93fb 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -10,6 +10,9 @@
#include "core/nng_impl.h"
#include "transport/inproc/inproc.h"
+#include "transport/ipc/ipc.h"
+#include "transport/tcp/tcp.h"
+#include "transport/zerotier/zerotier.h"
#include <stdio.h>
#include <string.h>
@@ -51,6 +54,11 @@ nni_tran_register(const nni_tran *tran)
nni_mtx_lock(&nni_tran_lk);
// Check to see if the transport is already registered...
NNI_LIST_FOREACH (&nni_tran_list, t) {
+ if (tran->tran_init == t->t_tran.tran_init) {
+ nni_mtx_unlock(&nni_tran_lk);
+ // Same transport, duplicate registration.
+ return (0);
+ }
if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) {
nni_mtx_unlock(&nni_tran_lk);
return (NNG_ESTATE);
@@ -129,20 +137,40 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz)
// nni_tran_sys_init initializes the entire transport subsystem, including
// each individual transport.
+
+typedef int (*nni_tran_ctor)(void);
+
+static nni_tran_ctor nni_tran_ctors[] = {
+#ifdef NNG_HAVE_INPROC
+ nng_inproc_register,
+#endif
+#ifdef NNG_HAVE_IPC
+ nng_ipc_register,
+#endif
+#ifdef NNG_HAVE_TCP
+ nng_tcp_register,
+#endif
+#ifdef NNI_HAVE_ZEROTIER
+ nng_zt_register,
+#endif
+ NULL,
+};
+
int
nni_tran_sys_init(void)
{
- int rv;
+ int i;
nni_tran_inited = 1;
NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node);
nni_mtx_init(&nni_tran_lk);
- if (((rv = nng_inproc_register()) != 0) ||
- ((rv = nni_tran_register(&nni_ipc_tran)) != 0) ||
- ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) {
- nni_tran_sys_fini();
- return (rv);
+ for (i = 0; nni_tran_ctors[i] != NULL; i++) {
+ int rv;
+ if ((rv = (nni_tran_ctors[i])()) != 0) {
+ nni_tran_sys_fini();
+ return (rv);
+ }
}
return (0);
}
diff --git a/src/nng.h b/src/nng.h
index c39ba9ee..6d375509 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -337,33 +337,6 @@ enum nng_flag_enum {
NNG_FLAG_NONBLOCK = 2, // Non-blocking operations.
};
-// Builtin protocol socket constructors.
-NNG_DECL int nng_bus0_open(nng_socket *);
-NNG_DECL int nng_pair0_open(nng_socket *);
-NNG_DECL int nng_pair1_open(nng_socket *);
-NNG_DECL int nng_pub0_open(nng_socket *);
-NNG_DECL int nng_sub0_open(nng_socket *);
-NNG_DECL int nng_push0_open(nng_socket *);
-NNG_DECL int nng_pull0_open(nng_socket *);
-NNG_DECL int nng_req0_open(nng_socket *);
-NNG_DECL int nng_rep0_open(nng_socket *);
-NNG_DECL int nng_surveyor0_open(nng_socket *);
-NNG_DECL int nng_respondent0_open(nng_socket *);
-
-// Default versions. These provide compile time defaults; note that
-// the actual protocols are baked into the binary; this should avoid
-// suprising. Choosing a new protocol should be done explicitly.
-#define nng_bus_open nng_bus0_open
-#define nng_pair_open nng_pair1_open
-#define nng_pub_open nng_pub0_open
-#define nng_sub_open nng_sub0_open
-#define nng_push_open nng_push0_open
-#define nng_pull_open nng_pull0_open
-#define nng_req_open nng_req0_open
-#define nng_rep_open nng_rep0_open
-#define nng_surveyor_open nng_surveyor0_open
-#define nng_respondent_open nng_respondent0_open
-
// Options.
#define NNG_OPT_SOCKNAME "socket-name"
#define NNG_OPT_DOMAIN "compat:domain" // legacy compat only
@@ -385,15 +358,6 @@ NNG_DECL int nng_respondent0_open(nng_socket *);
#define NNG_OPT_RECONNMINT "reconnect-time-min"
#define NNG_OPT_RECONNMAXT "reconnect-time-max"
-#define NNG_OPT_PAIR1_POLY "pair1:polyamorous"
-
-#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
-#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
-
-#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
-
-#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time"
-
// XXX: TBD: priorities, socket names, ipv4only
// Statistics. These are for informational purposes only, and subject
diff --git a/src/nng_compat.c b/src/nng_compat.c
index f8a7ceb0..482834a1 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -10,6 +10,16 @@
#include "nng_compat.h"
#include "nng.h"
+#include "protocol/bus0/bus.h"
+#include "protocol/pair0/pair.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "protocol/pubsub0/pub.h"
+#include "protocol/pubsub0/sub.h"
+#include "protocol/reqrep0/rep.h"
+#include "protocol/reqrep0/req.h"
+#include "protocol/survey0/respond.h"
+#include "protocol/survey0/survey.h"
#include <stdio.h>
#include <string.h>
@@ -92,17 +102,37 @@ static const struct {
uint16_t p_id;
int (*p_open)(nng_socket *);
} nn_protocols[] = {
- // clang-format off
+// clang-format off
+#ifdef NNG_HAVE_BUS0
{ NN_BUS, nng_bus0_open },
+#endif
+#ifdef NNG_HAVE_PAIR0
{ NN_PAIR, nng_pair0_open },
+#endif
+#ifdef NNG_HAVE_PUSH0
{ NN_PUSH, nng_push0_open },
+#endif
+#ifdef NNG_HAVE_PULL0
{ NN_PULL, nng_pull0_open },
+#endif
+#ifdef NNG_HAVE_PUB0
{ NN_PUB, nng_pub0_open },
+#endif
+#ifdef NNG_HAVE_SUB0
{ NN_SUB, nng_sub0_open },
+#endif
+#ifdef NNG_HAVE_REQ0
{ NN_REQ, nng_req0_open },
+#endif
+#ifdef NNG_HAVE_REP0
{ NN_REP, nng_rep0_open },
+#endif
+#ifdef NNG_HAVE_SURVEYOR0
{ NN_SURVEYOR, nng_surveyor0_open },
+#endif
+#ifdef NNG_HAVE_RESPONDENT0
{ NN_RESPONDENT, nng_respondent0_open },
+#endif
{ 0, NULL },
// clang-format on
};
@@ -115,7 +145,7 @@ nn_socket(int domain, int protocol)
int i;
if ((domain != AF_SP) && (domain != AF_SP_RAW)) {
- nn_seterror(EAFNOSUPPORT);
+ errno = EAFNOSUPPORT;
return (-1);
}
@@ -125,7 +155,7 @@ nn_socket(int domain, int protocol)
}
}
if (nn_protocols[i].p_open == NULL) {
- nn_seterror(ENOTSUP);
+ errno = ENOTSUP;
return (-1);
}
diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt
new file mode 100644
index 00000000..5071054a
--- /dev/null
+++ b/src/protocol/bus0/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Bus protocol
+
+if (NNG_PROTO_BUS0)
+ set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h)
+ install(FILES bus.h DESTINATION include/nng/protocol/bus0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c
index 6ec0066b..3e15000b 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -12,31 +12,36 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/bus0/bus.h"
// Bus protocol. The BUS protocol, each peer sends a message to its peers.
// However, bus protocols do not "forward" (absent a device). So in order
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct bus_pipe bus_pipe;
-typedef struct bus_sock bus_sock;
+#ifndef NNI_PROTO_BUS_V0
+#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0)
+#endif
-static void bus_sock_getq(bus_sock *);
-static void bus_sock_send(void *, nni_aio *);
-static void bus_sock_recv(void *, nni_aio *);
+typedef struct bus0_pipe bus0_pipe;
+typedef struct bus0_sock bus0_sock;
-static void bus_pipe_getq(bus_pipe *);
-static void bus_pipe_send(bus_pipe *);
-static void bus_pipe_recv(bus_pipe *);
+static void bus0_sock_getq(bus0_sock *);
+static void bus0_sock_send(void *, nni_aio *);
+static void bus0_sock_recv(void *, nni_aio *);
-static void bus_sock_getq_cb(void *);
-static void bus_pipe_getq_cb(void *);
-static void bus_pipe_send_cb(void *);
-static void bus_pipe_recv_cb(void *);
-static void bus_pipe_putq_cb(void *);
+static void bus0_pipe_getq(bus0_pipe *);
+static void bus0_pipe_send(bus0_pipe *);
+static void bus0_pipe_recv(bus0_pipe *);
-// A bus_sock is our per-socket protocol private structure.
-struct bus_sock {
+static void bus0_sock_getq_cb(void *);
+static void bus0_pipe_getq_cb(void *);
+static void bus0_pipe_send_cb(void *);
+static void bus0_pipe_recv_cb(void *);
+static void bus0_pipe_putq_cb(void *);
+
+// bus0_sock is our per-socket protocol private structure.
+struct bus0_sock {
int raw;
nni_aio * aio_getq;
nni_list pipes;
@@ -45,10 +50,10 @@ struct bus_sock {
nni_msgq *urq;
};
-// A bus_pipe is our per-pipe protocol private structure.
-struct bus_pipe {
+// bus0_pipe is our per-pipe protocol private structure.
+struct bus0_pipe {
nni_pipe * npipe;
- bus_sock * psock;
+ bus0_sock * psock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
@@ -59,9 +64,9 @@ struct bus_pipe {
};
static void
-bus_sock_fini(void *arg)
+bus0_sock_fini(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -70,18 +75,18 @@ bus_sock_fini(void *arg)
}
static int
-bus_sock_init(void **sp, nni_sock *nsock)
+bus0_sock_init(void **sp, nni_sock *nsock)
{
- bus_sock *s;
- int rv;
+ bus0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&s->pipes, bus_pipe, node);
+ NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) {
- bus_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
+ bus0_sock_fini(s);
return (rv);
}
s->raw = 0;
@@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock)
}
static void
-bus_sock_open(void *arg)
+bus0_sock_open(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
- bus_sock_getq(s);
+ bus0_sock_getq(s);
}
static void
-bus_sock_close(void *arg)
+bus0_sock_close(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-bus_pipe_fini(void *arg)
+bus0_pipe_fini(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -123,10 +128,10 @@ bus_pipe_fini(void *arg)
}
static int
-bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
+bus0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- bus_pipe *p;
- int rv;
+ bus0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) {
- bus_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
+ bus0_pipe_fini(p);
return (rv);
}
@@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-bus_pipe_start(void *arg)
+bus0_pipe_start(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
- bus_pipe_recv(p);
- bus_pipe_getq(p);
+ bus0_pipe_recv(p);
+ bus0_pipe_getq(p);
return (0);
}
static void
-bus_pipe_stop(void *arg)
+bus0_pipe_stop(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
nni_msgq_close(p->sendq);
@@ -185,9 +190,9 @@ bus_pipe_stop(void *arg)
}
static void
-bus_pipe_getq_cb(void *arg)
+bus0_pipe_getq_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
// closed?
@@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg)
}
static void
-bus_pipe_send_cb(void *arg)
+bus0_pipe_send_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
// closed?
@@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg)
return;
}
- bus_pipe_getq(p);
+ bus0_pipe_getq(p);
}
static void
-bus_pipe_recv_cb(void *arg)
+bus0_pipe_recv_cb(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
- nni_msg * msg;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
@@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg)
}
static void
-bus_pipe_putq_cb(void *arg)
+bus0_pipe_putq_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg)
}
// Wait for another recv.
- bus_pipe_recv(p);
+ bus0_pipe_recv(p);
}
static void
-bus_sock_getq_cb(void *arg)
+bus0_sock_getq_cb(void *arg)
{
- bus_sock *s = arg;
- bus_pipe *p;
- bus_pipe *lastp;
- nni_msg * msg;
- nni_msg * dup;
- uint32_t sender;
+ bus0_sock *s = arg;
+ bus0_pipe *p;
+ bus0_pipe *lastp;
+ nni_msg * msg;
+ nni_msg * dup;
+ uint32_t sender;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg)
nni_msg_free(msg);
}
- bus_sock_getq(s);
+ bus0_sock_getq(s);
}
static void
-bus_sock_getq(bus_sock *s)
+bus0_sock_getq(bus0_sock *s)
{
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-bus_pipe_getq(bus_pipe *p)
+bus0_pipe_getq(bus0_pipe *p)
{
nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-bus_pipe_recv(bus_pipe *p)
+bus0_pipe_recv(bus0_pipe *p)
{
nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-bus_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-bus_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-bus_sock_send(void *arg, nni_aio *aio)
+bus0_sock_send(void *arg, nni_aio *aio)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
static void
-bus_sock_recv(void *arg, nni_aio *aio)
+bus0_sock_recv(void *arg, nni_aio *aio)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops bus_pipe_ops = {
- .pipe_init = bus_pipe_init,
- .pipe_fini = bus_pipe_fini,
- .pipe_start = bus_pipe_start,
- .pipe_stop = bus_pipe_stop,
+static nni_proto_pipe_ops bus0_pipe_ops = {
+ .pipe_init = bus0_pipe_init,
+ .pipe_fini = bus0_pipe_fini,
+ .pipe_start = bus0_pipe_start,
+ .pipe_stop = bus0_pipe_stop,
};
-static nni_proto_sock_option bus_sock_options[] = {
+static nni_proto_sock_option bus0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = bus_sock_getopt_raw,
- .pso_setopt = bus_sock_setopt_raw,
+ .pso_getopt = bus0_sock_getopt_raw,
+ .pso_setopt = bus0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops bus_sock_ops = {
- .sock_init = bus_sock_init,
- .sock_fini = bus_sock_fini,
- .sock_open = bus_sock_open,
- .sock_close = bus_sock_close,
- .sock_send = bus_sock_send,
- .sock_recv = bus_sock_recv,
- .sock_options = bus_sock_options,
+static nni_proto_sock_ops bus0_sock_ops = {
+ .sock_init = bus0_sock_init,
+ .sock_fini = bus0_sock_fini,
+ .sock_open = bus0_sock_open,
+ .sock_close = bus0_sock_close,
+ .sock_send = bus0_sock_send,
+ .sock_recv = bus0_sock_recv,
+ .sock_options = bus0_sock_options,
};
-static nni_proto bus_proto = {
+static nni_proto bus0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_BUS_V0, "bus" },
.proto_peer = { NNI_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &bus_sock_ops,
- .proto_pipe_ops = &bus_pipe_ops,
+ .proto_sock_ops = &bus0_sock_ops,
+ .proto_pipe_ops = &bus0_pipe_ops,
};
int
nng_bus0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &bus_proto));
+ return (nni_proto_open(sidp, &bus0_proto));
}
diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h
new file mode 100644
index 00000000..0ef3d391
--- /dev/null
+++ b/src/protocol/bus0/bus.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_BUS0_BUS_H
+#define NNG_PROTOCOL_BUS0_BUS_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_bus0_open(nng_socket *);
+
+#ifndef nng_bus_open
+#define nng_bus_open nng_bus0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_BUS0_BUS_H
diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt
new file mode 100644
index 00000000..68e7ad34
--- /dev/null
+++ b/src/protocol/pair0/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# PAIRv0 protocol
+
+if (NNG_PROTO_PAIR0)
+ set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h)
+ install(FILES pair.h DESTINATION include/nng/protocol/pair0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c
index 93cd1497..bac405b8 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair0/pair.c
@@ -17,6 +17,10 @@
// While a peer is connected to the server, all other peer connection
// attempts are discarded.
+#ifndef NNI_PROTO_PAIR_V0
+#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0)
+#endif
+
typedef struct pair0_pipe pair0_pipe;
typedef struct pair0_sock pair0_sock;
diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h
new file mode 100644
index 00000000..6828c921
--- /dev/null
+++ b/src/protocol/pair0/pair.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PAIR0_PAIR_H
+#define NNG_PROTOCOL_PAIR0_PAIR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pair0_open(nng_socket *);
+
+#ifndef nng_pair_open
+#define nng_pair_open nng_pair0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PAIR0_PAIR_H
diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt
new file mode 100644
index 00000000..f35d6959
--- /dev/null
+++ b/src/protocol/pair1/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# PAIRv1 protocol
+
+if (NNG_PROTO_PAIR1)
+ set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h)
+ install(FILES pair.h DESTINATION include/nng/protocol/pair1)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c
index e14d06d5..3f6f63fc 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair1/pair.c
@@ -13,10 +13,16 @@
#include "core/nng_impl.h"
+#include "protocol/pair1/pair.h"
+
// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern,
// usually, but it can support a polyamorous mode where a single server can
// communicate with multiple partners.
+#ifndef NNI_PROTO_PAIR_V1
+#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1)
+#endif
+
typedef struct pair1_pipe pair1_pipe;
typedef struct pair1_sock pair1_sock;
diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h
new file mode 100644
index 00000000..bc519d9f
--- /dev/null
+++ b/src/protocol/pair1/pair.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PAIR1_PAIR_H
+#define NNG_PROTOCOL_PAIR1_PAIR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pair1_open(nng_socket *);
+
+#ifndef nng_pair_open
+#define nng_pair_open nng_pair1_open
+#endif
+
+#define NNG_OPT_PAIR1_POLY "pair1:polyamorous"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PAIR1_PAIR_H
diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt
new file mode 100644
index 00000000..6153c5a7
--- /dev/null
+++ b/src/protocol/pipeline0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Pub/Sub protocol
+
+if (NNG_PROTO_PUSH0)
+ set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h)
+ install(FILES push.h DESTINATION include/nng/protocol/pipeline0)
+endif()
+
+if (NNG_PROTO_PULL0)
+ set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h)
+ install(FILES pull.h DESTINATION include/nng/protocol/pipeline0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c
index 9685f0a1..8c16cb17 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -15,31 +15,39 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct pull_pipe pull_pipe;
-typedef struct pull_sock pull_sock;
+#ifndef NNI_PROTO_PULL_V0
+#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1)
+#endif
-static void pull_putq_cb(void *);
-static void pull_recv_cb(void *);
-static void pull_putq(pull_pipe *, nni_msg *);
+#ifndef NNI_PROTO_PUSH_V0
+#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0)
+#endif
-// A pull_sock is our per-socket protocol private structure.
-struct pull_sock {
+typedef struct pull0_pipe pull0_pipe;
+typedef struct pull0_sock pull0_sock;
+
+static void pull0_putq_cb(void *);
+static void pull0_recv_cb(void *);
+static void pull0_putq(pull0_pipe *, nni_msg *);
+
+// pull0_sock is our per-socket protocol private structure.
+struct pull0_sock {
nni_msgq *urq;
int raw;
};
-// A pull_pipe is our per-pipe protocol private structure.
-struct pull_pipe {
- nni_pipe * pipe;
- pull_sock *pull;
- nni_aio * putq_aio;
- nni_aio * recv_aio;
+// pull0_pipe is our per-pipe protocol private structure.
+struct pull0_pipe {
+ nni_pipe * pipe;
+ pull0_sock *pull;
+ nni_aio * putq_aio;
+ nni_aio * recv_aio;
};
static int
-pull_sock_init(void **sp, nni_sock *sock)
+pull0_sock_init(void **sp, nni_sock *sock)
{
- pull_sock *s;
+ pull0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock)
}
static void
-pull_sock_fini(void *arg)
+pull0_sock_fini(void *arg)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
NNI_FREE_STRUCT(s);
}
static void
-pull_pipe_fini(void *arg)
+pull0_pipe_fini(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
nni_aio_fini(p->putq_aio);
nni_aio_fini(p->recv_aio);
@@ -70,17 +78,17 @@ pull_pipe_fini(void *arg)
}
static int
-pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pull0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- pull_pipe *p;
- int rv;
+ pull0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) {
- pull_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
+ pull0_pipe_fini(p);
return (rv);
}
@@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-pull_pipe_start(void *arg)
+pull0_pipe_start(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
// Start the pending pull...
nni_pipe_recv(p->pipe, p->recv_aio);
@@ -102,20 +110,20 @@ pull_pipe_start(void *arg)
}
static void
-pull_pipe_stop(void *arg)
+pull0_pipe_stop(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
nni_aio_stop(p->putq_aio);
nni_aio_stop(p->recv_aio);
}
static void
-pull_recv_cb(void *arg)
+pull0_recv_cb(void *arg)
{
- pull_pipe *p = arg;
- nni_aio * aio = p->recv_aio;
- nni_msg * msg;
+ pull0_pipe *p = arg;
+ nni_aio * aio = p->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
@@ -127,14 +135,14 @@ pull_recv_cb(void *arg)
msg = nni_aio_get_msg(aio);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
nni_aio_set_msg(aio, NULL);
- pull_putq(p, msg);
+ pull0_putq(p, msg);
}
static void
-pull_putq_cb(void *arg)
+pull0_putq_cb(void *arg)
{
- pull_pipe *p = arg;
- nni_aio * aio = p->putq_aio;
+ pull0_pipe *p = arg;
+ nni_aio * aio = p->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
@@ -148,11 +156,11 @@ pull_putq_cb(void *arg)
nni_pipe_recv(p->pipe, p->recv_aio);
}
-// nni_pull_putq schedules a put operation to the user socket (sendup).
+// pull0_putq schedules a put operation to the user socket (sendup).
static void
-pull_putq(pull_pipe *p, nni_msg *msg)
+pull0_putq(pull0_pipe *p, nni_msg *msg)
{
- pull_sock *s = p->pull;
+ pull0_sock *s = p->pull;
nni_aio_set_msg(p->putq_aio, msg);
@@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg)
}
static void
-pull_sock_open(void *arg)
+pull0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-pull_sock_close(void *arg)
+pull0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static int
-pull_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-pull_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-pull_sock_send(void *arg, nni_aio *aio)
+pull0_sock_send(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-pull_sock_recv(void *arg, nni_aio *aio)
+pull0_sock_recv(void *arg, nni_aio *aio)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops pull_pipe_ops = {
- .pipe_init = pull_pipe_init,
- .pipe_fini = pull_pipe_fini,
- .pipe_start = pull_pipe_start,
- .pipe_stop = pull_pipe_stop,
+static nni_proto_pipe_ops pull0_pipe_ops = {
+ .pipe_init = pull0_pipe_init,
+ .pipe_fini = pull0_pipe_fini,
+ .pipe_start = pull0_pipe_start,
+ .pipe_stop = pull0_pipe_stop,
};
-static nni_proto_sock_option pull_sock_options[] = {
+static nni_proto_sock_option pull0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = pull_sock_getopt_raw,
- .pso_setopt = pull_sock_setopt_raw,
+ .pso_getopt = pull0_sock_getopt_raw,
+ .pso_setopt = pull0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops pull_sock_ops = {
- .sock_init = pull_sock_init,
- .sock_fini = pull_sock_fini,
- .sock_open = pull_sock_open,
- .sock_close = pull_sock_close,
- .sock_send = pull_sock_send,
- .sock_recv = pull_sock_recv,
- .sock_options = pull_sock_options,
+static nni_proto_sock_ops pull0_sock_ops = {
+ .sock_init = pull0_sock_init,
+ .sock_fini = pull0_sock_fini,
+ .sock_open = pull0_sock_open,
+ .sock_close = pull0_sock_close,
+ .sock_send = pull0_sock_send,
+ .sock_recv = pull0_sock_recv,
+ .sock_options = pull0_sock_options,
};
-static nni_proto pull_proto = {
+static nni_proto pull0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PULL_V0, "pull" },
.proto_peer = { NNI_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_pipe_ops = &pull_pipe_ops,
- .proto_sock_ops = &pull_sock_ops,
+ .proto_pipe_ops = &pull0_pipe_ops,
+ .proto_sock_ops = &pull0_sock_ops,
};
int
nng_pull0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &pull_proto));
+ return (nni_proto_open(sidp, &pull0_proto));
}
diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h
new file mode 100644
index 00000000..75bded03
--- /dev/null
+++ b/src/protocol/pipeline0/pull.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H
+#define NNG_PROTOCOL_PIPELINE0_PULL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pull0_open(nng_socket *);
+
+#ifndef nng_pull_open
+#define nng_pull_open nng_pull0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PIPELINE0_PULL_H
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c
index 9ff74558..3dd83fe0 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -17,23 +17,31 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct push_pipe push_pipe;
-typedef struct push_sock push_sock;
+#ifndef NNI_PROTO_PULL_V0
+#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1)
+#endif
-static void push_send_cb(void *);
-static void push_recv_cb(void *);
-static void push_getq_cb(void *);
+#ifndef NNI_PROTO_PUSH_V0
+#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0)
+#endif
-// An nni_push_sock is our per-socket protocol private structure.
-struct push_sock {
+typedef struct push0_pipe push0_pipe;
+typedef struct push0_sock push0_sock;
+
+static void push0_send_cb(void *);
+static void push0_recv_cb(void *);
+static void push0_getq_cb(void *);
+
+// push0_sock is our per-socket protocol private structure.
+struct push0_sock {
nni_msgq *uwq;
int raw;
};
-// An nni_push_pipe is our per-pipe protocol private structure.
-struct push_pipe {
+// push0_pipe is our per-pipe protocol private structure.
+struct push0_pipe {
nni_pipe * pipe;
- push_sock * push;
+ push0_sock * push;
nni_list_node node;
nni_aio *aio_recv;
@@ -42,9 +50,9 @@ struct push_pipe {
};
static int
-push_sock_init(void **sp, nni_sock *sock)
+push0_sock_init(void **sp, nni_sock *sock)
{
- push_sock *s;
+ push0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock)
}
static void
-push_sock_fini(void *arg)
+push0_sock_fini(void *arg)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
NNI_FREE_STRUCT(s);
}
static void
-push_sock_open(void *arg)
+push0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-push_sock_close(void *arg)
+push0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-push_pipe_fini(void *arg)
+push0_pipe_fini(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_send);
@@ -87,18 +95,18 @@ push_pipe_fini(void *arg)
}
static int
-push_pipe_init(void **pp, nni_pipe *pipe, void *s)
+push0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- push_pipe *p;
- int rv;
+ push0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) {
- push_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) {
+ push0_pipe_fini(p);
return (rv);
}
NNI_LIST_NODE_INIT(&p->node);
@@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-push_pipe_start(void *arg)
+push0_pipe_start(void *arg)
{
- push_pipe *p = arg;
- push_sock *s = p->push;
+ push0_pipe *p = arg;
+ push0_sock *s = p->push;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) {
return (NNG_EPROTO);
@@ -129,9 +137,9 @@ push_pipe_start(void *arg)
}
static void
-push_pipe_stop(void *arg)
+push0_pipe_stop(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_send);
@@ -139,9 +147,9 @@ push_pipe_stop(void *arg)
}
static void
-push_recv_cb(void *arg)
+push0_recv_cb(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
@@ -155,10 +163,10 @@ push_recv_cb(void *arg)
}
static void
-push_send_cb(void *arg)
+push0_send_cb(void *arg)
{
- push_pipe *p = arg;
- push_sock *s = p->push;
+ push0_pipe *p = arg;
+ push0_sock *s = p->push;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -171,10 +179,10 @@ push_send_cb(void *arg)
}
static void
-push_getq_cb(void *arg)
+push0_getq_cb(void *arg)
{
- push_pipe *p = arg;
- nni_aio * aio = p->aio_getq;
+ push0_pipe *p = arg;
+ nni_aio * aio = p->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
@@ -189,71 +197,71 @@ push_getq_cb(void *arg)
}
static int
-push_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+push0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-push_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+push0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-push_sock_send(void *arg, nni_aio *aio)
+push0_sock_send(void *arg, nni_aio *aio)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
static void
-push_sock_recv(void *arg, nni_aio *aio)
+push0_sock_recv(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
-static nni_proto_pipe_ops push_pipe_ops = {
- .pipe_init = push_pipe_init,
- .pipe_fini = push_pipe_fini,
- .pipe_start = push_pipe_start,
- .pipe_stop = push_pipe_stop,
+static nni_proto_pipe_ops push0_pipe_ops = {
+ .pipe_init = push0_pipe_init,
+ .pipe_fini = push0_pipe_fini,
+ .pipe_start = push0_pipe_start,
+ .pipe_stop = push0_pipe_stop,
};
-static nni_proto_sock_option push_sock_options[] = {
+static nni_proto_sock_option push0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = push_sock_getopt_raw,
- .pso_setopt = push_sock_setopt_raw,
+ .pso_getopt = push0_sock_getopt_raw,
+ .pso_setopt = push0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops push_sock_ops = {
- .sock_init = push_sock_init,
- .sock_fini = push_sock_fini,
- .sock_open = push_sock_open,
- .sock_close = push_sock_close,
- .sock_options = push_sock_options,
- .sock_send = push_sock_send,
- .sock_recv = push_sock_recv,
+static nni_proto_sock_ops push0_sock_ops = {
+ .sock_init = push0_sock_init,
+ .sock_fini = push0_sock_fini,
+ .sock_open = push0_sock_open,
+ .sock_close = push0_sock_close,
+ .sock_options = push0_sock_options,
+ .sock_send = push0_sock_send,
+ .sock_recv = push0_sock_recv,
};
-static nni_proto push_proto = {
+static nni_proto push0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUSH_V0, "push" },
.proto_peer = { NNI_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_pipe_ops = &push_pipe_ops,
- .proto_sock_ops = &push_sock_ops,
+ .proto_pipe_ops = &push0_pipe_ops,
+ .proto_sock_ops = &push0_sock_ops,
};
int
nng_push0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &push_proto));
+ return (nni_proto_open(sidp, &push0_proto));
}
diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h
new file mode 100644
index 00000000..c7303b92
--- /dev/null
+++ b/src/protocol/pipeline0/push.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H
+#define NNG_PROTOCOL_PIPELINE0_PUSH_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_push0_open(nng_socket *);
+
+#ifndef nng_push_open
+#define nng_push_open nng_push0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H
diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt
new file mode 100644
index 00000000..4edcbfae
--- /dev/null
+++ b/src/protocol/pubsub0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Pub/Sub protocol
+
+if (NNG_PROTO_PUB0)
+ set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h)
+ install(FILES pub.h DESTINATION include/nng/protocol/pubsub0)
+endif()
+
+if (NNG_PROTO_SUB0)
+ set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h)
+ install(FILES sub.h DESTINATION include/nng/protocol/pubsub0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c
index 9e5cd67f..f4a33b77 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -12,24 +12,33 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/pubsub0/pub.h"
// Publish protocol. The PUB protocol simply sends messages out, as
// a broadcast. It has nothing more sophisticated because it does not
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct pub_pipe pub_pipe;
-typedef struct pub_sock pub_sock;
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
-static void pub_pipe_recv_cb(void *);
-static void pub_pipe_send_cb(void *);
-static void pub_pipe_getq_cb(void *);
-static void pub_sock_getq_cb(void *);
-static void pub_sock_fini(void *);
-static void pub_pipe_fini(void *);
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
-// A pub_sock is our per-socket protocol private structure.
-struct pub_sock {
+typedef struct pub0_pipe pub0_pipe;
+typedef struct pub0_sock pub0_sock;
+
+static void pub0_pipe_recv_cb(void *);
+static void pub0_pipe_send_cb(void *);
+static void pub0_pipe_getq_cb(void *);
+static void pub0_sock_getq_cb(void *);
+static void pub0_sock_fini(void *);
+static void pub0_pipe_fini(void *);
+
+// pub0_sock is our per-socket protocol private structure.
+struct pub0_sock {
nni_msgq *uwq;
int raw;
nni_aio * aio_getq;
@@ -37,10 +46,10 @@ struct pub_sock {
nni_mtx mtx;
};
-// A pub_pipe is our per-pipe protocol private structure.
-struct pub_pipe {
+// pub0_pipe is our per-pipe protocol private structure.
+struct pub0_pipe {
nni_pipe * pipe;
- pub_sock * pub;
+ pub0_sock * pub;
nni_msgq * sendq;
nni_aio * aio_getq;
nni_aio * aio_send;
@@ -49,9 +58,9 @@ struct pub_pipe {
};
static void
-pub_sock_fini(void *arg)
+pub0_sock_fini(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -60,22 +69,22 @@ pub_sock_fini(void *arg)
}
static int
-pub_sock_init(void **sp, nni_sock *sock)
+pub0_sock_init(void **sp, nni_sock *sock)
{
- pub_sock *s;
- int rv;
+ pub0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) {
- pub_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) {
+ pub0_sock_fini(s);
return (rv);
}
s->raw = 0;
- NNI_LIST_INIT(&s->pipes, pub_pipe, node);
+ NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock)
}
static void
-pub_sock_open(void *arg)
+pub0_sock_open(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-pub_sock_close(void *arg)
+pub0_sock_close(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-pub_pipe_fini(void *arg)
+pub0_pipe_fini(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
@@ -111,10 +120,10 @@ pub_pipe_fini(void *arg)
}
static int
-pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- pub_pipe *p;
- int rv;
+ pub0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
// XXX: consider making this depth tunable
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
- pub_pipe_fini(p);
+ pub0_pipe_fini(p);
return (rv);
}
@@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-pub_pipe_start(void *arg)
+pub0_pipe_start(void *arg)
{
- pub_pipe *p = arg;
- pub_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *s = p->pub;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) {
return (NNG_EPROTO);
@@ -157,10 +166,10 @@ pub_pipe_start(void *arg)
}
static void
-pub_pipe_stop(void *arg)
+pub0_pipe_stop(void *arg)
{
- pub_pipe *p = arg;
- pub_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *s = p->pub;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
@@ -176,15 +185,15 @@ pub_pipe_stop(void *arg)
}
static void
-pub_sock_getq_cb(void *arg)
+pub0_sock_getq_cb(void *arg)
{
- pub_sock *s = arg;
- nni_msgq *uwq = s->uwq;
- nni_msg * msg, *dup;
+ pub0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg, *dup;
- pub_pipe *p;
- pub_pipe *last;
- int rv;
+ pub0_pipe *p;
+ pub0_pipe *last;
+ int rv;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg)
}
static void
-pub_pipe_recv_cb(void *arg)
+pub0_pipe_recv_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg)
}
static void
-pub_pipe_getq_cb(void *arg)
+pub0_pipe_getq_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->pipe);
@@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg)
}
static void
-pub_pipe_send_cb(void *arg)
+pub0_pipe_send_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg)
}
static int
-pub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-pub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-pub_sock_recv(void *arg, nni_aio *aio)
+pub0_sock_recv(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-pub_sock_send(void *arg, nni_aio *aio)
+pub0_sock_send(void *arg, nni_aio *aio)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
-static nni_proto_pipe_ops pub_pipe_ops = {
- .pipe_init = pub_pipe_init,
- .pipe_fini = pub_pipe_fini,
- .pipe_start = pub_pipe_start,
- .pipe_stop = pub_pipe_stop,
+static nni_proto_pipe_ops pub0_pipe_ops = {
+ .pipe_init = pub0_pipe_init,
+ .pipe_fini = pub0_pipe_fini,
+ .pipe_start = pub0_pipe_start,
+ .pipe_stop = pub0_pipe_stop,
};
-static nni_proto_sock_option pub_sock_options[] = {
+static nni_proto_sock_option pub0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = pub_sock_getopt_raw,
- .pso_setopt = pub_sock_setopt_raw,
+ .pso_getopt = pub0_sock_getopt_raw,
+ .pso_setopt = pub0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops pub_sock_ops = {
- .sock_init = pub_sock_init,
- .sock_fini = pub_sock_fini,
- .sock_open = pub_sock_open,
- .sock_close = pub_sock_close,
- .sock_send = pub_sock_send,
- .sock_recv = pub_sock_recv,
- .sock_options = pub_sock_options,
+static nni_proto_sock_ops pub0_sock_ops = {
+ .sock_init = pub0_sock_init,
+ .sock_fini = pub0_sock_fini,
+ .sock_open = pub0_sock_open,
+ .sock_close = pub0_sock_close,
+ .sock_send = pub0_sock_send,
+ .sock_recv = pub0_sock_recv,
+ .sock_options = pub0_sock_options,
};
-static nni_proto pub_proto = {
+static nni_proto pub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUB_V0, "pub" },
.proto_peer = { NNI_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_sock_ops = &pub_sock_ops,
- .proto_pipe_ops = &pub_pipe_ops,
+ .proto_sock_ops = &pub0_sock_ops,
+ .proto_pipe_ops = &pub0_pipe_ops,
};
int
nng_pub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &pub_proto));
+ return (nni_proto_open(sidp, &pub0_proto));
}
diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h
new file mode 100644
index 00000000..2388a292
--- /dev/null
+++ b/src/protocol/pubsub0/pub.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H
+#define NNG_PROTOCOL_PUBSUB0_PUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pub0_open(nng_socket *);
+
+#ifndef nng_pub_open
+#define nng_pub_open nng_pub0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PUBSUB0_PUB_H
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c
index 555d528e..6c504d75 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -12,54 +12,60 @@
#include <string.h>
#include "core/nng_impl.h"
-
-const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE;
-const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE;
+#include "protocol/pubsub0/sub.h"
// Subscriber protocol. The SUB protocol receives messages sent to
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct sub_pipe sub_pipe;
-typedef struct sub_sock sub_sock;
-typedef struct sub_topic sub_topic;
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
+
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
+
+typedef struct sub0_pipe sub0_pipe;
+typedef struct sub0_sock sub0_sock;
+typedef struct sub0_topic sub0_topic;
-static void sub_recv_cb(void *);
-static void sub_putq_cb(void *);
-static void sub_pipe_fini(void *);
+static void sub0_recv_cb(void *);
+static void sub0_putq_cb(void *);
+static void sub0_pipe_fini(void *);
-struct sub_topic {
+struct sub0_topic {
nni_list_node node;
size_t len;
void * buf;
};
-// An nni_rep_sock is our per-socket protocol private structure.
-struct sub_sock {
+// sub0_sock is our per-socket protocol private structure.
+struct sub0_sock {
nni_list topics;
nni_msgq *urq;
int raw;
nni_mtx lk;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
-struct sub_pipe {
- nni_pipe *pipe;
- sub_sock *sub;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+// sub0_pipe is our per-pipe protocol private structure.
+struct sub0_pipe {
+ nni_pipe * pipe;
+ sub0_sock *sub;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static int
-sub_sock_init(void **sp, nni_sock *sock)
+sub0_sock_init(void **sp, nni_sock *sock)
{
- sub_sock *s;
+ sub0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
- NNI_LIST_INIT(&s->topics, sub_topic, node);
+ NNI_LIST_INIT(&s->topics, sub0_topic, node);
s->raw = 0;
s->urq = nni_sock_recvq(sock);
@@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock)
}
static void
-sub_sock_fini(void *arg)
+sub0_sock_fini(void *arg)
{
- sub_sock * s = arg;
- sub_topic *topic;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
while ((topic = nni_list_first(&s->topics)) != NULL) {
nni_list_remove(&s->topics, topic);
@@ -83,21 +89,21 @@ sub_sock_fini(void *arg)
}
static void
-sub_sock_open(void *arg)
+sub0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-sub_sock_close(void *arg)
+sub0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-sub_pipe_fini(void *arg)
+sub0_pipe_fini(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
@@ -105,17 +111,17 @@ sub_pipe_fini(void *arg)
}
static int
-sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
+sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- sub_pipe *p;
- int rv;
+ sub0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) {
- sub_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) {
+ sub0_pipe_fini(p);
return (rv);
}
@@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-sub_pipe_start(void *arg)
+sub0_pipe_start(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-sub_pipe_stop(void *arg)
+sub0_pipe_stop(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
}
static void
-sub_recv_cb(void *arg)
+sub0_recv_cb(void *arg)
{
- sub_pipe *p = arg;
- sub_sock *s = p->sub;
- nni_msgq *urq = s->urq;
- nni_msg * msg;
+ sub0_pipe *p = arg;
+ sub0_sock *s = p->sub;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -164,9 +170,9 @@ sub_recv_cb(void *arg)
}
static void
-sub_putq_cb(void *arg)
+sub0_putq_cb(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -184,11 +190,11 @@ sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub_subscribe(void *arg, const void *buf, size_t sz)
+sub0_subscribe(void *arg, const void *buf, size_t sz)
{
- sub_sock * s = arg;
- sub_topic *topic;
- sub_topic *newtopic;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ sub0_topic *newtopic;
nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
@@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
}
static int
-sub_unsubscribe(void *arg, const void *buf, size_t sz)
+sub0_unsubscribe(void *arg, const void *buf, size_t sz)
{
- sub_sock * s = arg;
- sub_topic *topic;
- int rv;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ int rv;
nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
@@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
}
static int
-sub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-sub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-sub_sock_send(void *arg, nni_aio *aio)
+sub0_sock_send(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-sub_sock_recv(void *arg, nni_aio *aio)
+sub0_sock_recv(void *arg, nni_aio *aio)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
static nni_msg *
-sub_sock_filter(void *arg, nni_msg *msg)
+sub0_sock_filter(void *arg, nni_msg *msg)
{
- sub_sock * s = arg;
- sub_topic *topic;
- char * body;
- size_t len;
- int match;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ char * body;
+ size_t len;
+ int match;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops sub_pipe_ops = {
- .pipe_init = sub_pipe_init,
- .pipe_fini = sub_pipe_fini,
- .pipe_start = sub_pipe_start,
- .pipe_stop = sub_pipe_stop,
+static nni_proto_pipe_ops sub0_pipe_ops = {
+ .pipe_init = sub0_pipe_init,
+ .pipe_fini = sub0_pipe_fini,
+ .pipe_start = sub0_pipe_start,
+ .pipe_stop = sub0_pipe_stop,
};
-static nni_proto_sock_option sub_sock_options[] = {
+static nni_proto_sock_option sub0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = sub_sock_getopt_raw,
- .pso_setopt = sub_sock_setopt_raw,
+ .pso_getopt = sub0_sock_getopt_raw,
+ .pso_setopt = sub0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_SUB_SUBSCRIBE,
.pso_getopt = NULL,
- .pso_setopt = sub_subscribe,
+ .pso_setopt = sub0_subscribe,
},
{
.pso_name = NNG_OPT_SUB_UNSUBSCRIBE,
.pso_getopt = NULL,
- .pso_setopt = sub_unsubscribe,
+ .pso_setopt = sub0_unsubscribe,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops sub_sock_ops = {
- .sock_init = sub_sock_init,
- .sock_fini = sub_sock_fini,
- .sock_open = sub_sock_open,
- .sock_close = sub_sock_close,
- .sock_send = sub_sock_send,
- .sock_recv = sub_sock_recv,
- .sock_filter = sub_sock_filter,
- .sock_options = sub_sock_options,
+static nni_proto_sock_ops sub0_sock_ops = {
+ .sock_init = sub0_sock_init,
+ .sock_fini = sub0_sock_fini,
+ .sock_open = sub0_sock_open,
+ .sock_close = sub0_sock_close,
+ .sock_send = sub0_sock_send,
+ .sock_recv = sub0_sock_recv,
+ .sock_filter = sub0_sock_filter,
+ .sock_options = sub0_sock_options,
};
-static nni_proto sub_proto = {
+static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
.proto_peer = { NNI_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &sub_sock_ops,
- .proto_pipe_ops = &sub_pipe_ops,
+ .proto_sock_ops = &sub0_sock_ops,
+ .proto_pipe_ops = &sub0_pipe_ops,
};
int
nng_sub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &sub_proto));
+ return (nni_proto_open(sidp, &sub0_proto));
}
diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h
new file mode 100644
index 00000000..1a09145d
--- /dev/null
+++ b/src/protocol/pubsub0/sub.h
@@ -0,0 +1,31 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H
+#define NNG_PROTOCOL_PUBSUB0_SUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_sub0_open(nng_socket *);
+
+#ifndef nng_sub_open
+#define nng_sub_open nng_sub0_open
+#endif
+
+#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
+#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PUBSUB0_SUB_H
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt
new file mode 100644
index 00000000..4e82ad41
--- /dev/null
+++ b/src/protocol/reqrep0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+
+if (NNG_PROTO_REQ0)
+ set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h)
+ install(FILES req.h DESTINATION include/nng/protocol/reqrep0)
+endif()
+
+if (NNG_PROTO_REP0)
+ set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h)
+ install(FILES rep.h DESTINATION include/nng/protocol/reqrep0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c
index 100e739d..ee8e4277 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -12,23 +12,32 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/reqrep0/rep.h"
// Response protocol. The REP protocol is the "reply" side of a
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct rep_pipe rep_pipe;
-typedef struct rep_sock rep_sock;
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
-static void rep_sock_getq_cb(void *);
-static void rep_pipe_getq_cb(void *);
-static void rep_pipe_putq_cb(void *);
-static void rep_pipe_send_cb(void *);
-static void rep_pipe_recv_cb(void *);
-static void rep_pipe_fini(void *);
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
-// A rep_sock is our per-socket protocol private structure.
-struct rep_sock {
+typedef struct rep0_pipe rep0_pipe;
+typedef struct rep0_sock rep0_sock;
+
+static void rep0_sock_getq_cb(void *);
+static void rep0_pipe_getq_cb(void *);
+static void rep0_pipe_putq_cb(void *);
+static void rep0_pipe_send_cb(void *);
+static void rep0_pipe_recv_cb(void *);
+static void rep0_pipe_fini(void *);
+
+// rep0_sock is our per-socket protocol private structure.
+struct rep0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
@@ -40,21 +49,21 @@ struct rep_sock {
nni_aio * aio_getq;
};
-// A rep_pipe is our per-pipe protocol private structure.
-struct rep_pipe {
- nni_pipe *pipe;
- rep_sock *rep;
- nni_msgq *sendq;
- nni_aio * aio_getq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+// rep0_pipe is our per-pipe protocol private structure.
+struct rep0_pipe {
+ nni_pipe * pipe;
+ rep0_sock *rep;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static void
-rep_sock_fini(void *arg)
+rep0_sock_fini(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -67,18 +76,18 @@ rep_sock_fini(void *arg)
}
static int
-rep_sock_init(void **sp, nni_sock *sock)
+rep0_sock_init(void **sp, nni_sock *sock)
{
- rep_sock *s;
- int rv;
+ rep0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
- rep_sock_fini(s);
+ ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) {
+ rep0_sock_fini(s);
return (rv);
}
@@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock)
}
static void
-rep_sock_open(void *arg)
+rep0_sock_open(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-rep_sock_close(void *arg)
+rep0_sock_close(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-rep_pipe_fini(void *arg)
+rep0_pipe_fini(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -124,20 +133,20 @@ rep_pipe_fini(void *arg)
}
static int
-rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
+rep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- rep_pipe *p;
- int rv;
+ rep0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) {
- rep_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) {
+ rep0_pipe_fini(p);
return (rv);
}
@@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-rep_pipe_start(void *arg)
+rep0_pipe_start(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
- int rv;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ int rv;
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
@@ -164,10 +173,10 @@ rep_pipe_start(void *arg)
}
static void
-rep_pipe_stop(void *arg)
+rep0_pipe_stop(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
nni_msgq_close(p->sendq);
nni_aio_stop(p->aio_getq);
@@ -179,14 +188,14 @@ rep_pipe_stop(void *arg)
}
static void
-rep_sock_getq_cb(void *arg)
+rep0_sock_getq_cb(void *arg)
{
- rep_sock *s = arg;
- nni_msgq *uwq = s->uwq;
- nni_msg * msg;
- uint32_t id;
- rep_pipe *p;
- int rv;
+ rep0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ rep0_pipe *p;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg)
}
static void
-rep_pipe_getq_cb(void *arg)
+rep0_pipe_getq_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->pipe);
@@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg)
}
static void
-rep_pipe_send_cb(void *arg)
+rep0_pipe_send_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg)
}
static void
-rep_pipe_recv_cb(void *arg)
+rep0_pipe_recv_cb(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
- nni_msg * msg;
- int rv;
- uint8_t * body;
- int hops;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+ int hops;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -329,9 +338,9 @@ drop:
}
static void
-rep_pipe_putq_cb(void *arg)
+rep0_pipe_putq_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg)
}
static int
-rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- rep_sock *s = arg;
- int rv;
+ rep0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->lk);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
@@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-rep_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static nni_msg *
-rep_sock_filter(void *arg, nni_msg *msg)
+rep0_sock_filter(void *arg, nni_msg *msg)
{
- rep_sock *s = arg;
- char * header;
- size_t len;
+ rep0_sock *s = arg;
+ char * header;
+ size_t len;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg)
}
static void
-rep_sock_send(void *arg, nni_aio *aio)
+rep0_sock_send(void *arg, nni_aio *aio)
{
- rep_sock *s = arg;
- int rv;
- nni_msg * msg;
+ rep0_sock *s = arg;
+ int rv;
+ nni_msg * msg;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio)
}
static void
-rep_sock_recv(void *arg, nni_aio *aio)
+rep0_sock_recv(void *arg, nni_aio *aio)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops rep_pipe_ops = {
- .pipe_init = rep_pipe_init,
- .pipe_fini = rep_pipe_fini,
- .pipe_start = rep_pipe_start,
- .pipe_stop = rep_pipe_stop,
+static nni_proto_pipe_ops rep0_pipe_ops = {
+ .pipe_init = rep0_pipe_init,
+ .pipe_fini = rep0_pipe_fini,
+ .pipe_start = rep0_pipe_start,
+ .pipe_stop = rep0_pipe_stop,
};
-static nni_proto_sock_option rep_sock_options[] = {
+static nni_proto_sock_option rep0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = rep_sock_getopt_raw,
- .pso_setopt = rep_sock_setopt_raw,
+ .pso_getopt = rep0_sock_getopt_raw,
+ .pso_setopt = rep0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = rep_sock_getopt_maxttl,
- .pso_setopt = rep_sock_setopt_maxttl,
+ .pso_getopt = rep0_sock_getopt_maxttl,
+ .pso_setopt = rep0_sock_setopt_maxttl,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops rep_sock_ops = {
- .sock_init = rep_sock_init,
- .sock_fini = rep_sock_fini,
- .sock_open = rep_sock_open,
- .sock_close = rep_sock_close,
- .sock_options = rep_sock_options,
- .sock_filter = rep_sock_filter,
- .sock_send = rep_sock_send,
- .sock_recv = rep_sock_recv,
+static nni_proto_sock_ops rep0_sock_ops = {
+ .sock_init = rep0_sock_init,
+ .sock_fini = rep0_sock_fini,
+ .sock_open = rep0_sock_open,
+ .sock_close = rep0_sock_close,
+ .sock_options = rep0_sock_options,
+ .sock_filter = rep0_sock_filter,
+ .sock_send = rep0_sock_send,
+ .sock_recv = rep0_sock_recv,
};
-static nni_proto nni_rep_proto = {
+static nni_proto rep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REP_V0, "rep" },
.proto_peer = { NNI_PROTO_REQ_V0, "req" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &rep_sock_ops,
- .proto_pipe_ops = &rep_pipe_ops,
+ .proto_sock_ops = &rep0_sock_ops,
+ .proto_pipe_ops = &rep0_pipe_ops,
};
int
nng_rep0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_rep_proto));
+ return (nni_proto_open(sidp, &rep0_proto));
}
diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h
new file mode 100644
index 00000000..93df9379
--- /dev/null
+++ b/src/protocol/reqrep0/rep.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_REQREP0_REP_H
+#define NNG_PROTOCOL_REQREP0_REP_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_rep0_open(nng_socket *);
+
+#ifndef nng_rep_open
+#define nng_rep_open nng_rep0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_REQREP0_REP_H
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c
index bead1ec4..94c7f1a0 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -13,21 +13,28 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
-const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME;
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
-typedef struct req_pipe req_pipe;
-typedef struct req_sock req_sock;
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
-static void req_resend(req_sock *);
-static void req_timeout(void *);
-static void req_pipe_fini(void *);
+typedef struct req0_pipe req0_pipe;
+typedef struct req0_sock req0_sock;
-// A req_sock is our per-socket protocol private structure.
-struct req_sock {
+static void req0_resend(req0_sock *);
+static void req0_timeout(void *);
+static void req0_pipe_fini(void *);
+
+// A req0_sock is our per-socket protocol private structure.
+struct req0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_duration retry;
@@ -38,7 +45,7 @@ struct req_sock {
int ttl;
nni_msg * reqmsg;
- req_pipe *pendpipe;
+ req0_pipe *pendpipe;
nni_list readypipes;
nni_list busypipes;
@@ -51,10 +58,10 @@ struct req_sock {
nni_cv cv;
};
-// A req_pipe is our per-pipe protocol private structure.
-struct req_pipe {
+// A req0_pipe is our per-pipe protocol private structure.
+struct req0_pipe {
nni_pipe * pipe;
- req_sock * req;
+ req0_sock * req;
nni_list_node node;
nni_aio * aio_getq; // raw mode only
nni_aio * aio_sendraw; // raw mode only
@@ -64,17 +71,17 @@ struct req_pipe {
nni_mtx mtx;
};
-static void req_resender(void *);
-static void req_getq_cb(void *);
-static void req_sendraw_cb(void *);
-static void req_sendcooked_cb(void *);
-static void req_recv_cb(void *);
-static void req_putq_cb(void *);
+static void req0_resender(void *);
+static void req0_getq_cb(void *);
+static void req0_sendraw_cb(void *);
+static void req0_sendcooked_cb(void *);
+static void req0_recv_cb(void *);
+static void req0_putq_cb(void *);
static int
-req_sock_init(void **sp, nni_sock *sock)
+req0_sock_init(void **sp, nni_sock *sock)
{
- req_sock *s;
+ req0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock)
nni_mtx_init(&s->mtx);
nni_cv_init(&s->cv, &s->mtx);
- NNI_LIST_INIT(&s->readypipes, req_pipe, node);
- NNI_LIST_INIT(&s->busypipes, req_pipe, node);
- nni_timer_init(&s->timer, req_timeout, s);
+ NNI_LIST_INIT(&s->readypipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->busypipes, req0_pipe, node);
+ nni_timer_init(&s->timer, req0_timeout, s);
// this is "semi random" start for request IDs.
s->nextid = nni_random();
@@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock)
}
static void
-req_sock_open(void *arg)
+req0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-req_sock_close(void *arg)
+req0_sock_close(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
s->closed = 1;
@@ -120,9 +127,9 @@ req_sock_close(void *arg)
}
static void
-req_sock_fini(void *arg)
+req0_sock_fini(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
while ((!nni_list_empty(&s->readypipes)) ||
@@ -139,9 +146,9 @@ req_sock_fini(void *arg)
}
static void
-req_pipe_fini(void *arg)
+req0_pipe_fini(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_putq);
@@ -153,22 +160,22 @@ req_pipe_fini(void *arg)
}
static int
-req_pipe_init(void **pp, nni_pipe *pipe, void *s)
+req0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- req_pipe *p;
- int rv;
+ req0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) !=
+ if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) !=
0)) {
- req_pipe_fini(p);
+ req0_pipe_fini(p);
return (rv);
}
@@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-req_pipe_start(void *arg)
+req0_pipe_start(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
return (NNG_EPROTO);
@@ -198,7 +205,7 @@ req_pipe_start(void *arg)
// If sock was waiting for somewhere to send data, go ahead and
// send it to this pipe.
if (s->wantw) {
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
@@ -208,10 +215,10 @@ req_pipe_start(void *arg)
}
static void
-req_pipe_stop(void *arg)
+req0_pipe_stop(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_putq);
@@ -238,50 +245,50 @@ req_pipe_stop(void *arg)
s->pendpipe = NULL;
s->resend = NNI_TIME_ZERO;
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
}
static int
-req_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-req_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static int
-req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_ms(&s->retry, buf, sz));
}
static int
-req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_ms(s->retry, buf, szp));
}
@@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
// kind of priority.)
static void
-req_getq_cb(void *arg)
+req0_getq_cb(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
// We should be in RAW mode. Cooked mode traffic bypasses
// the upper write queue entirely, and should never end up here.
@@ -326,9 +333,9 @@ req_getq_cb(void *arg)
}
static void
-req_sendraw_cb(void *arg)
+req0_sendraw_cb(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
if (nni_aio_result(p->aio_sendraw) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_sendraw));
@@ -342,10 +349,10 @@ req_sendraw_cb(void *arg)
}
static void
-req_sendcooked_cb(void *arg)
+req0_sendcooked_cb(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
if (nni_aio_result(p->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
@@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg)
if (nni_list_active(&s->busypipes, p)) {
nni_list_remove(&s->busypipes, p);
nni_list_append(&s->readypipes, p);
- req_resend(s);
+ req0_resend(s);
} else {
// We wind up here if stop was called from the reader
// side while we were waiting to be scheduled to run for the
@@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg)
}
static void
-req_putq_cb(void *arg)
+req0_putq_cb(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -393,10 +400,10 @@ req_putq_cb(void *arg)
}
static void
-req_recv_cb(void *arg)
+req0_recv_cb(void *arg)
{
- req_pipe *p = arg;
- nni_msg * msg;
+ req0_pipe *p = arg;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -431,23 +438,23 @@ malformed:
}
static void
-req_timeout(void *arg)
+req0_timeout(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->reqmsg != NULL) {
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
}
static void
-req_resend(req_sock *s)
+req0_resend(req0_sock *s)
{
- req_pipe *p;
- nni_msg * msg;
+ req0_pipe *p;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
@@ -499,13 +506,13 @@ req_resend(req_sock *s)
}
static void
-req_sock_send(void *arg, nni_aio *aio)
+req0_sock_send(void *arg, nni_aio *aio)
{
- req_sock *s = arg;
- uint32_t id;
- size_t len;
- nni_msg * msg;
- int rv;
+ req0_sock *s = arg;
+ uint32_t id;
+ size_t len;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio)
s->resend = NNI_TIME_ZERO;
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
nni_mtx_unlock(&s->mtx);
@@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-req_sock_filter(void *arg, nni_msg *msg)
+req0_sock_filter(void *arg, nni_msg *msg)
{
- req_sock *s = arg;
- nni_msg * rmsg;
+ req0_sock *s = arg;
+ nni_msg * rmsg;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg)
}
static void
-req_sock_recv(void *arg, nni_aio *aio)
+req0_sock_recv(void *arg, nni_aio *aio)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (!s->raw) {
@@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio)
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops req_pipe_ops = {
- .pipe_init = req_pipe_init,
- .pipe_fini = req_pipe_fini,
- .pipe_start = req_pipe_start,
- .pipe_stop = req_pipe_stop,
+static nni_proto_pipe_ops req0_pipe_ops = {
+ .pipe_init = req0_pipe_init,
+ .pipe_fini = req0_pipe_fini,
+ .pipe_start = req0_pipe_start,
+ .pipe_stop = req0_pipe_stop,
};
-static nni_proto_sock_option req_sock_options[] = {
+static nni_proto_sock_option req0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = req_sock_getopt_raw,
- .pso_setopt = req_sock_setopt_raw,
+ .pso_getopt = req0_sock_getopt_raw,
+ .pso_setopt = req0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = req_sock_getopt_maxttl,
- .pso_setopt = req_sock_setopt_maxttl,
+ .pso_getopt = req0_sock_getopt_maxttl,
+ .pso_setopt = req0_sock_setopt_maxttl,
},
{
.pso_name = NNG_OPT_REQ_RESENDTIME,
- .pso_getopt = req_sock_getopt_resendtime,
- .pso_setopt = req_sock_setopt_resendtime,
+ .pso_getopt = req0_sock_getopt_resendtime,
+ .pso_setopt = req0_sock_setopt_resendtime,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops req_sock_ops = {
- .sock_init = req_sock_init,
- .sock_fini = req_sock_fini,
- .sock_open = req_sock_open,
- .sock_close = req_sock_close,
- .sock_options = req_sock_options,
- .sock_filter = req_sock_filter,
- .sock_send = req_sock_send,
- .sock_recv = req_sock_recv,
+static nni_proto_sock_ops req0_sock_ops = {
+ .sock_init = req0_sock_init,
+ .sock_fini = req0_sock_fini,
+ .sock_open = req0_sock_open,
+ .sock_close = req0_sock_close,
+ .sock_options = req0_sock_options,
+ .sock_filter = req0_sock_filter,
+ .sock_send = req0_sock_send,
+ .sock_recv = req0_sock_recv,
};
-static nni_proto req_proto = {
+static nni_proto req0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REQ_V0, "req" },
.proto_peer = { NNI_PROTO_REP_V0, "rep" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &req_sock_ops,
- .proto_pipe_ops = &req_pipe_ops,
+ .proto_sock_ops = &req0_sock_ops,
+ .proto_pipe_ops = &req0_pipe_ops,
};
int
nng_req0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &req_proto));
+ return (nni_proto_open(sidp, &req0_proto));
}
diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h
new file mode 100644
index 00000000..99c9bf62
--- /dev/null
+++ b/src/protocol/reqrep0/req.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_REQREP0_REQ_H
+#define NNG_PROTOCOL_REQREP0_REQ_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_req0_open(nng_socket *);
+
+#ifndef nng_req_open
+#define nng_req_open nng_req0_open
+#endif
+
+#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_REQREP0_REQ_H
diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt
new file mode 100644
index 00000000..61e5aa7b
--- /dev/null
+++ b/src/protocol/survey0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+
+if (NNG_PROTO_SURVEYOR0)
+ set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h)
+ install(FILES survey.h DESTINATION include/nng/protocol/survey0)
+endif()
+
+if (NNG_PROTO_RESPONDENT0)
+ set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h)
+ install(FILES respond.h DESTINATION include/nng/protocol/survey0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c
index 94730be6..73e919c3 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -12,23 +12,32 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/survey0/respond.h"
// Respondent protocol. The RESPONDENT protocol is the "replier" side of
// the surveyor pattern. This is useful for building service discovery, or
-// voting algorithsm, for example.
+// voting algorithms, for example.
-typedef struct resp_pipe resp_pipe;
-typedef struct resp_sock resp_sock;
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
-static void resp_recv_cb(void *);
-static void resp_putq_cb(void *);
-static void resp_getq_cb(void *);
-static void resp_send_cb(void *);
-static void resp_sock_getq_cb(void *);
-static void resp_pipe_fini(void *);
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
-// A resp_sock is our per-socket protocol private structure.
-struct resp_sock {
+typedef struct resp0_pipe resp0_pipe;
+typedef struct resp0_sock resp0_sock;
+
+static void resp0_recv_cb(void *);
+static void resp0_putq_cb(void *);
+static void resp0_getq_cb(void *);
+static void resp0_send_cb(void *);
+static void resp0_sock_getq_cb(void *);
+static void resp0_pipe_fini(void *);
+
+// resp0_sock is our per-socket protocol private structure.
+struct resp0_sock {
nni_msgq * urq;
nni_msgq * uwq;
int raw;
@@ -40,22 +49,22 @@ struct resp_sock {
nni_mtx mtx;
};
-// A resp_pipe is our per-pipe protocol private structure.
-struct resp_pipe {
- nni_pipe * npipe;
- resp_sock *psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio * aio_getq;
- nni_aio * aio_putq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+// resp0_pipe is our per-pipe protocol private structure.
+struct resp0_pipe {
+ nni_pipe * npipe;
+ resp0_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-resp_sock_fini(void *arg)
+resp0_sock_fini(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -68,18 +77,18 @@ resp_sock_fini(void *arg)
}
static int
-resp_sock_init(void **sp, nni_sock *nsock)
+resp0_sock_init(void **sp, nni_sock *nsock)
{
- resp_sock *s;
- int rv;
+ resp0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
- resp_sock_fini(s);
+ ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) {
+ resp0_sock_fini(s);
return (rv);
}
@@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock)
}
static void
-resp_sock_open(void *arg)
+resp0_sock_open(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-resp_sock_close(void *arg)
+resp0_sock_close(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-resp_pipe_fini(void *arg)
+resp0_pipe_fini(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_getq);
@@ -124,20 +133,20 @@ resp_pipe_fini(void *arg)
}
static int
-resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
+resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- resp_pipe *p;
- int rv;
+ resp0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) {
- resp_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) {
+ resp0_pipe_fini(p);
return (rv);
}
@@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-resp_pipe_start(void *arg)
+resp0_pipe_start(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- int rv;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ int rv;
p->id = nni_pipe_id(p->npipe);
@@ -170,10 +179,10 @@ resp_pipe_start(void *arg)
}
static void
-resp_pipe_stop(void *arg)
+resp0_pipe_stop(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
nni_msgq_close(p->sendq);
nni_aio_stop(p->aio_putq);
@@ -189,19 +198,19 @@ resp_pipe_stop(void *arg)
}
}
-// resp_sock_send watches for messages from the upper write queue,
+// resp0_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.s
void
-resp_sock_getq_cb(void *arg)
+resp0_sock_getq_cb(void *arg)
{
- resp_sock *s = arg;
- nni_msg * msg;
- uint32_t id;
- resp_pipe *p;
- int rv;
+ resp0_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+ resp0_pipe *p;
+ int rv;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg)
}
void
-resp_getq_cb(void *arg)
+resp0_getq_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
@@ -249,9 +258,9 @@ resp_getq_cb(void *arg)
}
void
-resp_send_cb(void *arg)
+resp0_send_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -264,14 +273,14 @@ resp_send_cb(void *arg)
}
static void
-resp_recv_cb(void *arg)
+resp0_recv_cb(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- nni_msgq * urq = s->urq;
- nni_msg * msg;
- int hops;
- int rv;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
+ int hops;
+ int rv;
if (nni_aio_result(p->aio_recv) != 0) {
goto error;
@@ -324,9 +333,9 @@ error:
}
static void
-resp_putq_cb(void *arg)
+resp0_putq_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -338,10 +347,10 @@ resp_putq_cb(void *arg)
}
static int
-resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- resp_sock *s = arg;
- int rv;
+ resp0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->mtx);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
@@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-resp_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static void
-resp_sock_send(void *arg, nni_aio *aio)
+resp0_sock_send(void *arg, nni_aio *aio)
{
- resp_sock *s = arg;
- nni_msg * msg;
- int rv;
+ resp0_sock *s = arg;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-resp_sock_filter(void *arg, nni_msg *msg)
+resp0_sock_filter(void *arg, nni_msg *msg)
{
- resp_sock *s = arg;
- char * header;
- size_t len;
+ resp0_sock *s = arg;
+ char * header;
+ size_t len;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg)
}
static void
-resp_sock_recv(void *arg, nni_aio *aio)
+resp0_sock_recv(void *arg, nni_aio *aio)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops resp_pipe_ops = {
- .pipe_init = resp_pipe_init,
- .pipe_fini = resp_pipe_fini,
- .pipe_start = resp_pipe_start,
- .pipe_stop = resp_pipe_stop,
+static nni_proto_pipe_ops resp0_pipe_ops = {
+ .pipe_init = resp0_pipe_init,
+ .pipe_fini = resp0_pipe_fini,
+ .pipe_start = resp0_pipe_start,
+ .pipe_stop = resp0_pipe_stop,
};
-static nni_proto_sock_option resp_sock_options[] = {
+static nni_proto_sock_option resp0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = resp_sock_getopt_raw,
- .pso_setopt = resp_sock_setopt_raw,
+ .pso_getopt = resp0_sock_getopt_raw,
+ .pso_setopt = resp0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = resp_sock_getopt_maxttl,
- .pso_setopt = resp_sock_setopt_maxttl,
+ .pso_getopt = resp0_sock_getopt_maxttl,
+ .pso_setopt = resp0_sock_setopt_maxttl,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops resp_sock_ops = {
- .sock_init = resp_sock_init,
- .sock_fini = resp_sock_fini,
- .sock_open = resp_sock_open,
- .sock_close = resp_sock_close,
- .sock_filter = resp_sock_filter,
- .sock_send = resp_sock_send,
- .sock_recv = resp_sock_recv,
- .sock_options = resp_sock_options,
+static nni_proto_sock_ops resp0_sock_ops = {
+ .sock_init = resp0_sock_init,
+ .sock_fini = resp0_sock_fini,
+ .sock_open = resp0_sock_open,
+ .sock_close = resp0_sock_close,
+ .sock_filter = resp0_sock_filter,
+ .sock_send = resp0_sock_send,
+ .sock_recv = resp0_sock_recv,
+ .sock_options = resp0_sock_options,
};
-static nni_proto resp_proto = {
+static nni_proto resp0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &resp_sock_ops,
- .proto_pipe_ops = &resp_pipe_ops,
+ .proto_sock_ops = &resp0_sock_ops,
+ .proto_pipe_ops = &resp0_pipe_ops,
};
int
nng_respondent0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &resp_proto));
+ return (nni_proto_open(sidp, &resp0_proto));
}
diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h
new file mode 100644
index 00000000..58c65298
--- /dev/null
+++ b/src/protocol/survey0/respond.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H
+#define NNG_PROTOCOL_SURVEY0_RESPOND_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_respondent0_open(nng_socket *);
+
+#ifndef nng_respondent_open
+#define nng_respondent_open nng_respondent0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c
index 9dcd6664..02283436 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -12,22 +12,31 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/survey0/survey.h"
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct surv_pipe surv_pipe;
-typedef struct surv_sock surv_sock;
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
-static void surv_sock_getq_cb(void *);
-static void surv_getq_cb(void *);
-static void surv_putq_cb(void *);
-static void surv_send_cb(void *);
-static void surv_recv_cb(void *);
-static void surv_timeout(void *);
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
-// A surv_sock is our per-socket protocol private structure.
-struct surv_sock {
+typedef struct surv0_pipe surv0_pipe;
+typedef struct surv0_sock surv0_sock;
+
+static void surv0_sock_getq_cb(void *);
+static void surv0_getq_cb(void *);
+static void surv0_putq_cb(void *);
+static void surv0_send_cb(void *);
+static void surv0_recv_cb(void *);
+static void surv0_timeout(void *);
+
+// surv0_sock is our per-socket protocol private structure.
+struct surv0_sock {
nni_duration survtime;
nni_time expire;
int raw;
@@ -42,10 +51,10 @@ struct surv_sock {
nni_mtx mtx;
};
-// A surv_pipe is our per-pipe protocol private structure.
-struct surv_pipe {
+// surv0_pipe is our per-pipe protocol private structure.
+struct surv0_pipe {
nni_pipe * npipe;
- surv_sock * psock;
+ surv0_sock * psock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
@@ -55,9 +64,9 @@ struct surv_pipe {
};
static void
-surv_sock_fini(void *arg)
+surv0_sock_fini(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -66,21 +75,21 @@ surv_sock_fini(void *arg)
}
static int
-surv_sock_init(void **sp, nni_sock *nsock)
+surv0_sock_init(void **sp, nni_sock *nsock)
{
- surv_sock *s;
- int rv;
+ surv0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) {
- surv_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) {
+ surv0_sock_fini(s);
return (rv);
}
- NNI_LIST_INIT(&s->pipes, surv_pipe, node);
+ NNI_LIST_INIT(&s->pipes, surv0_pipe, node);
nni_mtx_init(&s->mtx);
- nni_timer_init(&s->timer, surv_timeout, s);
+ nni_timer_init(&s->timer, surv0_timeout, s);
s->nextid = nni_random();
s->raw = 0;
@@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock)
}
static void
-surv_sock_open(void *arg)
+surv0_sock_open(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-surv_sock_close(void *arg)
+surv0_sock_close(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_timer_cancel(&s->timer);
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-surv_pipe_fini(void *arg)
+surv0_pipe_fini(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -124,21 +133,21 @@ surv_pipe_fini(void *arg)
}
static int
-surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
+surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- surv_pipe *p;
- int rv;
+ surv0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
// This depth could be tunable.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) {
- surv_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) {
+ surv0_pipe_fini(p);
return (rv);
}
@@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-surv_pipe_start(void *arg)
+surv0_pipe_start(void *arg)
{
- surv_pipe *p = arg;
- surv_sock *s = p->psock;
+ surv0_pipe *p = arg;
+ surv0_sock *s = p->psock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
@@ -164,10 +173,10 @@ surv_pipe_start(void *arg)
}
static void
-surv_pipe_stop(void *arg)
+surv0_pipe_stop(void *arg)
{
- surv_pipe *p = arg;
- surv_sock *s = p->psock;
+ surv0_pipe *p = arg;
+ surv0_sock *s = p->psock;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
@@ -184,9 +193,9 @@ surv_pipe_stop(void *arg)
}
static void
-surv_getq_cb(void *arg)
+surv0_getq_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
@@ -200,9 +209,9 @@ surv_getq_cb(void *arg)
}
static void
-surv_send_cb(void *arg)
+surv0_send_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -215,9 +224,9 @@ surv_send_cb(void *arg)
}
static void
-surv_putq_cb(void *arg)
+surv0_putq_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -230,10 +239,10 @@ surv_putq_cb(void *arg)
}
static void
-surv_recv_cb(void *arg)
+surv0_recv_cb(void *arg)
{
- surv_pipe *p = arg;
- nni_msg * msg;
+ surv0_pipe *p = arg;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
goto failed;
@@ -265,10 +274,10 @@ failed:
}
static int
-surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- surv_sock *s = arg;
- int rv;
+ surv0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->mtx);
if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
@@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-surv_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz)
+surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_setopt_ms(&s->survtime, buf, sz));
}
static int
-surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp)
+surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_getopt_ms(s->survtime, buf, szp));
}
static void
-surv_sock_getq_cb(void *arg)
+surv0_sock_getq_cb(void *arg)
{
- surv_sock *s = arg;
- surv_pipe *p;
- surv_pipe *last;
- nni_msg * msg, *dup;
+ surv0_sock *s = arg;
+ surv0_pipe *p;
+ surv0_pipe *last;
+ nni_msg * msg, *dup;
if (nni_aio_result(s->aio_getq) != 0) {
// Should be NNG_ECLOSED.
@@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg)
}
static void
-surv_timeout(void *arg)
+surv0_timeout(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
s->survid = 0;
@@ -349,9 +358,9 @@ surv_timeout(void *arg)
}
static void
-surv_sock_recv(void *arg, nni_aio *aio)
+surv0_sock_recv(void *arg, nni_aio *aio)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->survid == 0) {
@@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio)
}
static void
-surv_sock_send(void *arg, nni_aio *aio)
+surv0_sock_send(void *arg, nni_aio *aio)
{
- surv_sock *s = arg;
- nni_msg * msg;
- int rv;
+ surv0_sock *s = arg;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-surv_sock_filter(void *arg, nni_msg *msg)
+surv0_sock_filter(void *arg, nni_msg *msg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg)
return (msg);
}
-static nni_proto_pipe_ops surv_pipe_ops = {
- .pipe_init = surv_pipe_init,
- .pipe_fini = surv_pipe_fini,
- .pipe_start = surv_pipe_start,
- .pipe_stop = surv_pipe_stop,
+static nni_proto_pipe_ops surv0_pipe_ops = {
+ .pipe_init = surv0_pipe_init,
+ .pipe_fini = surv0_pipe_fini,
+ .pipe_start = surv0_pipe_start,
+ .pipe_stop = surv0_pipe_stop,
};
-static nni_proto_sock_option surv_sock_options[] = {
+static nni_proto_sock_option surv0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = surv_sock_getopt_raw,
- .pso_setopt = surv_sock_setopt_raw,
+ .pso_getopt = surv0_sock_getopt_raw,
+ .pso_setopt = surv0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_SURVEYOR_SURVEYTIME,
- .pso_getopt = surv_sock_getopt_surveytime,
- .pso_setopt = surv_sock_setopt_surveytime,
+ .pso_getopt = surv0_sock_getopt_surveytime,
+ .pso_setopt = surv0_sock_setopt_surveytime,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops surv_sock_ops = {
- .sock_init = surv_sock_init,
- .sock_fini = surv_sock_fini,
- .sock_open = surv_sock_open,
- .sock_close = surv_sock_close,
- .sock_send = surv_sock_send,
- .sock_recv = surv_sock_recv,
- .sock_filter = surv_sock_filter,
- .sock_options = surv_sock_options,
+static nni_proto_sock_ops surv0_sock_ops = {
+ .sock_init = surv0_sock_init,
+ .sock_fini = surv0_sock_fini,
+ .sock_open = surv0_sock_open,
+ .sock_close = surv0_sock_close,
+ .sock_send = surv0_sock_send,
+ .sock_recv = surv0_sock_recv,
+ .sock_filter = surv0_sock_filter,
+ .sock_options = surv0_sock_options,
};
-static nni_proto surv_proto = {
+static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &surv_sock_ops,
- .proto_pipe_ops = &surv_pipe_ops,
+ .proto_sock_ops = &surv0_sock_ops,
+ .proto_pipe_ops = &surv0_pipe_ops,
};
int
nng_surveyor0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &surv_proto));
+ return (nni_proto_open(sidp, &surv0_proto));
}
diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h
new file mode 100644
index 00000000..a7b6d943
--- /dev/null
+++ b/src/protocol/survey0/survey.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H
+#define NNG_PROTOCOL_SURVEY0_SURVEY_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_surveyor0_open(nng_socket *);
+
+#ifndef nng_surveyor_open
+#define nng_surveyor_open nng_surveyor0_open
+#endif
+
+#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H
diff --git a/src/transport/inproc/CMakeLists.txt b/src/transport/inproc/CMakeLists.txt
new file mode 100644
index 00000000..a445da85
--- /dev/null
+++ b/src/transport/inproc/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# inproc protocol
+
+if (NNG_TRANSPORT_INPROC)
+ set(INPROC_SOURCES transport/inproc/inproc.c transport/inproc/inproc.h)
+ install(FILES inproc.h DESTINATION include/nng/transport/inproc)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${INPROC_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index c747f7dc..ae64263c 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -475,9 +475,8 @@ struct nni_tran nni_inproc_tran = {
.tran_fini = nni_inproc_fini,
};
-
int
nng_inproc_register(void)
{
- return (nni_tran_register(&nni_inproc_tran));
+ return (nni_tran_register(&nni_inproc_tran));
}
diff --git a/src/transport/ipc/CMakeLists.txt b/src/transport/ipc/CMakeLists.txt
new file mode 100644
index 00000000..1a5496cf
--- /dev/null
+++ b/src/transport/ipc/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# ipc protocol
+
+if (NNG_TRANSPORT_IPC)
+ set(IPC_SOURCES transport/ipc/ipc.c transport/ipc/ipc.h)
+ install(FILES ipc.h DESTINATION include/nng/transport/ipc)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${IPC_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index afe8afa8..c13dbd34 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -704,9 +704,7 @@ static nni_tran_ep nni_ipc_ep_ops = {
.ep_options = nni_ipc_ep_options,
};
-// This is the IPC transport linkage, and should be the only global
-// symbol in this entire file.
-struct nni_tran nni_ipc_tran = {
+static nni_tran nni_ipc_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "ipc",
.tran_ep = &nni_ipc_ep_ops,
@@ -714,3 +712,9 @@ struct nni_tran nni_ipc_tran = {
.tran_init = nni_ipc_tran_init,
.tran_fini = nni_ipc_tran_fini,
};
+
+int
+nng_ipc_register(void)
+{
+ return (nni_tran_register(&nni_ipc_tran));
+}
diff --git a/src/transport/ipc/ipc.h b/src/transport/ipc/ipc.h
new file mode 100644
index 00000000..f19762c7
--- /dev/null
+++ b/src/transport/ipc/ipc.h
@@ -0,0 +1,19 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_IPC_IPC_H
+#define NNG_TRANSPORT_IPC_IPC_H
+
+// ipc transport. This is used for inter-process communication on
+// the same host computer.
+
+extern int nng_ipc_register(void);
+
+#endif // NNG_TRANSPORT_IPC_IPC_H
diff --git a/src/transport/tcp/CMakeLists.txt b/src/transport/tcp/CMakeLists.txt
new file mode 100644
index 00000000..305c357a
--- /dev/null
+++ b/src/transport/tcp/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# TCP protocol
+
+if (NNG_TRANSPORT_TCP)
+ set(TCP_SOURCES transport/tcp/tcp.c transport/tcp/tcp.h)
+ install(FILES tcp.h DESTINATION include/nng/transport/tcp)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${TCP_SOURCES} PARENT_SCOPE)
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 43b2890d..e33db865 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -854,9 +854,7 @@ static nni_tran_ep nni_tcp_ep_ops = {
.ep_options = nni_tcp_ep_options,
};
-// This is the TCP transport linkage, and should be the only global
-// symbol in this entire file.
-struct nni_tran nni_tcp_tran = {
+static nni_tran nni_tcp_tran = {
.tran_version = NNI_TRANSPORT_VERSION,
.tran_scheme = "tcp",
.tran_ep = &nni_tcp_ep_ops,
@@ -864,3 +862,9 @@ struct nni_tran nni_tcp_tran = {
.tran_init = nni_tcp_tran_init,
.tran_fini = nni_tcp_tran_fini,
};
+
+int
+nng_tcp_register(void)
+{
+ return (nni_tran_register(&nni_tcp_tran));
+}
diff --git a/src/transport/tcp/tcp.h b/src/transport/tcp/tcp.h
new file mode 100644
index 00000000..b4c79461
--- /dev/null
+++ b/src/transport/tcp/tcp.h
@@ -0,0 +1,18 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_TRANSPORT_TCP_TCP_H
+#define NNG_TRANSPORT_TCP_TCP_H
+
+// TCP transport. This is used for communication over TCP/IP.
+
+extern int nng_tcp_register(void);
+
+#endif // NNG_TRANSPORT_TCP_TCP_H
diff --git a/src/transport/zerotier/CMakeLists.txt b/src/transport/zerotier/CMakeLists.txt
new file mode 100644
index 00000000..c1bb0c35
--- /dev/null
+++ b/src/transport/zerotier/CMakeLists.txt
@@ -0,0 +1,50 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# ZeroTier protocol
+
+set (NNG_TRANSPORT_ZEROTIER_SOURCE "" CACHE PATH "Location of ZeroTier source tree.")
+
+if (NNG_TRANSPORT_ZEROTIER)
+
+ # We use the libzerotiercore.a library, which is unfortunately a C++ object
+ # even though it exposes only public C symbols. It would be extremely
+ # helpful if libzerotiercore didn't make us carry the whole C++ runtime
+ # behind us. The user must specify the location of the ZeroTier source
+ # tree (dev branch for now, and already compiled please) by setting the
+ # NNG_ZEROTIER_SOURCE macro.
+ # NB: This needs to be the zerotierone tree, not the libzt library.
+ # This is because we don't access the API, but instead use the low
+ # level zerotiercore functionality directly.
+ # NB: As we wind up linking libzerotiercore.a into the application,
+ # this means that your application will *also* need to either be licensed
+ # under the GPLv3, or you will need to have a commercial license from
+ # ZeroTier permitting its use elsewhere.
+
+ enable_language(CXX)
+ find_library(NNG_LIBZTCORE zerotiercore PATHS ${NNG_TRANSPORT_ZEROTIER_SOURCE})
+ if (NNG_LIBZTCORE)
+ set(CMAKE_REQUIRED_INCLUDES ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include)
+ message(STATUS "C++ ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES}")
+ set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES})
+ set(NNG_REQUIRED_LIBRARIES ${NNG_REQUIRED_LIBRARIES} ${NNG_LIBZTCORE} ${CMAKE_CXX_IMPLICIT_LINK_LIBRARIES} PARENT_SCOPE)
+ set(NNG_REQUIRED_INCLUDES ${NNG_REQUIRED_INCLUDES} ${NNG_TRANSPORT_ZEROTIER_SOURCE}/include PARENT_SCOPE)
+ nng_check_sym(ZT_Node_join ZeroTierOne.h HAVE_ZTCORE)
+ endif()
+ if (NOT HAVE_ZTCORE)
+ message (FATAL_ERROR "Cannot find ZeroTier components")
+ endif()
+ message(STATUS "Found ZeroTier at ${NNG_LIBZTCORE}")
+
+ set(ZT_SOURCES transport/zerotier/zerotier.c transport/zerotier/zerotier.h)
+ install(FILES zerotier.h DESTINATION include/nng/transport/zerotier)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${ZT_SOURCES} PARENT_SCOPE)