diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-03-13 00:50:48 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-03-13 00:50:48 -0700 |
| commit | f723fa9655e1e7fadc1a15b94b66de674ab9fe17 (patch) | |
| tree | 8788e128e723bbd936bee0b03287a49df789a116 | |
| parent | f65f819f7fb3bbe9e24bc73342b4f335f5034fe0 (diff) | |
| download | nng-f723fa9655e1e7fadc1a15b94b66de674ab9fe17.tar.gz nng-f723fa9655e1e7fadc1a15b94b66de674ab9fe17.tar.bz2 nng-f723fa9655e1e7fadc1a15b94b66de674ab9fe17.zip | |
fixes #815 Eliminate socket filters on message queues
This also eliminates the enforcement of NNG_OPT_RECVMAXSZ for inproc,
which never really made much sense. This helps inproc go faster.
While here, also clean up the entry point for protocols to support
a drain option, since we don't use that anywhere.
| -rw-r--r-- | docs/man/nng_inproc.7.adoc | 18 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 52 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 15 | ||||
| -rw-r--r-- | src/core/protocol.h | 14 | ||||
| -rw-r--r-- | src/core/socket.c | 5 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 33 | ||||
| -rw-r--r-- | tests/set_recvmaxsize.c | 51 |
7 files changed, 48 insertions, 140 deletions
diff --git a/docs/man/nng_inproc.7.adoc b/docs/man/nng_inproc.7.adoc index cfd3806d..733b7789 100644 --- a/docs/man/nng_inproc.7.adoc +++ b/docs/man/nng_inproc.7.adoc @@ -1,6 +1,6 @@ = nng_inproc(7) // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This document is supplied under the terms of the MIT License, a @@ -53,17 +53,23 @@ that URI. === Socket Address -When using an `<<nng_sockaddr.5#,nng_sockaddr>>` structure, +When using an xref:nng_sockaddr.5.adoc[`nng_sockaddr`] structure, the actual structure is of type -`<<nng_sockaddr_inproc.5#,nng_sockaddr_inproc>>`. +xref:nng_sockaddr_inproc.5.adoc[`nng_sockaddr_inproc`]. === Transport Options The _inproc_ transport has no special options. +NOTE: While _inproc_ accepts the option +xref:nng_options.5.adoc#NNG_OPT_RECVMAXSZ[`NNG_OPT_RECVMAXSZ`] for +compatibility, the value of the option is ignored with no enforcement. +As _inproc_ peers are in the same address space, they are implicitly trusted, +and thus it makes no sense to spend cycles protecting a program from itself. + == SEE ALSO [.text-left] -<<nng_inproc_register.3#,nng_inproc_register(3)>>, -<<nng_sockaddr_inproc.5#,nng_sockaddr_inproc(5)>>, -<<nng.7#,nng(7)>> +xref:nng_inproc_register.3.adoc[nng_inproc_register(3)], +xref:nng_sockaddr_inproc.5.adoc[nng_sockaddr_inproc(5)], +xref:nng.7.adoc[nng(7)] diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 92213265..697ca503 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -32,10 +32,6 @@ struct nni_msgq { // Pollable status. nni_pollable *mq_sendable; nni_pollable *mq_recvable; - - // Filters. - nni_msgq_filter mq_filter_fn; - void * mq_filter_arg; }; static void nni_msgq_run_notify(nni_msgq *); @@ -143,13 +139,6 @@ nni_msgq_flush(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); } -void -nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg) -{ - mq->mq_filter_fn = filter; - mq->mq_filter_arg = arg; -} - static void nni_msgq_run_putq(nni_msgq *mq) { @@ -167,15 +156,8 @@ nni_msgq_run_putq(nni_msgq *mq) nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - len = nni_msg_len(msg); - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); - } - + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); nni_aio_finish(waio, 0, len); continue; } @@ -213,13 +195,8 @@ nni_msgq_run_getq(nni_msgq *mq) } mq->mq_len--; - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); - } + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); continue; } @@ -234,14 +211,8 @@ nni_msgq_run_getq(nni_msgq *mq) nni_aio_list_remove(waio); nni_aio_finish(waio, 0, len); - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - len = nni_msg_len(msg); - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); - } + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); continue; } @@ -352,14 +323,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - nni_list_remove(&mq->mq_aio_getq, raio); - nni_aio_finish_msg(raio, msg); - nni_msgq_run_notify(mq); - } + nni_list_remove(&mq->mq_aio_getq, raio); + nni_aio_finish_msg(raio, msg); + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 44f9e765..2c43e540 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -49,19 +49,6 @@ extern int nni_msgq_tryput(nni_msgq *, nni_msg *); // Readers (nni_msgq_put*) are unaffected. extern void nni_msgq_set_get_error(nni_msgq *, int); -// nni_msgq_filter is a callback function used to filter messages. -// The function is called on entry (put) or exit (get). The void -// argument is an opaque pointer supplied with the function at registration -// time. The primary use for these functions is to support the protocol -// socket needs. -typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); - -// nni_msgq_set_filter sets the filter on the queue. Messages -// are filtered through this just before they are returned via the get -// functions. If the filter returns NULL, then the message is silently -// discarded instead, and any get waiters remain waiting. -extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); - // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. diff --git a/src/core/protocol.h b/src/core/protocol.h index f164ee45..005e34fd 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -105,18 +105,6 @@ struct nni_proto_sock_ops { // Receive a message. void (*sock_recv)(void *, nni_aio *); - // Message filter. This may be NULL, but if it isn't, then - // messages coming into the system are routed here just before being - // delivered to the application. To drop the message, the protocol - // should return NULL, otherwise the message (possibly modified). - nni_msg *(*sock_filter)(void *, nni_msg *); - - // Socket draining is intended to permit protocols to "drain" - // before exiting. For protocols where draining makes no - // sense, this may be NULL. (Example: REQ and SURVEYOR should - // not drain, because they cannot receive a reply!) - void (*sock_drain)(void *, nni_aio *); - // Options. Must not be NULL. Final entry should have NULL name. nni_option *sock_options; }; diff --git a/src/core/socket.c b/src/core/socket.c index dadc9073..0224a812 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -571,11 +571,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) (void) nni_sock_setopt( s, NNG_OPT_TCP_KEEPALIVE, &on, sizeof(on), NNI_TYPE_BOOL); - if (s->s_sock_ops.sock_filter != NULL) { - nni_msgq_set_filter( - s->s_urq, s->s_sock_ops.sock_filter, s->s_data); - } - *sp = s; return (rv); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 6618d794..9aef40a5 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -37,7 +37,6 @@ struct inproc_pipe { nni_pipe * npipe; uint16_t peer; uint16_t proto; - size_t rcvmax; }; // inproc_pair represents a pair of pipes. Because we control both @@ -61,7 +60,6 @@ struct inproc_ep { nni_mtx mtx; nni_dialer * ndialer; nni_listener *nlistener; - nni_stat_item st_rcvmaxsz; }; // nni_inproc is our global state - this contains the list of active endpoints @@ -115,9 +113,6 @@ inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_lock(&ep->mtx); - pipe->rcvmax = ep->rcvmax; - nni_mtx_unlock(&ep->mtx); pipe->proto = ep->proto; pipe->addr = ep->addr; @@ -229,13 +224,6 @@ inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) ep->addr = url->u_rawurl; // we match on the full URL. - nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); - nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL); - nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES); - nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx); - - nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz); - *epp = ep; return (0); } @@ -260,12 +248,6 @@ inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) ep->addr = url->u_rawurl; // we match on the full URL. - nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); - nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL); - nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES); - nni_stat_set_lock(&ep->st_rcvmaxsz, &ep->mtx); - nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz); - *epp = ep; return (0); } @@ -301,18 +283,6 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) } } -static nni_msg * -inproc_filter(void *arg, nni_msg *msg) -{ - inproc_pipe *p = arg; - if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) { - nni_pipe_bump_error(p->npipe, NNG_EMSGSIZE); - nni_msg_free(msg); - return (NULL); - } - return (msg); -} - static void inproc_ep_close(void *arg) { @@ -395,8 +365,6 @@ inproc_accept_clients(inproc_ep *srv) cpipe->rq = spipe->wq = pair->q[0]; cpipe->wq = spipe->rq = pair->q[1]; - nni_msgq_set_filter(spipe->rq, inproc_filter, spipe); - nni_msgq_set_filter(cpipe->rq, inproc_filter, cpipe); inproc_conn_finish(caio, 0, cli, cpipe); inproc_conn_finish(saio, 0, srv, spipe); } @@ -541,7 +509,6 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->rcvmax = val; - nni_stat_set_value(&ep->st_rcvmaxsz, val); nni_mtx_unlock(&ep->mtx); } return (rv); diff --git a/tests/set_recvmaxsize.c b/tests/set_recvmaxsize.c index 381f77fa..1e8c255a 100644 --- a/tests/set_recvmaxsize.c +++ b/tests/set_recvmaxsize.c @@ -15,36 +15,35 @@ #define RCVBUFSIZE 200 const char *addrs[] = { - "inproc:///tmp/inproctmp_setrecvmaxsz", - "ipc:///tmp/ipctemp_setrecvmaxsz", - "tcp://127.0.0.1:43895", - "ws://127.0.0.1:43897", + "ipc:///tmp/ipctemp_setrecvmaxsz", + "tcp://127.0.0.1:43895", + "ws://127.0.0.1:43897", }; TestMain("recvmaxsize", { // we don't actually care what the content of the message is. - char msg[SNDBUFSIZE]; - char rcvbuf[RCVBUFSIZE]; - size_t rcvsize = RCVBUFSIZE; - nng_socket s0; - nng_socket s1; + char msg[SNDBUFSIZE]; + char rcvbuf[RCVBUFSIZE]; + size_t rcvsize = RCVBUFSIZE; + nng_socket s0; + nng_socket s1; nng_listener l; - int numproto = sizeof addrs / sizeof *addrs; - Convey("recvmaxsize can be set after listening", { - for (int i = 0; i < numproto; i++) { - const char *addr = addrs[i]; - So(nng_pair1_open(&s0) == 0); - So(nng_setopt_ms(s0, NNG_OPT_RECVTIMEO, 100) == 0); - So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 200) == 0); - So(nng_listen(s0, addr, &l, 0) == 0); - So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 100) == 0); + int numproto = sizeof addrs / sizeof *addrs; + Convey("recvmaxsize can be set after listening", { + for (int i = 0; i < numproto; i++) { + const char *addr = addrs[i]; + So(nng_pair1_open(&s0) == 0); + So(nng_setopt_ms(s0, NNG_OPT_RECVTIMEO, 100) == 0); + So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 200) == 0); + So(nng_listen(s0, addr, &l, 0) == 0); + So(nng_setopt_size(s0, NNG_OPT_RECVMAXSZ, 100) == 0); - So(nng_pair1_open(&s1) == 0); - So(nng_dial(s1, addr, NULL, 0) == 0); - So(nng_send(s1, msg, 150, 0) == 0); - So(nng_recv(s0, rcvbuf, &rcvsize, 0) == NNG_ETIMEDOUT); - So(nng_close(s0) == 0); - So(nng_close(s1) == 0); - } - }); + So(nng_pair1_open(&s1) == 0); + So(nng_dial(s1, addr, NULL, 0) == 0); + So(nng_send(s1, msg, 150, 0) == 0); + So(nng_recv(s0, rcvbuf, &rcvsize, 0) == NNG_ETIMEDOUT); + So(nng_close(s0) == 0); + So(nng_close(s1) == 0); + } + }); }) |
