From 3e70013111b70f1439b2f9991211c887a8eefff3 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 20 Aug 2018 07:02:47 -0700 Subject: fixes #541 inproc does not honor maxrecvsize option --- src/transport/inproc/inproc.c | 64 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index db8aeff5..89f9b024 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -35,6 +35,7 @@ struct nni_inproc_pipe { nni_msgq * wq; uint16_t peer; uint16_t proto; + size_t rcvmax; }; // nni_inproc_pair represents a pair of pipes. Because we control both @@ -54,6 +55,8 @@ struct nni_inproc_ep { nni_cv cv; nni_list clients; nni_list aios; + size_t rcvmax; + nni_mtx mtx; }; // nni_inproc is our global state - this contains the list of active endpoints @@ -107,6 +110,10 @@ nni_inproc_pipe_init(nni_inproc_pipe **pipep, nni_inproc_ep *ep) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_lock(&ep->mtx); + pipe->rcvmax = ep->rcvmax; + nni_mtx_unlock(&ep->mtx); + pipe->proto = ep->proto; pipe->addr = ep->addr; *pipep = pipe; @@ -196,9 +203,11 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_sock *sock) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&ep->mtx); ep->listener = false; ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -214,9 +223,11 @@ nni_inproc_listener_init(void **epp, nni_url *url, nni_sock *sock) if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&ep->mtx); ep->listener = true; ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); nni_aio_list_init(&ep->aios); @@ -229,7 +240,7 @@ static void nni_inproc_ep_fini(void *arg) { nni_inproc_ep *ep = arg; - + nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } @@ -253,6 +264,17 @@ nni_inproc_conn_finish(nni_aio *aio, int rv, nni_inproc_pipe *pipe) } } +static nni_msg * +inproc_filter(void *arg, nni_msg *msg) +{ + nni_inproc_pipe *p = arg; + if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) { + nni_msg_free(msg); + return (NULL); + } + return (msg); +} + static void nni_inproc_ep_close(void *arg) { @@ -333,6 +355,8 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) cpipe->rq = spipe->wq = pair->q[0]; cpipe->wq = spipe->rq = pair->q[1]; + nni_msgq_set_filter(spipe->rq, inproc_filter, spipe); + nni_msgq_set_filter(cpipe->rq, inproc_filter, cpipe); nni_inproc_conn_finish(caio, 0, cpipe); nni_inproc_conn_finish(saio, 0, spipe); } @@ -449,6 +473,37 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) nni_mtx_unlock(&nni_inproc.mx); } +static int +inproc_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + nni_inproc_ep *ep = arg; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + nni_inproc_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + +static int +inproc_check_recvmaxsz(const void *data, size_t sz, nni_opt_type t) +{ + return (nni_copyin_size(NULL, data, sz, 0, NNI_MAXSZ, t)); +} + static nni_tran_option nni_inproc_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, @@ -476,6 +531,13 @@ static nni_tran_pipe_ops nni_inproc_pipe_ops = { }; static nni_tran_option nni_inproc_ep_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_type = NNI_TYPE_SIZE, + .o_get = inproc_ep_get_recvmaxsz, + .o_set = inproc_ep_set_recvmaxsz, + .o_chk = inproc_check_recvmaxsz, + }, // terminate list { .o_name = NULL, -- cgit v1.2.3-70-g09d2