diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-17 12:54:01 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-17 19:29:37 -0700 |
| commit | 70d478f5d185e147ca8d3dcba4cbd8bb6da3719a (patch) | |
| tree | 443e3b0e81138d7c195660d45eca7d4d497af8ac | |
| parent | e490aa3353f05e158a0f1f485f371cd49e70b4f5 (diff) | |
| download | nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.gz nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.tar.bz2 nng-70d478f5d185e147ca8d3dcba4cbd8bb6da3719a.zip | |
fixes #449 Want more flexible pipe events
This changes the signature of nng_pipe_notify(), and the associated
events. The documentation is updated to reflect this.
We have also broken the lock up so that we don't hold the master
socket lock for some of these things, which may have beneficial
impact on performance.
| -rw-r--r-- | docs/man/nng_pipe_notify.3.adoc | 59 | ||||
| -rw-r--r-- | src/core/pipe.c | 49 | ||||
| -rw-r--r-- | src/core/socket.c | 201 | ||||
| -rw-r--r-- | src/core/socket.h | 10 | ||||
| -rw-r--r-- | src/nng.c | 4 | ||||
| -rw-r--r-- | src/nng.h | 18 | ||||
| -rw-r--r-- | src/protocol/bus0/bus.c | 5 | ||||
| -rw-r--r-- | src/protocol/pair0/pair.c | 5 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 5 | ||||
| -rw-r--r-- | src/protocol/pipeline0/pull.c | 5 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep0/xrep.c | 5 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 4 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 4 | ||||
| -rw-r--r-- | src/protocol/survey0/xrespond.c | 4 | ||||
| -rw-r--r-- | src/protocol/survey0/xsurvey.c | 4 | ||||
| -rw-r--r-- | tests/pipe.c | 115 |
18 files changed, 345 insertions, 162 deletions
diff --git a/docs/man/nng_pipe_notify.3.adoc b/docs/man/nng_pipe_notify.3.adoc index 34b640c8..0a8785e3 100644 --- a/docs/man/nng_pipe_notify.3.adoc +++ b/docs/man/nng_pipe_notify.3.adoc @@ -19,40 +19,55 @@ nng_pipe_notify - register pipe notification callback ---- #include <nng/nng.h> -typedef enum { - NNG_PIPE_ADD, - NNG_PIPE_REM, -} nng_pipe_action; +enum { + NNG_PIPE_EV_ADD_PRE, + NNG_PIPE_EV_ADD_POST, + NNG_PIPE_EV_REM_POST, +}; -typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *); +typedef void (*nng_pipe_cb)(nng_pipe, int, void *); -int nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg); +int nng_pipe_notify(nng_socket s, int ev, nng_pipe_cb cb, void *arg); ---- == DESCRIPTION The `nng_pipe_notify()` function registers the callback function _cb_ -to be called whenever a <<nng_pipe.5#,pipe>> is added to or removed from the -socket _s_. +to be called whenever a <<nng_pipe.5#,pipe>> the pipe event specified by +_ev_ occurs on the socket _s_. +The callback _cb_ will be passed _arg_ as its final argument. -The function _cb_ will be called with the action `NNG_PIPE_ADD` just before -a pipe is added to the socket (as a result of a connection being established). -The final argument passed will be the argument _arg_ that was specified when -the function was registered. +A different callback may be supplied for each event. +Each event may have at most one callback registered. +Registering a callback implicitly unregisters any previously registered. -The function _cb_ will also be called with the action `NNG_PIPE_REM` when -the pipe is being removed from the socket for any reason. -This will also use the same argument _arg_. +The following pipe events are supported: -NOTE: Only one callback can be registered for a given socket. -Subsequent calls to `nng_pipe_notify()` on the same socket will overwrite -any prior registration. +`NNG_PIPE_EV_ADD_PRE`:: This event occurs after a connection and negotiation +has completed, but before the pipe is added to the socket. +If the pipe is closed (using `<<nng_pipe_close.3#,nng_pipe_close()>>`) at +this point, the socket will never see the pipe, and no further events will +occur for the given pipe. -TIP: The callback _cb_ may reject a pipe for any reason by simply closing +`NNG_PIPE_EV_ADD_POST`:: This event occurs after the pipe is fully added to +the socket. +Prior to this time, it is not possible to communicate over the pipe with +the socket. + +`NNG_PIPE_EV_REM_POST`:: This event occurs after the pipe has been removed +from the socket. +The underlying transport may be closed at this point, and it is not +possible communicate using this pipe. + +TIP: The callback _cb_ may close a pipe for any reason by simply closing it using `<<nng_pipe_close.3#,nng_pipe_close()>>`. -This might be done, for example, if the remote peer is not authorized to -access this session, based on values determined with the aid of -`<<nng_pipe_getopt.3#,nng_pipe_getopt()>>`. +This might be done before the pipe is added to the socket (during +`NNG_PIPE_EV_ADD_PRE`), for example, if the remote peer is not authorized. + +TIP: It is possible to register the same _cb_ and _arg_ for different events +by calling this function separately for different values of _ev_. + +NOTE: This function ignores invalid values for _ev_. == RETURN VALUES diff --git a/src/core/pipe.c b/src/core/pipe.c index 321e1a09..ccd0dbe7 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -29,6 +29,7 @@ struct nni_pipe { nni_ep * p_ep; bool p_closed; bool p_stop; + bool p_cbs; int p_refcnt; nni_mtx p_mtx; nni_cv p_cv; @@ -100,10 +101,19 @@ nni_pipe_sys_fini(void) static void nni_pipe_destroy(nni_pipe *p) { + bool cbs; if (p == NULL) { return; } + nni_mtx_lock(&p->p_mtx); + cbs = p->p_cbs; + nni_mtx_unlock(&p->p_mtx); + + if (cbs) { + nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); + } + // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); @@ -119,6 +129,7 @@ nni_pipe_destroy(nni_pipe *p) if (nni_list_node_active(&p->p_ep_node)) { nni_ep_pipe_remove(p->p_ep, p); } + if (nni_list_node_active(&p->p_sock_node)) { nni_sock_pipe_remove(p->p_sock, p); } @@ -263,13 +274,32 @@ static void nni_pipe_start_cb(void *arg) { nni_pipe *p = arg; + nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; + uint32_t id = nni_pipe_id(p); - if ((nni_aio_result(aio) != 0) || - (nni_sock_pipe_start(p->p_sock, p) != 0) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (nni_aio_result(aio) != 0) { nni_pipe_stop(p); + return; } + + nni_mtx_lock(&p->p_mtx); + p->p_cbs = true; // We're running all cbs going forward + nni_mtx_unlock(&p->p_mtx); + + nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + if (nni_pipe_closed(p)) { + nni_pipe_stop(p); + return; + } + + if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || + nni_sock_closing(s)) { + nni_pipe_stop(p); + return; + } + + nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); } int @@ -289,6 +319,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) } // Make a private copy of the transport ops. + p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -297,6 +328,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_sock = sock; p->p_closed = false; p->p_stop = false; + p->p_cbs = false; p->p_refcnt = 0; NNI_LIST_NODE_INIT(&p->p_reap_node); @@ -317,18 +349,13 @@ nni_pipe_create(nni_ep *ep, void *tdata) if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || - ((rv = nni_ep_pipe_add(ep, p)) != 0)) { + ((rv = nni_ep_pipe_add(ep, p)) != 0) || + ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); return (rv); } - // At this point the endpoint knows about it, and the protocol - // might too, so on failure we have to tear it down fully as if done - // after a successful result. - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_pipe_stop(p); - } - return (rv); + return (0); } int diff --git a/src/core/socket.c b/src/core/socket.c index f14c15fd..3a209441 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -47,6 +47,11 @@ typedef struct nni_sockopt { void * data; } nni_sockopt; +typedef struct nni_sock_pipe_cb { + nng_pipe_cb cb_fn; + void * cb_arg; +} nni_sock_pipe_cb; + struct nni_socket { nni_list_node s_node; nni_mtx s_mx; @@ -81,11 +86,12 @@ struct nni_socket { nni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - bool s_closing; // Socket is closing - bool s_closed; // Socket closed, protected by global lock - bool s_ctxwait; // Waiting for contexts to close. - nng_pipe_cb s_pipe_cb; // User callback for pipe events. - void * s_pipe_cbarg; // Argument for pipe events. + bool s_closing; // Socket is closing + bool s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. + + nni_mtx s_pipe_cbs_mtx; + nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; }; static void nni_ctx_destroy(nni_ctx *); @@ -431,39 +437,34 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&nni_sock_lk); } -int -nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) +bool +nni_sock_closing(nni_sock *s) { - nng_pipe_cb cb; - - NNI_ASSERT(s != NULL); + bool rv; nni_mtx_lock(&s->s_mx); - if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { - // Peer protocol mismatch. - nni_mtx_unlock(&s->s_mx); - return (NNG_EPROTO); - } - if ((cb = s->s_pipe_cb) != NULL) { - nng_pipe p; - void * arg = s->s_pipe_cbarg; - nni_mtx_unlock(&s->s_mx); - p.id = nni_pipe_id(pipe); - cb(p, NNG_PIPE_ADD, arg); - if (nni_pipe_closed(pipe)) { - return (NNG_ECLOSED); + rv = s->s_closing; + nni_mtx_unlock(&s->s_mx); + return (rv); +} + +void +nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id) +{ + if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + nng_pipe_cb cb; + void * arg; + + nni_mtx_lock(&s->s_pipe_cbs_mtx); + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + + if (cb != NULL) { + nng_pipe p; + p.id = id; + cb(p, ev, arg); } - nni_mtx_lock(&s->s_mx); - } - if (s->s_closing) { - // We're closing, bail out. This has to be done after - // we have dropped the lock above in case the sock is closed - // while the user callback runs. - nni_mtx_unlock(&s->s_mx); - return (NNG_ECLOSED); } - - nni_mtx_unlock(&s->s_mx); - return (0); } int @@ -486,26 +487,16 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p) } void -nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_remove(nni_sock *s, nni_pipe *p) { - nng_pipe_cb cb; - - nni_mtx_lock(&sock->s_mx); - if ((cb = sock->s_pipe_cb) != NULL) { - void * arg = sock->s_pipe_cbarg; - nng_pipe p; - nni_mtx_unlock(&sock->s_mx); - p.id = nni_pipe_id(pipe); - cb(p, NNG_PIPE_REM, arg); - nni_mtx_lock(&sock->s_mx); - } - if (nni_list_active(&sock->s_pipes, pipe)) { - nni_list_remove(&sock->s_pipes, pipe); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_pipes, p)) { + nni_list_remove(&s->s_pipes, p); } - if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { - nni_cv_wake(&sock->s_cv); + if (s->s_closing && nni_list_empty(&s->s_pipes)) { + nni_cv_wake(&s->s_cv); } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); } static void @@ -532,6 +523,7 @@ nni_sock_destroy(nni_sock *s) nni_cv_fini(&s->s_close_cv); nni_cv_fini(&s->s_cv); nni_mtx_fini(&s->s_mx); + nni_mtx_fini(&s->s_pipe_cbs_mtx); NNI_FREE_STRUCT(s); } @@ -574,6 +566,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); + nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &nni_sock_lk); @@ -668,10 +661,10 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) return (rv); } -// nni_sock_shutdown shuts down the socket; after this point no further -// access to the socket will function, and any threads blocked in entry -// points will be woken (and the functions they are blocked in will return -// NNG_ECLOSED.) +// nni_sock_shutdown shuts down the socket; after this point no +// further access to the socket will function, and any threads blocked +// in entry points will be woken (and the functions they are blocked +// in will return NNG_ECLOSED.) int nni_sock_shutdown(nni_sock *sock) { @@ -689,8 +682,8 @@ nni_sock_shutdown(nni_sock *sock) // Mark us closing, so no more EPs or changes can occur. sock->s_closing = true; - // Close the EPs. This prevents new connections from forming but - // but allows existing ones to drain. + // Close the EPs. This prevents new connections from forming + // but but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_shutdown(ep); } @@ -728,9 +721,9 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_lock(&sock->s_mx); - // At this point, we've done everything we politely can to give - // the protocol a chance to flush its write side. Now its time - // to be a little more insistent. + // At this point, we've done everything we politely can to + // give the protocol a chance to flush its write side. Now + // its time to be a little more insistent. // Close the upper queues immediately. This can happen // safely while we hold the lock. @@ -756,7 +749,8 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe_stop(pipe); } - // We have to wait for *both* endpoints and pipes to be removed. + // We have to wait for *both* endpoints and pipes to be + // removed. while ((!nni_list_empty(&sock->s_pipes)) || (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); @@ -777,9 +771,9 @@ nni_sock_shutdown(nni_sock *sock) } // nni_sock_close shuts down the socket, then releases any resources -// associated with it. It is a programmer error to reference the socket -// after this function is called, as the pointer may reference invalid -// memory or other objects. +// associated with it. It is a programmer error to reference the +// socket after this function is called, as the pointer may reference +// invalid memory or other objects. void nni_sock_close(nni_sock *s) { @@ -789,8 +783,8 @@ nni_sock_close(nni_sock *s) nni_mtx_lock(&nni_sock_lk); if (s->s_closed) { - // Some other thread called close. All we need to do is - // drop our reference count. + // Some other thread called close. All we need to do + // is drop our reference count. nni_mtx_unlock(&nni_sock_lk); nni_sock_rele(s); return; @@ -835,8 +829,8 @@ nni_sock_closeall(void) nni_mtx_unlock(&nni_sock_lk); return; } - // Bump the reference count. The close call below will - // drop it. + // Bump the reference count. The close call below + // will drop it. s->s_refcnt++; nni_list_node_remove(&s->s_node); nni_mtx_unlock(&nni_sock_lk); @@ -962,7 +956,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } // Protocol options. The protocol can override options that - // the socket framework would otherwise supply, like buffer sizes. + // the socket framework would otherwise supply, like buffer + // sizes. for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) { if (strcmp(pso->pso_name, name) != 0) { continue; @@ -976,7 +971,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) return (rv); } - // Some options do not go down to transports. Handle them directly. + // Some options do not go down to transports. Handle them + // directly. for (sso = nni_sock_options; sso->so_name != NULL; sso++) { if (strcmp(sso->so_name, name) != 0) { continue; @@ -997,17 +993,20 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) return (rv); } - // Validation of transport options. This is stateless, so transports - // should not fail to set an option later if they passed it here. + // Validation of transport options. This is stateless, so + // transports should not fail to set an option later if they + // passed it here. rv = nni_tran_chkopt(name, v, sz, t); - // Also check a few generic things. We do this if no transport - // was found, or even if a transport rejected one of the settings. + // Also check a few generic things. We do this if no + // transport was found, or even if a transport rejected one of + // the settings. if ((rv == NNG_ENOTSUP) || (rv == 0)) { if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) { size_t z; - // just a sanity test on the size; it also ensures that - // a size can be set even with no transport configured. + // just a sanity test on the size; it also + // ensures that a size can be set even with no + // transport configured. rv = nni_copyin_size(&z, v, sz, 0, NNI_MAXSZ, t); } } @@ -1049,9 +1048,10 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } } - // Apply the options. Failure to set any option on any transport - // (other than ENOTSUP) stops the operation altogether. Its - // important that transport wide checks properly pre-validate. + // Apply the options. Failure to set any option on any + // transport (other than ENOTSUP) stops the operation + // altogether. Its important that transport wide checks + // properly pre-validate. NNI_LIST_FOREACH (&s->s_eps, ep) { int x; if (optv->typ == NNI_TYPE_OPAQUE) { @@ -1075,15 +1075,16 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } if (rv == 0) { - // Remove and toss the old value, we are using a new one. + // Remove and toss the old value, we are using a new + // one. if (oldv != NULL) { nni_list_remove(&s->s_options, oldv); nni_free_opt(oldv); } - // Insert our new value. This permits it to be compared - // against later, and for new endpoints to automatically - // receive these values, + // Insert our new value. This permits it to be + // compared against later, and for new endpoints to + // automatically receive these values, nni_list_append(&s->s_options, optv); } else { nni_free_opt(optv); @@ -1108,7 +1109,8 @@ nni_sock_getopt(nni_sock *s, const char *name, void *val, size_t *szp, int t) } // Protocol specific options. The protocol can override - // options like the send buffer or notification descriptors this way. + // options like the send buffer or notification descriptors + // this way. for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) { if (strcmp(name, pso->pso_name) != 0) { continue; @@ -1166,12 +1168,14 @@ nni_sock_flags(nni_sock *sock) } void -nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg) +nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) { - nni_mtx_lock(&sock->s_mx); - sock->s_pipe_cb = cb; - sock->s_pipe_cbarg = arg; - nni_mtx_unlock(&sock->s_mx); + if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + nni_mtx_lock(&s->s_pipe_cbs_mtx); + s->s_pipe_cbs[ev].cb_fn = cb; + s->s_pipe_cbs[ev].cb_arg = arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + } } int @@ -1185,11 +1189,12 @@ nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) } nni_mtx_lock(&nni_sock_lk); if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) { - // We refuse a reference if either the socket is closed, - // or the context is closed. (If the socket is closed, - // and we are only getting the reference so we can close it, - // then we still allow. In the case the only valid operation - // will be to close the socket.) + // We refuse a reference if either the socket is + // closed, or the context is closed. (If the socket + // is closed, and we are only getting the reference so + // we can close it, then we still allow. In the case + // the only valid operation will be to close the + // socket.) if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) { rv = NNG_ECLOSED; } else { @@ -1224,8 +1229,8 @@ nni_ctx_rele(nni_ctx *ctx) nni_mtx_lock(&nni_sock_lk); ctx->c_refcnt--; if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) { - // Either still have an active reference, or not actually - // closing yet. + // Either still have an active reference, or not + // actually closing yet. nni_mtx_unlock(&nni_sock_lk); return; } @@ -1288,8 +1293,8 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) nni_mtx_unlock(&nni_sock_lk); // Paranoia, fixing a possible race in close. Don't let us - // give back a context if the socket is being shutdown (it might - // not have reached the "closed" state yet.) + // give back a context if the socket is being shutdown (it + // might not have reached the "closed" state yet.) nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 22a13ef7..833129fa 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -44,7 +44,6 @@ extern uint32_t nni_sock_id(nni_sock *); // a pipe could wind up orphaned. extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); -extern int nni_sock_pipe_start(nni_sock *, nni_pipe *); extern int nni_sock_ep_add(nni_sock *, nni_ep *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); @@ -71,8 +70,13 @@ extern uint32_t nni_sock_flags(nni_sock *); // one of the only cases (the only?) where the socket core understands // the public data types. (Other solutions might exist, but they require // keeping extra state to support conversion between public and internal -// types.) -extern void nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb, void *); +// types.) The second argument is a mask of events for which the callback +// should be executed. +extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *); + +extern void nni_sock_run_pipe_cb(nni_sock *sock, int, uint32_t); + +extern bool nni_sock_closing(nni_sock *sock); // nni_ctx_open is used to open/create a new context structure. // Contexts are not supported by most protocols, but for those that do, @@ -1015,7 +1015,7 @@ nng_getopt_string(nng_socket s, const char *name, char **valp) } int -nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg) +nng_pipe_notify(nng_socket s, int ev, nng_pipe_cb cb, void *arg) { int rv; nni_sock *sock; @@ -1027,7 +1027,7 @@ nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg) return (rv); } - nni_sock_set_pipe_cb(sock, cb, arg); + nni_sock_set_pipe_cb(sock, ev, cb, arg); nni_sock_rele(sock); return (0); } @@ -223,12 +223,18 @@ NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **); // Only one callback can be set on a given socket, and there is no way // to retrieve the old value. typedef enum { - NNG_PIPE_ADD, // Pipe added to socket - NNG_PIPE_REM // Pipe removed from socket -} nng_pipe_action; - -typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *); -NNG_DECL int nng_pipe_notify(nng_socket, nng_pipe_cb, void *); + NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket + NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket + NNG_PIPE_EV_REM_POST, // Called just after poipe removed from socket + NNG_PIPE_EV_NUM, // Used internally, must be last. +} nng_pipe_ev; + +typedef void (*nng_pipe_cb)(nng_pipe, int, void *); + +// nng_pipe_notify registers a callback to be executed when the +// given event is triggered. To watch for different events, register +// multiple times. Each event can have at most one callback registered. +NNG_DECL int nng_pipe_notify(nng_socket, int, nng_pipe_cb, void *); // nng_getopt_string is special -- it allocates a string to hold the // resulting string, which should be freed with nng_strfree when it is diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index 852de7c8..2427e836 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -193,6 +193,11 @@ bus0_pipe_start(void *arg) bus0_pipe *p = arg; bus0_sock *s = p->psock; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index 2fb42df5..87900793 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -129,6 +129,11 @@ pair0_pipe_start(void *arg) pair0_pipe *p = arg; pair0_sock *s = p->psock; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + nni_mtx_lock(&s->mtx); if (s->ppipe != NULL) { nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index ab01e451..584c147d 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -173,6 +173,11 @@ pair1_pipe_start(void *arg) uint32_t id; int rv; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V1) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + id = nni_pipe_id(p->npipe); nni_mtx_lock(&s->mtx); if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) { diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 81f6c137..f7d5d9a2 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -112,6 +112,11 @@ pull0_pipe_start(void *arg) { pull0_pipe *p = arg; + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUSH_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + // Start the pending pull... nni_pipe_recv(p->pipe, p->recv_aio); diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index c244b0ad..b4fa2e2a 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -143,6 +143,11 @@ sub0_pipe_start(void *arg) { sub0_pipe *p = arg; + if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + nni_pipe_recv(p->pipe, p->aio_recv); return (0); } diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index c483b777..24fc7335 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -346,6 +346,11 @@ rep0_pipe_start(void *arg) rep0_sock *s = p->rep; int rv; + if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); } diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c index 6dcfe6be..f6cd29eb 100644 --- a/src/protocol/reqrep0/xrep.c +++ b/src/protocol/reqrep0/xrep.c @@ -178,6 +178,11 @@ xrep0_pipe_start(void *arg) xrep0_sock *s = p->rep; int rv; + if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) { + // Peer protocol mismatch. + return (NNG_EPROTO); + } + if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 7738a8b7..5f833ca6 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -341,6 +341,10 @@ resp0_pipe_start(void *arg) resp0_sock *s = p->psock; int rv; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) { + return (NNG_EPROTO); + } + nni_mtx_lock(&s->mtx); rv = nni_idhash_insert(s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index 51bce0c8..24a35003 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -338,6 +338,10 @@ surv0_pipe_start(void *arg) surv0_pipe *p = arg; surv0_sock *s = p->sock; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + return (NNG_EPROTO); + } + nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c index bcbbcbc7..13a3d759 100644 --- a/src/protocol/survey0/xrespond.c +++ b/src/protocol/survey0/xrespond.c @@ -164,6 +164,10 @@ xresp0_pipe_start(void *arg) xresp0_sock *s = p->psock; int rv; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) { + return (NNG_EPROTO); + } + p->id = nni_pipe_id(p->npipe); nni_mtx_lock(&s->mtx); diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c index 47ebef3c..db21e688 100644 --- a/src/protocol/survey0/xsurvey.c +++ b/src/protocol/survey0/xsurvey.c @@ -166,6 +166,10 @@ xsurv0_pipe_start(void *arg) xsurv0_pipe *p = arg; xsurv0_sock *s = p->psock; + if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) { + return (NNG_EPROTO); + } + nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); nni_mtx_unlock(&s->mtx); diff --git a/tests/pipe.c b/tests/pipe.c index 3457db9b..79006fc8 100644 --- a/tests/pipe.c +++ b/tests/pipe.c @@ -27,27 +27,48 @@ struct testcase { nng_dialer d; nng_listener l; nng_pipe p; - int add; + int add_pre; + int add_post; int rem; int err; + int rej; + nng_mtx * lk; }; +static int +getval(struct testcase *t, int *vp) +{ + int rv; + nng_mtx_lock(t->lk); + rv = *vp; + nng_mtx_unlock(t->lk); + return (rv); +} + void -notify(nng_pipe p, nng_pipe_action act, void *arg) +notify(nng_pipe p, int act, void *arg) { struct testcase *t = arg; + nng_mtx_lock(t->lk); if ((nng_socket_id(nng_pipe_socket(p)) != nng_socket_id(t->s)) || (nng_listener_id(nng_pipe_listener(p)) != nng_listener_id(t->l)) || (nng_dialer_id(nng_pipe_dialer(p)) != nng_dialer_id(t->d))) { t->err++; + nng_mtx_unlock(t->lk); return; } + if (t->add_post > t->add_pre) { + t->err++; + } switch (act) { - case NNG_PIPE_ADD: - t->add++; + case NNG_PIPE_EV_ADD_PRE: + t->add_pre++; + break; + case NNG_PIPE_EV_ADD_POST: + t->add_post++; break; - case NNG_PIPE_REM: + case NNG_PIPE_EV_REM_POST: t->rem++; break; default: @@ -55,6 +76,21 @@ notify(nng_pipe p, nng_pipe_action act, void *arg) return; } t->p = p; + nng_mtx_unlock(t->lk); +} + +void +reject(nng_pipe p, int act, void *arg) +{ + struct testcase *t = arg; + notify(p, act, arg); + + nng_mtx_lock(t->lk); + if (!t->rej) { + nng_pipe_close(p); + t->rej++; + } + nng_mtx_unlock(t->lk); } char addr[64]; @@ -70,16 +106,30 @@ TestMain("Pipe notify works", { memset(&pull, 0, sizeof(pull)); memset(&push, 0, sizeof(push)); + So(nng_mtx_alloc(&push.lk) == 0); + So(nng_mtx_alloc(&pull.lk) == 0); So(nng_push_open(&push.s) == 0); So(nng_pull_open(&pull.s) == 0); Reset({ nng_close(push.s); nng_close(pull.s); + nng_mtx_free(push.lk); + nng_mtx_free(pull.lk); }); - So(nng_pipe_notify(push.s, notify, &push) == 0); - So(nng_pipe_notify(pull.s, notify, &pull) == 0); + So(nng_pipe_notify( + push.s, NNG_PIPE_EV_ADD_PRE, notify, &push) == 0); + So(nng_pipe_notify( + push.s, NNG_PIPE_EV_ADD_POST, notify, &push) == 0); + So(nng_pipe_notify( + push.s, NNG_PIPE_EV_REM_POST, notify, &push) == 0); + So(nng_pipe_notify( + pull.s, NNG_PIPE_EV_ADD_PRE, notify, &pull) == 0); + So(nng_pipe_notify( + pull.s, NNG_PIPE_EV_ADD_POST, notify, &pull) == 0); + So(nng_pipe_notify( + pull.s, NNG_PIPE_EV_REM_POST, notify, &pull) == 0); So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0); So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0); @@ -92,12 +142,14 @@ TestMain("Pipe notify works", { So(nng_listener_start(pull.l, 0) == 0); So(nng_dialer_start(push.d, 0) == 0); nng_msleep(100); - So(pull.add == 1); - So(pull.rem == 0); - So(pull.err == 0); - So(push.add == 1); - So(push.rem == 0); - So(push.err == 0); + So(getval(&pull, &pull.add_pre) == 1); + So(getval(&pull, &pull.add_post) == 1); + So(getval(&pull, &pull.rem) == 0); + So(getval(&pull, &pull.err) == 0); + So(getval(&push, &push.add_pre) == 1); + So(getval(&push, &push.add_post) == 1); + So(getval(&push, &push.rem) == 0); + So(getval(&push, &push.err) == 0); Convey("We can send a frame", { nng_msg *msg; @@ -114,17 +166,20 @@ TestMain("Pipe notify works", { }); Convey("Reconnection works", { - So(pull.add == 1); + So(getval(&pull, &pull.add_pre) == 1); + So(getval(&pull, &pull.add_post) == 1); nng_pipe_close(pull.p); nng_msleep(200); - So(pull.err == 0); - So(pull.rem == 1); - So(pull.add == 2); + So(getval(&pull, &pull.err) == 0); + So(getval(&pull, &pull.rem) == 1); + So(getval(&pull, &pull.add_pre) == 2); + So(getval(&pull, &pull.add_post) == 2); - So(push.err == 0); - So(push.rem == 1); - So(push.add == 2); + So(getval(&push, &push.err) == 0); + So(getval(&push, &push.rem) == 1); + So(getval(&push, &push.add_pre) == 2); + So(getval(&push, &push.add_post) == 2); Convey("They still exchange frames", { nng_msg *msg; @@ -145,5 +200,25 @@ TestMain("Pipe notify works", { }); }); }); + + Convey("Reject works", { + So(nng_pipe_notify(pull.s, NNG_PIPE_EV_ADD_PRE, reject, + &pull) == 0); + So(nng_listener_create(&pull.l, pull.s, addr) == 0); + So(nng_dialer_create(&push.d, push.s, addr) == 0); + So(nng_listener_id(pull.l) > 0); + So(nng_dialer_id(push.d) > 0); + So(nng_listener_start(pull.l, 0) == 0); + So(nng_dialer_start(push.d, 0) == 0); + nng_msleep(100); + So(getval(&pull, &pull.add_pre) == 2); + So(getval(&pull, &pull.add_post) == 1); + So(getval(&pull, &pull.rem) == 1); + So(getval(&pull, &pull.err) == 0); + So(getval(&push, &push.add_pre) == 2); + So(getval(&push, &push.add_post) == 2); + So(getval(&push, &push.rem) == 1); + So(getval(&push, &push.err) == 0); + }); }); }) |
