summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c301
1 files changed, 181 insertions, 120 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 66130d4a..11b1371e 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,7 +29,7 @@ struct nni_ctx {
void * c_data;
size_t c_size;
bool c_closed;
- unsigned c_refcnt; // protected by global lock
+ unsigned c_ref; // protected by global lock
uint32_t c_id;
nng_duration c_sndtimeo;
nng_duration c_rcvtimeo;
@@ -48,21 +48,6 @@ typedef struct nni_sock_pipe_cb {
void * cb_arg;
} nni_sock_pipe_cb;
-typedef struct sock_stats {
- nni_stat_item s_root; // socket scope
- nni_stat_item s_id; // socket id
- nni_stat_item s_name; // socket name
- nni_stat_item s_protocol; // socket protocol
- nni_stat_item s_ndialers; // number of dialers
- nni_stat_item s_nlisteners; // number of listeners
- nni_stat_item s_npipes; // number of pipes
- nni_stat_item s_rxbytes; // number of bytes received
- nni_stat_item s_txbytes; // number of bytes received
- nni_stat_item s_rxmsgs; // number of msgs received
- nni_stat_item s_txmsgs; // number of msgs sent
- nni_stat_item s_reject; // pipes rejected
-} sock_stats;
-
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -71,8 +56,8 @@ struct nni_socket {
uint32_t s_id;
uint32_t s_flags;
- unsigned s_refcnt; // protected by global lock
- void * s_data; // Protocol private
+ unsigned s_ref; // protected by global lock
+ void * s_data; // Protocol private
size_t s_size;
nni_msgq *s_uwq; // Upper write queue
@@ -93,7 +78,6 @@ struct nni_socket {
size_t s_rcvmaxsz; // max receive size
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
- char s_scope[24]; // socket scope ("socket%u", 32 bits max)
nni_list s_listeners; // active listeners
nni_list s_dialers; // active dialers
@@ -107,7 +91,20 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
- sock_stats s_stats;
+#ifdef NNG_ENABLE_STATS
+ nni_stat_item st_root; // socket scope
+ nni_stat_item st_id; // socket id
+ nni_stat_item st_name; // socket name
+ nni_stat_item st_protocol; // socket protocol
+ nni_stat_item st_dialers; // number of dialers
+ nni_stat_item st_listeners; // number of listeners
+ nni_stat_item st_pipes; // number of pipes
+ nni_stat_item st_rx_bytes; // number of bytes received
+ nni_stat_item st_tx_bytes; // number of bytes received
+ nni_stat_item st_rx_msgs; // number of msgs received
+ nni_stat_item st_tx_msgs; // number of msgs sent
+ nni_stat_item st_rejects; // pipes rejected
+#endif
};
static void nni_ctx_destroy(nni_ctx *);
@@ -379,7 +376,7 @@ nni_sock_find(nni_sock **sockp, uint32_t id)
if (s->s_closed) {
rv = NNG_ECLOSED;
} else {
- s->s_refcnt++;
+ s->s_ref++;
*sockp = s;
}
} else {
@@ -394,88 +391,129 @@ void
nni_sock_rele(nni_sock *s)
{
nni_mtx_lock(&sock_lk);
- s->s_refcnt--;
- if (s->s_closed && (s->s_refcnt < 2)) {
+ s->s_ref--;
+ if (s->s_closed && (s->s_ref < 2)) {
nni_cv_wake(&s->s_close_cv);
}
nni_mtx_unlock(&sock_lk);
}
+#ifdef NNG_ENABLE_STATS
static void
-sock_stats_fini(nni_sock *s)
+sock_stat_init(nni_sock *s, nni_stat_item *item, const nni_stat_info *info)
{
-#ifdef NNG_ENABLE_STATS
- sock_stats *st = &s->s_stats;
- nni_stat_unregister(&st->s_root);
-#else
- NNI_ARG_UNUSED(s);
-#endif
+ nni_stat_init(item, info);
+ nni_stat_add(&s->st_root, item);
}
static void
sock_stats_init(nni_sock *s)
{
-#ifdef NNG_ENABLE_STATS
- sock_stats * st = &s->s_stats;
- nni_stat_item *root = &s->s_stats.s_root;
+ static const nni_stat_info root_info = {
+ .si_name = "socket",
+ .si_desc = "socket statistics",
+ .si_type = NNG_STAT_SCOPE,
+ };
+ static const nni_stat_info id_info = {
+ .si_name = "id",
+ .si_desc = "socket identifier",
+ .si_type = NNG_STAT_ID,
+ };
+ static const nni_stat_info name_info = {
+ .si_name = "name",
+ .si_desc = "socket name",
+ .si_type = NNG_STAT_STRING,
+ .si_alloc = true,
+ };
+ static const nni_stat_info protocol_info = {
+ .si_name = "protocol",
+ .si_desc = "socket protocol",
+ .si_type = NNG_STAT_STRING,
+ };
+ static const nni_stat_info dialers_info = {
+ .si_name = "dialers",
+ .si_desc = "open dialers",
+ .si_type = NNG_STAT_LEVEL,
+ .si_atomic = true,
+ };
+ static const nni_stat_info listeners_info = {
+ .si_name = "listeners",
+ .si_desc = "open listeners",
+ .si_type = NNG_STAT_LEVEL,
+ .si_atomic = true,
+ };
+ static const nni_stat_info pipes_info = {
+ .si_name = "pipes",
+ .si_desc = "open pipes",
+ .si_type = NNG_STAT_LEVEL,
+ .si_atomic = true,
+ };
+ static const nni_stat_info reject_info = {
+ .si_name = "reject",
+ .si_desc = "rejected pipes",
+ .si_type = NNG_STAT_COUNTER,
+ .si_atomic = true,
+ };
+ static const nni_stat_info tx_msgs_info = {
+ .si_name = "tx_msgs",
+ .si_desc = "sent messages",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rx_msgs_info = {
+ .si_name = "rx_msgs",
+ .si_desc = "received messages",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_MESSAGES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info tx_bytes_info = {
+ .si_name = "tx_bytes",
+ .si_desc = "sent bytes",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
+ };
+ static const nni_stat_info rx_bytes_info = {
+ .si_name = "rx_bytes",
+ .si_desc = "received messages",
+ .si_type = NNG_STAT_COUNTER,
+ .si_unit = NNG_UNIT_BYTES,
+ .si_atomic = true,
+ };
// To make collection cheap and atomic for the socket,
// we just use a single lock for the entire chain.
- nni_stat_init_scope(root, s->s_scope, "socket statistics");
-
- nni_stat_init_id(&st->s_id, "id", "socket id", s->s_id);
- nni_stat_add(root, &st->s_id);
-
- nni_stat_init_string(&st->s_name, "name", "socket name", s->s_name);
- nni_stat_set_lock(&st->s_name, &s->s_mx);
- nni_stat_add(root, &st->s_name);
-
- nni_stat_init_string(&st->s_protocol, "protocol", "socket protocol",
- nni_sock_proto_name(s));
- nni_stat_add(root, &st->s_protocol);
-
- nni_stat_init_atomic(&st->s_ndialers, "ndialers", "open dialers");
- nni_stat_set_type(&st->s_ndialers, NNG_STAT_LEVEL);
- nni_stat_add(root, &st->s_ndialers);
-
- nni_stat_init_atomic(
- &st->s_nlisteners, "nlisteners", "open listeners");
- nni_stat_set_type(&st->s_nlisteners, NNG_STAT_LEVEL);
- nni_stat_add(root, &st->s_nlisteners);
-
- nni_stat_init_atomic(&st->s_npipes, "npipes", "open pipes");
- nni_stat_set_type(&st->s_npipes, NNG_STAT_LEVEL);
- nni_stat_add(root, &st->s_npipes);
-
- nni_stat_init_atomic(&st->s_rxbytes, "rxbytes", "bytes received");
- nni_stat_set_unit(&st->s_rxbytes, NNG_UNIT_BYTES);
- nni_stat_add(root, &st->s_rxbytes);
-
- nni_stat_init_atomic(&st->s_txbytes, "txbytes", "bytes sent");
- nni_stat_set_unit(&st->s_txbytes, NNG_UNIT_BYTES);
- nni_stat_add(root, &st->s_txbytes);
-
- nni_stat_init_atomic(&st->s_rxmsgs, "rxmsgs", "messages received");
- nni_stat_set_unit(&st->s_rxmsgs, NNG_UNIT_MESSAGES);
- nni_stat_add(root, &st->s_rxmsgs);
-
- nni_stat_init_atomic(&st->s_txmsgs, "txmsgs", "messages sent");
- nni_stat_set_unit(&st->s_txmsgs, NNG_UNIT_MESSAGES);
- nni_stat_add(root, &st->s_txmsgs);
-
- nni_stat_init_atomic(&st->s_reject, "reject", "pipes rejected");
- nni_stat_add(root, &st->s_reject);
-#else
- NNI_ARG_UNUSED(s);
-#endif
+ nni_stat_init(&s->st_root, &root_info);
+ sock_stat_init(s, &s->st_id, &id_info);
+ sock_stat_init(s, &s->st_name, &name_info);
+ sock_stat_init(s, &s->st_protocol, &protocol_info);
+ sock_stat_init(s, &s->st_dialers, &dialers_info);
+ sock_stat_init(s, &s->st_listeners, &listeners_info);
+ sock_stat_init(s, &s->st_pipes, &pipes_info);
+ sock_stat_init(s, &s->st_rejects, &reject_info);
+ sock_stat_init(s, &s->st_tx_msgs, &tx_msgs_info);
+ sock_stat_init(s, &s->st_rx_msgs, &rx_msgs_info);
+ sock_stat_init(s, &s->st_tx_bytes, &tx_bytes_info);
+ sock_stat_init(s, &s->st_rx_bytes, &rx_bytes_info);
+
+ nni_stat_set_id(&s->st_id, s->s_id);
+ nni_stat_set_string(&s->st_name, s->s_name);
+ nni_stat_set_string(&s->st_protocol, nni_sock_proto_name(s));
}
+#endif
static void
sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
+#ifdef NNG_ENABLE_STATS
+ nni_stat_unregister(&s->st_root);
+#endif
+
// The protocol needs to clean up its state.
if (s->s_data != NULL) {
s->s_sock_ops.sock_fini(s->s_data);
@@ -490,7 +528,6 @@ sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
- sock_stats_fini(s);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -519,7 +556,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
s->s_reconnmax = 0;
s->s_rcvmaxsz = 0; // unlimited by default
s->s_id = 0;
- s->s_refcnt = 0;
+ s->s_ref = 0;
s->s_self_id = proto->proto_self;
s->s_peer_id = proto->proto_peer;
s->s_flags = proto->proto_flags;
@@ -545,7 +582,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &sock_lk);
+
+#ifdef NNG_ENABLE_STATS
sock_stats_init(s);
+#endif
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
@@ -629,11 +669,10 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
(void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
// Set up basic stat values.
- (void) snprintf(s->s_scope, sizeof(s->s_scope), "socket%u", s->s_id);
- nni_stat_set_value(&s->s_stats.s_id, s->s_id);
+ nni_stat_set_id(&s->st_id, s->s_id);
// Add our stats chain.
- nni_stat_register(&s->s_stats.s_root);
+ nni_stat_register(&s->st_root);
return (0);
}
@@ -677,7 +716,7 @@ nni_sock_shutdown(nni_sock *sock)
while ((ctx = nctx) != NULL) {
nctx = nni_list_next(&sock->s_ctxs, ctx);
ctx->c_closed = true;
- if (ctx->c_refcnt == 0) {
+ if (ctx->c_ref == 0) {
// No open operations. So close it.
nni_id_remove(&ctx_ids, ctx->c_id);
nni_list_remove(&sock->s_ctxs, ctx);
@@ -767,8 +806,6 @@ nni_sock_close(nni_sock *s)
// is idempotent.
nni_sock_shutdown(s);
- nni_stat_unregister(&s->s_stats.s_root);
-
nni_mtx_lock(&sock_lk);
if (s->s_closed) {
// Some other thread called close. All we need to do
@@ -787,7 +824,7 @@ nni_sock_close(nni_sock *s)
// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
s->s_ctxwait = true;
- while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) {
+ while ((s->s_ref > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
nni_mtx_unlock(&sock_lk);
@@ -819,7 +856,7 @@ nni_sock_closeall(void)
}
// Bump the reference count. The close call below
// will drop it.
- s->s_refcnt++;
+ s->s_ref++;
nni_list_node_remove(&s->s_node);
nni_mtx_unlock(&sock_lk);
nni_sock_close(s);
@@ -901,7 +938,9 @@ nni_sock_add_listener(nni_sock *s, nni_listener *l)
nni_list_append(&s->s_listeners, l);
- nni_stat_inc_atomic(&s->s_stats.s_nlisteners, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&s->st_listeners, 1);
+#endif
nni_mtx_unlock(&s->s_mx);
return (0);
@@ -930,7 +969,9 @@ nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
nni_list_append(&s->s_dialers, d);
- nni_stat_inc_atomic(&s->s_stats.s_ndialers, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&s->st_dialers, 1);
+#endif
nni_mtx_unlock(&s->s_mx);
return (0);
@@ -1159,7 +1200,7 @@ nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing)
if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
rv = NNG_ECLOSED;
} else {
- ctx->c_refcnt++;
+ ctx->c_ref++;
*cp = ctx;
}
} else {
@@ -1186,8 +1227,8 @@ nni_ctx_rele(nni_ctx *ctx)
{
nni_sock *sock = ctx->c_sock;
nni_mtx_lock(&sock_lk);
- ctx->c_refcnt--;
- if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) {
+ ctx->c_ref--;
+ if ((ctx->c_ref > 0) || (!ctx->c_closed)) {
// Either still have an active reference, or not
// actually closing yet.
nni_mtx_unlock(&sock_lk);
@@ -1225,7 +1266,7 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
ctx->c_size = sz;
ctx->c_data = ctx + 1;
ctx->c_closed = false;
- ctx->c_refcnt = 1; // Caller implicitly gets a reference.
+ ctx->c_ref = 1; // Caller implicitly gets a reference.
ctx->c_sock = sock;
ctx->c_ops = sock->s_ctx_ops;
ctx->c_rcvtimeo = sock->s_rcvtimeo;
@@ -1413,29 +1454,37 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
- nni_stat_inc_atomic(&d->d_stats.s_npipes, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&s->st_pipes, 1);
+ nni_stat_inc(&d->st_pipes, 1);
+#endif
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&d->d_stats.s_reject, 1);
- nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&d->st_reject, 1);
+ nni_stat_inc(&s->st_rejects, 1);
+#endif
nni_pipe_rele(p);
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&d->d_stats.s_reject, 1);
- nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&d->st_reject, 1);
+ nni_stat_inc(&s->st_rejects, 1);
+#endif
nni_pipe_close(p);
nni_pipe_rele(p);
return;
}
nni_mtx_unlock(&s->s_mx);
- nni_stat_register(&p->p_stats.s_root);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_register(&p->st_root);
+#endif
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
nni_pipe_rele(p);
}
@@ -1483,7 +1532,9 @@ nni_dialer_reap(nni_dialer *d)
nni_aio_stop(&d->d_tmo_aio);
nni_aio_stop(&d->d_con_aio);
- nni_stat_unregister(&d->d_stats.s_root);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_unregister(&d->st_root);
+#endif
nni_mtx_lock(&s->s_mx);
if (!nni_list_empty(&d->d_pipes)) {
@@ -1524,29 +1575,35 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
nni_list_append(&l->l_pipes, p);
nni_list_append(&s->s_pipes, p);
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&l->l_stats.s_npipes, 1);
- nni_stat_inc_atomic(&s->s_stats.s_npipes, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&l->st_pipes, 1);
+ nni_stat_inc(&s->st_pipes, 1);
+#endif
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
nni_mtx_lock(&s->s_mx);
if (p->p_closed) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&l->l_stats.s_reject, 1);
- nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&l->st_reject, 1);
+ nni_stat_inc(&s->st_rejects, 1);
+#endif
nni_pipe_rele(p);
return;
}
if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
nni_mtx_unlock(&s->s_mx);
- nni_stat_inc_atomic(&l->l_stats.s_reject, 1);
- nni_stat_inc_atomic(&s->s_stats.s_reject, 1);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_inc(&l->st_reject, 1);
+ nni_stat_inc(&s->st_rejects, 1);
+#endif
nni_pipe_close(p);
nni_pipe_rele(p);
return;
}
nni_mtx_unlock(&s->s_mx);
- nni_stat_register(&p->p_stats.s_root);
+ nni_stat_register(&p->st_root);
nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
nni_pipe_rele(p);
}
@@ -1595,7 +1652,9 @@ nni_listener_reap(nni_listener *l)
nni_aio_stop(&l->l_tmo_aio);
nni_aio_stop(&l->l_acc_aio);
- nni_stat_unregister(&l->l_stats.s_root);
+#ifdef NNG_ENABLE_STATS
+ nni_stat_unregister(&l->st_root);
+#endif
nni_mtx_lock(&s->s_mx);
if (!nni_list_empty(&l->l_pipes)) {
@@ -1655,15 +1714,17 @@ nni_pipe_remove(nni_pipe *p)
nni_dialer *d = p->p_dialer;
nni_mtx_lock(&s->s_mx);
+#ifdef NNG_ENABLE_STATS
if (nni_list_node_active(&p->p_sock_node)) {
- nni_stat_dec_atomic(&s->s_stats.s_npipes, 1);
+ nni_stat_dec(&s->st_pipes, 1);
}
if (p->p_listener != NULL) {
- nni_stat_dec_atomic(&p->p_listener->l_stats.s_npipes, 1);
+ nni_stat_dec(&p->p_listener->st_pipes, 1);
}
if (p->p_dialer != NULL) {
- nni_stat_dec_atomic(&p->p_dialer->d_stats.s_npipes, 1);
+ nni_stat_dec(&p->p_dialer->st_pipes, 1);
}
+#endif
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
p->p_listener = NULL;
@@ -1682,7 +1743,7 @@ void
nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
{
#ifdef NNG_ENABLE_STATS
- nni_stat_add(&s->s_stats.s_root, stat);
+ nni_stat_add(&s->st_root, stat);
#else
NNI_ARG_UNUSED(s);
NNI_ARG_UNUSED(stat);
@@ -1693,8 +1754,8 @@ void
nni_sock_bump_tx(nni_sock *s, uint64_t sz)
{
#ifdef NNG_ENABLE_STATS
- nni_stat_inc_atomic(&s->s_stats.s_txmsgs, 1);
- nni_stat_inc_atomic(&s->s_stats.s_txbytes, sz);
+ nni_stat_inc(&s->st_tx_msgs, 1);
+ nni_stat_inc(&s->st_tx_bytes, sz);
#else
NNI_ARG_UNUSED(s);
NNI_ARG_UNUSED(sz);
@@ -1705,8 +1766,8 @@ void
nni_sock_bump_rx(nni_sock *s, uint64_t sz)
{
#ifdef NNG_ENABLE_STATS
- nni_stat_inc_atomic(&s->s_stats.s_rxmsgs, 1);
- nni_stat_inc_atomic(&s->s_stats.s_rxbytes, sz);
+ nni_stat_inc(&s->st_rx_msgs, 1);
+ nni_stat_inc(&s->st_rx_bytes, sz);
#else
NNI_ARG_UNUSED(s);
NNI_ARG_UNUSED(sz);