summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2019-03-13 00:50:48 -0700
committerGarrett D'Amore <garrett@damore.org>2019-03-13 00:50:48 -0700
commitf723fa9655e1e7fadc1a15b94b66de674ab9fe17 (patch)
tree8788e128e723bbd936bee0b03287a49df789a116
parentf65f819f7fb3bbe9e24bc73342b4f335f5034fe0 (diff)
downloadnng-f723fa9655e1e7fadc1a15b94b66de674ab9fe17.tar.gz
nng-f723fa9655e1e7fadc1a15b94b66de674ab9fe17.tar.bz2
nng-f723fa9655e1e7fadc1a15b94b66de674ab9fe17.zip
fixes #815 Eliminate socket filters on message queues
This also eliminates the enforcement of NNG_OPT_RECVMAXSZ for inproc, which never really made much sense. This helps inproc go faster. While here, also clean up the entry point for protocols to support a drain option, since we don't use that anywhere.
-rw-r--r--docs/man/nng_inproc.7.adoc18
-rw-r--r--src/core/msgqueue.c52
-rw-r--r--src/core/msgqueue.h15
-rw-r--r--src/core/protocol.h14
-rw-r--r--src/core/socket.c5
-rw-r--r--src/transport/inproc/inproc.c33
-rw-r--r--tests/set_recvmaxsize.c51
7 files changed, 48 insertions, 140 deletions
diff --git a/docs/man/nng_inproc.7.adoc b/docs/man/nng_inproc.7.adoc
index cfd3806d..733b7789 100644
--- a/docs/man/nng_inproc.7.adoc
+++ b/docs/man/nng_inproc.7.adoc
@@ -1,6 +1,6 @@
= nng_inproc(7)
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This document is supplied under the terms of the MIT License, a
@@ -53,17 +53,23 @@ that URI.
=== Socket Address
-When using an `<<nng_sockaddr.5#,nng_sockaddr>>` structure,
+When using an xref:nng_sockaddr.5.adoc[`nng_sockaddr`] structure,
the actual structure is of type
-`<<nng_sockaddr_inproc.5#,nng_sockaddr_inproc>>`.
+xref:nng_sockaddr_inproc.5.adoc[`nng_sockaddr_inproc`].
=== Transport Options
The _inproc_ transport has no special options.
+NOTE: While _inproc_ accepts the option
+xref:nng_options.5.adoc#NNG_OPT_RECVMAXSZ[`NNG_OPT_RECVMAXSZ`] for
+compatibility, the value of the option is ignored with no enforcement.
+As _inproc_ peers are in the same address space, they are implicitly trusted,
+and thus it makes no sense to spend cycles protecting a program from itself.
+
== SEE ALSO
[.text-left]
-<<nng_inproc_register.3#,nng_inproc_register(3)>>,
-<<nng_sockaddr_inproc.5#,nng_sockaddr_inproc(5)>>,
-<<nng.7#,nng(7)>>
+xref:nng_inproc_register.3.adoc[nng_inproc_register(3)],
+xref:nng_sockaddr_inproc.5.adoc[nng_sockaddr_inproc(5)],
+xref:nng.7.adoc[nng(7)]
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 92213265..697ca503 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -32,10 +32,6 @@ struct nni_msgq {
// Pollable status.
nni_pollable *mq_sendable;
nni_pollable *mq_recvable;
-
- // Filters.
- nni_msgq_filter mq_filter_fn;
- void * mq_filter_arg;
};
static void nni_msgq_run_notify(nni_msgq *);
@@ -143,13 +139,6 @@ nni_msgq_flush(nni_msgq *mq)
nni_mtx_unlock(&mq->mq_lock);
}
-void
-nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg)
-{
- mq->mq_filter_fn = filter;
- mq->mq_filter_arg = arg;
-}
-
static void
nni_msgq_run_putq(nni_msgq *mq)
{
@@ -167,15 +156,8 @@ nni_msgq_run_putq(nni_msgq *mq)
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- len = nni_msg_len(msg);
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
- }
-
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
nni_aio_finish(waio, 0, len);
continue;
}
@@ -213,13 +195,8 @@ nni_msgq_run_getq(nni_msgq *mq)
}
mq->mq_len--;
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
- }
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -234,14 +211,8 @@ nni_msgq_run_getq(nni_msgq *mq)
nni_aio_list_remove(waio);
nni_aio_finish(waio, 0, len);
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- len = nni_msg_len(msg);
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
- }
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
continue;
}
@@ -352,14 +323,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- if (mq->mq_filter_fn != NULL) {
- msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
- }
- if (msg != NULL) {
- nni_list_remove(&mq->mq_aio_getq, raio);
- nni_aio_finish_msg(raio, msg);
- nni_msgq_run_notify(mq);
- }
+ nni_list_remove(&mq->mq_aio_getq, raio);
+ nni_aio_finish_msg(raio, msg);
+ nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 44f9e765..2c43e540 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -49,19 +49,6 @@ extern int nni_msgq_tryput(nni_msgq *, nni_msg *);
// Readers (nni_msgq_put*) are unaffected.
extern void nni_msgq_set_get_error(nni_msgq *, int);
-// nni_msgq_filter is a callback function used to filter messages.
-// The function is called on entry (put) or exit (get). The void
-// argument is an opaque pointer supplied with the function at registration
-// time. The primary use for these functions is to support the protocol
-// socket needs.
-typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);
-
-// nni_msgq_set_filter sets the filter on the queue. Messages
-// are filtered through this just before they are returned via the get
-// functions. If the filter returns NULL, then the message is silently
-// discarded instead, and any get waiters remain waiting.
-extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);
-
// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index f164ee45..005e34fd 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -105,18 +105,6 @@ struct nni_proto_sock_ops {
// Receive a message.
void (*sock_recv)(void *, nni_aio *);
- // Message filter. This may be NULL, but if it isn't, then
- // messages coming into the system are routed here just before being
- // delivered to the application. To drop the message, the protocol
- // should return NULL, otherwise the message (possibly modified).
- nni_msg *(*sock_filter)(void *, nni_msg *);
-
- // Socket draining is intended to permit protocols to "drain"
- // before exiting. For protocols where draining makes no
- // sense, this may be NULL. (Example: REQ and SURVEYOR should
- // not drain, because they cannot receive a reply!)
- void (*sock_drain)(void *, nni_aio *);
-
// Options. Must not be NULL. Final entry should have NULL name.
nni_option *sock_options;
};
diff --git a/src/core/socket.c b/src/core/socket.c
index dadc9073..0224a812 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -571,11 +571,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
(void) nni_sock_setopt(
s, NNG_OPT_TCP_KEEPALIVE, &on, sizeof(on), NNI_TYPE_BOOL);
- if (s->s_sock_ops.sock_filter != NULL) {
- nni_msgq_set_filter(
- s->s_urq, s->s_sock_ops.sock_filter, s->s_data);
- }
-
*sp = s;
return (rv);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 6618d794..9aef40a5 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -37,7 +37,6 @@ struct inproc_pipe {
nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
- size_t rcvmax;
};
// inproc_pair represents a pair of pipes. Because we control both
@@ -61,7 +60,6 @@ struct inproc_ep {
nni_mtx mtx;
nni_dialer * ndialer;
nni_listener *nlistener;
- nni_stat_item st_rcvmaxsz;
};
// nni_inproc is our global state - this contains the list of active endpoints
@@ -115,9 +113,6 @@ inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep)
if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_lock(&ep->mtx);
- pipe->rcvmax = ep->rcvmax;
- nni_mtx_unlock(&ep->mtx);
pipe->proto = ep->proto;
pipe->addr = ep->addr;
@@ -229,13 +224,6 @@ inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer)
ep->addr = url->u_rawurl; // we match on the full URL.
- nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
- nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
- nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
- nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx);
-
- nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz);
-
*epp = ep;
return (0);
}
@@ -260,12 +248,6 @@ inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener)
ep->addr = url->u_rawurl; // we match on the full URL.
- nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
- nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL);
- nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES);
- nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx);
- nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz);
-
*epp = ep;
return (0);
}
@@ -301,18 +283,6 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe)
}
}
-static nni_msg *
-inproc_filter(void *arg, nni_msg *msg)
-{
- inproc_pipe *p = arg;
- if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) {
- nni_pipe_bump_error(p->npipe, NNG_EMSGSIZE);
- nni_msg_free(msg);
- return (NULL);
- }
- return (msg);
-}
-
static void
inproc_ep_close(void *arg)
{
@@ -395,8 +365,6 @@ inproc_accept_clients(inproc_ep *srv)
cpipe->rq = spipe->wq = pair->q[0];
cpipe->wq = spipe->rq = pair->q[1];
- nni_msgq_set_filter(spipe->rq, inproc_filter, spipe);
- nni_msgq_set_filter(cpipe->rq, inproc_filter, cpipe);
inproc_conn_finish(caio, 0, cli, cpipe);
inproc_conn_finish(saio, 0, srv, spipe);
}
@@ -541,7 +509,6 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
- nni_stat_set_value(&ep->st_rcvmaxsz, val);
nni_mtx_unlock(&ep->mtx);
}
return (rv);
diff --git a/tests/set_recvmaxsize.c b/tests/set_recvmaxsize.c
index 381f77fa..1e8c255a 100644
--- a/tests/set_recvmaxsize.c
+++ b/tests/set_recvmaxsize.c
@@ -15,36 +15,35 @@
#define RCVBUFSIZE 200
const char *addrs[] = {
- "inproc:///tmp/inproctmp_setrecvmaxsz",
- "ipc:///tmp/ipctemp_setrecvmaxsz",
- "tcp://127.0.0.1:43895",
- "ws://127.0.0.1:43897",
+ "ipc:///tmp/ipctemp_setrecvmaxsz",
+ "tcp://127.0.0.1:43895",
+ "ws://127.0.0.1:43897",
};
TestMain("recvmaxsize", {
// we don't actually care what the content of the message is.
- char msg[SNDBUFSIZE];
- char rcvbuf[RCVBUFSIZE];
- size_t rcvsize = RCVBUFSIZE;
- nng_socket s0;
- nng_socket s1;
+ char msg[SNDBUFSIZE];
+ char rcvbuf[RCVBUFSIZE];
+ size_t rcvsize = RCVBUFSIZE;
+ nng_socket s0;
+ nng_socket s1;
nng_listener l;
- int numproto = sizeof addrs / sizeof *addrs;
- Convey("recvmaxsize can be set after listening", {
- for (int i = 0; i < numproto; i++) {
- const char *addr = addrs[i];
- So(nng_pair1_open(&s0) == 0);
- So(nng_setopt_ms(s0, NNG_OPT_RECVTIMEO, 100) == 0);
- So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 200) == 0);
- So(nng_listen(s0, addr, &l, 0) == 0);
- So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 100) == 0);
+ int numproto = sizeof addrs / sizeof *addrs;
+ Convey("recvmaxsize can be set after listening", {
+ for (int i = 0; i < numproto; i++) {
+ const char *addr = addrs[i];
+ So(nng_pair1_open(&s0) == 0);
+ So(nng_setopt_ms(s0, NNG_OPT_RECVTIMEO, 100) == 0);
+ So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 200) == 0);
+ So(nng_listen(s0, addr, &l, 0) == 0);
+ So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 100) == 0);
- So(nng_pair1_open(&s1) == 0);
- So(nng_dial(s1, addr, NULL, 0) == 0);
- So(nng_send(s1, msg, 150, 0) == 0);
- So(nng_recv(s0, rcvbuf, &rcvsize, 0) == NNG_ETIMEDOUT);
- So(nng_close(s0) == 0);
- So(nng_close(s1) == 0);
- }
- });
+ So(nng_pair1_open(&s1) == 0);
+ So(nng_dial(s1, addr, NULL, 0) == 0);
+ So(nng_send(s1, msg, 150, 0) == 0);
+ So(nng_recv(s0, rcvbuf, &rcvsize, 0) == NNG_ETIMEDOUT);
+ So(nng_close(s0) == 0);
+ So(nng_close(s1) == 0);
+ }
+ });
})