From 67b4cea8852775712d9a2c8b4eac2f5f2b1a132b Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 28 Dec 2024 17:00:04 -0800 Subject: fixes #961 surprising pipe event order --- src/core/socket.c | 49 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 14 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 6ba5d54e..d0c18744 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -104,6 +104,7 @@ struct nni_socket { nni_mtx s_pipe_cbs_mtx; nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; + bool s_want_evs; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; // socket scope @@ -1013,10 +1014,16 @@ nni_sock_flags(nni_sock *sock) void nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) { - if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + if ((ev > NNG_PIPE_EV_NONE) && (ev < NNG_PIPE_EV_NUM)) { nni_mtx_lock(&s->s_pipe_cbs_mtx); s->s_pipe_cbs[ev].cb_fn = cb; s->s_pipe_cbs[ev].cb_arg = arg; + s->s_want_evs = false; + for (ev = NNG_PIPE_EV_NONE; ev < NNG_PIPE_EV_NUM; ev++) { + if (s->s_pipe_cbs[ev].cb_fn != NULL) { + s->s_want_evs = true; + } + } nni_mtx_unlock(&s->s_pipe_cbs_mtx); } } @@ -1538,25 +1545,39 @@ nni_pipe_start(nni_pipe *p) void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { - nni_sock *s = p->p_sock; - nng_pipe_cb cb; - void *arg; + nni_sock *s = p->p_sock; + nng_pipe_cb cb; + void *arg; + bool wantevs; + static nni_mtx serialize = NNI_MTX_INITIALIZER; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (ev == NNG_PIPE_EV_ADD_PRE) { - p->p_cbs = true; - } else if (!p->p_cbs) { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } - cb = s->s_pipe_cbs[ev].cb_fn; - arg = s->s_pipe_cbs[ev].cb_arg; + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + wantevs = s->s_want_evs; nni_mtx_unlock(&s->s_pipe_cbs_mtx); - if (cb != NULL) { + if (wantevs) { + nni_mtx_lock(&serialize); + // this pipe never got an event before, so don't start now + if (p->p_last_event == NNG_PIPE_EV_NONE && + ev != NNG_PIPE_EV_ADD_PRE) { + nni_mtx_unlock(&serialize); + return; + } + if (p->p_last_event >= ev) { + // this pipe event already was notified, or a "later" + // one was, so don't go backwards. + nni_mtx_unlock(&serialize); + return; + } + p->p_last_event = ev; nng_pipe pid; pid.id = p->p_id; - cb(pid, ev, arg); + if (cb != NULL) { + cb(pid, ev, arg); + } + nni_mtx_unlock(&serialize); } } -- cgit v1.2.3-70-g09d2