diff options
| author | Garrett D'Amore <garrett@damore.org> | 2024-12-28 17:00:04 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2024-12-28 20:40:21 -0800 |
| commit | 67b4cea8852775712d9a2c8b4eac2f5f2b1a132b (patch) | |
| tree | 387fb37d09733e680717c27face6494c55860d2b /src/core/socket.c | |
| parent | bef6b378ae5d6bbfedfb5e7e7f9f433310fac4e9 (diff) | |
| download | nng-67b4cea8852775712d9a2c8b4eac2f5f2b1a132b.tar.gz nng-67b4cea8852775712d9a2c8b4eac2f5f2b1a132b.tar.bz2 nng-67b4cea8852775712d9a2c8b4eac2f5f2b1a132b.zip | |
fixes #961 surprising pipe event order
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 49 |
1 files changed, 35 insertions, 14 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 6ba5d54e..d0c18744 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -104,6 +104,7 @@ struct nni_socket { nni_mtx s_pipe_cbs_mtx; nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; + bool s_want_evs; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; // socket scope @@ -1013,10 +1014,16 @@ nni_sock_flags(nni_sock *sock) void nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) { - if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + if ((ev > NNG_PIPE_EV_NONE) && (ev < NNG_PIPE_EV_NUM)) { nni_mtx_lock(&s->s_pipe_cbs_mtx); s->s_pipe_cbs[ev].cb_fn = cb; s->s_pipe_cbs[ev].cb_arg = arg; + s->s_want_evs = false; + for (ev = NNG_PIPE_EV_NONE; ev < NNG_PIPE_EV_NUM; ev++) { + if (s->s_pipe_cbs[ev].cb_fn != NULL) { + s->s_want_evs = true; + } + } nni_mtx_unlock(&s->s_pipe_cbs_mtx); } } @@ -1538,25 +1545,39 @@ nni_pipe_start(nni_pipe *p) void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { - nni_sock *s = p->p_sock; - nng_pipe_cb cb; - void *arg; + nni_sock *s = p->p_sock; + nng_pipe_cb cb; + void *arg; + bool wantevs; + static nni_mtx serialize = NNI_MTX_INITIALIZER; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (ev == NNG_PIPE_EV_ADD_PRE) { - p->p_cbs = true; - } else if (!p->p_cbs) { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } - cb = s->s_pipe_cbs[ev].cb_fn; - arg = s->s_pipe_cbs[ev].cb_arg; + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + wantevs = s->s_want_evs; nni_mtx_unlock(&s->s_pipe_cbs_mtx); - if (cb != NULL) { + if (wantevs) { + nni_mtx_lock(&serialize); + // this pipe never got an event before, so don't start now + if (p->p_last_event == NNG_PIPE_EV_NONE && + ev != NNG_PIPE_EV_ADD_PRE) { + nni_mtx_unlock(&serialize); + return; + } + if (p->p_last_event >= ev) { + // this pipe event already was notified, or a "later" + // one was, so don't go backwards. + nni_mtx_unlock(&serialize); + return; + } + p->p_last_event = ev; nng_pipe pid; pid.id = p->p_id; - cb(pid, ev, arg); + if (cb != NULL) { + cb(pid, ev, arg); + } + nni_mtx_unlock(&serialize); } } |
