aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus0/CMakeLists.txt18
-rw-r--r--src/protocol/bus0/bus.c (renamed from src/protocol/bus/bus.c)205
-rw-r--r--src/protocol/bus0/bus.h28
-rw-r--r--src/protocol/pair0/CMakeLists.txt18
-rw-r--r--src/protocol/pair0/pair.c (renamed from src/protocol/pair/pair_v0.c)4
-rw-r--r--src/protocol/pair0/pair.h28
-rw-r--r--src/protocol/pair1/CMakeLists.txt18
-rw-r--r--src/protocol/pair1/pair.c (renamed from src/protocol/pair/pair_v1.c)6
-rw-r--r--src/protocol/pair1/pair.h30
-rw-r--r--src/protocol/pipeline0/CMakeLists.txt23
-rw-r--r--src/protocol/pipeline0/pull.c (renamed from src/protocol/pipeline/pull.c)146
-rw-r--r--src/protocol/pipeline0/pull.h28
-rw-r--r--src/protocol/pipeline0/push.c (renamed from src/protocol/pipeline/push.c)138
-rw-r--r--src/protocol/pipeline0/push.h28
-rw-r--r--src/protocol/pubsub0/CMakeLists.txt23
-rw-r--r--src/protocol/pubsub0/pub.c (renamed from src/protocol/pubsub/pub.c)169
-rw-r--r--src/protocol/pubsub0/pub.h28
-rw-r--r--src/protocol/pubsub0/sub.c (renamed from src/protocol/pubsub/sub.c)184
-rw-r--r--src/protocol/pubsub0/sub.h31
-rw-r--r--src/protocol/reqrep0/CMakeLists.txt23
-rw-r--r--src/protocol/reqrep0/rep.c (renamed from src/protocol/reqrep/rep.c)227
-rw-r--r--src/protocol/reqrep0/rep.h28
-rw-r--r--src/protocol/reqrep0/req.c (renamed from src/protocol/reqrep/req.c)239
-rw-r--r--src/protocol/reqrep0/req.h30
-rw-r--r--src/protocol/survey0/CMakeLists.txt23
-rw-r--r--src/protocol/survey0/respond.c (renamed from src/protocol/survey/respond.c)231
-rw-r--r--src/protocol/survey0/respond.h28
-rw-r--r--src/protocol/survey0/survey.c (renamed from src/protocol/survey/survey.c)205
-rw-r--r--src/protocol/survey0/survey.h30
29 files changed, 1380 insertions, 837 deletions
diff --git a/src/protocol/bus0/CMakeLists.txt b/src/protocol/bus0/CMakeLists.txt
new file mode 100644
index 00000000..5071054a
--- /dev/null
+++ b/src/protocol/bus0/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Bus protocol
+
+if (NNG_PROTO_BUS0)
+ set(BUS0_SOURCES protocol/bus0/bus.c protocol/bus0/bus.h)
+ install(FILES bus.h DESTINATION include/nng/protocol/bus0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${BUS0_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus0/bus.c
index 6ec0066b..3e15000b 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -12,31 +12,36 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/bus0/bus.h"
// Bus protocol. The BUS protocol, each peer sends a message to its peers.
// However, bus protocols do not "forward" (absent a device). So in order
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct bus_pipe bus_pipe;
-typedef struct bus_sock bus_sock;
+#ifndef NNI_PROTO_BUS_V0
+#define NNI_PROTO_BUS_V0 NNI_PROTO(7, 0)
+#endif
-static void bus_sock_getq(bus_sock *);
-static void bus_sock_send(void *, nni_aio *);
-static void bus_sock_recv(void *, nni_aio *);
+typedef struct bus0_pipe bus0_pipe;
+typedef struct bus0_sock bus0_sock;
-static void bus_pipe_getq(bus_pipe *);
-static void bus_pipe_send(bus_pipe *);
-static void bus_pipe_recv(bus_pipe *);
+static void bus0_sock_getq(bus0_sock *);
+static void bus0_sock_send(void *, nni_aio *);
+static void bus0_sock_recv(void *, nni_aio *);
-static void bus_sock_getq_cb(void *);
-static void bus_pipe_getq_cb(void *);
-static void bus_pipe_send_cb(void *);
-static void bus_pipe_recv_cb(void *);
-static void bus_pipe_putq_cb(void *);
+static void bus0_pipe_getq(bus0_pipe *);
+static void bus0_pipe_send(bus0_pipe *);
+static void bus0_pipe_recv(bus0_pipe *);
-// A bus_sock is our per-socket protocol private structure.
-struct bus_sock {
+static void bus0_sock_getq_cb(void *);
+static void bus0_pipe_getq_cb(void *);
+static void bus0_pipe_send_cb(void *);
+static void bus0_pipe_recv_cb(void *);
+static void bus0_pipe_putq_cb(void *);
+
+// bus0_sock is our per-socket protocol private structure.
+struct bus0_sock {
int raw;
nni_aio * aio_getq;
nni_list pipes;
@@ -45,10 +50,10 @@ struct bus_sock {
nni_msgq *urq;
};
-// A bus_pipe is our per-pipe protocol private structure.
-struct bus_pipe {
+// bus0_pipe is our per-pipe protocol private structure.
+struct bus0_pipe {
nni_pipe * npipe;
- bus_sock * psock;
+ bus0_sock * psock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
@@ -59,9 +64,9 @@ struct bus_pipe {
};
static void
-bus_sock_fini(void *arg)
+bus0_sock_fini(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -70,18 +75,18 @@ bus_sock_fini(void *arg)
}
static int
-bus_sock_init(void **sp, nni_sock *nsock)
+bus0_sock_init(void **sp, nni_sock *nsock)
{
- bus_sock *s;
- int rv;
+ bus0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&s->pipes, bus_pipe, node);
+ NNI_LIST_INIT(&s->pipes, bus0_pipe, node);
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) {
- bus_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, bus0_sock_getq_cb, s)) != 0) {
+ bus0_sock_fini(s);
return (rv);
}
s->raw = 0;
@@ -93,25 +98,25 @@ bus_sock_init(void **sp, nni_sock *nsock)
}
static void
-bus_sock_open(void *arg)
+bus0_sock_open(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
- bus_sock_getq(s);
+ bus0_sock_getq(s);
}
static void
-bus_sock_close(void *arg)
+bus0_sock_close(void *arg)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-bus_pipe_fini(void *arg)
+bus0_pipe_fini(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -123,10 +128,10 @@ bus_pipe_fini(void *arg)
}
static int
-bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
+bus0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- bus_pipe *p;
- int rv;
+ bus0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -134,11 +139,11 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
NNI_LIST_NODE_INIT(&p->node);
nni_mtx_init(&p->mtx);
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) {
- bus_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, bus0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, bus0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, bus0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, bus0_pipe_putq_cb, p)) != 0)) {
+ bus0_pipe_fini(p);
return (rv);
}
@@ -149,26 +154,26 @@ bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-bus_pipe_start(void *arg)
+bus0_pipe_start(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
- bus_pipe_recv(p);
- bus_pipe_getq(p);
+ bus0_pipe_recv(p);
+ bus0_pipe_getq(p);
return (0);
}
static void
-bus_pipe_stop(void *arg)
+bus0_pipe_stop(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
nni_msgq_close(p->sendq);
@@ -185,9 +190,9 @@ bus_pipe_stop(void *arg)
}
static void
-bus_pipe_getq_cb(void *arg)
+bus0_pipe_getq_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
// closed?
@@ -201,9 +206,9 @@ bus_pipe_getq_cb(void *arg)
}
static void
-bus_pipe_send_cb(void *arg)
+bus0_pipe_send_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
// closed?
@@ -213,15 +218,15 @@ bus_pipe_send_cb(void *arg)
return;
}
- bus_pipe_getq(p);
+ bus0_pipe_getq(p);
}
static void
-bus_pipe_recv_cb(void *arg)
+bus0_pipe_recv_cb(void *arg)
{
- bus_pipe *p = arg;
- bus_sock *s = p->psock;
- nni_msg * msg;
+ bus0_pipe *p = arg;
+ bus0_sock *s = p->psock;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
@@ -243,9 +248,9 @@ bus_pipe_recv_cb(void *arg)
}
static void
-bus_pipe_putq_cb(void *arg)
+bus0_pipe_putq_cb(void *arg)
{
- bus_pipe *p = arg;
+ bus0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -255,18 +260,18 @@ bus_pipe_putq_cb(void *arg)
}
// Wait for another recv.
- bus_pipe_recv(p);
+ bus0_pipe_recv(p);
}
static void
-bus_sock_getq_cb(void *arg)
+bus0_sock_getq_cb(void *arg)
{
- bus_sock *s = arg;
- bus_pipe *p;
- bus_pipe *lastp;
- nni_msg * msg;
- nni_msg * dup;
- uint32_t sender;
+ bus0_sock *s = arg;
+ bus0_pipe *p;
+ bus0_pipe *lastp;
+ nni_msg * msg;
+ nni_msg * dup;
+ uint32_t sender;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -307,95 +312,95 @@ bus_sock_getq_cb(void *arg)
nni_msg_free(msg);
}
- bus_sock_getq(s);
+ bus0_sock_getq(s);
}
static void
-bus_sock_getq(bus_sock *s)
+bus0_sock_getq(bus0_sock *s)
{
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-bus_pipe_getq(bus_pipe *p)
+bus0_pipe_getq(bus0_pipe *p)
{
nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-bus_pipe_recv(bus_pipe *p)
+bus0_pipe_recv(bus0_pipe *p)
{
nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-bus_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-bus_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-bus_sock_send(void *arg, nni_aio *aio)
+bus0_sock_send(void *arg, nni_aio *aio)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
static void
-bus_sock_recv(void *arg, nni_aio *aio)
+bus0_sock_recv(void *arg, nni_aio *aio)
{
- bus_sock *s = arg;
+ bus0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops bus_pipe_ops = {
- .pipe_init = bus_pipe_init,
- .pipe_fini = bus_pipe_fini,
- .pipe_start = bus_pipe_start,
- .pipe_stop = bus_pipe_stop,
+static nni_proto_pipe_ops bus0_pipe_ops = {
+ .pipe_init = bus0_pipe_init,
+ .pipe_fini = bus0_pipe_fini,
+ .pipe_start = bus0_pipe_start,
+ .pipe_stop = bus0_pipe_stop,
};
-static nni_proto_sock_option bus_sock_options[] = {
+static nni_proto_sock_option bus0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = bus_sock_getopt_raw,
- .pso_setopt = bus_sock_setopt_raw,
+ .pso_getopt = bus0_sock_getopt_raw,
+ .pso_setopt = bus0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops bus_sock_ops = {
- .sock_init = bus_sock_init,
- .sock_fini = bus_sock_fini,
- .sock_open = bus_sock_open,
- .sock_close = bus_sock_close,
- .sock_send = bus_sock_send,
- .sock_recv = bus_sock_recv,
- .sock_options = bus_sock_options,
+static nni_proto_sock_ops bus0_sock_ops = {
+ .sock_init = bus0_sock_init,
+ .sock_fini = bus0_sock_fini,
+ .sock_open = bus0_sock_open,
+ .sock_close = bus0_sock_close,
+ .sock_send = bus0_sock_send,
+ .sock_recv = bus0_sock_recv,
+ .sock_options = bus0_sock_options,
};
-static nni_proto bus_proto = {
+static nni_proto bus0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_BUS_V0, "bus" },
.proto_peer = { NNI_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &bus_sock_ops,
- .proto_pipe_ops = &bus_pipe_ops,
+ .proto_sock_ops = &bus0_sock_ops,
+ .proto_pipe_ops = &bus0_pipe_ops,
};
int
nng_bus0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &bus_proto));
+ return (nni_proto_open(sidp, &bus0_proto));
}
diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h
new file mode 100644
index 00000000..0ef3d391
--- /dev/null
+++ b/src/protocol/bus0/bus.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_BUS0_BUS_H
+#define NNG_PROTOCOL_BUS0_BUS_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_bus0_open(nng_socket *);
+
+#ifndef nng_bus_open
+#define nng_bus_open nng_bus0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_BUS0_BUS_H
diff --git a/src/protocol/pair0/CMakeLists.txt b/src/protocol/pair0/CMakeLists.txt
new file mode 100644
index 00000000..68e7ad34
--- /dev/null
+++ b/src/protocol/pair0/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# PAIRv0 protocol
+
+if (NNG_PROTO_PAIR0)
+ set(PAIR0_SOURCES protocol/pair0/pair.c protocol/pair0/pair.h)
+ install(FILES pair.h DESTINATION include/nng/protocol/pair0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PAIR0_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair0/pair.c
index 93cd1497..bac405b8 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair0/pair.c
@@ -17,6 +17,10 @@
// While a peer is connected to the server, all other peer connection
// attempts are discarded.
+#ifndef NNI_PROTO_PAIR_V0
+#define NNI_PROTO_PAIR_V0 NNI_PROTO(1, 0)
+#endif
+
typedef struct pair0_pipe pair0_pipe;
typedef struct pair0_sock pair0_sock;
diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h
new file mode 100644
index 00000000..6828c921
--- /dev/null
+++ b/src/protocol/pair0/pair.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PAIR0_PAIR_H
+#define NNG_PROTOCOL_PAIR0_PAIR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pair0_open(nng_socket *);
+
+#ifndef nng_pair_open
+#define nng_pair_open nng_pair0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PAIR0_PAIR_H
diff --git a/src/protocol/pair1/CMakeLists.txt b/src/protocol/pair1/CMakeLists.txt
new file mode 100644
index 00000000..f35d6959
--- /dev/null
+++ b/src/protocol/pair1/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# PAIRv1 protocol
+
+if (NNG_PROTO_PAIR1)
+ set(PAIR1_SOURCES protocol/pair1/pair.c protocol/pair1/pair.h)
+ install(FILES pair.h DESTINATION include/nng/protocol/pair1)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PAIR1_SOURCES} PARENT_SCOPE)
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair1/pair.c
index e14d06d5..3f6f63fc 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair1/pair.c
@@ -13,10 +13,16 @@
#include "core/nng_impl.h"
+#include "protocol/pair1/pair.h"
+
// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern,
// usually, but it can support a polyamorous mode where a single server can
// communicate with multiple partners.
+#ifndef NNI_PROTO_PAIR_V1
+#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1)
+#endif
+
typedef struct pair1_pipe pair1_pipe;
typedef struct pair1_sock pair1_sock;
diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h
new file mode 100644
index 00000000..bc519d9f
--- /dev/null
+++ b/src/protocol/pair1/pair.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PAIR1_PAIR_H
+#define NNG_PROTOCOL_PAIR1_PAIR_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pair1_open(nng_socket *);
+
+#ifndef nng_pair_open
+#define nng_pair_open nng_pair1_open
+#endif
+
+#define NNG_OPT_PAIR1_POLY "pair1:polyamorous"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PAIR1_PAIR_H
diff --git a/src/protocol/pipeline0/CMakeLists.txt b/src/protocol/pipeline0/CMakeLists.txt
new file mode 100644
index 00000000..6153c5a7
--- /dev/null
+++ b/src/protocol/pipeline0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Pub/Sub protocol
+
+if (NNG_PROTO_PUSH0)
+ set(PUSH0_SOURCES protocol/pipeline0/push.c protocol/pipeline0/push.h)
+ install(FILES push.h DESTINATION include/nng/protocol/pipeline0)
+endif()
+
+if (NNG_PROTO_PULL0)
+ set(PULL0_SOURCES protocol/pipeline0/pull.c protocol/pipeline0/pull.h)
+ install(FILES pull.h DESTINATION include/nng/protocol/pipeline0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PUSH0_SOURCES} ${PULL0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline0/pull.c
index 9685f0a1..8c16cb17 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -15,31 +15,39 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct pull_pipe pull_pipe;
-typedef struct pull_sock pull_sock;
+#ifndef NNI_PROTO_PULL_V0
+#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1)
+#endif
-static void pull_putq_cb(void *);
-static void pull_recv_cb(void *);
-static void pull_putq(pull_pipe *, nni_msg *);
+#ifndef NNI_PROTO_PUSH_V0
+#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0)
+#endif
-// A pull_sock is our per-socket protocol private structure.
-struct pull_sock {
+typedef struct pull0_pipe pull0_pipe;
+typedef struct pull0_sock pull0_sock;
+
+static void pull0_putq_cb(void *);
+static void pull0_recv_cb(void *);
+static void pull0_putq(pull0_pipe *, nni_msg *);
+
+// pull0_sock is our per-socket protocol private structure.
+struct pull0_sock {
nni_msgq *urq;
int raw;
};
-// A pull_pipe is our per-pipe protocol private structure.
-struct pull_pipe {
- nni_pipe * pipe;
- pull_sock *pull;
- nni_aio * putq_aio;
- nni_aio * recv_aio;
+// pull0_pipe is our per-pipe protocol private structure.
+struct pull0_pipe {
+ nni_pipe * pipe;
+ pull0_sock *pull;
+ nni_aio * putq_aio;
+ nni_aio * recv_aio;
};
static int
-pull_sock_init(void **sp, nni_sock *sock)
+pull0_sock_init(void **sp, nni_sock *sock)
{
- pull_sock *s;
+ pull0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -52,17 +60,17 @@ pull_sock_init(void **sp, nni_sock *sock)
}
static void
-pull_sock_fini(void *arg)
+pull0_sock_fini(void *arg)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
NNI_FREE_STRUCT(s);
}
static void
-pull_pipe_fini(void *arg)
+pull0_pipe_fini(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
nni_aio_fini(p->putq_aio);
nni_aio_fini(p->recv_aio);
@@ -70,17 +78,17 @@ pull_pipe_fini(void *arg)
}
static int
-pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pull0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- pull_pipe *p;
- int rv;
+ pull0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) {
- pull_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->putq_aio, pull0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->recv_aio, pull0_recv_cb, p)) != 0)) {
+ pull0_pipe_fini(p);
return (rv);
}
@@ -91,9 +99,9 @@ pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-pull_pipe_start(void *arg)
+pull0_pipe_start(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
// Start the pending pull...
nni_pipe_recv(p->pipe, p->recv_aio);
@@ -102,20 +110,20 @@ pull_pipe_start(void *arg)
}
static void
-pull_pipe_stop(void *arg)
+pull0_pipe_stop(void *arg)
{
- pull_pipe *p = arg;
+ pull0_pipe *p = arg;
nni_aio_stop(p->putq_aio);
nni_aio_stop(p->recv_aio);
}
static void
-pull_recv_cb(void *arg)
+pull0_recv_cb(void *arg)
{
- pull_pipe *p = arg;
- nni_aio * aio = p->recv_aio;
- nni_msg * msg;
+ pull0_pipe *p = arg;
+ nni_aio * aio = p->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
@@ -127,14 +135,14 @@ pull_recv_cb(void *arg)
msg = nni_aio_get_msg(aio);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
nni_aio_set_msg(aio, NULL);
- pull_putq(p, msg);
+ pull0_putq(p, msg);
}
static void
-pull_putq_cb(void *arg)
+pull0_putq_cb(void *arg)
{
- pull_pipe *p = arg;
- nni_aio * aio = p->putq_aio;
+ pull0_pipe *p = arg;
+ nni_aio * aio = p->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
@@ -148,11 +156,11 @@ pull_putq_cb(void *arg)
nni_pipe_recv(p->pipe, p->recv_aio);
}
-// nni_pull_putq schedules a put operation to the user socket (sendup).
+// pull0_putq schedules a put operation to the user socket (sendup).
static void
-pull_putq(pull_pipe *p, nni_msg *msg)
+pull0_putq(pull0_pipe *p, nni_msg *msg)
{
- pull_sock *s = p->pull;
+ pull0_sock *s = p->pull;
nni_aio_set_msg(p->putq_aio, msg);
@@ -160,83 +168,83 @@ pull_putq(pull_pipe *p, nni_msg *msg)
}
static void
-pull_sock_open(void *arg)
+pull0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-pull_sock_close(void *arg)
+pull0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static int
-pull_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-pull_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-pull_sock_send(void *arg, nni_aio *aio)
+pull0_sock_send(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-pull_sock_recv(void *arg, nni_aio *aio)
+pull0_sock_recv(void *arg, nni_aio *aio)
{
- pull_sock *s = arg;
+ pull0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops pull_pipe_ops = {
- .pipe_init = pull_pipe_init,
- .pipe_fini = pull_pipe_fini,
- .pipe_start = pull_pipe_start,
- .pipe_stop = pull_pipe_stop,
+static nni_proto_pipe_ops pull0_pipe_ops = {
+ .pipe_init = pull0_pipe_init,
+ .pipe_fini = pull0_pipe_fini,
+ .pipe_start = pull0_pipe_start,
+ .pipe_stop = pull0_pipe_stop,
};
-static nni_proto_sock_option pull_sock_options[] = {
+static nni_proto_sock_option pull0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = pull_sock_getopt_raw,
- .pso_setopt = pull_sock_setopt_raw,
+ .pso_getopt = pull0_sock_getopt_raw,
+ .pso_setopt = pull0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops pull_sock_ops = {
- .sock_init = pull_sock_init,
- .sock_fini = pull_sock_fini,
- .sock_open = pull_sock_open,
- .sock_close = pull_sock_close,
- .sock_send = pull_sock_send,
- .sock_recv = pull_sock_recv,
- .sock_options = pull_sock_options,
+static nni_proto_sock_ops pull0_sock_ops = {
+ .sock_init = pull0_sock_init,
+ .sock_fini = pull0_sock_fini,
+ .sock_open = pull0_sock_open,
+ .sock_close = pull0_sock_close,
+ .sock_send = pull0_sock_send,
+ .sock_recv = pull0_sock_recv,
+ .sock_options = pull0_sock_options,
};
-static nni_proto pull_proto = {
+static nni_proto pull0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PULL_V0, "pull" },
.proto_peer = { NNI_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_pipe_ops = &pull_pipe_ops,
- .proto_sock_ops = &pull_sock_ops,
+ .proto_pipe_ops = &pull0_pipe_ops,
+ .proto_sock_ops = &pull0_sock_ops,
};
int
nng_pull0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &pull_proto));
+ return (nni_proto_open(sidp, &pull0_proto));
}
diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h
new file mode 100644
index 00000000..75bded03
--- /dev/null
+++ b/src/protocol/pipeline0/pull.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PIPELINE0_PULL_H
+#define NNG_PROTOCOL_PIPELINE0_PULL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pull0_open(nng_socket *);
+
+#ifndef nng_pull_open
+#define nng_pull_open nng_pull0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PIPELINE0_PULL_H
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline0/push.c
index 9ff74558..3dd83fe0 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -17,23 +17,31 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct push_pipe push_pipe;
-typedef struct push_sock push_sock;
+#ifndef NNI_PROTO_PULL_V0
+#define NNI_PROTO_PULL_V0 NNI_PROTO(5, 1)
+#endif
-static void push_send_cb(void *);
-static void push_recv_cb(void *);
-static void push_getq_cb(void *);
+#ifndef NNI_PROTO_PUSH_V0
+#define NNI_PROTO_PUSH_V0 NNI_PROTO(5, 0)
+#endif
-// An nni_push_sock is our per-socket protocol private structure.
-struct push_sock {
+typedef struct push0_pipe push0_pipe;
+typedef struct push0_sock push0_sock;
+
+static void push0_send_cb(void *);
+static void push0_recv_cb(void *);
+static void push0_getq_cb(void *);
+
+// push0_sock is our per-socket protocol private structure.
+struct push0_sock {
nni_msgq *uwq;
int raw;
};
-// An nni_push_pipe is our per-pipe protocol private structure.
-struct push_pipe {
+// push0_pipe is our per-pipe protocol private structure.
+struct push0_pipe {
nni_pipe * pipe;
- push_sock * push;
+ push0_sock * push;
nni_list_node node;
nni_aio *aio_recv;
@@ -42,9 +50,9 @@ struct push_pipe {
};
static int
-push_sock_init(void **sp, nni_sock *sock)
+push0_sock_init(void **sp, nni_sock *sock)
{
- push_sock *s;
+ push0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -56,29 +64,29 @@ push_sock_init(void **sp, nni_sock *sock)
}
static void
-push_sock_fini(void *arg)
+push0_sock_fini(void *arg)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
NNI_FREE_STRUCT(s);
}
static void
-push_sock_open(void *arg)
+push0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-push_sock_close(void *arg)
+push0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-push_pipe_fini(void *arg)
+push0_pipe_fini(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_send);
@@ -87,18 +95,18 @@ push_pipe_fini(void *arg)
}
static int
-push_pipe_init(void **pp, nni_pipe *pipe, void *s)
+push0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- push_pipe *p;
- int rv;
+ push0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) {
- push_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->aio_recv, push0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, push0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, push0_getq_cb, p)) != 0)) {
+ push0_pipe_fini(p);
return (rv);
}
NNI_LIST_NODE_INIT(&p->node);
@@ -109,10 +117,10 @@ push_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-push_pipe_start(void *arg)
+push0_pipe_start(void *arg)
{
- push_pipe *p = arg;
- push_sock *s = p->push;
+ push0_pipe *p = arg;
+ push0_sock *s = p->push;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_PULL_V0) {
return (NNG_EPROTO);
@@ -129,9 +137,9 @@ push_pipe_start(void *arg)
}
static void
-push_pipe_stop(void *arg)
+push0_pipe_stop(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
nni_aio_stop(p->aio_recv);
nni_aio_stop(p->aio_send);
@@ -139,9 +147,9 @@ push_pipe_stop(void *arg)
}
static void
-push_recv_cb(void *arg)
+push0_recv_cb(void *arg)
{
- push_pipe *p = arg;
+ push0_pipe *p = arg;
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
@@ -155,10 +163,10 @@ push_recv_cb(void *arg)
}
static void
-push_send_cb(void *arg)
+push0_send_cb(void *arg)
{
- push_pipe *p = arg;
- push_sock *s = p->push;
+ push0_pipe *p = arg;
+ push0_sock *s = p->push;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -171,10 +179,10 @@ push_send_cb(void *arg)
}
static void
-push_getq_cb(void *arg)
+push0_getq_cb(void *arg)
{
- push_pipe *p = arg;
- nni_aio * aio = p->aio_getq;
+ push0_pipe *p = arg;
+ nni_aio * aio = p->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
@@ -189,71 +197,71 @@ push_getq_cb(void *arg)
}
static int
-push_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+push0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-push_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+push0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-push_sock_send(void *arg, nni_aio *aio)
+push0_sock_send(void *arg, nni_aio *aio)
{
- push_sock *s = arg;
+ push0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
static void
-push_sock_recv(void *arg, nni_aio *aio)
+push0_sock_recv(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
-static nni_proto_pipe_ops push_pipe_ops = {
- .pipe_init = push_pipe_init,
- .pipe_fini = push_pipe_fini,
- .pipe_start = push_pipe_start,
- .pipe_stop = push_pipe_stop,
+static nni_proto_pipe_ops push0_pipe_ops = {
+ .pipe_init = push0_pipe_init,
+ .pipe_fini = push0_pipe_fini,
+ .pipe_start = push0_pipe_start,
+ .pipe_stop = push0_pipe_stop,
};
-static nni_proto_sock_option push_sock_options[] = {
+static nni_proto_sock_option push0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = push_sock_getopt_raw,
- .pso_setopt = push_sock_setopt_raw,
+ .pso_getopt = push0_sock_getopt_raw,
+ .pso_setopt = push0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops push_sock_ops = {
- .sock_init = push_sock_init,
- .sock_fini = push_sock_fini,
- .sock_open = push_sock_open,
- .sock_close = push_sock_close,
- .sock_options = push_sock_options,
- .sock_send = push_sock_send,
- .sock_recv = push_sock_recv,
+static nni_proto_sock_ops push0_sock_ops = {
+ .sock_init = push0_sock_init,
+ .sock_fini = push0_sock_fini,
+ .sock_open = push0_sock_open,
+ .sock_close = push0_sock_close,
+ .sock_options = push0_sock_options,
+ .sock_send = push0_sock_send,
+ .sock_recv = push0_sock_recv,
};
-static nni_proto push_proto = {
+static nni_proto push0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUSH_V0, "push" },
.proto_peer = { NNI_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_pipe_ops = &push_pipe_ops,
- .proto_sock_ops = &push_sock_ops,
+ .proto_pipe_ops = &push0_pipe_ops,
+ .proto_sock_ops = &push0_sock_ops,
};
int
nng_push0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &push_proto));
+ return (nni_proto_open(sidp, &push0_proto));
}
diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h
new file mode 100644
index 00000000..c7303b92
--- /dev/null
+++ b/src/protocol/pipeline0/push.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PIPELINE0_PUSH_H
+#define NNG_PROTOCOL_PIPELINE0_PUSH_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_push0_open(nng_socket *);
+
+#ifndef nng_push_open
+#define nng_push_open nng_push0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PIPELINE0_PUSH_H
diff --git a/src/protocol/pubsub0/CMakeLists.txt b/src/protocol/pubsub0/CMakeLists.txt
new file mode 100644
index 00000000..4edcbfae
--- /dev/null
+++ b/src/protocol/pubsub0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Pub/Sub protocol
+
+if (NNG_PROTO_PUB0)
+ set(PUB0_SOURCES protocol/pubsub0/pub.c protocol/pubsub0/pub.h)
+ install(FILES pub.h DESTINATION include/nng/protocol/pubsub0)
+endif()
+
+if (NNG_PROTO_SUB0)
+ set(SUB0_SOURCES protocol/pubsub0/sub.c protocol/pubsub0/sub.h)
+ install(FILES sub.h DESTINATION include/nng/protocol/pubsub0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${PUB0_SOURCES} ${SUB0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub0/pub.c
index 9e5cd67f..f4a33b77 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -12,24 +12,33 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/pubsub0/pub.h"
// Publish protocol. The PUB protocol simply sends messages out, as
// a broadcast. It has nothing more sophisticated because it does not
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct pub_pipe pub_pipe;
-typedef struct pub_sock pub_sock;
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
-static void pub_pipe_recv_cb(void *);
-static void pub_pipe_send_cb(void *);
-static void pub_pipe_getq_cb(void *);
-static void pub_sock_getq_cb(void *);
-static void pub_sock_fini(void *);
-static void pub_pipe_fini(void *);
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
-// A pub_sock is our per-socket protocol private structure.
-struct pub_sock {
+typedef struct pub0_pipe pub0_pipe;
+typedef struct pub0_sock pub0_sock;
+
+static void pub0_pipe_recv_cb(void *);
+static void pub0_pipe_send_cb(void *);
+static void pub0_pipe_getq_cb(void *);
+static void pub0_sock_getq_cb(void *);
+static void pub0_sock_fini(void *);
+static void pub0_pipe_fini(void *);
+
+// pub0_sock is our per-socket protocol private structure.
+struct pub0_sock {
nni_msgq *uwq;
int raw;
nni_aio * aio_getq;
@@ -37,10 +46,10 @@ struct pub_sock {
nni_mtx mtx;
};
-// A pub_pipe is our per-pipe protocol private structure.
-struct pub_pipe {
+// pub0_pipe is our per-pipe protocol private structure.
+struct pub0_pipe {
nni_pipe * pipe;
- pub_sock * pub;
+ pub0_sock * pub;
nni_msgq * sendq;
nni_aio * aio_getq;
nni_aio * aio_send;
@@ -49,9 +58,9 @@ struct pub_pipe {
};
static void
-pub_sock_fini(void *arg)
+pub0_sock_fini(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -60,22 +69,22 @@ pub_sock_fini(void *arg)
}
static int
-pub_sock_init(void **sp, nni_sock *sock)
+pub0_sock_init(void **sp, nni_sock *sock)
{
- pub_sock *s;
- int rv;
+ pub0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
- if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) {
- pub_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, pub0_sock_getq_cb, s)) != 0) {
+ pub0_sock_fini(s);
return (rv);
}
s->raw = 0;
- NNI_LIST_INIT(&s->pipes, pub_pipe, node);
+ NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -84,25 +93,25 @@ pub_sock_init(void **sp, nni_sock *sock)
}
static void
-pub_sock_open(void *arg)
+pub0_sock_open(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-pub_sock_close(void *arg)
+pub0_sock_close(void *arg)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-pub_pipe_fini(void *arg)
+pub0_pipe_fini(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
@@ -111,10 +120,10 @@ pub_pipe_fini(void *arg)
}
static int
-pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
+pub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- pub_pipe *p;
- int rv;
+ pub0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -122,11 +131,11 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
// XXX: consider making this depth tunable
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_init(&p->aio_getq, pub0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
- pub_pipe_fini(p);
+ pub0_pipe_fini(p);
return (rv);
}
@@ -137,10 +146,10 @@ pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-pub_pipe_start(void *arg)
+pub0_pipe_start(void *arg)
{
- pub_pipe *p = arg;
- pub_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *s = p->pub;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_SUB_V0) {
return (NNG_EPROTO);
@@ -157,10 +166,10 @@ pub_pipe_start(void *arg)
}
static void
-pub_pipe_stop(void *arg)
+pub0_pipe_stop(void *arg)
{
- pub_pipe *p = arg;
- pub_sock *s = p->pub;
+ pub0_pipe *p = arg;
+ pub0_sock *s = p->pub;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
@@ -176,15 +185,15 @@ pub_pipe_stop(void *arg)
}
static void
-pub_sock_getq_cb(void *arg)
+pub0_sock_getq_cb(void *arg)
{
- pub_sock *s = arg;
- nni_msgq *uwq = s->uwq;
- nni_msg * msg, *dup;
+ pub0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg, *dup;
- pub_pipe *p;
- pub_pipe *last;
- int rv;
+ pub0_pipe *p;
+ pub0_pipe *last;
+ int rv;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -218,9 +227,9 @@ pub_sock_getq_cb(void *arg)
}
static void
-pub_pipe_recv_cb(void *arg)
+pub0_pipe_recv_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -233,9 +242,9 @@ pub_pipe_recv_cb(void *arg)
}
static void
-pub_pipe_getq_cb(void *arg)
+pub0_pipe_getq_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->pipe);
@@ -249,9 +258,9 @@ pub_pipe_getq_cb(void *arg)
}
static void
-pub_pipe_send_cb(void *arg)
+pub0_pipe_send_cb(void *arg)
{
- pub_pipe *p = arg;
+ pub0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -265,71 +274,71 @@ pub_pipe_send_cb(void *arg)
}
static int
-pub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-pub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-pub_sock_recv(void *arg, nni_aio *aio)
+pub0_sock_recv(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-pub_sock_send(void *arg, nni_aio *aio)
+pub0_sock_send(void *arg, nni_aio *aio)
{
- pub_sock *s = arg;
+ pub0_sock *s = arg;
nni_msgq_aio_put(s->uwq, aio);
}
-static nni_proto_pipe_ops pub_pipe_ops = {
- .pipe_init = pub_pipe_init,
- .pipe_fini = pub_pipe_fini,
- .pipe_start = pub_pipe_start,
- .pipe_stop = pub_pipe_stop,
+static nni_proto_pipe_ops pub0_pipe_ops = {
+ .pipe_init = pub0_pipe_init,
+ .pipe_fini = pub0_pipe_fini,
+ .pipe_start = pub0_pipe_start,
+ .pipe_stop = pub0_pipe_stop,
};
-static nni_proto_sock_option pub_sock_options[] = {
+static nni_proto_sock_option pub0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = pub_sock_getopt_raw,
- .pso_setopt = pub_sock_setopt_raw,
+ .pso_getopt = pub0_sock_getopt_raw,
+ .pso_setopt = pub0_sock_setopt_raw,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops pub_sock_ops = {
- .sock_init = pub_sock_init,
- .sock_fini = pub_sock_fini,
- .sock_open = pub_sock_open,
- .sock_close = pub_sock_close,
- .sock_send = pub_sock_send,
- .sock_recv = pub_sock_recv,
- .sock_options = pub_sock_options,
+static nni_proto_sock_ops pub0_sock_ops = {
+ .sock_init = pub0_sock_init,
+ .sock_fini = pub0_sock_fini,
+ .sock_open = pub0_sock_open,
+ .sock_close = pub0_sock_close,
+ .sock_send = pub0_sock_send,
+ .sock_recv = pub0_sock_recv,
+ .sock_options = pub0_sock_options,
};
-static nni_proto pub_proto = {
+static nni_proto pub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_PUB_V0, "pub" },
.proto_peer = { NNI_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_sock_ops = &pub_sock_ops,
- .proto_pipe_ops = &pub_pipe_ops,
+ .proto_sock_ops = &pub0_sock_ops,
+ .proto_pipe_ops = &pub0_pipe_ops,
};
int
nng_pub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &pub_proto));
+ return (nni_proto_open(sidp, &pub0_proto));
}
diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h
new file mode 100644
index 00000000..2388a292
--- /dev/null
+++ b/src/protocol/pubsub0/pub.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PUBSUB0_PUB_H
+#define NNG_PROTOCOL_PUBSUB0_PUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_pub0_open(nng_socket *);
+
+#ifndef nng_pub_open
+#define nng_pub_open nng_pub0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PUBSUB0_PUB_H
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub0/sub.c
index 555d528e..6c504d75 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -12,54 +12,60 @@
#include <string.h>
#include "core/nng_impl.h"
-
-const char *nng_opt_sub_subscribe = NNG_OPT_SUB_SUBSCRIBE;
-const char *nng_opt_sub_unsubscribe = NNG_OPT_SUB_UNSUBSCRIBE;
+#include "protocol/pubsub0/sub.h"
// Subscriber protocol. The SUB protocol receives messages sent to
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct sub_pipe sub_pipe;
-typedef struct sub_sock sub_sock;
-typedef struct sub_topic sub_topic;
+#ifndef NNI_PROTO_SUB_V0
+#define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1)
+#endif
+
+#ifndef NNI_PROTO_PUB_V0
+#define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0)
+#endif
+
+typedef struct sub0_pipe sub0_pipe;
+typedef struct sub0_sock sub0_sock;
+typedef struct sub0_topic sub0_topic;
-static void sub_recv_cb(void *);
-static void sub_putq_cb(void *);
-static void sub_pipe_fini(void *);
+static void sub0_recv_cb(void *);
+static void sub0_putq_cb(void *);
+static void sub0_pipe_fini(void *);
-struct sub_topic {
+struct sub0_topic {
nni_list_node node;
size_t len;
void * buf;
};
-// An nni_rep_sock is our per-socket protocol private structure.
-struct sub_sock {
+// sub0_sock is our per-socket protocol private structure.
+struct sub0_sock {
nni_list topics;
nni_msgq *urq;
int raw;
nni_mtx lk;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
-struct sub_pipe {
- nni_pipe *pipe;
- sub_sock *sub;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+// sub0_pipe is our per-pipe protocol private structure.
+struct sub0_pipe {
+ nni_pipe * pipe;
+ sub0_sock *sub;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static int
-sub_sock_init(void **sp, nni_sock *sock)
+sub0_sock_init(void **sp, nni_sock *sock)
{
- sub_sock *s;
+ sub0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
- NNI_LIST_INIT(&s->topics, sub_topic, node);
+ NNI_LIST_INIT(&s->topics, sub0_topic, node);
s->raw = 0;
s->urq = nni_sock_recvq(sock);
@@ -68,10 +74,10 @@ sub_sock_init(void **sp, nni_sock *sock)
}
static void
-sub_sock_fini(void *arg)
+sub0_sock_fini(void *arg)
{
- sub_sock * s = arg;
- sub_topic *topic;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
while ((topic = nni_list_first(&s->topics)) != NULL) {
nni_list_remove(&s->topics, topic);
@@ -83,21 +89,21 @@ sub_sock_fini(void *arg)
}
static void
-sub_sock_open(void *arg)
+sub0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-sub_sock_close(void *arg)
+sub0_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-sub_pipe_fini(void *arg)
+sub0_pipe_fini(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
@@ -105,17 +111,17 @@ sub_pipe_fini(void *arg)
}
static int
-sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
+sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- sub_pipe *p;
- int rv;
+ sub0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) {
- sub_pipe_fini(p);
+ if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) {
+ sub0_pipe_fini(p);
return (rv);
}
@@ -126,30 +132,30 @@ sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-sub_pipe_start(void *arg)
+sub0_pipe_start(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-sub_pipe_stop(void *arg)
+sub0_pipe_stop(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
}
static void
-sub_recv_cb(void *arg)
+sub0_recv_cb(void *arg)
{
- sub_pipe *p = arg;
- sub_sock *s = p->sub;
- nni_msgq *urq = s->urq;
- nni_msg * msg;
+ sub0_pipe *p = arg;
+ sub0_sock *s = p->sub;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -164,9 +170,9 @@ sub_recv_cb(void *arg)
}
static void
-sub_putq_cb(void *arg)
+sub0_putq_cb(void *arg)
{
- sub_pipe *p = arg;
+ sub0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -184,11 +190,11 @@ sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-sub_subscribe(void *arg, const void *buf, size_t sz)
+sub0_subscribe(void *arg, const void *buf, size_t sz)
{
- sub_sock * s = arg;
- sub_topic *topic;
- sub_topic *newtopic;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ sub0_topic *newtopic;
nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
@@ -234,11 +240,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
}
static int
-sub_unsubscribe(void *arg, const void *buf, size_t sz)
+sub0_unsubscribe(void *arg, const void *buf, size_t sz)
{
- sub_sock * s = arg;
- sub_topic *topic;
- int rv;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ int rv;
nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
@@ -270,41 +276,41 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
}
static int
-sub_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-sub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static void
-sub_sock_send(void *arg, nni_aio *aio)
+sub0_sock_send(void *arg, nni_aio *aio)
{
nni_aio_finish_error(aio, NNG_ENOTSUP);
}
static void
-sub_sock_recv(void *arg, nni_aio *aio)
+sub0_sock_recv(void *arg, nni_aio *aio)
{
- sub_sock *s = arg;
+ sub0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
static nni_msg *
-sub_sock_filter(void *arg, nni_msg *msg)
+sub0_sock_filter(void *arg, nni_msg *msg)
{
- sub_sock * s = arg;
- sub_topic *topic;
- char * body;
- size_t len;
- int match;
+ sub0_sock * s = arg;
+ sub0_topic *topic;
+ char * body;
+ size_t len;
+ int match;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -344,55 +350,55 @@ sub_sock_filter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops sub_pipe_ops = {
- .pipe_init = sub_pipe_init,
- .pipe_fini = sub_pipe_fini,
- .pipe_start = sub_pipe_start,
- .pipe_stop = sub_pipe_stop,
+static nni_proto_pipe_ops sub0_pipe_ops = {
+ .pipe_init = sub0_pipe_init,
+ .pipe_fini = sub0_pipe_fini,
+ .pipe_start = sub0_pipe_start,
+ .pipe_stop = sub0_pipe_stop,
};
-static nni_proto_sock_option sub_sock_options[] = {
+static nni_proto_sock_option sub0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = sub_sock_getopt_raw,
- .pso_setopt = sub_sock_setopt_raw,
+ .pso_getopt = sub0_sock_getopt_raw,
+ .pso_setopt = sub0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_SUB_SUBSCRIBE,
.pso_getopt = NULL,
- .pso_setopt = sub_subscribe,
+ .pso_setopt = sub0_subscribe,
},
{
.pso_name = NNG_OPT_SUB_UNSUBSCRIBE,
.pso_getopt = NULL,
- .pso_setopt = sub_unsubscribe,
+ .pso_setopt = sub0_unsubscribe,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops sub_sock_ops = {
- .sock_init = sub_sock_init,
- .sock_fini = sub_sock_fini,
- .sock_open = sub_sock_open,
- .sock_close = sub_sock_close,
- .sock_send = sub_sock_send,
- .sock_recv = sub_sock_recv,
- .sock_filter = sub_sock_filter,
- .sock_options = sub_sock_options,
+static nni_proto_sock_ops sub0_sock_ops = {
+ .sock_init = sub0_sock_init,
+ .sock_fini = sub0_sock_fini,
+ .sock_open = sub0_sock_open,
+ .sock_close = sub0_sock_close,
+ .sock_send = sub0_sock_send,
+ .sock_recv = sub0_sock_recv,
+ .sock_filter = sub0_sock_filter,
+ .sock_options = sub0_sock_options,
};
-static nni_proto sub_proto = {
+static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
.proto_peer = { NNI_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &sub_sock_ops,
- .proto_pipe_ops = &sub_pipe_ops,
+ .proto_sock_ops = &sub0_sock_ops,
+ .proto_pipe_ops = &sub0_pipe_ops,
};
int
nng_sub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &sub_proto));
+ return (nni_proto_open(sidp, &sub0_proto));
}
diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h
new file mode 100644
index 00000000..1a09145d
--- /dev/null
+++ b/src/protocol/pubsub0/sub.h
@@ -0,0 +1,31 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_PUBSUB0_SUB_H
+#define NNG_PROTOCOL_PUBSUB0_SUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_sub0_open(nng_socket *);
+
+#ifndef nng_sub_open
+#define nng_sub_open nng_sub0_open
+#endif
+
+#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
+#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_PUBSUB0_SUB_H
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt
new file mode 100644
index 00000000..4e82ad41
--- /dev/null
+++ b/src/protocol/reqrep0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+
+if (NNG_PROTO_REQ0)
+ set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h)
+ install(FILES req.h DESTINATION include/nng/protocol/reqrep0)
+endif()
+
+if (NNG_PROTO_REP0)
+ set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h)
+ install(FILES rep.h DESTINATION include/nng/protocol/reqrep0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${REQ0_SOURCES} ${REP0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep0/rep.c
index 100e739d..ee8e4277 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -12,23 +12,32 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/reqrep0/rep.h"
// Response protocol. The REP protocol is the "reply" side of a
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct rep_pipe rep_pipe;
-typedef struct rep_sock rep_sock;
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
-static void rep_sock_getq_cb(void *);
-static void rep_pipe_getq_cb(void *);
-static void rep_pipe_putq_cb(void *);
-static void rep_pipe_send_cb(void *);
-static void rep_pipe_recv_cb(void *);
-static void rep_pipe_fini(void *);
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
-// A rep_sock is our per-socket protocol private structure.
-struct rep_sock {
+typedef struct rep0_pipe rep0_pipe;
+typedef struct rep0_sock rep0_sock;
+
+static void rep0_sock_getq_cb(void *);
+static void rep0_pipe_getq_cb(void *);
+static void rep0_pipe_putq_cb(void *);
+static void rep0_pipe_send_cb(void *);
+static void rep0_pipe_recv_cb(void *);
+static void rep0_pipe_fini(void *);
+
+// rep0_sock is our per-socket protocol private structure.
+struct rep0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
@@ -40,21 +49,21 @@ struct rep_sock {
nni_aio * aio_getq;
};
-// A rep_pipe is our per-pipe protocol private structure.
-struct rep_pipe {
- nni_pipe *pipe;
- rep_sock *rep;
- nni_msgq *sendq;
- nni_aio * aio_getq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+// rep0_pipe is our per-pipe protocol private structure.
+struct rep0_pipe {
+ nni_pipe * pipe;
+ rep0_sock *rep;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static void
-rep_sock_fini(void *arg)
+rep0_sock_fini(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -67,18 +76,18 @@ rep_sock_fini(void *arg)
}
static int
-rep_sock_init(void **sp, nni_sock *sock)
+rep0_sock_init(void **sp, nni_sock *sock)
{
- rep_sock *s;
- int rv;
+ rep0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
- rep_sock_fini(s);
+ ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) {
+ rep0_sock_fini(s);
return (rv);
}
@@ -95,25 +104,25 @@ rep_sock_init(void **sp, nni_sock *sock)
}
static void
-rep_sock_open(void *arg)
+rep0_sock_open(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-rep_sock_close(void *arg)
+rep0_sock_close(void *arg)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-rep_pipe_fini(void *arg)
+rep0_pipe_fini(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -124,20 +133,20 @@ rep_pipe_fini(void *arg)
}
static int
-rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
+rep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- rep_pipe *p;
- int rv;
+ rep0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) {
- rep_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) {
+ rep0_pipe_fini(p);
return (rv);
}
@@ -148,11 +157,11 @@ rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-rep_pipe_start(void *arg)
+rep0_pipe_start(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
- int rv;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ int rv;
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
@@ -164,10 +173,10 @@ rep_pipe_start(void *arg)
}
static void
-rep_pipe_stop(void *arg)
+rep0_pipe_stop(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
nni_msgq_close(p->sendq);
nni_aio_stop(p->aio_getq);
@@ -179,14 +188,14 @@ rep_pipe_stop(void *arg)
}
static void
-rep_sock_getq_cb(void *arg)
+rep0_sock_getq_cb(void *arg)
{
- rep_sock *s = arg;
- nni_msgq *uwq = s->uwq;
- nni_msg * msg;
- uint32_t id;
- rep_pipe *p;
- int rv;
+ rep0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ rep0_pipe *p;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -228,9 +237,9 @@ rep_sock_getq_cb(void *arg)
}
static void
-rep_pipe_getq_cb(void *arg)
+rep0_pipe_getq_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->pipe);
@@ -244,9 +253,9 @@ rep_pipe_getq_cb(void *arg)
}
static void
-rep_pipe_send_cb(void *arg)
+rep0_pipe_send_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -259,14 +268,14 @@ rep_pipe_send_cb(void *arg)
}
static void
-rep_pipe_recv_cb(void *arg)
+rep0_pipe_recv_cb(void *arg)
{
- rep_pipe *p = arg;
- rep_sock *s = p->rep;
- nni_msg * msg;
- int rv;
- uint8_t * body;
- int hops;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+ int hops;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -329,9 +338,9 @@ drop:
}
static void
-rep_pipe_putq_cb(void *arg)
+rep0_pipe_putq_cb(void *arg)
{
- rep_pipe *p = arg;
+ rep0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -344,10 +353,10 @@ rep_pipe_putq_cb(void *arg)
}
static int
-rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- rep_sock *s = arg;
- int rv;
+ rep0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->lk);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
@@ -356,32 +365,32 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-rep_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-rep_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static nni_msg *
-rep_sock_filter(void *arg, nni_msg *msg)
+rep0_sock_filter(void *arg, nni_msg *msg)
{
- rep_sock *s = arg;
- char * header;
- size_t len;
+ rep0_sock *s = arg;
+ char * header;
+ size_t len;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -408,11 +417,11 @@ rep_sock_filter(void *arg, nni_msg *msg)
}
static void
-rep_sock_send(void *arg, nni_aio *aio)
+rep0_sock_send(void *arg, nni_aio *aio)
{
- rep_sock *s = arg;
- int rv;
- nni_msg * msg;
+ rep0_sock *s = arg;
+ int rv;
+ nni_msg * msg;
nni_mtx_lock(&s->lk);
if (s->raw) {
@@ -448,59 +457,59 @@ rep_sock_send(void *arg, nni_aio *aio)
}
static void
-rep_sock_recv(void *arg, nni_aio *aio)
+rep0_sock_recv(void *arg, nni_aio *aio)
{
- rep_sock *s = arg;
+ rep0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops rep_pipe_ops = {
- .pipe_init = rep_pipe_init,
- .pipe_fini = rep_pipe_fini,
- .pipe_start = rep_pipe_start,
- .pipe_stop = rep_pipe_stop,
+static nni_proto_pipe_ops rep0_pipe_ops = {
+ .pipe_init = rep0_pipe_init,
+ .pipe_fini = rep0_pipe_fini,
+ .pipe_start = rep0_pipe_start,
+ .pipe_stop = rep0_pipe_stop,
};
-static nni_proto_sock_option rep_sock_options[] = {
+static nni_proto_sock_option rep0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = rep_sock_getopt_raw,
- .pso_setopt = rep_sock_setopt_raw,
+ .pso_getopt = rep0_sock_getopt_raw,
+ .pso_setopt = rep0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = rep_sock_getopt_maxttl,
- .pso_setopt = rep_sock_setopt_maxttl,
+ .pso_getopt = rep0_sock_getopt_maxttl,
+ .pso_setopt = rep0_sock_setopt_maxttl,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops rep_sock_ops = {
- .sock_init = rep_sock_init,
- .sock_fini = rep_sock_fini,
- .sock_open = rep_sock_open,
- .sock_close = rep_sock_close,
- .sock_options = rep_sock_options,
- .sock_filter = rep_sock_filter,
- .sock_send = rep_sock_send,
- .sock_recv = rep_sock_recv,
+static nni_proto_sock_ops rep0_sock_ops = {
+ .sock_init = rep0_sock_init,
+ .sock_fini = rep0_sock_fini,
+ .sock_open = rep0_sock_open,
+ .sock_close = rep0_sock_close,
+ .sock_options = rep0_sock_options,
+ .sock_filter = rep0_sock_filter,
+ .sock_send = rep0_sock_send,
+ .sock_recv = rep0_sock_recv,
};
-static nni_proto nni_rep_proto = {
+static nni_proto rep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REP_V0, "rep" },
.proto_peer = { NNI_PROTO_REQ_V0, "req" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &rep_sock_ops,
- .proto_pipe_ops = &rep_pipe_ops,
+ .proto_sock_ops = &rep0_sock_ops,
+ .proto_pipe_ops = &rep0_pipe_ops,
};
int
nng_rep0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_rep_proto));
+ return (nni_proto_open(sidp, &rep0_proto));
}
diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h
new file mode 100644
index 00000000..93df9379
--- /dev/null
+++ b/src/protocol/reqrep0/rep.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_REQREP0_REP_H
+#define NNG_PROTOCOL_REQREP0_REP_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_rep0_open(nng_socket *);
+
+#ifndef nng_rep_open
+#define nng_rep_open nng_rep0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_REQREP0_REP_H
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep0/req.c
index bead1ec4..94c7f1a0 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -13,21 +13,28 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/reqrep0/req.h"
// Request protocol. The REQ protocol is the "request" side of a
// request-reply pair. This is useful for building RPC clients, for example.
-const char *nng_opt_req_resendtime = NNG_OPT_REQ_RESENDTIME;
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
-typedef struct req_pipe req_pipe;
-typedef struct req_sock req_sock;
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
-static void req_resend(req_sock *);
-static void req_timeout(void *);
-static void req_pipe_fini(void *);
+typedef struct req0_pipe req0_pipe;
+typedef struct req0_sock req0_sock;
-// A req_sock is our per-socket protocol private structure.
-struct req_sock {
+static void req0_resend(req0_sock *);
+static void req0_timeout(void *);
+static void req0_pipe_fini(void *);
+
+// A req0_sock is our per-socket protocol private structure.
+struct req0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_duration retry;
@@ -38,7 +45,7 @@ struct req_sock {
int ttl;
nni_msg * reqmsg;
- req_pipe *pendpipe;
+ req0_pipe *pendpipe;
nni_list readypipes;
nni_list busypipes;
@@ -51,10 +58,10 @@ struct req_sock {
nni_cv cv;
};
-// A req_pipe is our per-pipe protocol private structure.
-struct req_pipe {
+// A req0_pipe is our per-pipe protocol private structure.
+struct req0_pipe {
nni_pipe * pipe;
- req_sock * req;
+ req0_sock * req;
nni_list_node node;
nni_aio * aio_getq; // raw mode only
nni_aio * aio_sendraw; // raw mode only
@@ -64,17 +71,17 @@ struct req_pipe {
nni_mtx mtx;
};
-static void req_resender(void *);
-static void req_getq_cb(void *);
-static void req_sendraw_cb(void *);
-static void req_sendcooked_cb(void *);
-static void req_recv_cb(void *);
-static void req_putq_cb(void *);
+static void req0_resender(void *);
+static void req0_getq_cb(void *);
+static void req0_sendraw_cb(void *);
+static void req0_sendcooked_cb(void *);
+static void req0_recv_cb(void *);
+static void req0_putq_cb(void *);
static int
-req_sock_init(void **sp, nni_sock *sock)
+req0_sock_init(void **sp, nni_sock *sock)
{
- req_sock *s;
+ req0_sock *s;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
@@ -82,9 +89,9 @@ req_sock_init(void **sp, nni_sock *sock)
nni_mtx_init(&s->mtx);
nni_cv_init(&s->cv, &s->mtx);
- NNI_LIST_INIT(&s->readypipes, req_pipe, node);
- NNI_LIST_INIT(&s->busypipes, req_pipe, node);
- nni_timer_init(&s->timer, req_timeout, s);
+ NNI_LIST_INIT(&s->readypipes, req0_pipe, node);
+ NNI_LIST_INIT(&s->busypipes, req0_pipe, node);
+ nni_timer_init(&s->timer, req0_timeout, s);
// this is "semi random" start for request IDs.
s->nextid = nni_random();
@@ -102,15 +109,15 @@ req_sock_init(void **sp, nni_sock *sock)
}
static void
-req_sock_open(void *arg)
+req0_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-req_sock_close(void *arg)
+req0_sock_close(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
s->closed = 1;
@@ -120,9 +127,9 @@ req_sock_close(void *arg)
}
static void
-req_sock_fini(void *arg)
+req0_sock_fini(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
while ((!nni_list_empty(&s->readypipes)) ||
@@ -139,9 +146,9 @@ req_sock_fini(void *arg)
}
static void
-req_pipe_fini(void *arg)
+req0_pipe_fini(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_putq);
@@ -153,22 +160,22 @@ req_pipe_fini(void *arg)
}
static int
-req_pipe_init(void **pp, nni_pipe *pipe, void *s)
+req0_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- req_pipe *p;
- int rv;
+ req0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) !=
+ if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) !=
0)) {
- req_pipe_fini(p);
+ req0_pipe_fini(p);
return (rv);
}
@@ -180,10 +187,10 @@ req_pipe_init(void **pp, nni_pipe *pipe, void *s)
}
static int
-req_pipe_start(void *arg)
+req0_pipe_start(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
return (NNG_EPROTO);
@@ -198,7 +205,7 @@ req_pipe_start(void *arg)
// If sock was waiting for somewhere to send data, go ahead and
// send it to this pipe.
if (s->wantw) {
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
@@ -208,10 +215,10 @@ req_pipe_start(void *arg)
}
static void
-req_pipe_stop(void *arg)
+req0_pipe_stop(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_putq);
@@ -238,50 +245,50 @@ req_pipe_stop(void *arg)
s->pendpipe = NULL;
s->resend = NNI_TIME_ZERO;
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
}
static int
-req_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
-req_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-req_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-req_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static int
-req_sock_setopt_resendtime(void *arg, const void *buf, size_t sz)
+req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_setopt_ms(&s->retry, buf, sz));
}
static int
-req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
+req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
return (nni_getopt_ms(s->retry, buf, szp));
}
@@ -302,10 +309,10 @@ req_sock_getopt_resendtime(void *arg, void *buf, size_t *szp)
// kind of priority.)
static void
-req_getq_cb(void *arg)
+req0_getq_cb(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
// We should be in RAW mode. Cooked mode traffic bypasses
// the upper write queue entirely, and should never end up here.
@@ -326,9 +333,9 @@ req_getq_cb(void *arg)
}
static void
-req_sendraw_cb(void *arg)
+req0_sendraw_cb(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
if (nni_aio_result(p->aio_sendraw) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_sendraw));
@@ -342,10 +349,10 @@ req_sendraw_cb(void *arg)
}
static void
-req_sendcooked_cb(void *arg)
+req0_sendcooked_cb(void *arg)
{
- req_pipe *p = arg;
- req_sock *s = p->req;
+ req0_pipe *p = arg;
+ req0_sock *s = p->req;
if (nni_aio_result(p->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
@@ -365,7 +372,7 @@ req_sendcooked_cb(void *arg)
if (nni_list_active(&s->busypipes, p)) {
nni_list_remove(&s->busypipes, p);
nni_list_append(&s->readypipes, p);
- req_resend(s);
+ req0_resend(s);
} else {
// We wind up here if stop was called from the reader
// side while we were waiting to be scheduled to run for the
@@ -377,9 +384,9 @@ req_sendcooked_cb(void *arg)
}
static void
-req_putq_cb(void *arg)
+req0_putq_cb(void *arg)
{
- req_pipe *p = arg;
+ req0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -393,10 +400,10 @@ req_putq_cb(void *arg)
}
static void
-req_recv_cb(void *arg)
+req0_recv_cb(void *arg)
{
- req_pipe *p = arg;
- nni_msg * msg;
+ req0_pipe *p = arg;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -431,23 +438,23 @@ malformed:
}
static void
-req_timeout(void *arg)
+req0_timeout(void *arg)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->reqmsg != NULL) {
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
}
nni_mtx_unlock(&s->mtx);
}
static void
-req_resend(req_sock *s)
+req0_resend(req0_sock *s)
{
- req_pipe *p;
- nni_msg * msg;
+ req0_pipe *p;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
@@ -499,13 +506,13 @@ req_resend(req_sock *s)
}
static void
-req_sock_send(void *arg, nni_aio *aio)
+req0_sock_send(void *arg, nni_aio *aio)
{
- req_sock *s = arg;
- uint32_t id;
- size_t len;
- nni_msg * msg;
- int rv;
+ req0_sock *s = arg;
+ uint32_t id;
+ size_t len;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -547,7 +554,7 @@ req_sock_send(void *arg, nni_aio *aio)
s->resend = NNI_TIME_ZERO;
s->wantw = 1;
- req_resend(s);
+ req0_resend(s);
nni_mtx_unlock(&s->mtx);
@@ -555,10 +562,10 @@ req_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-req_sock_filter(void *arg, nni_msg *msg)
+req0_sock_filter(void *arg, nni_msg *msg)
{
- req_sock *s = arg;
- nni_msg * rmsg;
+ req0_sock *s = arg;
+ nni_msg * rmsg;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -598,9 +605,9 @@ req_sock_filter(void *arg, nni_msg *msg)
}
static void
-req_sock_recv(void *arg, nni_aio *aio)
+req0_sock_recv(void *arg, nni_aio *aio)
{
- req_sock *s = arg;
+ req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (!s->raw) {
@@ -614,55 +621,55 @@ req_sock_recv(void *arg, nni_aio *aio)
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops req_pipe_ops = {
- .pipe_init = req_pipe_init,
- .pipe_fini = req_pipe_fini,
- .pipe_start = req_pipe_start,
- .pipe_stop = req_pipe_stop,
+static nni_proto_pipe_ops req0_pipe_ops = {
+ .pipe_init = req0_pipe_init,
+ .pipe_fini = req0_pipe_fini,
+ .pipe_start = req0_pipe_start,
+ .pipe_stop = req0_pipe_stop,
};
-static nni_proto_sock_option req_sock_options[] = {
+static nni_proto_sock_option req0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = req_sock_getopt_raw,
- .pso_setopt = req_sock_setopt_raw,
+ .pso_getopt = req0_sock_getopt_raw,
+ .pso_setopt = req0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = req_sock_getopt_maxttl,
- .pso_setopt = req_sock_setopt_maxttl,
+ .pso_getopt = req0_sock_getopt_maxttl,
+ .pso_setopt = req0_sock_setopt_maxttl,
},
{
.pso_name = NNG_OPT_REQ_RESENDTIME,
- .pso_getopt = req_sock_getopt_resendtime,
- .pso_setopt = req_sock_setopt_resendtime,
+ .pso_getopt = req0_sock_getopt_resendtime,
+ .pso_setopt = req0_sock_setopt_resendtime,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops req_sock_ops = {
- .sock_init = req_sock_init,
- .sock_fini = req_sock_fini,
- .sock_open = req_sock_open,
- .sock_close = req_sock_close,
- .sock_options = req_sock_options,
- .sock_filter = req_sock_filter,
- .sock_send = req_sock_send,
- .sock_recv = req_sock_recv,
+static nni_proto_sock_ops req0_sock_ops = {
+ .sock_init = req0_sock_init,
+ .sock_fini = req0_sock_fini,
+ .sock_open = req0_sock_open,
+ .sock_close = req0_sock_close,
+ .sock_options = req0_sock_options,
+ .sock_filter = req0_sock_filter,
+ .sock_send = req0_sock_send,
+ .sock_recv = req0_sock_recv,
};
-static nni_proto req_proto = {
+static nni_proto req0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REQ_V0, "req" },
.proto_peer = { NNI_PROTO_REP_V0, "rep" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &req_sock_ops,
- .proto_pipe_ops = &req_pipe_ops,
+ .proto_sock_ops = &req0_sock_ops,
+ .proto_pipe_ops = &req0_pipe_ops,
};
int
nng_req0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &req_proto));
+ return (nni_proto_open(sidp, &req0_proto));
}
diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h
new file mode 100644
index 00000000..99c9bf62
--- /dev/null
+++ b/src/protocol/reqrep0/req.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_REQREP0_REQ_H
+#define NNG_PROTOCOL_REQREP0_REQ_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_req0_open(nng_socket *);
+
+#ifndef nng_req_open
+#define nng_req_open nng_req0_open
+#endif
+
+#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_REQREP0_REQ_H
diff --git a/src/protocol/survey0/CMakeLists.txt b/src/protocol/survey0/CMakeLists.txt
new file mode 100644
index 00000000..61e5aa7b
--- /dev/null
+++ b/src/protocol/survey0/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Copyright 2017 Garrett D'Amore <garrett@damore.org>
+# Copyright 2017 Capitar IT Group BV <info@capitar.com>
+#
+# This software is supplied under the terms of the MIT License, a
+# copy of which should be located in the distribution where this
+# file was obtained (LICENSE.txt). A copy of the license may also be
+# found online at https://opensource.org/licenses/MIT.
+#
+
+# Req/Rep protocol
+
+if (NNG_PROTO_SURVEYOR0)
+ set(SURV0_SOURCES protocol/survey0/survey.c protocol/survey0/survey.h)
+ install(FILES survey.h DESTINATION include/nng/protocol/survey0)
+endif()
+
+if (NNG_PROTO_RESPONDENT0)
+ set(RESP0_SOURCES protocol/survey0/respond.c protocol/survey0/respond.h)
+ install(FILES respond.h DESTINATION include/nng/protocol/survey0)
+endif()
+
+set(NNG_SOURCES ${NNG_SOURCES} ${SURV0_SOURCES} ${RESP0_SOURCES} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey0/respond.c
index 94730be6..73e919c3 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -12,23 +12,32 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/survey0/respond.h"
// Respondent protocol. The RESPONDENT protocol is the "replier" side of
// the surveyor pattern. This is useful for building service discovery, or
-// voting algorithsm, for example.
+// voting algorithms, for example.
-typedef struct resp_pipe resp_pipe;
-typedef struct resp_sock resp_sock;
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
-static void resp_recv_cb(void *);
-static void resp_putq_cb(void *);
-static void resp_getq_cb(void *);
-static void resp_send_cb(void *);
-static void resp_sock_getq_cb(void *);
-static void resp_pipe_fini(void *);
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
-// A resp_sock is our per-socket protocol private structure.
-struct resp_sock {
+typedef struct resp0_pipe resp0_pipe;
+typedef struct resp0_sock resp0_sock;
+
+static void resp0_recv_cb(void *);
+static void resp0_putq_cb(void *);
+static void resp0_getq_cb(void *);
+static void resp0_send_cb(void *);
+static void resp0_sock_getq_cb(void *);
+static void resp0_pipe_fini(void *);
+
+// resp0_sock is our per-socket protocol private structure.
+struct resp0_sock {
nni_msgq * urq;
nni_msgq * uwq;
int raw;
@@ -40,22 +49,22 @@ struct resp_sock {
nni_mtx mtx;
};
-// A resp_pipe is our per-pipe protocol private structure.
-struct resp_pipe {
- nni_pipe * npipe;
- resp_sock *psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio * aio_getq;
- nni_aio * aio_putq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+// resp0_pipe is our per-pipe protocol private structure.
+struct resp0_pipe {
+ nni_pipe * npipe;
+ resp0_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-resp_sock_fini(void *arg)
+resp0_sock_fini(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -68,18 +77,18 @@ resp_sock_fini(void *arg)
}
static int
-resp_sock_init(void **sp, nni_sock *nsock)
+resp0_sock_init(void **sp, nni_sock *nsock)
{
- resp_sock *s;
- int rv;
+ resp0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
- resp_sock_fini(s);
+ ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) {
+ resp0_sock_fini(s);
return (rv);
}
@@ -95,25 +104,25 @@ resp_sock_init(void **sp, nni_sock *nsock)
}
static void
-resp_sock_open(void *arg)
+resp0_sock_open(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-resp_sock_close(void *arg)
+resp0_sock_close(void *arg)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-resp_pipe_fini(void *arg)
+resp0_pipe_fini(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_getq);
@@ -124,20 +133,20 @@ resp_pipe_fini(void *arg)
}
static int
-resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
+resp0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- resp_pipe *p;
- int rv;
+ resp0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) {
- resp_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) {
+ resp0_pipe_fini(p);
return (rv);
}
@@ -148,11 +157,11 @@ resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-resp_pipe_start(void *arg)
+resp0_pipe_start(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- int rv;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ int rv;
p->id = nni_pipe_id(p->npipe);
@@ -170,10 +179,10 @@ resp_pipe_start(void *arg)
}
static void
-resp_pipe_stop(void *arg)
+resp0_pipe_stop(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
nni_msgq_close(p->sendq);
nni_aio_stop(p->aio_putq);
@@ -189,19 +198,19 @@ resp_pipe_stop(void *arg)
}
}
-// resp_sock_send watches for messages from the upper write queue,
+// resp0_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.s
void
-resp_sock_getq_cb(void *arg)
+resp0_sock_getq_cb(void *arg)
{
- resp_sock *s = arg;
- nni_msg * msg;
- uint32_t id;
- resp_pipe *p;
- int rv;
+ resp0_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+ resp0_pipe *p;
+ int rv;
if (nni_aio_result(s->aio_getq) != 0) {
return;
@@ -233,9 +242,9 @@ resp_sock_getq_cb(void *arg)
}
void
-resp_getq_cb(void *arg)
+resp0_getq_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
@@ -249,9 +258,9 @@ resp_getq_cb(void *arg)
}
void
-resp_send_cb(void *arg)
+resp0_send_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -264,14 +273,14 @@ resp_send_cb(void *arg)
}
static void
-resp_recv_cb(void *arg)
+resp0_recv_cb(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- nni_msgq * urq = s->urq;
- nni_msg * msg;
- int hops;
- int rv;
+ resp0_pipe *p = arg;
+ resp0_sock *s = p->psock;
+ nni_msgq * urq = s->urq;
+ nni_msg * msg;
+ int hops;
+ int rv;
if (nni_aio_result(p->aio_recv) != 0) {
goto error;
@@ -324,9 +333,9 @@ error:
}
static void
-resp_putq_cb(void *arg)
+resp0_putq_cb(void *arg)
{
- resp_pipe *p = arg;
+ resp0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -338,10 +347,10 @@ resp_putq_cb(void *arg)
}
static int
-resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- resp_sock *s = arg;
- int rv;
+ resp0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->mtx);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
@@ -350,32 +359,32 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-resp_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-resp_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
+resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_setopt_int(&s->ttl, buf, sz, 1, 255));
}
static int
-resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
+resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
return (nni_getopt_int(s->ttl, buf, szp));
}
static void
-resp_sock_send(void *arg, nni_aio *aio)
+resp0_sock_send(void *arg, nni_aio *aio)
{
- resp_sock *s = arg;
- nni_msg * msg;
- int rv;
+ resp0_sock *s = arg;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -412,11 +421,11 @@ resp_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-resp_sock_filter(void *arg, nni_msg *msg)
+resp0_sock_filter(void *arg, nni_msg *msg)
{
- resp_sock *s = arg;
- char * header;
- size_t len;
+ resp0_sock *s = arg;
+ char * header;
+ size_t len;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -444,57 +453,57 @@ resp_sock_filter(void *arg, nni_msg *msg)
}
static void
-resp_sock_recv(void *arg, nni_aio *aio)
+resp0_sock_recv(void *arg, nni_aio *aio)
{
- resp_sock *s = arg;
+ resp0_sock *s = arg;
nni_msgq_aio_get(s->urq, aio);
}
-static nni_proto_pipe_ops resp_pipe_ops = {
- .pipe_init = resp_pipe_init,
- .pipe_fini = resp_pipe_fini,
- .pipe_start = resp_pipe_start,
- .pipe_stop = resp_pipe_stop,
+static nni_proto_pipe_ops resp0_pipe_ops = {
+ .pipe_init = resp0_pipe_init,
+ .pipe_fini = resp0_pipe_fini,
+ .pipe_start = resp0_pipe_start,
+ .pipe_stop = resp0_pipe_stop,
};
-static nni_proto_sock_option resp_sock_options[] = {
+static nni_proto_sock_option resp0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = resp_sock_getopt_raw,
- .pso_setopt = resp_sock_setopt_raw,
+ .pso_getopt = resp0_sock_getopt_raw,
+ .pso_setopt = resp0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_MAXTTL,
- .pso_getopt = resp_sock_getopt_maxttl,
- .pso_setopt = resp_sock_setopt_maxttl,
+ .pso_getopt = resp0_sock_getopt_maxttl,
+ .pso_setopt = resp0_sock_setopt_maxttl,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops resp_sock_ops = {
- .sock_init = resp_sock_init,
- .sock_fini = resp_sock_fini,
- .sock_open = resp_sock_open,
- .sock_close = resp_sock_close,
- .sock_filter = resp_sock_filter,
- .sock_send = resp_sock_send,
- .sock_recv = resp_sock_recv,
- .sock_options = resp_sock_options,
+static nni_proto_sock_ops resp0_sock_ops = {
+ .sock_init = resp0_sock_init,
+ .sock_fini = resp0_sock_fini,
+ .sock_open = resp0_sock_open,
+ .sock_close = resp0_sock_close,
+ .sock_filter = resp0_sock_filter,
+ .sock_send = resp0_sock_send,
+ .sock_recv = resp0_sock_recv,
+ .sock_options = resp0_sock_options,
};
-static nni_proto resp_proto = {
+static nni_proto resp0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &resp_sock_ops,
- .proto_pipe_ops = &resp_pipe_ops,
+ .proto_sock_ops = &resp0_sock_ops,
+ .proto_pipe_ops = &resp0_pipe_ops,
};
int
nng_respondent0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &resp_proto));
+ return (nni_proto_open(sidp, &resp0_proto));
}
diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h
new file mode 100644
index 00000000..58c65298
--- /dev/null
+++ b/src/protocol/survey0/respond.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_SURVEY0_RESPOND_H
+#define NNG_PROTOCOL_SURVEY0_RESPOND_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_respondent0_open(nng_socket *);
+
+#ifndef nng_respondent_open
+#define nng_respondent_open nng_respondent0_open
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_SURVEY0_RESPOND_H
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey0/survey.c
index 9dcd6664..02283436 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -12,22 +12,31 @@
#include <string.h>
#include "core/nng_impl.h"
+#include "protocol/survey0/survey.h"
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct surv_pipe surv_pipe;
-typedef struct surv_sock surv_sock;
+#ifndef NNI_PROTO_SURVEYOR_V0
+#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
+#endif
-static void surv_sock_getq_cb(void *);
-static void surv_getq_cb(void *);
-static void surv_putq_cb(void *);
-static void surv_send_cb(void *);
-static void surv_recv_cb(void *);
-static void surv_timeout(void *);
+#ifndef NNI_PROTO_RESPONDENT_V0
+#define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
+#endif
-// A surv_sock is our per-socket protocol private structure.
-struct surv_sock {
+typedef struct surv0_pipe surv0_pipe;
+typedef struct surv0_sock surv0_sock;
+
+static void surv0_sock_getq_cb(void *);
+static void surv0_getq_cb(void *);
+static void surv0_putq_cb(void *);
+static void surv0_send_cb(void *);
+static void surv0_recv_cb(void *);
+static void surv0_timeout(void *);
+
+// surv0_sock is our per-socket protocol private structure.
+struct surv0_sock {
nni_duration survtime;
nni_time expire;
int raw;
@@ -42,10 +51,10 @@ struct surv_sock {
nni_mtx mtx;
};
-// A surv_pipe is our per-pipe protocol private structure.
-struct surv_pipe {
+// surv0_pipe is our per-pipe protocol private structure.
+struct surv0_pipe {
nni_pipe * npipe;
- surv_sock * psock;
+ surv0_sock * psock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
@@ -55,9 +64,9 @@ struct surv_pipe {
};
static void
-surv_sock_fini(void *arg)
+surv0_sock_fini(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
@@ -66,21 +75,21 @@ surv_sock_fini(void *arg)
}
static int
-surv_sock_init(void **sp, nni_sock *nsock)
+surv0_sock_init(void **sp, nni_sock *nsock)
{
- surv_sock *s;
- int rv;
+ surv0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) {
- surv_sock_fini(s);
+ if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) {
+ surv0_sock_fini(s);
return (rv);
}
- NNI_LIST_INIT(&s->pipes, surv_pipe, node);
+ NNI_LIST_INIT(&s->pipes, surv0_pipe, node);
nni_mtx_init(&s->mtx);
- nni_timer_init(&s->timer, surv_timeout, s);
+ nni_timer_init(&s->timer, surv0_timeout, s);
s->nextid = nni_random();
s->raw = 0;
@@ -94,26 +103,26 @@ surv_sock_init(void **sp, nni_sock *nsock)
}
static void
-surv_sock_open(void *arg)
+surv0_sock_open(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-surv_sock_close(void *arg)
+surv0_sock_close(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_timer_cancel(&s->timer);
nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-surv_pipe_fini(void *arg)
+surv0_pipe_fini(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
@@ -124,21 +133,21 @@ surv_pipe_fini(void *arg)
}
static int
-surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
+surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- surv_pipe *p;
- int rv;
+ surv0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
// This depth could be tunable.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) {
- surv_pipe_fini(p);
+ ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) {
+ surv0_pipe_fini(p);
return (rv);
}
@@ -149,10 +158,10 @@ surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
}
static int
-surv_pipe_start(void *arg)
+surv0_pipe_start(void *arg)
{
- surv_pipe *p = arg;
- surv_sock *s = p->psock;
+ surv0_pipe *p = arg;
+ surv0_sock *s = p->psock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
@@ -164,10 +173,10 @@ surv_pipe_start(void *arg)
}
static void
-surv_pipe_stop(void *arg)
+surv0_pipe_stop(void *arg)
{
- surv_pipe *p = arg;
- surv_sock *s = p->psock;
+ surv0_pipe *p = arg;
+ surv0_sock *s = p->psock;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
@@ -184,9 +193,9 @@ surv_pipe_stop(void *arg)
}
static void
-surv_getq_cb(void *arg)
+surv0_getq_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
@@ -200,9 +209,9 @@ surv_getq_cb(void *arg)
}
static void
-surv_send_cb(void *arg)
+surv0_send_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
@@ -215,9 +224,9 @@ surv_send_cb(void *arg)
}
static void
-surv_putq_cb(void *arg)
+surv0_putq_cb(void *arg)
{
- surv_pipe *p = arg;
+ surv0_pipe *p = arg;
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
@@ -230,10 +239,10 @@ surv_putq_cb(void *arg)
}
static void
-surv_recv_cb(void *arg)
+surv0_recv_cb(void *arg)
{
- surv_pipe *p = arg;
- nni_msg * msg;
+ surv0_pipe *p = arg;
+ nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
goto failed;
@@ -265,10 +274,10 @@ failed:
}
static int
-surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
+surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
- surv_sock *s = arg;
- int rv;
+ surv0_sock *s = arg;
+ int rv;
nni_mtx_lock(&s->mtx);
if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
@@ -280,33 +289,33 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
}
static int
-surv_sock_getopt_raw(void *arg, void *buf, size_t *szp)
+surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_getopt_int(s->raw, buf, szp));
}
static int
-surv_sock_setopt_surveytime(void *arg, const void *buf, size_t sz)
+surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_setopt_ms(&s->survtime, buf, sz));
}
static int
-surv_sock_getopt_surveytime(void *arg, void *buf, size_t *szp)
+surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
return (nni_getopt_ms(s->survtime, buf, szp));
}
static void
-surv_sock_getq_cb(void *arg)
+surv0_sock_getq_cb(void *arg)
{
- surv_sock *s = arg;
- surv_pipe *p;
- surv_pipe *last;
- nni_msg * msg, *dup;
+ surv0_sock *s = arg;
+ surv0_pipe *p;
+ surv0_pipe *last;
+ nni_msg * msg, *dup;
if (nni_aio_result(s->aio_getq) != 0) {
// Should be NNG_ECLOSED.
@@ -338,9 +347,9 @@ surv_sock_getq_cb(void *arg)
}
static void
-surv_timeout(void *arg)
+surv0_timeout(void *arg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
s->survid = 0;
@@ -349,9 +358,9 @@ surv_timeout(void *arg)
}
static void
-surv_sock_recv(void *arg, nni_aio *aio)
+surv0_sock_recv(void *arg, nni_aio *aio)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->survid == 0) {
@@ -364,11 +373,11 @@ surv_sock_recv(void *arg, nni_aio *aio)
}
static void
-surv_sock_send(void *arg, nni_aio *aio)
+surv0_sock_send(void *arg, nni_aio *aio)
{
- surv_sock *s = arg;
- nni_msg * msg;
- int rv;
+ surv0_sock *s = arg;
+ nni_msg * msg;
+ int rv;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -404,9 +413,9 @@ surv_sock_send(void *arg, nni_aio *aio)
}
static nni_msg *
-surv_sock_filter(void *arg, nni_msg *msg)
+surv0_sock_filter(void *arg, nni_msg *msg)
{
- surv_sock *s = arg;
+ surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
if (s->raw) {
@@ -426,50 +435,50 @@ surv_sock_filter(void *arg, nni_msg *msg)
return (msg);
}
-static nni_proto_pipe_ops surv_pipe_ops = {
- .pipe_init = surv_pipe_init,
- .pipe_fini = surv_pipe_fini,
- .pipe_start = surv_pipe_start,
- .pipe_stop = surv_pipe_stop,
+static nni_proto_pipe_ops surv0_pipe_ops = {
+ .pipe_init = surv0_pipe_init,
+ .pipe_fini = surv0_pipe_fini,
+ .pipe_start = surv0_pipe_start,
+ .pipe_stop = surv0_pipe_stop,
};
-static nni_proto_sock_option surv_sock_options[] = {
+static nni_proto_sock_option surv0_sock_options[] = {
{
.pso_name = NNG_OPT_RAW,
- .pso_getopt = surv_sock_getopt_raw,
- .pso_setopt = surv_sock_setopt_raw,
+ .pso_getopt = surv0_sock_getopt_raw,
+ .pso_setopt = surv0_sock_setopt_raw,
},
{
.pso_name = NNG_OPT_SURVEYOR_SURVEYTIME,
- .pso_getopt = surv_sock_getopt_surveytime,
- .pso_setopt = surv_sock_setopt_surveytime,
+ .pso_getopt = surv0_sock_getopt_surveytime,
+ .pso_setopt = surv0_sock_setopt_surveytime,
},
// terminate list
{ NULL, NULL, NULL },
};
-static nni_proto_sock_ops surv_sock_ops = {
- .sock_init = surv_sock_init,
- .sock_fini = surv_sock_fini,
- .sock_open = surv_sock_open,
- .sock_close = surv_sock_close,
- .sock_send = surv_sock_send,
- .sock_recv = surv_sock_recv,
- .sock_filter = surv_sock_filter,
- .sock_options = surv_sock_options,
+static nni_proto_sock_ops surv0_sock_ops = {
+ .sock_init = surv0_sock_init,
+ .sock_fini = surv0_sock_fini,
+ .sock_open = surv0_sock_open,
+ .sock_close = surv0_sock_close,
+ .sock_send = surv0_sock_send,
+ .sock_recv = surv0_sock_recv,
+ .sock_filter = surv0_sock_filter,
+ .sock_options = surv0_sock_options,
};
-static nni_proto surv_proto = {
+static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &surv_sock_ops,
- .proto_pipe_ops = &surv_pipe_ops,
+ .proto_sock_ops = &surv0_sock_ops,
+ .proto_pipe_ops = &surv0_pipe_ops,
};
int
nng_surveyor0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &surv_proto));
+ return (nni_proto_open(sidp, &surv0_proto));
}
diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h
new file mode 100644
index 00000000..a7b6d943
--- /dev/null
+++ b/src/protocol/survey0/survey.h
@@ -0,0 +1,30 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef NNG_PROTOCOL_SURVEY0_SURVEY_H
+#define NNG_PROTOCOL_SURVEY0_SURVEY_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+NNG_DECL int nng_surveyor0_open(nng_socket *);
+
+#ifndef nng_surveyor_open
+#define nng_surveyor_open nng_surveyor0_open
+#endif
+
+#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time"
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // NNG_PROTOCOL_SURVEY0_SURVEY_H