summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/man/nng_pipe_notify.3.adoc59
-rw-r--r--src/core/pipe.c49
-rw-r--r--src/core/socket.c201
-rw-r--r--src/core/socket.h10
-rw-r--r--src/nng.c4
-rw-r--r--src/nng.h18
-rw-r--r--src/protocol/bus0/bus.c5
-rw-r--r--src/protocol/pair0/pair.c5
-rw-r--r--src/protocol/pair1/pair.c5
-rw-r--r--src/protocol/pipeline0/pull.c5
-rw-r--r--src/protocol/pubsub0/sub.c5
-rw-r--r--src/protocol/reqrep0/rep.c5
-rw-r--r--src/protocol/reqrep0/xrep.c5
-rw-r--r--src/protocol/survey0/respond.c4
-rw-r--r--src/protocol/survey0/survey.c4
-rw-r--r--src/protocol/survey0/xrespond.c4
-rw-r--r--src/protocol/survey0/xsurvey.c4
-rw-r--r--tests/pipe.c115
18 files changed, 345 insertions, 162 deletions
diff --git a/docs/man/nng_pipe_notify.3.adoc b/docs/man/nng_pipe_notify.3.adoc
index 34b640c8..0a8785e3 100644
--- a/docs/man/nng_pipe_notify.3.adoc
+++ b/docs/man/nng_pipe_notify.3.adoc
@@ -19,40 +19,55 @@ nng_pipe_notify - register pipe notification callback
----
#include <nng/nng.h>
-typedef enum {
- NNG_PIPE_ADD,
- NNG_PIPE_REM,
-} nng_pipe_action;
+enum {
+ NNG_PIPE_EV_ADD_PRE,
+ NNG_PIPE_EV_ADD_POST,
+ NNG_PIPE_EV_REM_POST,
+};
-typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *);
+typedef void (*nng_pipe_cb)(nng_pipe, int, void *);
-int nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg);
+int nng_pipe_notify(nng_socket s, int ev, nng_pipe_cb cb, void *arg);
----
== DESCRIPTION
The `nng_pipe_notify()` function registers the callback function _cb_
-to be called whenever a <<nng_pipe.5#,pipe>> is added to or removed from the
-socket _s_.
+to be called whenever a <<nng_pipe.5#,pipe>> the pipe event specified by
+_ev_ occurs on the socket _s_.
+The callback _cb_ will be passed _arg_ as its final argument.
-The function _cb_ will be called with the action `NNG_PIPE_ADD` just before
-a pipe is added to the socket (as a result of a connection being established).
-The final argument passed will be the argument _arg_ that was specified when
-the function was registered.
+A different callback may be supplied for each event.
+Each event may have at most one callback registered.
+Registering a callback implicitly unregisters any previously registered.
-The function _cb_ will also be called with the action `NNG_PIPE_REM` when
-the pipe is being removed from the socket for any reason.
-This will also use the same argument _arg_.
+The following pipe events are supported:
-NOTE: Only one callback can be registered for a given socket.
-Subsequent calls to `nng_pipe_notify()` on the same socket will overwrite
-any prior registration.
+`NNG_PIPE_EV_ADD_PRE`:: This event occurs after a connection and negotiation
+has completed, but before the pipe is added to the socket.
+If the pipe is closed (using `<<nng_pipe_close.3#,nng_pipe_close()>>`) at
+this point, the socket will never see the pipe, and no further events will
+occur for the given pipe.
-TIP: The callback _cb_ may reject a pipe for any reason by simply closing
+`NNG_PIPE_EV_ADD_POST`:: This event occurs after the pipe is fully added to
+the socket.
+Prior to this time, it is not possible to communicate over the pipe with
+the socket.
+
+`NNG_PIPE_EV_REM_POST`:: This event occurs after the pipe has been removed
+from the socket.
+The underlying transport may be closed at this point, and it is not
+possible communicate using this pipe.
+
+TIP: The callback _cb_ may close a pipe for any reason by simply closing
it using `<<nng_pipe_close.3#,nng_pipe_close()>>`.
-This might be done, for example, if the remote peer is not authorized to
-access this session, based on values determined with the aid of
-`<<nng_pipe_getopt.3#,nng_pipe_getopt()>>`.
+This might be done before the pipe is added to the socket (during
+`NNG_PIPE_EV_ADD_PRE`), for example, if the remote peer is not authorized.
+
+TIP: It is possible to register the same _cb_ and _arg_ for different events
+by calling this function separately for different values of _ev_.
+
+NOTE: This function ignores invalid values for _ev_.
== RETURN VALUES
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 321e1a09..ccd0dbe7 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -29,6 +29,7 @@ struct nni_pipe {
nni_ep * p_ep;
bool p_closed;
bool p_stop;
+ bool p_cbs;
int p_refcnt;
nni_mtx p_mtx;
nni_cv p_cv;
@@ -100,10 +101,19 @@ nni_pipe_sys_fini(void)
static void
nni_pipe_destroy(nni_pipe *p)
{
+ bool cbs;
if (p == NULL) {
return;
}
+ nni_mtx_lock(&p->p_mtx);
+ cbs = p->p_cbs;
+ nni_mtx_unlock(&p->p_mtx);
+
+ if (cbs) {
+ nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
+ }
+
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
@@ -119,6 +129,7 @@ nni_pipe_destroy(nni_pipe *p)
if (nni_list_node_active(&p->p_ep_node)) {
nni_ep_pipe_remove(p->p_ep, p);
}
+
if (nni_list_node_active(&p->p_sock_node)) {
nni_sock_pipe_remove(p->p_sock, p);
}
@@ -263,13 +274,32 @@ static void
nni_pipe_start_cb(void *arg)
{
nni_pipe *p = arg;
+ nni_sock *s = p->p_sock;
nni_aio * aio = p->p_start_aio;
+ uint32_t id = nni_pipe_id(p);
- if ((nni_aio_result(aio) != 0) ||
- (nni_sock_pipe_start(p->p_sock, p) != 0) ||
- (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
+ if (nni_aio_result(aio) != 0) {
nni_pipe_stop(p);
+ return;
}
+
+ nni_mtx_lock(&p->p_mtx);
+ p->p_cbs = true; // We're running all cbs going forward
+ nni_mtx_unlock(&p->p_mtx);
+
+ nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
+ if (nni_pipe_closed(p)) {
+ nni_pipe_stop(p);
+ return;
+ }
+
+ if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
+ nni_sock_closing(s)) {
+ nni_pipe_stop(p);
+ return;
+ }
+
+ nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id);
}
int
@@ -289,6 +319,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
}
// Make a private copy of the transport ops.
+ p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -297,6 +328,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_sock = sock;
p->p_closed = false;
p->p_stop = false;
+ p->p_cbs = false;
p->p_refcnt = 0;
NNI_LIST_NODE_INIT(&p->p_reap_node);
@@ -317,18 +349,13 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
- ((rv = nni_ep_pipe_add(ep, p)) != 0)) {
+ ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
+ ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
return (rv);
}
- // At this point the endpoint knows about it, and the protocol
- // might too, so on failure we have to tear it down fully as if done
- // after a successful result.
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- nni_pipe_stop(p);
- }
- return (rv);
+ return (0);
}
int
diff --git a/src/core/socket.c b/src/core/socket.c
index f14c15fd..3a209441 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -47,6 +47,11 @@ typedef struct nni_sockopt {
void * data;
} nni_sockopt;
+typedef struct nni_sock_pipe_cb {
+ nng_pipe_cb cb_fn;
+ void * cb_arg;
+} nni_sock_pipe_cb;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -81,11 +86,12 @@ struct nni_socket {
nni_list s_pipes; // active pipes
nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- bool s_closing; // Socket is closing
- bool s_closed; // Socket closed, protected by global lock
- bool s_ctxwait; // Waiting for contexts to close.
- nng_pipe_cb s_pipe_cb; // User callback for pipe events.
- void * s_pipe_cbarg; // Argument for pipe events.
+ bool s_closing; // Socket is closing
+ bool s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
+
+ nni_mtx s_pipe_cbs_mtx;
+ nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
};
static void nni_ctx_destroy(nni_ctx *);
@@ -431,39 +437,34 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&nni_sock_lk);
}
-int
-nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
+bool
+nni_sock_closing(nni_sock *s)
{
- nng_pipe_cb cb;
-
- NNI_ASSERT(s != NULL);
+ bool rv;
nni_mtx_lock(&s->s_mx);
- if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
- // Peer protocol mismatch.
- nni_mtx_unlock(&s->s_mx);
- return (NNG_EPROTO);
- }
- if ((cb = s->s_pipe_cb) != NULL) {
- nng_pipe p;
- void * arg = s->s_pipe_cbarg;
- nni_mtx_unlock(&s->s_mx);
- p.id = nni_pipe_id(pipe);
- cb(p, NNG_PIPE_ADD, arg);
- if (nni_pipe_closed(pipe)) {
- return (NNG_ECLOSED);
+ rv = s->s_closing;
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+}
+
+void
+nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id)
+{
+ if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
+ nng_pipe_cb cb;
+ void * arg;
+
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+
+ if (cb != NULL) {
+ nng_pipe p;
+ p.id = id;
+ cb(p, ev, arg);
}
- nni_mtx_lock(&s->s_mx);
- }
- if (s->s_closing) {
- // We're closing, bail out. This has to be done after
- // we have dropped the lock above in case the sock is closed
- // while the user callback runs.
- nni_mtx_unlock(&s->s_mx);
- return (NNG_ECLOSED);
}
-
- nni_mtx_unlock(&s->s_mx);
- return (0);
}
int
@@ -486,26 +487,16 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
}
void
-nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_remove(nni_sock *s, nni_pipe *p)
{
- nng_pipe_cb cb;
-
- nni_mtx_lock(&sock->s_mx);
- if ((cb = sock->s_pipe_cb) != NULL) {
- void * arg = sock->s_pipe_cbarg;
- nng_pipe p;
- nni_mtx_unlock(&sock->s_mx);
- p.id = nni_pipe_id(pipe);
- cb(p, NNG_PIPE_REM, arg);
- nni_mtx_lock(&sock->s_mx);
- }
- if (nni_list_active(&sock->s_pipes, pipe)) {
- nni_list_remove(&sock->s_pipes, pipe);
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_pipes, p)) {
+ nni_list_remove(&s->s_pipes, p);
}
- if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
- nni_cv_wake(&sock->s_cv);
+ if (s->s_closing && nni_list_empty(&s->s_pipes)) {
+ nni_cv_wake(&s->s_cv);
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
}
static void
@@ -532,6 +523,7 @@ nni_sock_destroy(nni_sock *s)
nni_cv_fini(&s->s_close_cv);
nni_cv_fini(&s->s_cv);
nni_mtx_fini(&s->s_mx);
+ nni_mtx_fini(&s->s_pipe_cbs_mtx);
NNI_FREE_STRUCT(s);
}
@@ -574,6 +566,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
nni_mtx_init(&s->s_mx);
+ nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &nni_sock_lk);
@@ -668,10 +661,10 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
return (rv);
}
-// nni_sock_shutdown shuts down the socket; after this point no further
-// access to the socket will function, and any threads blocked in entry
-// points will be woken (and the functions they are blocked in will return
-// NNG_ECLOSED.)
+// nni_sock_shutdown shuts down the socket; after this point no
+// further access to the socket will function, and any threads blocked
+// in entry points will be woken (and the functions they are blocked
+// in will return NNG_ECLOSED.)
int
nni_sock_shutdown(nni_sock *sock)
{
@@ -689,8 +682,8 @@ nni_sock_shutdown(nni_sock *sock)
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = true;
- // Close the EPs. This prevents new connections from forming but
- // but allows existing ones to drain.
+ // Close the EPs. This prevents new connections from forming
+ // but but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_shutdown(ep);
}
@@ -728,9 +721,9 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_lock(&sock->s_mx);
- // At this point, we've done everything we politely can to give
- // the protocol a chance to flush its write side. Now its time
- // to be a little more insistent.
+ // At this point, we've done everything we politely can to
+ // give the protocol a chance to flush its write side. Now
+ // its time to be a little more insistent.
// Close the upper queues immediately. This can happen
// safely while we hold the lock.
@@ -756,7 +749,8 @@ nni_sock_shutdown(nni_sock *sock)
nni_pipe_stop(pipe);
}
- // We have to wait for *both* endpoints and pipes to be removed.
+ // We have to wait for *both* endpoints and pipes to be
+ // removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
(!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
@@ -777,9 +771,9 @@ nni_sock_shutdown(nni_sock *sock)
}
// nni_sock_close shuts down the socket, then releases any resources
-// associated with it. It is a programmer error to reference the socket
-// after this function is called, as the pointer may reference invalid
-// memory or other objects.
+// associated with it. It is a programmer error to reference the
+// socket after this function is called, as the pointer may reference
+// invalid memory or other objects.
void
nni_sock_close(nni_sock *s)
{
@@ -789,8 +783,8 @@ nni_sock_close(nni_sock *s)
nni_mtx_lock(&nni_sock_lk);
if (s->s_closed) {
- // Some other thread called close. All we need to do is
- // drop our reference count.
+ // Some other thread called close. All we need to do
+ // is drop our reference count.
nni_mtx_unlock(&nni_sock_lk);
nni_sock_rele(s);
return;
@@ -835,8 +829,8 @@ nni_sock_closeall(void)
nni_mtx_unlock(&nni_sock_lk);
return;
}
- // Bump the reference count. The close call below will
- // drop it.
+ // Bump the reference count. The close call below
+ // will drop it.
s->s_refcnt++;
nni_list_node_remove(&s->s_node);
nni_mtx_unlock(&nni_sock_lk);
@@ -962,7 +956,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
// Protocol options. The protocol can override options that
- // the socket framework would otherwise supply, like buffer sizes.
+ // the socket framework would otherwise supply, like buffer
+ // sizes.
for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) {
if (strcmp(pso->pso_name, name) != 0) {
continue;
@@ -976,7 +971,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
return (rv);
}
- // Some options do not go down to transports. Handle them directly.
+ // Some options do not go down to transports. Handle them
+ // directly.
for (sso = nni_sock_options; sso->so_name != NULL; sso++) {
if (strcmp(sso->so_name, name) != 0) {
continue;
@@ -997,17 +993,20 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
return (rv);
}
- // Validation of transport options. This is stateless, so transports
- // should not fail to set an option later if they passed it here.
+ // Validation of transport options. This is stateless, so
+ // transports should not fail to set an option later if they
+ // passed it here.
rv = nni_tran_chkopt(name, v, sz, t);
- // Also check a few generic things. We do this if no transport
- // was found, or even if a transport rejected one of the settings.
+ // Also check a few generic things. We do this if no
+ // transport was found, or even if a transport rejected one of
+ // the settings.
if ((rv == NNG_ENOTSUP) || (rv == 0)) {
if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
size_t z;
- // just a sanity test on the size; it also ensures that
- // a size can be set even with no transport configured.
+ // just a sanity test on the size; it also
+ // ensures that a size can be set even with no
+ // transport configured.
rv = nni_copyin_size(&z, v, sz, 0, NNI_MAXSZ, t);
}
}
@@ -1049,9 +1048,10 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
}
- // Apply the options. Failure to set any option on any transport
- // (other than ENOTSUP) stops the operation altogether. Its
- // important that transport wide checks properly pre-validate.
+ // Apply the options. Failure to set any option on any
+ // transport (other than ENOTSUP) stops the operation
+ // altogether. Its important that transport wide checks
+ // properly pre-validate.
NNI_LIST_FOREACH (&s->s_eps, ep) {
int x;
if (optv->typ == NNI_TYPE_OPAQUE) {
@@ -1075,15 +1075,16 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
if (rv == 0) {
- // Remove and toss the old value, we are using a new one.
+ // Remove and toss the old value, we are using a new
+ // one.
if (oldv != NULL) {
nni_list_remove(&s->s_options, oldv);
nni_free_opt(oldv);
}
- // Insert our new value. This permits it to be compared
- // against later, and for new endpoints to automatically
- // receive these values,
+ // Insert our new value. This permits it to be
+ // compared against later, and for new endpoints to
+ // automatically receive these values,
nni_list_append(&s->s_options, optv);
} else {
nni_free_opt(optv);
@@ -1108,7 +1109,8 @@ nni_sock_getopt(nni_sock *s, const char *name, void *val, size_t *szp, int t)
}
// Protocol specific options. The protocol can override
- // options like the send buffer or notification descriptors this way.
+ // options like the send buffer or notification descriptors
+ // this way.
for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) {
if (strcmp(name, pso->pso_name) != 0) {
continue;
@@ -1166,12 +1168,14 @@ nni_sock_flags(nni_sock *sock)
}
void
-nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg)
+nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
{
- nni_mtx_lock(&sock->s_mx);
- sock->s_pipe_cb = cb;
- sock->s_pipe_cbarg = arg;
- nni_mtx_unlock(&sock->s_mx);
+ if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ s->s_pipe_cbs[ev].cb_fn = cb;
+ s->s_pipe_cbs[ev].cb_arg = arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+ }
}
int
@@ -1185,11 +1189,12 @@ nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
}
nni_mtx_lock(&nni_sock_lk);
if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) {
- // We refuse a reference if either the socket is closed,
- // or the context is closed. (If the socket is closed,
- // and we are only getting the reference so we can close it,
- // then we still allow. In the case the only valid operation
- // will be to close the socket.)
+ // We refuse a reference if either the socket is
+ // closed, or the context is closed. (If the socket
+ // is closed, and we are only getting the reference so
+ // we can close it, then we still allow. In the case
+ // the only valid operation will be to close the
+ // socket.)
if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
rv = NNG_ECLOSED;
} else {
@@ -1224,8 +1229,8 @@ nni_ctx_rele(nni_ctx *ctx)
nni_mtx_lock(&nni_sock_lk);
ctx->c_refcnt--;
if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) {
- // Either still have an active reference, or not actually
- // closing yet.
+ // Either still have an active reference, or not
+ // actually closing yet.
nni_mtx_unlock(&nni_sock_lk);
return;
}
@@ -1288,8 +1293,8 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
nni_mtx_unlock(&nni_sock_lk);
// Paranoia, fixing a possible race in close. Don't let us
- // give back a context if the socket is being shutdown (it might
- // not have reached the "closed" state yet.)
+ // give back a context if the socket is being shutdown (it
+ // might not have reached the "closed" state yet.)
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);
diff --git a/src/core/socket.h b/src/core/socket.h
index 22a13ef7..833129fa 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -44,7 +44,6 @@ extern uint32_t nni_sock_id(nni_sock *);
// a pipe could wind up orphaned.
extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
-extern int nni_sock_pipe_start(nni_sock *, nni_pipe *);
extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
@@ -71,8 +70,13 @@ extern uint32_t nni_sock_flags(nni_sock *);
// one of the only cases (the only?) where the socket core understands
// the public data types. (Other solutions might exist, but they require
// keeping extra state to support conversion between public and internal
-// types.)
-extern void nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb, void *);
+// types.) The second argument is a mask of events for which the callback
+// should be executed.
+extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *);
+
+extern void nni_sock_run_pipe_cb(nni_sock *sock, int, uint32_t);
+
+extern bool nni_sock_closing(nni_sock *sock);
// nni_ctx_open is used to open/create a new context structure.
// Contexts are not supported by most protocols, but for those that do,
diff --git a/src/nng.c b/src/nng.c
index 9e010978..a417fbee 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1015,7 +1015,7 @@ nng_getopt_string(nng_socket s, const char *name, char **valp)
}
int
-nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg)
+nng_pipe_notify(nng_socket s, int ev, nng_pipe_cb cb, void *arg)
{
int rv;
nni_sock *sock;
@@ -1027,7 +1027,7 @@ nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg)
return (rv);
}
- nni_sock_set_pipe_cb(sock, cb, arg);
+ nni_sock_set_pipe_cb(sock, ev, cb, arg);
nni_sock_rele(sock);
return (0);
}
diff --git a/src/nng.h b/src/nng.h
index 0c62dcb9..19823f0d 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -223,12 +223,18 @@ NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **);
// Only one callback can be set on a given socket, and there is no way
// to retrieve the old value.
typedef enum {
- NNG_PIPE_ADD, // Pipe added to socket
- NNG_PIPE_REM // Pipe removed from socket
-} nng_pipe_action;
-
-typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *);
-NNG_DECL int nng_pipe_notify(nng_socket, nng_pipe_cb, void *);
+ NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket
+ NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket
+ NNG_PIPE_EV_REM_POST, // Called just after poipe removed from socket
+ NNG_PIPE_EV_NUM, // Used internally, must be last.
+} nng_pipe_ev;
+
+typedef void (*nng_pipe_cb)(nng_pipe, int, void *);
+
+// nng_pipe_notify registers a callback to be executed when the
+// given event is triggered. To watch for different events, register
+// multiple times. Each event can have at most one callback registered.
+NNG_DECL int nng_pipe_notify(nng_socket, int, nng_pipe_cb, void *);
// nng_getopt_string is special -- it allocates a string to hold the
// resulting string, which should be freed with nng_strfree when it is
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index 852de7c8..2427e836 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -193,6 +193,11 @@ bus0_pipe_start(void *arg)
bus0_pipe *p = arg;
bus0_sock *s = p->psock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_BUS_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 2fb42df5..87900793 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -129,6 +129,11 @@ pair0_pipe_start(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
if (s->ppipe != NULL) {
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index ab01e451..584c147d 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -173,6 +173,11 @@ pair1_pipe_start(void *arg)
uint32_t id;
int rv;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V1) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
id = nni_pipe_id(p->npipe);
nni_mtx_lock(&s->mtx);
if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) {
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 81f6c137..f7d5d9a2 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -112,6 +112,11 @@ pull0_pipe_start(void *arg)
{
pull0_pipe *p = arg;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUSH_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
// Start the pending pull...
nni_pipe_recv(p->pipe, p->recv_aio);
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index c244b0ad..b4fa2e2a 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -143,6 +143,11 @@ sub0_pipe_start(void *arg)
{
sub0_pipe *p = arg;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index c483b777..24fc7335 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -346,6 +346,11 @@ rep0_pipe_start(void *arg)
rep0_sock *s = p->rep;
int rv;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 6dcfe6be..f6cd29eb 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -178,6 +178,11 @@ xrep0_pipe_start(void *arg)
xrep0_sock *s = p->rep;
int rv;
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_REQ_V0) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 7738a8b7..5f833ca6 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -341,6 +341,10 @@ resp0_pipe_start(void *arg)
resp0_sock *s = p->psock;
int rv;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) {
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
rv = nni_idhash_insert(s->pipes, p->id, p);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 51bce0c8..24a35003 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -338,6 +338,10 @@ surv0_pipe_start(void *arg)
surv0_pipe *p = arg;
surv0_sock *s = p->sock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index bcbbcbc7..13a3d759 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -164,6 +164,10 @@ xresp0_pipe_start(void *arg)
xresp0_sock *s = p->psock;
int rv;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) {
+ return (NNG_EPROTO);
+ }
+
p->id = nni_pipe_id(p->npipe);
nni_mtx_lock(&s->mtx);
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index 47ebef3c..db21e688 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -166,6 +166,10 @@ xsurv0_pipe_start(void *arg)
xsurv0_pipe *p = arg;
xsurv0_sock *s = p->psock;
+ if (nni_pipe_peer(p->npipe) != NNI_PROTO_RESPONDENT_V0) {
+ return (NNG_EPROTO);
+ }
+
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
nni_mtx_unlock(&s->mtx);
diff --git a/tests/pipe.c b/tests/pipe.c
index 3457db9b..79006fc8 100644
--- a/tests/pipe.c
+++ b/tests/pipe.c
@@ -27,27 +27,48 @@ struct testcase {
nng_dialer d;
nng_listener l;
nng_pipe p;
- int add;
+ int add_pre;
+ int add_post;
int rem;
int err;
+ int rej;
+ nng_mtx * lk;
};
+static int
+getval(struct testcase *t, int *vp)
+{
+ int rv;
+ nng_mtx_lock(t->lk);
+ rv = *vp;
+ nng_mtx_unlock(t->lk);
+ return (rv);
+}
+
void
-notify(nng_pipe p, nng_pipe_action act, void *arg)
+notify(nng_pipe p, int act, void *arg)
{
struct testcase *t = arg;
+ nng_mtx_lock(t->lk);
if ((nng_socket_id(nng_pipe_socket(p)) != nng_socket_id(t->s)) ||
(nng_listener_id(nng_pipe_listener(p)) != nng_listener_id(t->l)) ||
(nng_dialer_id(nng_pipe_dialer(p)) != nng_dialer_id(t->d))) {
t->err++;
+ nng_mtx_unlock(t->lk);
return;
}
+ if (t->add_post > t->add_pre) {
+ t->err++;
+ }
switch (act) {
- case NNG_PIPE_ADD:
- t->add++;
+ case NNG_PIPE_EV_ADD_PRE:
+ t->add_pre++;
+ break;
+ case NNG_PIPE_EV_ADD_POST:
+ t->add_post++;
break;
- case NNG_PIPE_REM:
+ case NNG_PIPE_EV_REM_POST:
t->rem++;
break;
default:
@@ -55,6 +76,21 @@ notify(nng_pipe p, nng_pipe_action act, void *arg)
return;
}
t->p = p;
+ nng_mtx_unlock(t->lk);
+}
+
+void
+reject(nng_pipe p, int act, void *arg)
+{
+ struct testcase *t = arg;
+ notify(p, act, arg);
+
+ nng_mtx_lock(t->lk);
+ if (!t->rej) {
+ nng_pipe_close(p);
+ t->rej++;
+ }
+ nng_mtx_unlock(t->lk);
}
char addr[64];
@@ -70,16 +106,30 @@ TestMain("Pipe notify works", {
memset(&pull, 0, sizeof(pull));
memset(&push, 0, sizeof(push));
+ So(nng_mtx_alloc(&push.lk) == 0);
+ So(nng_mtx_alloc(&pull.lk) == 0);
So(nng_push_open(&push.s) == 0);
So(nng_pull_open(&pull.s) == 0);
Reset({
nng_close(push.s);
nng_close(pull.s);
+ nng_mtx_free(push.lk);
+ nng_mtx_free(pull.lk);
});
- So(nng_pipe_notify(push.s, notify, &push) == 0);
- So(nng_pipe_notify(pull.s, notify, &pull) == 0);
+ So(nng_pipe_notify(
+ push.s, NNG_PIPE_EV_ADD_PRE, notify, &push) == 0);
+ So(nng_pipe_notify(
+ push.s, NNG_PIPE_EV_ADD_POST, notify, &push) == 0);
+ So(nng_pipe_notify(
+ push.s, NNG_PIPE_EV_REM_POST, notify, &push) == 0);
+ So(nng_pipe_notify(
+ pull.s, NNG_PIPE_EV_ADD_PRE, notify, &pull) == 0);
+ So(nng_pipe_notify(
+ pull.s, NNG_PIPE_EV_ADD_POST, notify, &pull) == 0);
+ So(nng_pipe_notify(
+ pull.s, NNG_PIPE_EV_REM_POST, notify, &pull) == 0);
So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0);
So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0);
@@ -92,12 +142,14 @@ TestMain("Pipe notify works", {
So(nng_listener_start(pull.l, 0) == 0);
So(nng_dialer_start(push.d, 0) == 0);
nng_msleep(100);
- So(pull.add == 1);
- So(pull.rem == 0);
- So(pull.err == 0);
- So(push.add == 1);
- So(push.rem == 0);
- So(push.err == 0);
+ So(getval(&pull, &pull.add_pre) == 1);
+ So(getval(&pull, &pull.add_post) == 1);
+ So(getval(&pull, &pull.rem) == 0);
+ So(getval(&pull, &pull.err) == 0);
+ So(getval(&push, &push.add_pre) == 1);
+ So(getval(&push, &push.add_post) == 1);
+ So(getval(&push, &push.rem) == 0);
+ So(getval(&push, &push.err) == 0);
Convey("We can send a frame", {
nng_msg *msg;
@@ -114,17 +166,20 @@ TestMain("Pipe notify works", {
});
Convey("Reconnection works", {
- So(pull.add == 1);
+ So(getval(&pull, &pull.add_pre) == 1);
+ So(getval(&pull, &pull.add_post) == 1);
nng_pipe_close(pull.p);
nng_msleep(200);
- So(pull.err == 0);
- So(pull.rem == 1);
- So(pull.add == 2);
+ So(getval(&pull, &pull.err) == 0);
+ So(getval(&pull, &pull.rem) == 1);
+ So(getval(&pull, &pull.add_pre) == 2);
+ So(getval(&pull, &pull.add_post) == 2);
- So(push.err == 0);
- So(push.rem == 1);
- So(push.add == 2);
+ So(getval(&push, &push.err) == 0);
+ So(getval(&push, &push.rem) == 1);
+ So(getval(&push, &push.add_pre) == 2);
+ So(getval(&push, &push.add_post) == 2);
Convey("They still exchange frames", {
nng_msg *msg;
@@ -145,5 +200,25 @@ TestMain("Pipe notify works", {
});
});
});
+
+ Convey("Reject works", {
+ So(nng_pipe_notify(pull.s, NNG_PIPE_EV_ADD_PRE, reject,
+ &pull) == 0);
+ So(nng_listener_create(&pull.l, pull.s, addr) == 0);
+ So(nng_dialer_create(&push.d, push.s, addr) == 0);
+ So(nng_listener_id(pull.l) > 0);
+ So(nng_dialer_id(push.d) > 0);
+ So(nng_listener_start(pull.l, 0) == 0);
+ So(nng_dialer_start(push.d, 0) == 0);
+ nng_msleep(100);
+ So(getval(&pull, &pull.add_pre) == 2);
+ So(getval(&pull, &pull.add_post) == 1);
+ So(getval(&pull, &pull.rem) == 1);
+ So(getval(&pull, &pull.err) == 0);
+ So(getval(&push, &push.add_pre) == 2);
+ So(getval(&push, &push.add_post) == 2);
+ So(getval(&push, &push.rem) == 1);
+ So(getval(&push, &push.err) == 0);
+ });
});
})