diff options
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 186 |
1 files changed, 56 insertions, 130 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 40ab9107..6618d794 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -30,24 +30,14 @@ typedef struct { // inproc_pipe represents one half of a connection. struct inproc_pipe { - const char * addr; - inproc_pair * pair; - nni_msgq * rq; - nni_msgq * wq; - nni_pipe * npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - nni_stat_item st_rxbytes; - nni_stat_item st_txbytes; - nni_stat_item st_rxmsgs; - nni_stat_item st_txmsgs; - nni_stat_item st_rxdiscards; - nni_stat_item st_txdiscards; - nni_stat_item st_rxerrs; - nni_stat_item st_txerrs; - nni_stat_item st_rxoversize; - nni_stat_item st_rcvmaxsz; + const char * addr; + inproc_pair *pair; + nni_msgq * rq; + nni_msgq * wq; + nni_pipe * npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; }; // inproc_pair represents a pair of pipes. Because we control both @@ -135,120 +125,12 @@ inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) return (0); } -#ifdef NNG_ENABLE_STATS -static void -inproc_get_rxbytes(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq)); -} - -static void -inproc_get_rxmsgs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq)); -} - -static void -inproc_get_txbytes(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq)); -} - -static void -inproc_get_txmsgs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq)); -} - -static void -inproc_get_discards(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_discards(mq)); -} - -static void -inproc_get_txerrs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_put_errs(mq)); -} - -static void -inproc_get_rxerrs(nni_stat_item *st, void *arg) -{ - nni_msgq *mq = arg; - nni_stat_set_value(st, nni_msgq_stat_get_errs(mq)); -} -#else -#undef nni_stat_set_update -#define nni_stat_set_update(p, x, f) -#endif - static int inproc_pipe_init(void *arg, nni_pipe *p) { inproc_pipe *pipe = arg; pipe->npipe = p; - nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)"); - nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq); - nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES); - nni_pipe_add_stat(p, &pipe->st_rxbytes); - - nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)"); - nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq); - nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES); - nni_pipe_add_stat(p, &pipe->st_txbytes); - - nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received"); - nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq); - nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxmsgs); - - nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent"); - nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq); - nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_txmsgs); - - nni_stat_init( - &pipe->st_rxdiscards, "rxdiscards", "receives discarded"); - nni_stat_set_update( - &pipe->st_rxdiscards, inproc_get_discards, pipe->rq); - nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxdiscards); - - nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded"); - nni_stat_set_update( - &pipe->st_txdiscards, inproc_get_discards, pipe->wq); - nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_txdiscards); - - nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors"); - nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq); - nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxerrs); - - nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors"); - nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq); - nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_txerrs); - - nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize", - "oversize msgs received (dropped)"); - nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES); - nni_pipe_add_stat(p, &pipe->st_rxoversize); - - nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); - nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES); - nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES); - nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax); - nni_pipe_add_stat(p, &pipe->st_rcvmaxsz); - return (0); } @@ -409,6 +291,11 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) nni_aio_set_output(aio, 0, pipe); nni_aio_finish(aio, 0, 0); } else { + if (ep->ndialer != NULL) { + nni_dialer_bump_error(ep->ndialer, rv); + } else { + nni_listener_bump_error(ep->nlistener, rv); + } NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } @@ -419,7 +306,7 @@ inproc_filter(void *arg, nni_msg *msg) { inproc_pipe *p = arg; if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) { - nni_stat_inc_atomic(&p->st_rxoversize, 1); + nni_pipe_bump_error(p->npipe, NNG_EMSGSIZE); nni_msg_free(msg); return (NULL); } @@ -535,6 +422,11 @@ inproc_ep_cancel(nni_aio *aio, void *arg, int rv) nni_list_node_remove(&ep->node); nni_aio_finish_error(aio, rv); } + if (ep->ndialer != NULL) { + nni_dialer_bump_error(ep->ndialer, rv); + } else { + nni_listener_bump_error(ep->nlistener, rv); + } nni_mtx_unlock(&nni_inproc.mx); } @@ -559,6 +451,7 @@ inproc_ep_connect(void *arg, nni_aio *aio) } if (server == NULL) { nni_mtx_unlock(&nni_inproc.mx); + nni_dialer_bump_error(ep->ndialer, NNG_ECONNREFUSED); nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; } @@ -568,6 +461,7 @@ inproc_ep_connect(void *arg, nni_aio *aio) // that in the upper API. if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&nni_inproc.mx); + nni_dialer_bump_error(ep->ndialer, rv); nni_aio_finish_error(aio, rv); return; } @@ -590,6 +484,7 @@ inproc_ep_bind(void *arg) NNI_LIST_FOREACH (list, srch) { if (strcmp(srch->addr, ep->addr) == 0) { nni_mtx_unlock(&nni_inproc.mx); + nni_listener_bump_error(ep->nlistener, NNG_EADDRINUSE); return (NNG_EADDRINUSE); } } @@ -614,6 +509,7 @@ inproc_ep_accept(void *arg, nni_aio *aio) // accept was tried -- there is no API to do such a thing. if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&nni_inproc.mx); + nni_listener_bump_error(ep->nlistener, rv); nni_aio_finish_error(aio, rv); return; } @@ -642,8 +538,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) inproc_ep *ep = arg; size_t val; int rv; - if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) && - (ep != NULL)) { + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { nni_mtx_lock(&ep->mtx); ep->rcvmax = val; nni_stat_set_value(&ep->st_rcvmaxsz, val); @@ -728,6 +623,36 @@ inproc_ep_setopt( return (nni_setopt(inproc_ep_options, name, arg, v, sz, t)); } +static int +inproc_check_recvmaxsz(const void *v, size_t sz, nni_type t) +{ + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); +} + +static nni_chkoption inproc_checkopts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_check = inproc_check_recvmaxsz, + }, + { + .o_name = NNG_OPT_LOCADDR, + }, + { + .o_name = NNG_OPT_REMADDR, + }, + { + .o_name = NULL, + }, +}; + +static int +inproc_checkopt(const char *name, const void *buf, size_t sz, nni_type t) +{ + int rv; + rv = nni_chkopt(inproc_checkopts, name, buf, sz, t); + return (rv); +} + static nni_tran_dialer_ops inproc_dialer_ops = { .d_init = inproc_dialer_init, .d_fini = inproc_ep_fini, @@ -757,6 +682,7 @@ struct nni_tran nni_inproc_tran = { .tran_pipe = &inproc_pipe_ops, .tran_init = inproc_init, .tran_fini = inproc_fini, + .tran_checkopt = inproc_checkopt, }; int |
