aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c49
1 files changed, 35 insertions, 14 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 6ba5d54e..d0c18744 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -104,6 +104,7 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
+ bool s_want_evs;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_root; // socket scope
@@ -1013,10 +1014,16 @@ nni_sock_flags(nni_sock *sock)
void
nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
{
- if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
+ if ((ev > NNG_PIPE_EV_NONE) && (ev < NNG_PIPE_EV_NUM)) {
nni_mtx_lock(&s->s_pipe_cbs_mtx);
s->s_pipe_cbs[ev].cb_fn = cb;
s->s_pipe_cbs[ev].cb_arg = arg;
+ s->s_want_evs = false;
+ for (ev = NNG_PIPE_EV_NONE; ev < NNG_PIPE_EV_NUM; ev++) {
+ if (s->s_pipe_cbs[ev].cb_fn != NULL) {
+ s->s_want_evs = true;
+ }
+ }
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
}
}
@@ -1538,25 +1545,39 @@ nni_pipe_start(nni_pipe *p)
void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
- nni_sock *s = p->p_sock;
- nng_pipe_cb cb;
- void *arg;
+ nni_sock *s = p->p_sock;
+ nng_pipe_cb cb;
+ void *arg;
+ bool wantevs;
+ static nni_mtx serialize = NNI_MTX_INITIALIZER;
nni_mtx_lock(&s->s_pipe_cbs_mtx);
- if (ev == NNG_PIPE_EV_ADD_PRE) {
- p->p_cbs = true;
- } else if (!p->p_cbs) {
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- return;
- }
- cb = s->s_pipe_cbs[ev].cb_fn;
- arg = s->s_pipe_cbs[ev].cb_arg;
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ wantevs = s->s_want_evs;
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- if (cb != NULL) {
+ if (wantevs) {
+ nni_mtx_lock(&serialize);
+ // this pipe never got an event before, so don't start now
+ if (p->p_last_event == NNG_PIPE_EV_NONE &&
+ ev != NNG_PIPE_EV_ADD_PRE) {
+ nni_mtx_unlock(&serialize);
+ return;
+ }
+ if (p->p_last_event >= ev) {
+ // this pipe event already was notified, or a "later"
+ // one was, so don't go backwards.
+ nni_mtx_unlock(&serialize);
+ return;
+ }
+ p->p_last_event = ev;
nng_pipe pid;
pid.id = p->p_id;
- cb(pid, ev, arg);
+ if (cb != NULL) {
+ cb(pid, ev, arg);
+ }
+ nni_mtx_unlock(&serialize);
}
}