aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2024-12-28 17:00:04 -0800
committerGarrett D'Amore <garrett@damore.org>2024-12-28 20:40:21 -0800
commit67b4cea8852775712d9a2c8b4eac2f5f2b1a132b (patch)
tree387fb37d09733e680717c27face6494c55860d2b
parentbef6b378ae5d6bbfedfb5e7e7f9f433310fac4e9 (diff)
downloadnng-67b4cea8852775712d9a2c8b4eac2f5f2b1a132b.tar.gz
nng-67b4cea8852775712d9a2c8b4eac2f5f2b1a132b.tar.bz2
nng-67b4cea8852775712d9a2c8b4eac2f5f2b1a132b.zip
fixes #961 surprising pipe event order
-rw-r--r--include/nng/nng.h1
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/socket.c49
-rw-r--r--src/core/sockimpl.h6
4 files changed, 45 insertions, 25 deletions
diff --git a/include/nng/nng.h b/include/nng/nng.h
index 62c6e133..04ef88ba 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -271,6 +271,7 @@ uint32_t nng_sockaddr_port(const nng_sockaddr *sa);
// Only one callback can be set on a given socket, and there is no way
// to retrieve the old value.
typedef enum {
+ NNG_PIPE_EV_NONE, // Used internally, must be first, never posted
NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket
NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket
NNG_PIPE_EV_REM_POST, // Called just after pipe removed from socket
diff --git a/src/core/pipe.c b/src/core/pipe.c
index dac24140..d7cd6bc0 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -256,13 +256,13 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
return (NNG_ENOMEM);
}
- p->p_size = sz;
- p->p_proto_ops = *pops;
- p->p_tran_ops = *tops;
- p->p_sock = sock;
- p->p_cbs = false;
- p->p_dialer = d;
- p->p_listener = l;
+ p->p_size = sz;
+ p->p_proto_ops = *pops;
+ p->p_tran_ops = *tops;
+ p->p_sock = sock;
+ p->p_dialer = d;
+ p->p_listener = l;
+ p->p_last_event = NNG_PIPE_EV_NONE;
// Two references - one for our caller, and
// one to be dropped when the pipe is closed.
diff --git a/src/core/socket.c b/src/core/socket.c
index 6ba5d54e..d0c18744 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -104,6 +104,7 @@ struct nni_socket {
nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
+ bool s_want_evs;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_root; // socket scope
@@ -1013,10 +1014,16 @@ nni_sock_flags(nni_sock *sock)
void
nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
{
- if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
+ if ((ev > NNG_PIPE_EV_NONE) && (ev < NNG_PIPE_EV_NUM)) {
nni_mtx_lock(&s->s_pipe_cbs_mtx);
s->s_pipe_cbs[ev].cb_fn = cb;
s->s_pipe_cbs[ev].cb_arg = arg;
+ s->s_want_evs = false;
+ for (ev = NNG_PIPE_EV_NONE; ev < NNG_PIPE_EV_NUM; ev++) {
+ if (s->s_pipe_cbs[ev].cb_fn != NULL) {
+ s->s_want_evs = true;
+ }
+ }
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
}
}
@@ -1538,25 +1545,39 @@ nni_pipe_start(nni_pipe *p)
void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
- nni_sock *s = p->p_sock;
- nng_pipe_cb cb;
- void *arg;
+ nni_sock *s = p->p_sock;
+ nng_pipe_cb cb;
+ void *arg;
+ bool wantevs;
+ static nni_mtx serialize = NNI_MTX_INITIALIZER;
nni_mtx_lock(&s->s_pipe_cbs_mtx);
- if (ev == NNG_PIPE_EV_ADD_PRE) {
- p->p_cbs = true;
- } else if (!p->p_cbs) {
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- return;
- }
- cb = s->s_pipe_cbs[ev].cb_fn;
- arg = s->s_pipe_cbs[ev].cb_arg;
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ wantevs = s->s_want_evs;
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- if (cb != NULL) {
+ if (wantevs) {
+ nni_mtx_lock(&serialize);
+ // this pipe never got an event before, so don't start now
+ if (p->p_last_event == NNG_PIPE_EV_NONE &&
+ ev != NNG_PIPE_EV_ADD_PRE) {
+ nni_mtx_unlock(&serialize);
+ return;
+ }
+ if (p->p_last_event >= ev) {
+ // this pipe event already was notified, or a "later"
+ // one was, so don't go backwards.
+ nni_mtx_unlock(&serialize);
+ return;
+ }
+ p->p_last_event = ev;
nng_pipe pid;
pid.id = p->p_id;
- cb(pid, ev, arg);
+ if (cb != NULL) {
+ cb(pid, ev, arg);
+ }
+ nni_mtx_unlock(&serialize);
}
}
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
index 3140b97b..92c0bb69 100644
--- a/src/core/sockimpl.h
+++ b/src/core/sockimpl.h
@@ -1,5 +1,5 @@
//
-// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -104,9 +104,9 @@ struct nni_pipe {
nni_listener *p_listener;
nni_atomic_bool p_closed;
nni_atomic_flag p_stop;
- bool p_cbs;
nni_reap_node p_reap;
nni_refcnt p_refcnt;
+ nng_pipe_ev p_last_event;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
@@ -125,7 +125,6 @@ extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_sock_remove_listener(nni_listener *);
extern void nni_sock_remove_dialer(nni_dialer *);
-extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
@@ -133,7 +132,6 @@ extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_stop(nni_dialer *);
extern void nni_listener_start_pipe(nni_listener *, nni_pipe *);
-extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);