aboutsummaryrefslogtreecommitdiff
path: root/src/transport/inproc
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/inproc')
-rw-r--r--src/transport/inproc/inproc.c186
1 files changed, 56 insertions, 130 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 40ab9107..6618d794 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -1,5 +1,5 @@
//
-// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2018 Devolutions <info@devolutions.net>
//
@@ -30,24 +30,14 @@ typedef struct {
// inproc_pipe represents one half of a connection.
struct inproc_pipe {
- const char * addr;
- inproc_pair * pair;
- nni_msgq * rq;
- nni_msgq * wq;
- nni_pipe * npipe;
- uint16_t peer;
- uint16_t proto;
- size_t rcvmax;
- nni_stat_item st_rxbytes;
- nni_stat_item st_txbytes;
- nni_stat_item st_rxmsgs;
- nni_stat_item st_txmsgs;
- nni_stat_item st_rxdiscards;
- nni_stat_item st_txdiscards;
- nni_stat_item st_rxerrs;
- nni_stat_item st_txerrs;
- nni_stat_item st_rxoversize;
- nni_stat_item st_rcvmaxsz;
+ const char * addr;
+ inproc_pair *pair;
+ nni_msgq * rq;
+ nni_msgq * wq;
+ nni_pipe * npipe;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcvmax;
};
// inproc_pair represents a pair of pipes. Because we control both
@@ -135,120 +125,12 @@ inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep)
return (0);
}
-#ifdef NNG_ENABLE_STATS
-static void
-inproc_get_rxbytes(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_get_bytes(mq));
-}
-
-static void
-inproc_get_rxmsgs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_get_msgs(mq));
-}
-
-static void
-inproc_get_txbytes(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_put_bytes(mq));
-}
-
-static void
-inproc_get_txmsgs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_put_msgs(mq));
-}
-
-static void
-inproc_get_discards(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_discards(mq));
-}
-
-static void
-inproc_get_txerrs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_put_errs(mq));
-}
-
-static void
-inproc_get_rxerrs(nni_stat_item *st, void *arg)
-{
- nni_msgq *mq = arg;
- nni_stat_set_value(st, nni_msgq_stat_get_errs(mq));
-}
-#else
-#undef nni_stat_set_update
-#define nni_stat_set_update(p, x, f)
-#endif
-
static int
inproc_pipe_init(void *arg, nni_pipe *p)
{
inproc_pipe *pipe = arg;
pipe->npipe = p;
- nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)");
- nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxbytes, NNG_UNIT_BYTES);
- nni_pipe_add_stat(p, &pipe->st_rxbytes);
-
- nni_stat_init(&pipe->st_txbytes, "txbytes", "bytes sent (raw)");
- nni_stat_set_update(&pipe->st_txbytes, inproc_get_txbytes, pipe->wq);
- nni_stat_set_unit(&pipe->st_txbytes, NNG_UNIT_BYTES);
- nni_pipe_add_stat(p, &pipe->st_txbytes);
-
- nni_stat_init(&pipe->st_rxmsgs, "rxmsgs", "msgs received");
- nni_stat_set_update(&pipe->st_rxmsgs, inproc_get_rxmsgs, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxmsgs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxmsgs);
-
- nni_stat_init(&pipe->st_txmsgs, "txmsgs", "msgs sent");
- nni_stat_set_update(&pipe->st_txmsgs, inproc_get_txmsgs, pipe->wq);
- nni_stat_set_unit(&pipe->st_txmsgs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_txmsgs);
-
- nni_stat_init(
- &pipe->st_rxdiscards, "rxdiscards", "receives discarded");
- nni_stat_set_update(
- &pipe->st_rxdiscards, inproc_get_discards, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxdiscards, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxdiscards);
-
- nni_stat_init(&pipe->st_txdiscards, "txdiscards", "sends discarded");
- nni_stat_set_update(
- &pipe->st_txdiscards, inproc_get_discards, pipe->wq);
- nni_stat_set_unit(&pipe->st_txdiscards, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_txdiscards);
-
- nni_stat_init(&pipe->st_rxerrs, "rxerrs", "receive errors");
- nni_stat_set_update(&pipe->st_rxerrs, inproc_get_rxerrs, pipe->rq);
- nni_stat_set_unit(&pipe->st_rxerrs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxerrs);
-
- nni_stat_init(&pipe->st_txerrs, "txerrs", "send errors");
- nni_stat_set_update(&pipe->st_txerrs, inproc_get_txerrs, pipe->wq);
- nni_stat_set_unit(&pipe->st_txerrs, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_txerrs);
-
- nni_stat_init_atomic(&pipe->st_rxoversize, "rxoversize",
- "oversize msgs received (dropped)");
- nni_stat_set_unit(&pipe->st_rxoversize, NNG_UNIT_MESSAGES);
- nni_pipe_add_stat(p, &pipe->st_rxoversize);
-
- nni_stat_init(&pipe->st_rcvmaxsz, "rcvmaxsz", "maximum receive size");
- nni_stat_set_type(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
- nni_stat_set_unit(&pipe->st_rcvmaxsz, NNG_UNIT_BYTES);
- nni_stat_set_value(&pipe->st_rcvmaxsz, pipe->rcvmax);
- nni_pipe_add_stat(p, &pipe->st_rcvmaxsz);
-
return (0);
}
@@ -409,6 +291,11 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe)
nni_aio_set_output(aio, 0, pipe);
nni_aio_finish(aio, 0, 0);
} else {
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
NNI_ASSERT(pipe == NULL);
nni_aio_finish_error(aio, rv);
}
@@ -419,7 +306,7 @@ inproc_filter(void *arg, nni_msg *msg)
{
inproc_pipe *p = arg;
if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) {
- nni_stat_inc_atomic(&p->st_rxoversize, 1);
+ nni_pipe_bump_error(p->npipe, NNG_EMSGSIZE);
nni_msg_free(msg);
return (NULL);
}
@@ -535,6 +422,11 @@ inproc_ep_cancel(nni_aio *aio, void *arg, int rv)
nni_list_node_remove(&ep->node);
nni_aio_finish_error(aio, rv);
}
+ if (ep->ndialer != NULL) {
+ nni_dialer_bump_error(ep->ndialer, rv);
+ } else {
+ nni_listener_bump_error(ep->nlistener, rv);
+ }
nni_mtx_unlock(&nni_inproc.mx);
}
@@ -559,6 +451,7 @@ inproc_ep_connect(void *arg, nni_aio *aio)
}
if (server == NULL) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_dialer_bump_error(ep->ndialer, NNG_ECONNREFUSED);
nni_aio_finish_error(aio, NNG_ECONNREFUSED);
return;
}
@@ -568,6 +461,7 @@ inproc_ep_connect(void *arg, nni_aio *aio)
// that in the upper API.
if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_dialer_bump_error(ep->ndialer, rv);
nni_aio_finish_error(aio, rv);
return;
}
@@ -590,6 +484,7 @@ inproc_ep_bind(void *arg)
NNI_LIST_FOREACH (list, srch) {
if (strcmp(srch->addr, ep->addr) == 0) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_listener_bump_error(ep->nlistener, NNG_EADDRINUSE);
return (NNG_EADDRINUSE);
}
}
@@ -614,6 +509,7 @@ inproc_ep_accept(void *arg, nni_aio *aio)
// accept was tried -- there is no API to do such a thing.
if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
nni_mtx_unlock(&nni_inproc.mx);
+ nni_listener_bump_error(ep->nlistener, rv);
nni_aio_finish_error(aio, rv);
return;
}
@@ -642,8 +538,7 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t)
inproc_ep *ep = arg;
size_t val;
int rv;
- if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) &&
- (ep != NULL)) {
+ if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) {
nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
nni_stat_set_value(&ep->st_rcvmaxsz, val);
@@ -728,6 +623,36 @@ inproc_ep_setopt(
return (nni_setopt(inproc_ep_options, name, arg, v, sz, t));
}
+static int
+inproc_check_recvmaxsz(const void *v, size_t sz, nni_type t)
+{
+ return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t));
+}
+
+static nni_chkoption inproc_checkopts[] = {
+ {
+ .o_name = NNG_OPT_RECVMAXSZ,
+ .o_check = inproc_check_recvmaxsz,
+ },
+ {
+ .o_name = NNG_OPT_LOCADDR,
+ },
+ {
+ .o_name = NNG_OPT_REMADDR,
+ },
+ {
+ .o_name = NULL,
+ },
+};
+
+static int
+inproc_checkopt(const char *name, const void *buf, size_t sz, nni_type t)
+{
+ int rv;
+ rv = nni_chkopt(inproc_checkopts, name, buf, sz, t);
+ return (rv);
+}
+
static nni_tran_dialer_ops inproc_dialer_ops = {
.d_init = inproc_dialer_init,
.d_fini = inproc_ep_fini,
@@ -757,6 +682,7 @@ struct nni_tran nni_inproc_tran = {
.tran_pipe = &inproc_pipe_ops,
.tran_init = inproc_init,
.tran_fini = inproc_fini,
+ .tran_checkopt = inproc_checkopt,
};
int