aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-02 16:01:22 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-03 06:41:19 -0700
commit159dc95695515db767e208c41d5f44b334b71adc (patch)
tree5390251d939c33b1d274d0767861d2012673e258
parent649e5d5ca9a03cef766e944c7735aab50041c012 (diff)
downloadnng-159dc95695515db767e208c41d5f44b334b71adc.tar.gz
nng-159dc95695515db767e208c41d5f44b334b71adc.tar.bz2
nng-159dc95695515db767e208c41d5f44b334b71adc.zip
fixes #389 Need pipe notification callbacks
This adds a new pipe event notification API (callbacks called on either pipe add or remove), including both tests and docs. Also supporting APIs to get the socket or endpoint associated with a pipe are included (tested and documented as well.)
-rw-r--r--docs/man/libnng.3.adoc4
-rw-r--r--docs/man/nng_pipe.5.adoc3
-rw-r--r--docs/man/nng_pipe_dialer.3.adoc46
-rw-r--r--docs/man/nng_pipe_listener.3.adoc46
-rw-r--r--docs/man/nng_pipe_notify.3.adoc71
-rw-r--r--docs/man/nng_pipe_socket.3.adoc45
-rw-r--r--src/core/pipe.c79
-rw-r--r--src/core/pipe.h14
-rw-r--r--src/core/socket.c113
-rw-r--r--src/core/socket.h9
-rw-r--r--src/nng.c59
-rw-r--r--src/nng.h17
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/pipe.c148
14 files changed, 587 insertions, 68 deletions
diff --git a/docs/man/libnng.3.adoc b/docs/man/libnng.3.adoc
index 61ceacc7..1164b613 100644
--- a/docs/man/libnng.3.adoc
+++ b/docs/man/libnng.3.adoc
@@ -73,8 +73,12 @@ Listeners accept incoming connection requets, and dialers make them.
|<<nng_listener_setopt.3#,nng_listener_setopt()>>|set listener option
|<<nng_listener_start.3#,nng_listener_start()>>|start listener
|<<nng_pipe_close.3#,nng_pipe_close()>>|close pipe
+|<<nng_pipe_dialer.3#,nng_pipe_dialer()>>|return dialer that created pipe
|<<nng_pipe_getopt.3#,nng_pipe_getopt()>>|get pipe option
|<<nng_pipe_id.3#,nng_pipe_id()>>|get numeric pipe identifier
+|<<nng_pipe_listener.3#,nng_pipe_listener()>>|return listener that created pipe
+|<<nng_pipe_notify.3#,nng_pipe_notify())>>|register pipe notification callback
+|<<nng_pipe_socket.3#,nng_pipe_socket())>>|return owning socket for pipe
|===
=== Message Handling Functions
diff --git a/docs/man/nng_pipe.5.adoc b/docs/man/nng_pipe.5.adoc
index a777c0d0..cf11c047 100644
--- a/docs/man/nng_pipe.5.adoc
+++ b/docs/man/nng_pipe.5.adoc
@@ -69,7 +69,10 @@ nng_pipe p = NNG_PIPE_INITIALIZER;
<<nng_msg_get_pipe.3#,nng_msg_get_pipe(3)>>,
<<nng_pipe_close.3#,nng_pipe_close(3)>>,
<<nng_pipe_getopt.3#,nng_pipe_getopt(3)>>,
+<<nng_pipe_dialer.3#,nng_pipe_dialer(3)>>,
<<nng_pipe_id.3#,nng_pipe_id(3)>>,
+<<nng_pipe_listener.3#,nng_pipe_listener(3)>>,
+<<nng_pipe_socket.3#,nng_pipe_socket(3)>>,
<<nng_dialer.5#,nng_dialer(5)>>,
<<nng_listener.5#,nng_listener(5)>>,
<<nng_options.5#,nng_options(5)>>,
diff --git a/docs/man/nng_pipe_dialer.3.adoc b/docs/man/nng_pipe_dialer.3.adoc
new file mode 100644
index 00000000..476ab1be
--- /dev/null
+++ b/docs/man/nng_pipe_dialer.3.adoc
@@ -0,0 +1,46 @@
+= nng_pipe_dialer(3)
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This document is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+== NAME
+
+nng_pipe_dialer - return dialer that created pipe
+
+== SYNOPSIS
+
+[source, c]
+----
+#include <nng/nng.h>
+
+nng_dialer nng_pipe_dialer(nng_pipe p);
+----
+
+== DESCRIPTION
+
+The `nng_pipe_dialer()` function returns the `<<nng_dialer.5#,nng_dialer>>`
+that created the pipe _p_.
+If the pipe was not created by a dialer, then the returned value will
+have an identifier (`<<nng_dialer_id.3#,nng_dialer_id()>>`) of `-1`.
+
+== RETURN VALUES
+
+This function returns the dialer that created the pipe, unless it was
+not created by a dialer, in which case a value initialized with
+`NNG_DIALER_INITIALIZER` will be returned.
+
+== ERRORS
+
+None.
+
+== SEE ALSO
+
+<<nng_pipe.5#,nng_pipe(5)>>,
+<<nng_dialer.5#,nng_dialer(5)>>,
+<<nng.7#,nng(7)>>
diff --git a/docs/man/nng_pipe_listener.3.adoc b/docs/man/nng_pipe_listener.3.adoc
new file mode 100644
index 00000000..3b7db4d6
--- /dev/null
+++ b/docs/man/nng_pipe_listener.3.adoc
@@ -0,0 +1,46 @@
+= nng_pipe_listener(3)
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This document is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+== NAME
+
+nng_pipe_listener - return listener that created pipe
+
+== SYNOPSIS
+
+[source, c]
+----
+#include <nng/nng.h>
+
+nng_listener nng_pipe_listener(nng_pipe p);
+----
+
+== DESCRIPTION
+
+The `nng_pipe_listener()` function returns the `<<nng_listener.5#,nng_lisener>>`
+that created the pipe _p_.
+If the pipe was not created by a listener, then the returned value will
+have an identifier (`<<nng_listener_id.3#,nng_listener_id()>>`) of `-1`.
+
+== RETURN VALUES
+
+This function returns the listener that created the pipe, unless it was
+not created by a listener, in which case a value initialized with
+`NNG_LISTENER_INITIALIZER` will be returned.
+
+== ERRORS
+
+None.
+
+== SEE ALSO
+
+<<nng_pipe.5#,nng_pipe(5)>>,
+<<nng_listener.5#,nng_listener(5)>>,
+<<nng.7#,nng(7)>>
diff --git a/docs/man/nng_pipe_notify.3.adoc b/docs/man/nng_pipe_notify.3.adoc
new file mode 100644
index 00000000..34b640c8
--- /dev/null
+++ b/docs/man/nng_pipe_notify.3.adoc
@@ -0,0 +1,71 @@
+= nng_pipe_notify(3)
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This document is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+== NAME
+
+nng_pipe_notify - register pipe notification callback
+
+== SYNOPSIS
+
+[source, c]
+----
+#include <nng/nng.h>
+
+typedef enum {
+ NNG_PIPE_ADD,
+ NNG_PIPE_REM,
+} nng_pipe_action;
+
+typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *);
+
+int nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg);
+----
+
+== DESCRIPTION
+
+The `nng_pipe_notify()` function registers the callback function _cb_
+to be called whenever a <<nng_pipe.5#,pipe>> is added to or removed from the
+socket _s_.
+
+The function _cb_ will be called with the action `NNG_PIPE_ADD` just before
+a pipe is added to the socket (as a result of a connection being established).
+The final argument passed will be the argument _arg_ that was specified when
+the function was registered.
+
+The function _cb_ will also be called with the action `NNG_PIPE_REM` when
+the pipe is being removed from the socket for any reason.
+This will also use the same argument _arg_.
+
+NOTE: Only one callback can be registered for a given socket.
+Subsequent calls to `nng_pipe_notify()` on the same socket will overwrite
+any prior registration.
+
+TIP: The callback _cb_ may reject a pipe for any reason by simply closing
+it using `<<nng_pipe_close.3#,nng_pipe_close()>>`.
+This might be done, for example, if the remote peer is not authorized to
+access this session, based on values determined with the aid of
+`<<nng_pipe_getopt.3#,nng_pipe_getopt()>>`.
+
+== RETURN VALUES
+
+This function returns 0 on success, and non-zero otherwise.
+
+== ERRORS
+
+`NNG_ECLOSED`:: The socket _s_ does not refer to an open socket.
+
+== SEE ALSO
+
+<<nng_pipe_close.3#,nng_pipe_close(3)>>,
+<<nng_pipe_getopt.3#,nng_pipe_getopt(3)>>,
+<<nng_pipe.5#,nng_pipe(5)>>,
+<<nng_socket.5#,nng_socket(5)>>,
+<<nng.7#,nng(7)>>
diff --git a/docs/man/nng_pipe_socket.3.adoc b/docs/man/nng_pipe_socket.3.adoc
new file mode 100644
index 00000000..f2d02aa3
--- /dev/null
+++ b/docs/man/nng_pipe_socket.3.adoc
@@ -0,0 +1,45 @@
+= nng_pipe_socket(3)
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This document is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+== NAME
+
+nng_pipe_socket - return owning socket for pipe
+
+== SYNOPSIS
+
+[source, c]
+----
+#include <nng/nng.h>
+
+nng_socket nng_pipe_socket(nng_pipe p);
+----
+
+== DESCRIPTION
+
+The `nng_pipe_socket()` function returns the `<<nng_socket.5#,nng_socket>>`
+that owns the pipe _p_.
+
+NOTE: The returned socket may be closed or in the process of closing, in
+which case it will not be usable with other functions.
+
+== RETURN VALUES
+
+This function returns the socket that "`owns`" the pipe.
+
+== ERRORS
+
+None.
+
+== SEE ALSO
+
+<<nng_pipe.5#,nng_pipe(5)>>,
+<<nng_socket.5#,nng_socket(5)>>,
+<<nng.7#,nng(7)>>
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f849e00e..010d306d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -18,7 +18,7 @@
// performed in the context of the protocol.
struct nni_pipe {
- uint64_t p_id;
+ uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
void * p_proto_data;
@@ -26,7 +26,7 @@ struct nni_pipe {
nni_list_node p_ep_node;
nni_sock * p_sock;
nni_ep * p_ep;
- bool p_reap;
+ bool p_closed;
bool p_stop;
int p_refcnt;
nni_mtx p_mtx;
@@ -106,6 +106,15 @@ nni_pipe_destroy(nni_pipe *p)
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
+ // We have exclusive access at this point, so we can check if
+ // we are still on any lists.
+ if (nni_list_node_active(&p->p_ep_node)) {
+ nni_ep_pipe_remove(p->p_ep, p);
+ }
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_sock_pipe_remove(p->p_sock, p);
+ }
+
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&nni_pipe_lk);
@@ -117,16 +126,6 @@ nni_pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
- // We have exclusive access at this point, so we can check if
- // we are still on any lists.
-
- if (nni_list_node_active(&p->p_ep_node)) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
-
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -141,13 +140,14 @@ nni_pipe_find(nni_pipe **pp, uint32_t id)
int rv;
nni_pipe *p;
nni_mtx_lock(&nni_pipe_lk);
+
+ // We don't care if the pipe is "closed". End users only have
+ // access to the pipe in order to obtain properties (which may
+ // be retried during the post-close notification callback) or to
+ // close the pipe.
if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) {
- if (p->p_reap) {
- rv = NNG_ECLOSED;
- } else {
- p->p_refcnt++;
- *pp = p;
- }
+ p->p_refcnt++;
+ *pp = p;
}
nni_mtx_unlock(&nni_pipe_lk);
return (rv);
@@ -168,7 +168,7 @@ nni_pipe_rele(nni_pipe *p)
uint32_t
nni_pipe_id(nni_pipe *p)
{
- return ((uint32_t) p->p_id);
+ return (p->p_id);
}
void
@@ -184,18 +184,18 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
}
// nni_pipe_close closes the underlying connection. It is expected that
-// subsequent attempts receive or send (including any waiting receive) will
+// subsequent attempts to receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
void
nni_pipe_close(nni_pipe *p)
{
nni_mtx_lock(&p->p_mtx);
- if (p->p_reap) {
+ if (p->p_closed) {
// We already did a close.
nni_mtx_unlock(&p->p_mtx);
return;
}
- p->p_reap = true;
+ p->p_closed = true;
// Close the underlying transport.
if (p->p_tran_data != NULL) {
@@ -208,6 +208,16 @@ nni_pipe_close(nni_pipe *p)
nni_aio_abort(p->p_start_aio, NNG_ECLOSED);
}
+bool
+nni_pipe_closed(nni_pipe *p)
+{
+ bool rv;
+ nni_mtx_lock(&p->p_mtx);
+ rv = p->p_closed;
+ nni_mtx_unlock(&p->p_mtx);
+ return (rv);
+}
+
void
nni_pipe_stop(nni_pipe *p)
{
@@ -270,7 +280,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
- p->p_reap = false;
+ p->p_closed = false;
p->p_stop = false;
p->p_refcnt = 0;
@@ -281,8 +291,11 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
+ uint64_t id;
nni_mtx_lock(&nni_pipe_lk);
- rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = (uint32_t) id;
+ }
nni_mtx_unlock(&nni_pipe_lk);
}
@@ -343,6 +356,24 @@ nni_pipe_ep_list_init(nni_list *list)
NNI_LIST_INIT(list, nni_pipe, p_ep_node);
}
+uint32_t
+nni_pipe_sock_id(nni_pipe *p)
+{
+ return (nni_sock_id(p->p_sock));
+}
+
+uint32_t
+nni_pipe_ep_id(nni_pipe *p)
+{
+ return (nni_ep_id(p->p_ep));
+}
+
+int
+nni_pipe_ep_mode(nni_pipe *p)
+{
+ return (nni_ep_mode(p->p_ep));
+}
+
static void
nni_pipe_reaper(void *notused)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index ea0c16db..18a59ddb 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -79,6 +79,20 @@ extern void nni_pipe_ep_list_init(nni_list *);
// pipe, which must be released by the caller when it is done.
extern int nni_pipe_find(nni_pipe **, uint32_t);
+// nni_pipe_sock_id returns the socket id for the pipe (used by public API).
+extern uint32_t nni_pipe_sock_id(nni_pipe *);
+
+// nni_pipe_ep_id returns the endpoint id for the pipe.
+extern uint32_t nni_pipe_ep_id(nni_pipe *);
+
+// nni_pipe_ep_mode returns the endpoint mode for the pipe.
+extern int nni_pipe_ep_mode(nni_pipe *);
+
+// nni_pipe_closed returns true if nni_pipe_close was called.
+// (This is used by the socket to determine if user closed the pipe
+// during callback.)
+extern bool nni_pipe_closed(nni_pipe *);
+
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index f5acd941..66580300 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -81,12 +81,11 @@ struct nni_socket {
nni_list s_pipes; // active pipes
nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
- bool s_ctxwait; // Waiting for contexts to close.
-
- nni_notifyfd s_send_fd;
- nni_notifyfd s_recv_fd;
+ bool s_closing; // Socket is closing
+ bool s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
+ nng_pipe_cb s_pipe_cb; // User callback for pipe events.
+ void * s_pipe_cbarg; // Argument for pipe events.
};
static void nni_ctx_destroy(nni_ctx *);
@@ -435,21 +434,41 @@ nni_sock_rele(nni_sock *s)
int
nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- void *pdata = nni_pipe_get_proto_data(pipe);
- int rv;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ nng_pipe_cb cb;
+ int rv;
NNI_ASSERT(s != NULL);
nni_mtx_lock(&s->s_mx);
- if (s->s_closing) {
- // We're closing, bail out.
- rv = NNG_ECLOSED;
- } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
+ if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
// Peer protocol mismatch.
- rv = NNG_EPROTO;
- } else {
- // Protocol can reject for other reasons.
- rv = s->s_pipe_ops.pipe_start(pdata);
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EPROTO);
+ }
+ if ((cb = s->s_pipe_cb) != NULL) {
+ nng_pipe p;
+ void * arg = s->s_pipe_cbarg;
+ nni_mtx_unlock(&s->s_mx);
+ p.id = nni_pipe_id(pipe);
+ cb(p, NNG_PIPE_ADD, arg);
+ if (nni_pipe_closed(pipe)) {
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_lock(&s->s_mx);
}
+ if (s->s_closing) {
+ // We're closing, bail out. This has to be done after
+ // we have dropped the lock above in case the sock is closed
+ // while the user callback runs.
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ // Protocol can reject for other reasons.
+ // This must be the last operation, until this point
+ // the protocol has not actually "seen" the pipe.
+ rv = s->s_pipe_ops.pipe_start(pdata);
+
nni_mtx_unlock(&s->s_mx);
return (rv);
}
@@ -484,9 +503,18 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- void *pdata;
+ void * pdata;
+ nng_pipe_cb cb;
nni_mtx_lock(&sock->s_mx);
+ if ((cb = sock->s_pipe_cb) != NULL) {
+ void * arg = sock->s_pipe_cbarg;
+ nng_pipe p;
+ nni_mtx_unlock(&sock->s_mx);
+ p.id = nni_pipe_id(pipe);
+ cb(p, NNG_PIPE_REM, arg);
+ nni_mtx_lock(&sock->s_mx);
+ }
pdata = nni_pipe_get_proto_data(pipe);
if (pdata != NULL) {
sock->s_pipe_ops.pipe_stop(pdata);
@@ -507,14 +535,6 @@ nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
- // Close any open notification pipes.
- if (s->s_recv_fd.sn_init) {
- nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
- }
- if (s->s_send_fd.sn_init) {
- nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd);
- }
-
// The protocol needs to clean up its state.
if (s->s_data != NULL) {
s->s_sock_ops.sock_fini(s->s_data);
@@ -546,21 +566,21 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->s_sndtimeo = -1;
- s->s_rcvtimeo = -1;
- s->s_closing = 0;
- s->s_reconn = NNI_SECOND;
- s->s_reconnmax = 0;
- s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
- s->s_id = 0;
- s->s_refcnt = 0;
- s->s_send_fd.sn_init = 0;
- s->s_recv_fd.sn_init = 0;
- s->s_self_id = proto->proto_self;
- s->s_peer_id = proto->proto_peer;
- s->s_flags = proto->proto_flags;
- s->s_sock_ops = *proto->proto_sock_ops;
- s->s_pipe_ops = *proto->proto_pipe_ops;
+ s->s_sndtimeo = -1;
+ s->s_rcvtimeo = -1;
+ s->s_closing = 0;
+ s->s_reconn = NNI_SECOND;
+ s->s_reconnmax = 0;
+ s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
+ s->s_id = 0;
+ s->s_refcnt = 0;
+ s->s_self_id = proto->proto_self;
+ s->s_peer_id = proto->proto_peer;
+ s->s_flags = proto->proto_flags;
+ s->s_sock_ops = *proto->proto_sock_ops;
+ s->s_pipe_ops = *proto->proto_pipe_ops;
+ s->s_closed = false;
+ s->s_closing = false;
if (proto->proto_ctx_ops != NULL) {
s->s_ctx_ops = *proto->proto_ctx_ops;
@@ -692,7 +712,7 @@ nni_sock_shutdown(nni_sock *sock)
return (NNG_ECLOSED);
}
// Mark us closing, so no more EPs or changes can occur.
- sock->s_closing = 1;
+ sock->s_closing = true;
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
@@ -800,7 +820,7 @@ nni_sock_close(nni_sock *s)
nni_sock_rele(s);
return;
}
- s->s_closed = 1;
+ s->s_closed = true;
nni_idhash_remove(nni_sock_hash, s->s_id);
// We might have been removed from the list already, e.g. by
@@ -1157,6 +1177,15 @@ nni_sock_flags(nni_sock *sock)
return (sock->s_flags);
}
+void
+nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg)
+{
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_cb = cb;
+ sock->s_pipe_cbarg = arg;
+ nni_mtx_unlock(&sock->s_mx);
+}
+
int
nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 87bf1374..bccda5b3 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -41,7 +41,7 @@ extern uint32_t nni_sock_id(nni_sock *);
// a pipe could wind up orphaned.
extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
-extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p);
+extern int nni_sock_pipe_start(nni_sock *, nni_pipe *);
extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
@@ -64,6 +64,13 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *);
// and or write are appropriate for the protocol.
extern uint32_t nni_sock_flags(nni_sock *);
+// This function is used by the public API to set callbacks. It is
+// one of the only cases (the only?) where the socket core understands
+// the public data types. (Other solutions might exist, but they require
+// keeping extra state to support conversion between public and internal
+// types.)
+extern void nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb, void *);
+
// nni_ctx_open is used to open/create a new context structure.
// Contexts are not supported by most protocols, but for those that do,
// this can offer some improvements for massive concurrency/scalability.
diff --git a/src/nng.c b/src/nng.c
index 4dc6c3fa..9e010978 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1015,6 +1015,24 @@ nng_getopt_string(nng_socket s, const char *name, char **valp)
}
int
+nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_sock_find(&sock, s.id)) != 0) {
+ return (rv);
+ }
+
+ nni_sock_set_pipe_cb(sock, cb, arg);
+ nni_sock_rele(sock);
+ return (0);
+}
+
+int
nng_device(nng_socket s1, nng_socket s2)
{
int rv;
@@ -1190,6 +1208,47 @@ nng_pipe_getopt_string(nng_pipe p, const char *name, char **valp)
return (nng_pipe_getx(p, name, valp, &sz, NNI_TYPE_STRING));
}
+nng_socket
+nng_pipe_socket(nng_pipe p)
+{
+ nng_socket s = NNG_SOCKET_INITIALIZER;
+ nni_pipe * pipe;
+
+ if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
+ s.id = nni_pipe_sock_id(pipe);
+ nni_pipe_rele(pipe);
+ }
+ return (s);
+}
+
+nng_dialer
+nng_pipe_dialer(nng_pipe p)
+{
+ nng_dialer d = NNG_DIALER_INITIALIZER;
+ nni_pipe * pipe;
+ if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
+ if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_DIAL) {
+ d.id = nni_pipe_ep_id(pipe);
+ }
+ nni_pipe_rele(pipe);
+ }
+ return (d);
+}
+
+nng_listener
+nng_pipe_listener(nng_pipe p)
+{
+ nng_listener l = NNG_LISTENER_INITIALIZER;
+ nni_pipe * pipe;
+ if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
+ if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_LISTEN) {
+ l.id = nni_pipe_ep_id(pipe);
+ }
+ nni_pipe_rele(pipe);
+ }
+ return (l);
+}
+
int
nng_pipe_close(nng_pipe p)
{
diff --git a/src/nng.h b/src/nng.h
index c3ae48ac..ac4f178b 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -217,6 +217,18 @@ NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *);
NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *);
NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **);
+// Arguably the pipe callback functions could be handled as an option,
+// but with the need to specify an argument, we find it best to unify
+// this as a separate function to pass in the argument and the callback.
+// Only one callback can be set on a given socket, and there is no way
+// to retrieve the old value.
+typedef enum {
+ NNG_PIPE_ADD,
+ NNG_PIPE_REM,
+} nng_pipe_action;
+typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *);
+NNG_DECL int nng_pipe_notify(nng_socket, nng_pipe_cb, void *);
+
// nng_getopt_string is special -- it allocates a string to hold the
// resulting string, which should be freed with nng_strfree when it is
// no logner needed.
@@ -600,6 +612,9 @@ NNG_DECL int nng_pipe_getopt_ptr(nng_pipe, const char *, void **);
NNG_DECL int nng_pipe_getopt_string(nng_pipe, const char *, char **);
NNG_DECL int nng_pipe_close(nng_pipe);
NNG_DECL int nng_pipe_id(nng_pipe);
+NNG_DECL nng_socket nng_pipe_socket(nng_pipe);
+NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe);
+NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
// Flags.
enum nng_flag_enum {
@@ -696,7 +711,7 @@ enum nng_flag_enum {
// state current). This is a boolean.
#define NNG_OPT_TCP_KEEPALIVE "tcp-keepalive"
-// XXX: TBD: priorities, ipv4only, TCP options
+// XXX: TBD: priorities, ipv4only
// Statistics. These are for informational purposes only, and subject
// to change without notice. The API for accessing these is stable,
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 3cfc89fa..0e3a033f 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -143,6 +143,7 @@ add_nng_test(message 5 ON)
add_nng_test(multistress 60 ON)
add_nng_test(nonblock 60 ON)
add_nng_test(options 5 ON)
+add_nng_test(pipe 5 ON)
add_nng_test(platform 5 ON)
add_nng_test(pollfd 5 ON)
add_nng_test(reconnect 5 ON)
diff --git a/tests/pipe.c b/tests/pipe.c
new file mode 100644
index 00000000..d5481c10
--- /dev/null
+++ b/tests/pipe.c
@@ -0,0 +1,148 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "protocol/pipeline0/pull.h"
+#include "protocol/pipeline0/push.h"
+#include "supplemental/util/platform.h"
+
+#include "stubs.h"
+#include <string.h>
+
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) \
+ So(nng_msg_len(m) == strlen(s)); \
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+
+struct testcase {
+ nng_socket s;
+ nng_dialer d;
+ nng_listener l;
+ nng_pipe p;
+ int add;
+ int rem;
+ int err;
+};
+
+void
+notify(nng_pipe p, nng_pipe_action act, void *arg)
+{
+ struct testcase *t = arg;
+
+ if ((nng_socket_id(nng_pipe_socket(p)) != nng_socket_id(t->s)) ||
+ (nng_listener_id(nng_pipe_listener(p)) != nng_listener_id(t->l)) ||
+ (nng_dialer_id(nng_pipe_dialer(p)) != nng_dialer_id(t->d))) {
+ t->err++;
+ return;
+ }
+ switch (act) {
+ case NNG_PIPE_ADD:
+ t->add++;
+ break;
+ case NNG_PIPE_REM:
+ t->rem++;
+ break;
+ default:
+ t->err++;
+ return;
+ }
+ t->p = p;
+}
+
+char addr[64];
+static int cnt;
+
+TestMain("Pipe notify works", {
+ atexit(nng_fini);
+
+ Convey("We can create a pipeline", {
+ struct testcase push;
+ struct testcase pull;
+ sprintf(addr, "inproc://test%d", cnt++);
+
+ memset(&pull, 0, sizeof(pull));
+ memset(&push, 0, sizeof(push));
+ So(nng_push_open(&push.s) == 0);
+ So(nng_pull_open(&pull.s) == 0);
+
+ Reset({
+ nng_close(push.s);
+ nng_close(pull.s);
+ });
+
+ So(nng_pipe_notify(push.s, notify, &push) == 0);
+ So(nng_pipe_notify(pull.s, notify, &pull) == 0);
+
+ So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0);
+ So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0);
+
+ Convey("Dialing works", {
+ So(nng_listen(pull.s, addr, &pull.l, 0) == 0);
+ So(nng_listener_id(pull.l) > 0);
+ So(nng_dial(push.s, addr, &push.d, 0) == 0);
+ So(nng_dialer_id(push.d) > 0);
+ nng_msleep(200);
+ So(pull.add == 1);
+ So(pull.rem == 0);
+ So(pull.err == 0);
+ So(push.add == 1);
+ So(push.rem == 0);
+ So(push.err == 0);
+ Convey("We can send a frame", {
+ nng_msg *msg;
+
+ nng_msleep(200);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "hello");
+ So(nng_sendmsg(push.s, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(pull.s, &msg, 0) == 0);
+ So(msg != NULL);
+ CHECKSTR(msg, "hello");
+ So(nng_pipe_id(nng_msg_get_pipe(msg)) ==
+ nng_pipe_id(pull.p));
+ nng_msg_free(msg);
+ });
+
+ Convey("Reconnection works", {
+ So(pull.add == 1);
+ nng_pipe_close(pull.p);
+ nng_msleep(200);
+
+ So(pull.err == 0);
+ So(pull.rem == 1);
+ So(pull.add == 2);
+
+ So(push.err == 0);
+ So(push.rem == 1);
+ So(push.add == 2);
+
+ Convey("They still exchange frames", {
+ nng_msg *msg;
+ nng_pipe p1;
+
+ nng_msleep(200);
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "hello");
+ So(nng_sendmsg(push.s, msg, 0) == 0);
+ msg = NULL;
+ So(nng_recvmsg(pull.s, &msg, 0) == 0);
+ So(msg != NULL);
+ CHECKSTR(msg, "hello");
+ p1 = nng_msg_get_pipe(msg);
+ nng_msg_free(msg);
+ So(nng_pipe_id(p1) ==
+ nng_pipe_id(pull.p));
+ });
+ });
+ });
+ });
+});