summaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus0/bus.c37
-rw-r--r--src/protocol/bus0/bus.h10
-rw-r--r--src/protocol/pair0/pair.c37
-rw-r--r--src/protocol/pair0/pair.h10
-rw-r--r--src/protocol/pair1/pair.c65
-rw-r--r--src/protocol/pair1/pair.h9
-rw-r--r--src/protocol/pipeline0/pull.c36
-rw-r--r--src/protocol/pipeline0/pull.h9
-rw-r--r--src/protocol/pipeline0/push.c37
-rw-r--r--src/protocol/pipeline0/push.h9
-rw-r--r--src/protocol/pubsub0/pub.c37
-rw-r--r--src/protocol/pubsub0/pub.h9
-rw-r--r--src/protocol/pubsub0/sub.c57
-rw-r--r--src/protocol/pubsub0/sub.h10
-rw-r--r--src/protocol/reqrep0/rep.c70
-rw-r--r--src/protocol/reqrep0/rep.h9
-rw-r--r--src/protocol/reqrep0/req.c97
-rw-r--r--src/protocol/reqrep0/req.h8
-rw-r--r--src/protocol/survey0/respond.c68
-rw-r--r--src/protocol/survey0/respond.h9
-rw-r--r--src/protocol/survey0/survey.c76
-rw-r--r--src/protocol/survey0/survey.h9
22 files changed, 381 insertions, 337 deletions
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index 57fad341..2a2a1228 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -42,7 +42,6 @@ static void bus0_pipe_putq_cb(void *);
// bus0_sock is our per-socket protocol private structure.
struct bus0_sock {
- bool raw;
nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
@@ -89,7 +88,6 @@ bus0_sock_init(void **sp, nni_sock *nsock)
bus0_sock_fini(s);
return (rv);
}
- s->raw = false;
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
@@ -333,20 +331,6 @@ bus0_pipe_recv(bus0_pipe *p)
nni_pipe_recv(p->npipe, p->aio_recv);
}
-static int
-bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- bus0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- bus0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
bus0_sock_send(void *arg, nni_aio *aio)
{
@@ -371,12 +355,6 @@ static nni_proto_pipe_ops bus0_pipe_ops = {
};
static nni_proto_sock_option bus0_sock_options[] = {
- {
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = bus0_sock_getopt_raw,
- .pso_setopt = bus0_sock_setopt_raw,
- },
// terminate list
{
.pso_name = NULL,
@@ -402,8 +380,23 @@ static nni_proto bus0_proto = {
.proto_pipe_ops = &bus0_pipe_ops,
};
+static nni_proto bus0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_BUS_V0, "bus" },
+ .proto_peer = { NNI_PROTO_BUS_V0, "bus" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &bus0_sock_ops,
+ .proto_pipe_ops = &bus0_pipe_ops,
+};
+
int
nng_bus0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &bus0_proto));
}
+
+int
+nng_bus0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &bus0_proto_raw));
+}
diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h
index 0ef3d391..c8c23d84 100644
--- a/src/protocol/bus0/bus.h
+++ b/src/protocol/bus0/bus.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,10 +17,16 @@ extern "C" {
NNG_DECL int nng_bus0_open(nng_socket *);
+NNG_DECL int nng_bus0_open_raw(nng_socket *);
+
#ifndef nng_bus_open
#define nng_bus_open nng_bus0_open
#endif
+#ifndef nng_bus_open_raw
+#define nng_bus_open_raw nng_bus0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index ccece972..e275e52c 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -36,7 +36,6 @@ struct pair0_sock {
pair0_pipe *ppipe;
nni_msgq * uwq;
nni_msgq * urq;
- bool raw;
nni_mtx mtx;
};
@@ -63,7 +62,6 @@ pair0_sock_init(void **sp, nni_sock *nsock)
}
nni_mtx_init(&s->mtx);
s->ppipe = NULL;
- s->raw = false;
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
*sp = s;
@@ -231,20 +229,6 @@ pair0_sock_close(void *arg)
NNI_ARG_UNUSED(arg);
}
-static int
-pair0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- pair0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- pair0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
pair0_sock_send(void *arg, nni_aio *aio)
{
@@ -269,12 +253,6 @@ static nni_proto_pipe_ops pair0_pipe_ops = {
};
static nni_proto_sock_option pair0_sock_options[] = {
- {
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = pair0_sock_getopt_raw,
- .pso_setopt = pair0_sock_setopt_raw,
- },
// terminate list
{
.pso_name = NULL,
@@ -301,8 +279,23 @@ static nni_proto pair0_proto = {
.proto_pipe_ops = &pair0_pipe_ops,
};
+static nni_proto pair0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_PAIR_V0, "pair" },
+ .proto_peer = { NNI_PROTO_PAIR_V0, "pair" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &pair0_sock_ops,
+ .proto_pipe_ops = &pair0_pipe_ops,
+};
+
int
nng_pair0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &pair0_proto));
}
+
+int
+nng_pair0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pair0_proto_raw));
+}
diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h
index 6828c921..1356f1cd 100644
--- a/src/protocol/pair0/pair.h
+++ b/src/protocol/pair0/pair.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,10 +17,16 @@ extern "C" {
NNG_DECL int nng_pair0_open(nng_socket *);
+NNG_DECL int nng_pair0_open_raw(nng_socket *);
+
#ifndef nng_pair_open
#define nng_pair_open nng_pair0_open
#endif
+#ifndef nng_pair_open_raw
+#define nng_pair_open_raw nng_pair0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index becbbfa7..a3c01d46 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -72,7 +72,7 @@ pair1_sock_fini(void *arg)
}
static int
-pair1_sock_init(void **sp, nni_sock *nsock)
+pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw)
{
pair1_sock *s;
int rv;
@@ -94,7 +94,7 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (rv);
}
- s->raw = false;
+ s->raw = raw;
s->poly = false;
s->uwq = nni_sock_sendq(nsock);
s->urq = nni_sock_recvq(nsock);
@@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (0);
}
+static int
+pair1_sock_init(void **sp, nni_sock *nsock)
+{
+ return (pair1_sock_init_impl(sp, nsock, false));
+}
+
+static int
+pair1_sock_init_raw(void **sp, nni_sock *nsock)
+{
+ return (pair1_sock_init_impl(sp, nsock, true));
+}
+
static void
pair1_pipe_fini(void *arg)
{
@@ -397,24 +409,6 @@ pair1_sock_close(void *arg)
}
static int
-pair1_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- pair1_sock *s = arg;
- int rv;
- nni_mtx_lock(&s->mtx);
- rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->raw, buf, sz, typ);
- nni_mtx_unlock(&s->mtx);
- return (rv);
-}
-
-static int
-pair1_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- pair1_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
-static int
pair1_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
{
pair1_sock *s = arg;
@@ -475,12 +469,6 @@ static nni_proto_pipe_ops pair1_pipe_ops = {
static nni_proto_sock_option pair1_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = pair1_sock_getopt_raw,
- .pso_setopt = pair1_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_MAXTTL,
.pso_type = NNI_TYPE_INT32,
.pso_getopt = pair1_sock_getopt_maxttl,
@@ -522,3 +510,28 @@ nng_pair1_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &pair1_proto));
}
+
+static nni_proto_sock_ops pair1_sock_ops_raw = {
+ .sock_init = pair1_sock_init_raw,
+ .sock_fini = pair1_sock_fini,
+ .sock_open = pair1_sock_open,
+ .sock_close = pair1_sock_close,
+ .sock_recv = pair1_sock_recv,
+ .sock_send = pair1_sock_send,
+ .sock_options = pair1_sock_options,
+};
+
+static nni_proto pair1_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_PAIR_V1, "pair1" },
+ .proto_peer = { NNI_PROTO_PAIR_V1, "pair1" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &pair1_sock_ops_raw,
+ .proto_pipe_ops = &pair1_pipe_ops,
+};
+
+int
+nng_pair1_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pair1_proto_raw));
+}
diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h
index bc519d9f..85da9d45 100644
--- a/src/protocol/pair1/pair.h
+++ b/src/protocol/pair1/pair.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_pair1_open(nng_socket *);
+NNG_DECL int nng_pair1_open_raw(nng_socket *);
#ifndef nng_pair_open
#define nng_pair_open nng_pair1_open
#endif
+#ifndef nng_pair_open_raw
+#define nng_pair_open_raw nng_pair1_open_raw
+#endif
+
#define NNG_OPT_PAIR1_POLY "pair1:polyamorous"
#ifdef __cplusplus
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 9aa7bea9..c5017d50 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -53,7 +53,6 @@ pull0_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = false;
s->urq = nni_sock_recvq(sock);
*sp = s;
@@ -180,20 +179,6 @@ pull0_sock_close(void *arg)
NNI_ARG_UNUSED(arg);
}
-static int
-pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- pull0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- pull0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
pull0_sock_send(void *arg, nni_aio *aio)
{
@@ -217,12 +202,6 @@ static nni_proto_pipe_ops pull0_pipe_ops = {
};
static nni_proto_sock_option pull0_sock_options[] = {
- {
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = pull0_sock_getopt_raw,
- .pso_setopt = pull0_sock_setopt_raw,
- },
// terminate list
{
.pso_name = NULL,
@@ -248,8 +227,23 @@ static nni_proto pull0_proto = {
.proto_sock_ops = &pull0_sock_ops,
};
+static nni_proto pull0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_PULL_V0, "pull" },
+ .proto_peer = { NNI_PROTO_PUSH_V0, "push" },
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
+ .proto_pipe_ops = &pull0_pipe_ops,
+ .proto_sock_ops = &pull0_sock_ops,
+};
+
int
nng_pull0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &pull0_proto));
}
+
+int
+nng_pull0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pull0_proto_raw));
+}
diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h
index 75bded03..1c5d63e3 100644
--- a/src/protocol/pipeline0/pull.h
+++ b/src/protocol/pipeline0/pull.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_pull0_open(nng_socket *);
+NNG_DECL int nng_pull0_open_raw(nng_socket *);
#ifndef nng_pull_open
#define nng_pull_open nng_pull0_open
#endif
+#ifndef nng_pull_open_raw
+#define nng_pull_open_raw nng_pull0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 8c8fa13e..2ad657b6 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -36,7 +36,6 @@ static void push0_getq_cb(void *);
// push0_sock is our per-socket protocol private structure.
struct push0_sock {
nni_msgq *uwq;
- bool raw;
};
// push0_pipe is our per-pipe protocol private structure.
@@ -58,7 +57,6 @@ push0_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = false;
s->uwq = nni_sock_sendq(sock);
*sp = s;
return (0);
@@ -197,20 +195,6 @@ push0_getq_cb(void *arg)
nni_pipe_send(p->pipe, p->aio_send);
}
-static int
-push0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- push0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-push0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- push0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
push0_sock_send(void *arg, nni_aio *aio)
{
@@ -234,12 +218,6 @@ static nni_proto_pipe_ops push0_pipe_ops = {
};
static nni_proto_sock_option push0_sock_options[] = {
- {
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = push0_sock_getopt_raw,
- .pso_setopt = push0_sock_setopt_raw,
- },
// terminate list
{
.pso_name = NULL,
@@ -265,8 +243,23 @@ static nni_proto push0_proto = {
.proto_sock_ops = &push0_sock_ops,
};
+static nni_proto push0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_PUSH_V0, "push" },
+ .proto_peer = { NNI_PROTO_PULL_V0, "pull" },
+ .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW,
+ .proto_pipe_ops = &push0_pipe_ops,
+ .proto_sock_ops = &push0_sock_ops,
+};
+
int
nng_push0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &push0_proto));
}
+
+int
+nng_push0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &push0_proto_raw));
+}
diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h
index c7303b92..a1384e0a 100644
--- a/src/protocol/pipeline0/push.h
+++ b/src/protocol/pipeline0/push.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_push0_open(nng_socket *);
+NNG_DECL int nng_push0_open_raw(nng_socket *);
#ifndef nng_push_open
#define nng_push_open nng_push0_open
#endif
+#ifndef nng_push_open_raw
+#define nng_push_open_raw nng_push0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index aaa22801..45f4b7d9 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -40,7 +40,6 @@ static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
nni_msgq *uwq;
- bool raw;
nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
@@ -83,7 +82,6 @@ pub0_sock_init(void **sp, nni_sock *sock)
return (rv);
}
- s->raw = false;
NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -273,20 +271,6 @@ pub0_pipe_send_cb(void *arg)
nni_msgq_aio_get(p->sendq, p->aio_getq);
}
-static int
-pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- pub0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- pub0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
pub0_sock_recv(void *arg, nni_aio *aio)
{
@@ -310,12 +294,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
};
static nni_proto_sock_option pub0_sock_options[] = {
- {
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = pub0_sock_getopt_raw,
- .pso_setopt = pub0_sock_setopt_raw,
- },
// terminate list
{
.pso_name = NULL,
@@ -341,8 +319,23 @@ static nni_proto pub0_proto = {
.proto_pipe_ops = &pub0_pipe_ops,
};
+static nni_proto pub0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_PUB_V0, "pub" },
+ .proto_peer = { NNI_PROTO_SUB_V0, "sub" },
+ .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &pub0_sock_ops,
+ .proto_pipe_ops = &pub0_pipe_ops,
+};
+
int
nng_pub0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &pub0_proto));
}
+
+int
+nng_pub0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pub0_proto_raw));
+}
diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h
index 2388a292..877f2f1c 100644
--- a/src/protocol/pubsub0/pub.h
+++ b/src/protocol/pubsub0/pub.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_pub0_open(nng_socket *);
+NNG_DECL int nng_pub0_open_raw(nng_socket *);
#ifndef nng_pub_open
#define nng_pub_open nng_pub0_open
#endif
+#ifndef nng_pub_open_raw
+#define nng_pub_open_raw nng_pub0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 6b1f1173..b41b33ea 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -44,7 +44,6 @@ struct sub0_topic {
struct sub0_sock {
nni_list topics;
nni_msgq *urq;
- bool raw;
nni_mtx lk;
};
@@ -66,7 +65,6 @@ sub0_sock_init(void **sp, nni_sock *sock)
}
nni_mtx_init(&s->lk);
NNI_LIST_INIT(&s->topics, sub0_topic, node);
- s->raw = false;
s->urq = nni_sock_recvq(sock);
*sp = s;
@@ -277,20 +275,6 @@ sub0_unsubscribe(void *arg, const void *buf, size_t sz, int typ)
return (NNG_ENOENT);
}
-static int
-sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- sub0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- sub0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
sub0_sock_send(void *arg, nni_aio *aio)
{
@@ -315,16 +299,13 @@ sub0_sock_filter(void *arg, nni_msg *msg)
size_t len;
int match;
- nni_mtx_lock(&s->lk);
- if (s->raw) {
- nni_mtx_unlock(&s->lk);
- return (msg);
- }
-
body = nni_msg_body(msg);
len = nni_msg_len(msg);
match = 0;
+
+ nni_mtx_lock(&s->lk);
+
// Check to see if the message matches one of our subscriptions.
NNI_LIST_FOREACH (&s->topics, topic) {
if (len >= topic->len) {
@@ -362,12 +343,6 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
static nni_proto_sock_option sub0_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = sub0_sock_getopt_raw,
- .pso_setopt = sub0_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_SUB_SUBSCRIBE,
.pso_type = NNI_TYPE_OPAQUE,
.pso_getopt = NULL,
@@ -396,6 +371,17 @@ static nni_proto_sock_ops sub0_sock_ops = {
.sock_options = sub0_sock_options,
};
+static nni_proto_sock_ops sub0_sock_ops_raw = {
+ .sock_init = sub0_sock_init,
+ .sock_fini = sub0_sock_fini,
+ .sock_open = sub0_sock_open,
+ .sock_close = sub0_sock_close,
+ .sock_send = sub0_sock_send,
+ .sock_recv = sub0_sock_recv,
+ .sock_filter = NULL, // raw does not filter
+ .sock_options = sub0_sock_options,
+};
+
static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
@@ -405,8 +391,23 @@ static nni_proto sub0_proto = {
.proto_pipe_ops = &sub0_pipe_ops,
};
+static nni_proto sub0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_SUB_V0, "sub" },
+ .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &sub0_sock_ops_raw,
+ .proto_pipe_ops = &sub0_pipe_ops,
+};
+
int
nng_sub0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &sub0_proto));
}
+
+int
+nng_sub0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &sub0_proto_raw));
+}
diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h
index 1a09145d..acb5cda3 100644
--- a/src/protocol/pubsub0/sub.h
+++ b/src/protocol/pubsub0/sub.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,10 +17,16 @@ extern "C" {
NNG_DECL int nng_sub0_open(nng_socket *);
+NNG_DECL int nng_sub0_open_raw(nng_socket *);
+
#ifndef nng_sub_open
#define nng_sub_open nng_sub0_open
#endif
+#ifndef nng_sub_open_raw
+#define nng_sub_open_raw nng_sub0_open_raw
+#endif
+
#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index f62406cd..78a1f2ee 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -41,7 +41,6 @@ struct rep0_sock {
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
- bool raw;
int ttl;
nni_idhash *pipes;
char * btrace;
@@ -92,7 +91,6 @@ rep0_sock_init(void **sp, nni_sock *sock)
}
s->ttl = 8; // Per RFC
- s->raw = false;
s->btrace = NULL;
s->btrace_len = 0;
s->uwq = nni_sock_sendq(sock);
@@ -353,25 +351,6 @@ rep0_pipe_putq_cb(void *arg)
}
static int
-rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- rep0_sock *s = arg;
- int rv;
-
- nni_mtx_lock(&s->lk);
- rv = nni_copyin_bool(&s->raw, buf, sz, typ);
- nni_mtx_unlock(&s->lk);
- return (rv);
-}
-
-static int
-rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- rep0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
-static int
rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
{
rep0_sock *s = arg;
@@ -393,10 +372,6 @@ rep0_sock_filter(void *arg, nni_msg *msg)
size_t len;
nni_mtx_lock(&s->lk);
- if (s->raw) {
- nni_mtx_unlock(&s->lk);
- return (msg);
- }
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
@@ -417,6 +392,13 @@ rep0_sock_filter(void *arg, nni_msg *msg)
}
static void
+rep0_sock_send_raw(void *arg, nni_aio *aio)
+{
+ rep0_sock *s = arg;
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
rep0_sock_send(void *arg, nni_aio *aio)
{
rep0_sock *s = arg;
@@ -424,12 +406,6 @@ rep0_sock_send(void *arg, nni_aio *aio)
nni_msg * msg;
nni_mtx_lock(&s->lk);
- if (s->raw) {
- // Pass thru
- nni_mtx_unlock(&s->lk);
- nni_msgq_aio_put(s->uwq, aio);
- return;
- }
if (s->btrace == NULL) {
nni_mtx_unlock(&s->lk);
nni_aio_finish_error(aio, NNG_ESTATE);
@@ -475,12 +451,6 @@ static nni_proto_pipe_ops rep0_pipe_ops = {
static nni_proto_sock_option rep0_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = rep0_sock_getopt_raw,
- .pso_setopt = rep0_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_MAXTTL,
.pso_type = NNI_TYPE_INT32,
.pso_getopt = rep0_sock_getopt_maxttl,
@@ -503,6 +473,17 @@ static nni_proto_sock_ops rep0_sock_ops = {
.sock_recv = rep0_sock_recv,
};
+static nni_proto_sock_ops rep0_sock_ops_raw = {
+ .sock_init = rep0_sock_init,
+ .sock_fini = rep0_sock_fini,
+ .sock_open = rep0_sock_open,
+ .sock_close = rep0_sock_close,
+ .sock_options = rep0_sock_options,
+ .sock_filter = NULL, // No filtering for raw mode
+ .sock_send = rep0_sock_send_raw,
+ .sock_recv = rep0_sock_recv,
+};
+
static nni_proto rep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REP_V0, "rep" },
@@ -512,8 +493,23 @@ static nni_proto rep0_proto = {
.proto_pipe_ops = &rep0_pipe_ops,
};
+static nni_proto rep0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_REP_V0, "rep" },
+ .proto_peer = { NNI_PROTO_REQ_V0, "req" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &rep0_sock_ops_raw,
+ .proto_pipe_ops = &rep0_pipe_ops,
+};
+
int
nng_rep0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &rep0_proto));
}
+
+int
+nng_rep0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &rep0_proto_raw));
+}
diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h
index 93df9379..612127a2 100644
--- a/src/protocol/reqrep0/rep.h
+++ b/src/protocol/reqrep0/rep.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_rep0_open(nng_socket *);
+NNG_DECL int nng_rep0_open_raw(nng_socket *);
#ifndef nng_rep_open
#define nng_rep_open nng_rep0_open
#endif
+#ifndef nng_rep_open
+#define nng_rep_open_raw nng_rep0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 0a5b566a..4d35ca1f 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -78,7 +78,7 @@ static void req0_recv_cb(void *);
static void req0_putq_cb(void *);
static int
-req0_sock_init(void **sp, nni_sock *sock)
+req0_sock_init_impl(void **sp, nni_sock *sock, bool raw)
{
req0_sock *s;
@@ -96,7 +96,7 @@ req0_sock_init(void **sp, nni_sock *sock)
s->nextid = nni_random();
s->retry = NNI_SECOND * 60;
s->reqmsg = NULL;
- s->raw = false;
+ s->raw = raw;
s->wantw = false;
s->resend = NNI_TIME_ZERO;
s->ttl = 8;
@@ -107,6 +107,18 @@ req0_sock_init(void **sp, nni_sock *sock)
return (0);
}
+static int
+req0_sock_init(void **sp, nni_sock *sock)
+{
+ return (req0_sock_init_impl(sp, sock, false));
+}
+
+static int
+req0_sock_init_raw(void **sp, nni_sock *sock)
+{
+ return (req0_sock_init_impl(sp, sock, true));
+}
+
static void
req0_sock_open(void *arg)
{
@@ -250,20 +262,6 @@ req0_pipe_stop(void *arg)
}
static int
-req0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- req0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-req0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- req0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
-static int
req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
{
req0_sock *s = arg;
@@ -513,11 +511,6 @@ req0_sock_send(void *arg, nni_aio *aio)
int rv;
nni_mtx_lock(&s->mtx);
- if (s->raw) {
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_put(s->uwq, aio);
- return;
- }
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
@@ -559,6 +552,14 @@ req0_sock_send(void *arg, nni_aio *aio)
nni_aio_finish(aio, 0, len);
}
+static void
+req0_sock_send_raw(void *arg, nni_aio *aio)
+{
+ req0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
static nni_msg *
req0_sock_filter(void *arg, nni_msg *msg)
{
@@ -566,11 +567,6 @@ req0_sock_filter(void *arg, nni_msg *msg)
nni_msg * rmsg;
nni_mtx_lock(&s->mtx);
- if (s->raw) {
- // Pass it unmolested
- nni_mtx_unlock(&s->mtx);
- return (msg);
- }
if (nni_msg_header_len(msg) < 4) {
nni_mtx_unlock(&s->mtx);
@@ -608,17 +604,23 @@ req0_sock_recv(void *arg, nni_aio *aio)
req0_sock *s = arg;
nni_mtx_lock(&s->mtx);
- if (!s->raw) {
- if (s->reqmsg == NULL) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
+ if (s->reqmsg == NULL) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
}
nni_mtx_unlock(&s->mtx);
nni_msgq_aio_get(s->urq, aio);
}
+static void
+req0_sock_recv_raw(void *arg, nni_aio *aio)
+{
+ req0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops req0_pipe_ops = {
.pipe_init = req0_pipe_init,
.pipe_fini = req0_pipe_fini,
@@ -628,12 +630,6 @@ static nni_proto_pipe_ops req0_pipe_ops = {
static nni_proto_sock_option req0_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = req0_sock_getopt_raw,
- .pso_setopt = req0_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_MAXTTL,
.pso_type = NNI_TYPE_INT32,
.pso_getopt = req0_sock_getopt_maxttl,
@@ -676,3 +672,28 @@ nng_req0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &req0_proto));
}
+
+static nni_proto_sock_ops req0_sock_ops_raw = {
+ .sock_init = req0_sock_init_raw,
+ .sock_fini = req0_sock_fini,
+ .sock_open = req0_sock_open,
+ .sock_close = req0_sock_close,
+ .sock_options = req0_sock_options,
+ .sock_send = req0_sock_send_raw,
+ .sock_recv = req0_sock_recv_raw,
+};
+
+static nni_proto req0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_REQ_V0, "req" },
+ .proto_peer = { NNI_PROTO_REP_V0, "rep" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &req0_sock_ops_raw,
+ .proto_pipe_ops = &req0_pipe_ops,
+};
+
+int
+nng_req0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &req0_proto_raw));
+} \ No newline at end of file
diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h
index 99c9bf62..392c7932 100644
--- a/src/protocol/reqrep0/req.h
+++ b/src/protocol/reqrep0/req.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,10 +16,14 @@ extern "C" {
#endif
NNG_DECL int nng_req0_open(nng_socket *);
+NNG_DECL int nng_req0_open_raw(nng_socket *);
#ifndef nng_req_open
#define nng_req_open nng_req0_open
#endif
+#ifndef nng_req_open_raw
+#define nng_req_open_raw nng_req0_open_raw
+#endif
#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index eeb09d2a..1605d9e6 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -40,7 +40,6 @@ static void resp0_pipe_fini(void *);
struct resp0_sock {
nni_msgq * urq;
nni_msgq * uwq;
- bool raw;
int ttl;
nni_idhash *pipes;
char * btrace;
@@ -93,7 +92,6 @@ resp0_sock_init(void **sp, nni_sock *nsock)
}
s->ttl = 8; // Per RFC
- s->raw = false;
s->btrace = NULL;
s->btrace_len = 0;
s->urq = nni_sock_recvq(nsock);
@@ -347,36 +345,25 @@ resp0_putq_cb(void *arg)
}
static int
-resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
+resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
{
resp0_sock *s = arg;
- int rv;
-
- nni_mtx_lock(&s->mtx);
- rv = nni_copyin_bool(&s->raw, buf, sz, typ);
- nni_mtx_unlock(&s->mtx);
- return (rv);
+ return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
}
static int
-resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
+resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
{
resp0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
+ return (nni_copyout_int(s->ttl, buf, szp, typ));
}
-static int
-resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
+static void
+resp0_sock_send_raw(void *arg, nni_aio *aio)
{
resp0_sock *s = arg;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
-}
-static int
-resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
-{
- resp0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, typ));
+ nni_msgq_aio_put(s->uwq, aio);
}
static void
@@ -387,11 +374,6 @@ resp0_sock_send(void *arg, nni_aio *aio)
int rv;
nni_mtx_lock(&s->mtx);
- if (s->raw) {
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_put(s->uwq, aio);
- return;
- }
msg = nni_aio_get_msg(aio);
@@ -428,10 +410,6 @@ resp0_sock_filter(void *arg, nni_msg *msg)
size_t len;
nni_mtx_lock(&s->mtx);
- if (s->raw) {
- nni_mtx_unlock(&s->mtx);
- return (msg);
- }
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
@@ -469,12 +447,6 @@ static nni_proto_pipe_ops resp0_pipe_ops = {
static nni_proto_sock_option resp0_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = resp0_sock_getopt_raw,
- .pso_setopt = resp0_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_MAXTTL,
.pso_type = NNI_TYPE_INT32,
.pso_getopt = resp0_sock_getopt_maxttl,
@@ -497,6 +469,17 @@ static nni_proto_sock_ops resp0_sock_ops = {
.sock_options = resp0_sock_options,
};
+static nni_proto_sock_ops resp0_sock_ops_raw = {
+ .sock_init = resp0_sock_init,
+ .sock_fini = resp0_sock_fini,
+ .sock_open = resp0_sock_open,
+ .sock_close = resp0_sock_close,
+ .sock_filter = NULL, // no filter for raw
+ .sock_send = resp0_sock_send_raw,
+ .sock_recv = resp0_sock_recv,
+ .sock_options = resp0_sock_options,
+};
+
static nni_proto resp0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
@@ -506,8 +489,23 @@ static nni_proto resp0_proto = {
.proto_pipe_ops = &resp0_pipe_ops,
};
+static nni_proto resp0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &resp0_sock_ops_raw,
+ .proto_pipe_ops = &resp0_pipe_ops,
+};
+
int
nng_respondent0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &resp0_proto));
}
+
+int
+nng_respondent0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &resp0_proto_raw));
+}
diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h
index 58c65298..b865b2ac 100644
--- a/src/protocol/survey0/respond.h
+++ b/src/protocol/survey0/respond.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_respondent0_open(nng_socket *);
+NNG_DECL int nng_respondent0_open_raw(nng_socket *);
#ifndef nng_respondent_open
#define nng_respondent_open nng_respondent0_open
#endif
+#ifndef nng_respondent_open_raw
+#define nng_respondent_open_raw nng_respondent0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index a5909015..b7158464 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -39,7 +39,6 @@ static void surv0_timeout(void *);
struct surv0_sock {
nni_duration survtime;
nni_time expire;
- bool raw;
int ttl;
uint32_t nextid; // next id
uint32_t survid; // outstanding request ID (big endian)
@@ -92,7 +91,6 @@ surv0_sock_init(void **sp, nni_sock *nsock)
nni_timer_init(&s->timer, surv0_timeout, s);
s->nextid = nni_random();
- s->raw = false;
s->survtime = NNI_SECOND;
s->expire = NNI_TIME_ZERO;
s->uwq = nni_sock_sendq(nsock);
@@ -275,28 +273,6 @@ failed:
}
static int
-surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- surv0_sock *s = arg;
- int rv;
-
- nni_mtx_lock(&s->mtx);
- if ((rv = nni_copyin_bool(&s->raw, buf, sz, typ)) == 0) {
- s->survid = 0;
- nni_timer_cancel(&s->timer);
- }
- nni_mtx_unlock(&s->mtx);
- return (rv);
-}
-
-static int
-surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- surv0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
-static int
surv0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
{
surv0_sock *s = arg;
@@ -391,6 +367,14 @@ surv0_sock_recv(void *arg, nni_aio *aio)
}
static void
+surv0_sock_send_raw(void *arg, nni_aio *aio)
+{
+ surv0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
surv0_sock_send(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
@@ -398,13 +382,6 @@ surv0_sock_send(void *arg, nni_aio *aio)
int rv;
nni_mtx_lock(&s->mtx);
- if (s->raw) {
- // No automatic retry, and the request ID must
- // be in the header coming down.
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_put(s->uwq, aio);
- return;
- }
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
@@ -437,11 +414,6 @@ surv0_sock_filter(void *arg, nni_msg *msg)
surv0_sock *s = arg;
nni_mtx_lock(&s->mtx);
- if (s->raw) {
- // Pass it unmolested
- nni_mtx_unlock(&s->mtx);
- return (msg);
- }
if ((nni_msg_header_len(msg) < sizeof(uint32_t)) ||
(nni_msg_header_trim_u32(msg) != s->survid)) {
@@ -464,12 +436,6 @@ static nni_proto_pipe_ops surv0_pipe_ops = {
static nni_proto_sock_option surv0_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = surv0_sock_getopt_raw,
- .pso_setopt = surv0_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_SURVEYOR_SURVEYTIME,
.pso_type = NNI_TYPE_DURATION,
.pso_getopt = surv0_sock_getopt_surveytime,
@@ -498,6 +464,17 @@ static nni_proto_sock_ops surv0_sock_ops = {
.sock_options = surv0_sock_options,
};
+static nni_proto_sock_ops surv0_sock_ops_raw = {
+ .sock_init = surv0_sock_init,
+ .sock_fini = surv0_sock_fini,
+ .sock_open = surv0_sock_open,
+ .sock_close = surv0_sock_close,
+ .sock_send = surv0_sock_send_raw,
+ .sock_recv = surv0_sock_recv,
+ .sock_filter = surv0_sock_filter,
+ .sock_options = surv0_sock_options,
+};
+
static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
@@ -507,8 +484,23 @@ static nni_proto surv0_proto = {
.proto_pipe_ops = &surv0_pipe_ops,
};
+static nni_proto surv0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
+ .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &surv0_sock_ops_raw,
+ .proto_pipe_ops = &surv0_pipe_ops,
+};
+
int
nng_surveyor0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &surv0_proto));
}
+
+int
+nng_surveyor0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &surv0_proto_raw));
+}
diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h
index a7b6d943..37f76fbf 100644
--- a/src/protocol/survey0/survey.h
+++ b/src/protocol/survey0/survey.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_surveyor0_open(nng_socket *);
+NNG_DECL int nng_surveyor0_open_raw(nng_socket *);
#ifndef nng_surveyor_open
#define nng_surveyor_open nng_surveyor0_open
#endif
+#ifndef nng_surveyor_open_raw
+#define nng_surveyor_open_raw nng_surveyor0_open_raw
+#endif
+
#define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time"
#ifdef __cplusplus