aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0')
-rw-r--r--src/protocol/pubsub0/pub.c37
-rw-r--r--src/protocol/pubsub0/pub.h9
-rw-r--r--src/protocol/pubsub0/sub.c57
-rw-r--r--src/protocol/pubsub0/sub.h10
4 files changed, 59 insertions, 54 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index aaa22801..45f4b7d9 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -40,7 +40,6 @@ static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
nni_msgq *uwq;
- bool raw;
nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
@@ -83,7 +82,6 @@ pub0_sock_init(void **sp, nni_sock *sock)
return (rv);
}
- s->raw = false;
NNI_LIST_INIT(&s->pipes, pub0_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -273,20 +271,6 @@ pub0_pipe_send_cb(void *arg)
nni_msgq_aio_get(p->sendq, p->aio_getq);
}
-static int
-pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- pub0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- pub0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
pub0_sock_recv(void *arg, nni_aio *aio)
{
@@ -310,12 +294,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
};
static nni_proto_sock_option pub0_sock_options[] = {
- {
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = pub0_sock_getopt_raw,
- .pso_setopt = pub0_sock_setopt_raw,
- },
// terminate list
{
.pso_name = NULL,
@@ -341,8 +319,23 @@ static nni_proto pub0_proto = {
.proto_pipe_ops = &pub0_pipe_ops,
};
+static nni_proto pub0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_PUB_V0, "pub" },
+ .proto_peer = { NNI_PROTO_SUB_V0, "sub" },
+ .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &pub0_sock_ops,
+ .proto_pipe_ops = &pub0_pipe_ops,
+};
+
int
nng_pub0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &pub0_proto));
}
+
+int
+nng_pub0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pub0_proto_raw));
+}
diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h
index 2388a292..877f2f1c 100644
--- a/src/protocol/pubsub0/pub.h
+++ b/src/protocol/pubsub0/pub.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -16,11 +16,16 @@ extern "C" {
#endif
NNG_DECL int nng_pub0_open(nng_socket *);
+NNG_DECL int nng_pub0_open_raw(nng_socket *);
#ifndef nng_pub_open
#define nng_pub_open nng_pub0_open
#endif
+#ifndef nng_pub_open_raw
+#define nng_pub_open_raw nng_pub0_open_raw
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 6b1f1173..b41b33ea 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -44,7 +44,6 @@ struct sub0_topic {
struct sub0_sock {
nni_list topics;
nni_msgq *urq;
- bool raw;
nni_mtx lk;
};
@@ -66,7 +65,6 @@ sub0_sock_init(void **sp, nni_sock *sock)
}
nni_mtx_init(&s->lk);
NNI_LIST_INIT(&s->topics, sub0_topic, node);
- s->raw = false;
s->urq = nni_sock_recvq(sock);
*sp = s;
@@ -277,20 +275,6 @@ sub0_unsubscribe(void *arg, const void *buf, size_t sz, int typ)
return (NNG_ENOENT);
}
-static int
-sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ)
-{
- sub0_sock *s = arg;
- return (nni_copyin_bool(&s->raw, buf, sz, typ));
-}
-
-static int
-sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ)
-{
- sub0_sock *s = arg;
- return (nni_copyout_bool(s->raw, buf, szp, typ));
-}
-
static void
sub0_sock_send(void *arg, nni_aio *aio)
{
@@ -315,16 +299,13 @@ sub0_sock_filter(void *arg, nni_msg *msg)
size_t len;
int match;
- nni_mtx_lock(&s->lk);
- if (s->raw) {
- nni_mtx_unlock(&s->lk);
- return (msg);
- }
-
body = nni_msg_body(msg);
len = nni_msg_len(msg);
match = 0;
+
+ nni_mtx_lock(&s->lk);
+
// Check to see if the message matches one of our subscriptions.
NNI_LIST_FOREACH (&s->topics, topic) {
if (len >= topic->len) {
@@ -362,12 +343,6 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
static nni_proto_sock_option sub0_sock_options[] = {
{
- .pso_name = NNG_OPT_RAW,
- .pso_type = NNI_TYPE_BOOL,
- .pso_getopt = sub0_sock_getopt_raw,
- .pso_setopt = sub0_sock_setopt_raw,
- },
- {
.pso_name = NNG_OPT_SUB_SUBSCRIBE,
.pso_type = NNI_TYPE_OPAQUE,
.pso_getopt = NULL,
@@ -396,6 +371,17 @@ static nni_proto_sock_ops sub0_sock_ops = {
.sock_options = sub0_sock_options,
};
+static nni_proto_sock_ops sub0_sock_ops_raw = {
+ .sock_init = sub0_sock_init,
+ .sock_fini = sub0_sock_fini,
+ .sock_open = sub0_sock_open,
+ .sock_close = sub0_sock_close,
+ .sock_send = sub0_sock_send,
+ .sock_recv = sub0_sock_recv,
+ .sock_filter = NULL, // raw does not filter
+ .sock_options = sub0_sock_options,
+};
+
static nni_proto sub0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SUB_V0, "sub" },
@@ -405,8 +391,23 @@ static nni_proto sub0_proto = {
.proto_pipe_ops = &sub0_pipe_ops,
};
+static nni_proto sub0_proto_raw = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_SUB_V0, "sub" },
+ .proto_peer = { NNI_PROTO_PUB_V0, "pub" },
+ .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &sub0_sock_ops_raw,
+ .proto_pipe_ops = &sub0_pipe_ops,
+};
+
int
nng_sub0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &sub0_proto));
}
+
+int
+nng_sub0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &sub0_proto_raw));
+}
diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h
index 1a09145d..acb5cda3 100644
--- a/src/protocol/pubsub0/sub.h
+++ b/src/protocol/pubsub0/sub.h
@@ -1,6 +1,6 @@
//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -17,10 +17,16 @@ extern "C" {
NNG_DECL int nng_sub0_open(nng_socket *);
+NNG_DECL int nng_sub0_open_raw(nng_socket *);
+
#ifndef nng_sub_open
#define nng_sub_open nng_sub0_open
#endif
+#ifndef nng_sub_open_raw
+#define nng_sub_open_raw nng_sub0_open_raw
+#endif
+
#define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe"
#define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe"