aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair1
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair1')
-rw-r--r--src/protocol/pair1/pair.c68
1 files changed, 51 insertions, 17 deletions
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index dc250943..3033b196 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -23,6 +23,8 @@
#define NNI_PROTO_PAIR_V1 NNI_PROTO(1, 1)
#endif
+#define BUMPSTAT(x) nni_stat_inc_atomic(x, 1)
+
typedef struct pair1_pipe pair1_pipe;
typedef struct pair1_sock pair1_sock;
@@ -35,16 +37,21 @@ static void pair1_pipe_fini(void *);
// pair1_sock is our per-socket protocol private structure.
struct pair1_sock {
- nni_msgq * uwq;
- nni_msgq * urq;
- bool raw;
- int ttl;
- nni_mtx mtx;
- nni_idhash *pipes;
- nni_list plist;
- bool started;
- bool poly;
- nni_aio * aio_getq;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_sock * nsock;
+ bool raw;
+ int ttl;
+ nni_mtx mtx;
+ nni_idhash * pipes;
+ nni_list plist;
+ bool started;
+ bool poly;
+ nni_aio * aio_getq;
+ nni_stat_item stat_poly;
+ nni_stat_item stat_raw;
+ nni_stat_item stat_rejmismatch;
+ nni_stat_item stat_rejinuse;
};
// pair1_pipe is our per-pipe protocol private structure.
@@ -94,12 +101,29 @@ pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw)
return (rv);
}
- s->raw = raw;
- s->poly = false;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
- *sp = s;
+ nni_stat_init_bool(
+ &s->stat_poly, "polyamorous", "polyamorous mode?", false);
+ nni_stat_set_lock(&s->stat_poly, &s->mtx);
+ nni_sock_add_stat(nsock, &s->stat_poly);
+
+ nni_stat_init_bool(&s->stat_raw, "raw", "raw mode?", raw);
+ nni_sock_add_stat(nsock, &s->stat_raw);
+
+ nni_stat_init_atomic(&s->stat_rejmismatch, "mismatch",
+ "pipes rejected (protocol mismatch)");
+ nni_sock_add_stat(nsock, &s->stat_rejmismatch);
+
+ nni_stat_init_atomic(&s->stat_rejinuse, "already",
+ "pipes rejected (already connected)");
+ nni_sock_add_stat(nsock, &s->stat_rejinuse);
+
+ s->nsock = nsock;
+ s->raw = raw;
+ s->poly = false;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
return (0);
}
@@ -173,13 +197,15 @@ pair1_pipe_start(void *arg)
uint32_t id;
int rv;
+ nni_mtx_lock(&s->mtx);
if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V1) {
+ nni_mtx_unlock(&s->mtx);
+ BUMPSTAT(&s->stat_rejmismatch);
// Peer protocol mismatch.
return (NNG_EPROTO);
}
id = nni_pipe_id(p->npipe);
- nni_mtx_lock(&s->mtx);
if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) {
nni_mtx_unlock(&s->mtx);
return (rv);
@@ -188,6 +214,7 @@ pair1_pipe_start(void *arg)
if (!nni_list_empty(&s->plist)) {
nni_idhash_remove(s->pipes, id);
nni_mtx_unlock(&s->mtx);
+ BUMPSTAT(&s->stat_rejinuse);
return (NNG_EBUSY);
}
} else {
@@ -242,6 +269,7 @@ pair1_pipe_recv_cb(void *arg)
uint32_t hdr;
nni_pipe * npipe = p->npipe;
int rv;
+ size_t len;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_close(p->npipe);
@@ -266,10 +294,12 @@ pair1_pipe_recv_cb(void *arg)
nni_pipe_close(npipe);
return;
}
+ len = nni_msg_len(msg);
// If we bounced too many times, discard the message, but
// keep getting more.
if (hdr > (unsigned) s->ttl) {
+ // STAT: bump TTLdrop
nni_msg_free(msg);
nni_pipe_recv(npipe, p->aio_recv);
return;
@@ -277,6 +307,7 @@ pair1_pipe_recv_cb(void *arg)
// Store the hop count in the header.
if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
+ // STAT: bump allocfail
nni_msg_free(msg);
nni_pipe_recv(npipe, p->aio_recv);
return;
@@ -284,6 +315,7 @@ pair1_pipe_recv_cb(void *arg)
// Send the message up.
nni_aio_set_msg(p->aio_putq, msg);
+ nni_sock_bump_rx(s->nsock, len);
nni_msgq_aio_put(s->urq, p->aio_putq);
}
@@ -452,6 +484,7 @@ pair1_sock_set_poly(void *arg, const void *buf, size_t sz, nni_opt_type t)
int rv;
nni_mtx_lock(&s->mtx);
rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->poly, buf, sz, t);
+ nni_stat_set_value(&s->stat_poly, s->poly);
nni_mtx_unlock(&s->mtx);
return (rv);
}
@@ -468,6 +501,7 @@ pair1_sock_send(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
+ nni_sock_bump_tx(s->nsock, nni_msg_len(nni_aio_get_msg(aio)));
nni_msgq_aio_put(s->uwq, aio);
}