diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-02-18 20:15:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-23 11:50:17 -0800 |
| commit | 64e784237d143aa032311942bc44abd22e1e4114 (patch) | |
| tree | e3ea529d5e5adfd022773ab207622cc2247f057c /src/transport/inproc | |
| parent | d210ef96517c1462bc058c95bced8c27b5e19c4f (diff) | |
| download | nng-64e784237d143aa032311942bc44abd22e1e4114.tar.gz nng-64e784237d143aa032311942bc44abd22e1e4114.tar.bz2 nng-64e784237d143aa032311942bc44abd22e1e4114.zip | |
fixes #848 server hang waiting for client handshake
fixes #698 Need TCP stats
fixes #699 Need IPC stats
fixes #701 Need TLS stats
This commit addresses a problem when negotiating using one of the stream
based negotiation APIs -- a slow or misbehaving peer can prevent well
behaved ones from establishing a connection. The fix is a fairly
significant change in how these transports link up, and it does rely
on the fact that the socket only has a single accept() or connect()
pending at a time (on a given endpoint that is).
While here, we have completely revamped the way transport statistics are
done, offering a standard API for collecting these statistics.
Unfortunately, this completely borks the statistics for inproc. As we
are planning to change the way inproc works soon, in order to provide
more control and work on performance fixes for the message queue, we feel
this is an acceptable trade-off. Furthermore, almost nobody uses inproc
for anything, and even fewer people are making use of the statistics
at this time.
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 186 |
1 files changed, 56 insertions, 130 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 40ab9107..6618d794 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -30,24 +30,14 @@ typedef struct { // inproc_pipe represents one half of a connection. struct inproc_pipe { - const char * addr; - inproc_pair * pair; - nni_msgq * rq; - nni_msgq * wq; - nni_pipe * npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - nni_stat_item st_rxbytes; - nni_stat_item st_txbytes; - nni_stat_item st_rxmsgs; - nni_stat_item st_txmsgs; - nni_stat_item st_rxdiscards; - nni_stat_item st_txdiscards; - nni_stat_item st_rxerrs; - nni_stat_item st_txerrs; - nni_stat_item st_rxoversize; - nni_stat_item st_rcvmaxsz; + const char * addr; + inproc_pair *pair; + nni_msgq * rq; + nni_msgq * wq; + nni_pipe * npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; }; // inproc_pair represents a pair of pipes. Because we control both @@ -135,120 +125,12 @@ inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) return (0); } -#ifdef NNG_ENABLE_STATS -static void -inproc_get_rxbytes(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq)); -} - -static void -inproc_get_rxmsgs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq)); -} - -static void -inproc_get_txbytes(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq)); -} - -static void -inproc_get_txmsgs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq)); -} - -static void -inproc_get_discards(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_discards(mq)); -} - -static void -inproc_get_txerrs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_put_errs(mq)); -} - -static void -inproc_get_rxerrs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_get_errs(mq)); -} -#else -#undef nni_stat_set_update -#define nni_stat_set_update(p, x, f) -#endif - static int inproc_pipe_init(void *arg, nni_pipe *p) { inproc_pipe *pipe = arg; pipe->npipe = p; - nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)"); - nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq); - nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES); - nni_pipe_add_stat(p, &pipe->st_rxbytes); - - nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)"); - nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq); - nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES); - nni_pipe_add_stat(p, &pipe->st_txbytes); - - nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received"); - nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq); - nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxmsgs); - - nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent"); - nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq); - nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_txmsgs); - - nni_stat_init( - &pipe->st_rxdiscards, "rxdiscards", "receives discarded"); - nni_stat_set_update( - &pipe->st_rxdiscards, inproc_get_discards, pipe->rq); - nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxdiscards); - - nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded"); - nni_stat_set_update( - &pipe->st_txdiscards, inproc_get_discards, pipe->wq); - nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_txdiscards); - - nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors"); - nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq); - nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxerrs); - - nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors"); - nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq); - nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_txerrs); - - nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize", - "oversize msgs received (dropped)"); - nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxoversize); - - nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); - nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES); - nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES); - nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax); - nni_pipe_add_stat(p, &pipe->st_rcvmaxsz); - return (0); } @@ -409,6 +291,11 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) nni_aio_set_output(aio, 0, pipe); nni_aio_finish(aio, 0, 0); } else { + if (ep->ndialer != NULL) { + nni_dialer_bump_error(ep->ndialer, rv); + } else { + nni_listener_bump_error(ep->nlistener, rv); + } NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } @@ -419,7 +306,7 @@ inproc_filter(void *arg, nni_msg *msg) { inproc_pipe *p = arg; if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) { - nni_stat_inc_atomic(&p->st_rxoversize, 1); + nni_pipe_bump_error(p->npipe, NNG_EMSGSIZE); nni_msg_free(msg); return (NULL); } @@ -535,6 +422,11 @@ inproc_ep_cancel(nni_aio *aio, void *arg, int rv) nni_list_node_remove(&ep->node); nni_aio_finish_error(aio, rv); } + if (ep->ndialer != NULL) { + nni_dialer_bump_error(ep->ndialer, rv); + } else { + nni_listener_bump_error(ep->nlistener, rv); + } nni_mtx_unlock(&nni_inproc.mx); } @@ -559,6 +451,7 @@ inproc_ep_connect(void *arg, nni_aio *aio) } if (server == NULL) { nni_mtx_unlock(&nni_inproc.mx); + nni_dialer_bump_error(ep->ndialer, NNG_ECONNREFUSED); nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; } @@ -568,6 +461,7 @@ inproc_ep_connect(void *arg, nni_aio *aio) // that in the upper API. if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&nni_inproc.mx); + nni_dialer_bump_error(ep->ndialer, rv); nni_aio_finish_error(aio, rv); return; } @@ -590,6 +484,7 @@ inproc_ep_bind(void *arg) NNI_LIST_FOREACH (list, srch) { if (strcmp(srch->addr, ep->addr) == 0) { nni_mtx_unlock(&nni_inproc.mx); + nni_listener_bump_error(ep->nlistener, NNG_EADDRINUSE); return (NNG_EADDRINUSE); } } @@ -614,6 +509,7 @@ inproc_ep_accept(void *arg, nni_aio *aio) // accept was tried -- there is no API to do such a thing. if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&nni_inproc.mx); + nni_listener_bump_error(ep->nlistener, rv); nni_aio_finish_error(aio, rv); return; } @@ -642,8 +538,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) inproc_ep *ep = arg; size_t val; int rv; - if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) && - (ep != NULL)) { + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->rcvmax = val; nni_stat_set_value(&ep->st_rcvmaxsz, val); @@ -728,6 +623,36 @@ inproc_ep_setopt( return (nni_setopt(inproc_ep_options, name, arg, v, sz, t)); } +static int +inproc_check_recvmaxsz(const void *v, size_t sz, nni_type t) +{ + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); +} + +static nni_chkoption inproc_checkopts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_check = inproc_check_recvmaxsz, + }, + { + .o_name = NNG_OPT_LOCADDR, + }, + { + .o_name = NNG_OPT_REMADDR, + }, + { + .o_name = NULL, + }, +}; + +static int +inproc_checkopt(const char *name, const void *buf, size_t sz, nni_type t) +{ + int rv; + rv = nni_chkopt(inproc_checkopts, name, buf, sz, t); + return (rv); +} + static nni_tran_dialer_ops inproc_dialer_ops = { .d_init = inproc_dialer_init, .d_fini = inproc_ep_fini, @@ -757,6 +682,7 @@ struct nni_tran nni_inproc_tran = { .tran_pipe = &inproc_pipe_ops, .tran_init = inproc_init, .tran_fini = inproc_fini, + .tran_checkopt = inproc_checkopt, }; int |
