From f723fa9655e1e7fadc1a15b94b66de674ab9fe17 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 13 Mar 2019 00:50:48 -0700 Subject: fixes #815 Eliminate socket filters on message queues This also eliminates the enforcement of NNG_OPT_RECVMAXSZ for inproc, which never really made much sense. This helps inproc go faster. While here, also clean up the entry point for protocols to support a drain option, since we don't use that anywhere. --- src/core/msgqueue.c | 52 +++++++++------------------------------------------- src/core/msgqueue.h | 15 +-------------- src/core/protocol.h | 14 +------------- src/core/socket.c | 5 ----- 4 files changed, 11 insertions(+), 75 deletions(-) (limited to 'src/core') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 92213265..697ca503 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -32,10 +32,6 @@ struct nni_msgq { // Pollable status. nni_pollable *mq_sendable; nni_pollable *mq_recvable; - - // Filters. - nni_msgq_filter mq_filter_fn; - void * mq_filter_arg; }; static void nni_msgq_run_notify(nni_msgq *); @@ -143,13 +139,6 @@ nni_msgq_flush(nni_msgq *mq) nni_mtx_unlock(&mq->mq_lock); } -void -nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg) -{ - mq->mq_filter_fn = filter; - mq->mq_filter_arg = arg; -} - static void nni_msgq_run_putq(nni_msgq *mq) { @@ -167,15 +156,8 @@ nni_msgq_run_putq(nni_msgq *mq) nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - len = nni_msg_len(msg); - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); - } - + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); nni_aio_finish(waio, 0, len); continue; } @@ -213,13 +195,8 @@ nni_msgq_run_getq(nni_msgq *mq) } mq->mq_len--; - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); - } + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); continue; } @@ -234,14 +211,8 @@ nni_msgq_run_getq(nni_msgq *mq) nni_aio_list_remove(waio); nni_aio_finish(waio, 0, len); - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - len = nni_msg_len(msg); - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); - } + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); continue; } @@ -352,14 +323,9 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - if (mq->mq_filter_fn != NULL) { - msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); - } - if (msg != NULL) { - nni_list_remove(&mq->mq_aio_getq, raio); - nni_aio_finish_msg(raio, msg); - nni_msgq_run_notify(mq); - } + nni_list_remove(&mq->mq_aio_getq, raio); + nni_aio_finish_msg(raio, msg); + nni_msgq_run_notify(mq); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 44f9e765..2c43e540 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2019 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -49,19 +49,6 @@ extern int nni_msgq_tryput(nni_msgq *, nni_msg *); // Readers (nni_msgq_put*) are unaffected. extern void nni_msgq_set_get_error(nni_msgq *, int); -// nni_msgq_filter is a callback function used to filter messages. -// The function is called on entry (put) or exit (get). The void -// argument is an opaque pointer supplied with the function at registration -// time. The primary use for these functions is to support the protocol -// socket needs. -typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); - -// nni_msgq_set_filter sets the filter on the queue. Messages -// are filtered through this just before they are returned via the get -// functions. If the filter returns NULL, then the message is silently -// discarded instead, and any get waiters remain waiting. -extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); - // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. diff --git a/src/core/protocol.h b/src/core/protocol.h index f164ee45..005e34fd 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. +// Copyright 2019 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -105,18 +105,6 @@ struct nni_proto_sock_ops { // Receive a message. void (*sock_recv)(void *, nni_aio *); - // Message filter. This may be NULL, but if it isn't, then - // messages coming into the system are routed here just before being - // delivered to the application. To drop the message, the protocol - // should return NULL, otherwise the message (possibly modified). - nni_msg *(*sock_filter)(void *, nni_msg *); - - // Socket draining is intended to permit protocols to "drain" - // before exiting. For protocols where draining makes no - // sense, this may be NULL. (Example: REQ and SURVEYOR should - // not drain, because they cannot receive a reply!) - void (*sock_drain)(void *, nni_aio *); - // Options. Must not be NULL. Final entry should have NULL name. nni_option *sock_options; }; diff --git a/src/core/socket.c b/src/core/socket.c index dadc9073..0224a812 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -571,11 +571,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) (void) nni_sock_setopt( s, NNG_OPT_TCP_KEEPALIVE, &on, sizeof(on), NNI_TYPE_BOOL); - if (s->s_sock_ops.sock_filter != NULL) { - nni_msgq_set_filter( - s->s_urq, s->s_sock_ops.sock_filter, s->s_data); - } - *sp = s; return (rv); } -- cgit v1.2.3-70-g09d2