diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-05-02 16:01:22 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-05-03 06:41:19 -0700 |
| commit | 159dc95695515db767e208c41d5f44b334b71adc (patch) | |
| tree | 5390251d939c33b1d274d0767861d2012673e258 | |
| parent | 649e5d5ca9a03cef766e944c7735aab50041c012 (diff) | |
| download | nng-159dc95695515db767e208c41d5f44b334b71adc.tar.gz nng-159dc95695515db767e208c41d5f44b334b71adc.tar.bz2 nng-159dc95695515db767e208c41d5f44b334b71adc.zip | |
fixes #389 Need pipe notification callbacks
This adds a new pipe event notification API (callbacks called
on either pipe add or remove), including both tests and docs.
Also supporting APIs to get the socket or endpoint associated
with a pipe are included (tested and documented as well.)
| -rw-r--r-- | docs/man/libnng.3.adoc | 4 | ||||
| -rw-r--r-- | docs/man/nng_pipe.5.adoc | 3 | ||||
| -rw-r--r-- | docs/man/nng_pipe_dialer.3.adoc | 46 | ||||
| -rw-r--r-- | docs/man/nng_pipe_listener.3.adoc | 46 | ||||
| -rw-r--r-- | docs/man/nng_pipe_notify.3.adoc | 71 | ||||
| -rw-r--r-- | docs/man/nng_pipe_socket.3.adoc | 45 | ||||
| -rw-r--r-- | src/core/pipe.c | 79 | ||||
| -rw-r--r-- | src/core/pipe.h | 14 | ||||
| -rw-r--r-- | src/core/socket.c | 113 | ||||
| -rw-r--r-- | src/core/socket.h | 9 | ||||
| -rw-r--r-- | src/nng.c | 59 | ||||
| -rw-r--r-- | src/nng.h | 17 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/pipe.c | 148 |
14 files changed, 587 insertions, 68 deletions
diff --git a/docs/man/libnng.3.adoc b/docs/man/libnng.3.adoc index 61ceacc7..1164b613 100644 --- a/docs/man/libnng.3.adoc +++ b/docs/man/libnng.3.adoc @@ -73,8 +73,12 @@ Listeners accept incoming connection requets, and dialers make them. |<<nng_listener_setopt.3#,nng_listener_setopt()>>|set listener option |<<nng_listener_start.3#,nng_listener_start()>>|start listener |<<nng_pipe_close.3#,nng_pipe_close()>>|close pipe +|<<nng_pipe_dialer.3#,nng_pipe_dialer()>>|return dialer that created pipe |<<nng_pipe_getopt.3#,nng_pipe_getopt()>>|get pipe option |<<nng_pipe_id.3#,nng_pipe_id()>>|get numeric pipe identifier +|<<nng_pipe_listener.3#,nng_pipe_listener()>>|return listener that created pipe +|<<nng_pipe_notify.3#,nng_pipe_notify())>>|register pipe notification callback +|<<nng_pipe_socket.3#,nng_pipe_socket())>>|return owning socket for pipe |=== === Message Handling Functions diff --git a/docs/man/nng_pipe.5.adoc b/docs/man/nng_pipe.5.adoc index a777c0d0..cf11c047 100644 --- a/docs/man/nng_pipe.5.adoc +++ b/docs/man/nng_pipe.5.adoc @@ -69,7 +69,10 @@ nng_pipe p = NNG_PIPE_INITIALIZER; <<nng_msg_get_pipe.3#,nng_msg_get_pipe(3)>>, <<nng_pipe_close.3#,nng_pipe_close(3)>>, <<nng_pipe_getopt.3#,nng_pipe_getopt(3)>>, +<<nng_pipe_dialer.3#,nng_pipe_dialer(3)>>, <<nng_pipe_id.3#,nng_pipe_id(3)>>, +<<nng_pipe_listener.3#,nng_pipe_listener(3)>>, +<<nng_pipe_socket.3#,nng_pipe_socket(3)>>, <<nng_dialer.5#,nng_dialer(5)>>, <<nng_listener.5#,nng_listener(5)>>, <<nng_options.5#,nng_options(5)>>, diff --git a/docs/man/nng_pipe_dialer.3.adoc b/docs/man/nng_pipe_dialer.3.adoc new file mode 100644 index 00000000..476ab1be --- /dev/null +++ b/docs/man/nng_pipe_dialer.3.adoc @@ -0,0 +1,46 @@ += nng_pipe_dialer(3) +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_pipe_dialer - return dialer that created pipe + +== SYNOPSIS + +[source, c] +---- +#include <nng/nng.h> + +nng_dialer nng_pipe_dialer(nng_pipe p); +---- + +== DESCRIPTION + +The `nng_pipe_dialer()` function returns the `<<nng_dialer.5#,nng_dialer>>` +that created the pipe _p_. +If the pipe was not created by a dialer, then the returned value will +have an identifier (`<<nng_dialer_id.3#,nng_dialer_id()>>`) of `-1`. + +== RETURN VALUES + +This function returns the dialer that created the pipe, unless it was +not created by a dialer, in which case a value initialized with +`NNG_DIALER_INITIALIZER` will be returned. + +== ERRORS + +None. + +== SEE ALSO + +<<nng_pipe.5#,nng_pipe(5)>>, +<<nng_dialer.5#,nng_dialer(5)>>, +<<nng.7#,nng(7)>> diff --git a/docs/man/nng_pipe_listener.3.adoc b/docs/man/nng_pipe_listener.3.adoc new file mode 100644 index 00000000..3b7db4d6 --- /dev/null +++ b/docs/man/nng_pipe_listener.3.adoc @@ -0,0 +1,46 @@ += nng_pipe_listener(3) +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_pipe_listener - return listener that created pipe + +== SYNOPSIS + +[source, c] +---- +#include <nng/nng.h> + +nng_listener nng_pipe_listener(nng_pipe p); +---- + +== DESCRIPTION + +The `nng_pipe_listener()` function returns the `<<nng_listener.5#,nng_lisener>>` +that created the pipe _p_. +If the pipe was not created by a listener, then the returned value will +have an identifier (`<<nng_listener_id.3#,nng_listener_id()>>`) of `-1`. + +== RETURN VALUES + +This function returns the listener that created the pipe, unless it was +not created by a listener, in which case a value initialized with +`NNG_LISTENER_INITIALIZER` will be returned. + +== ERRORS + +None. + +== SEE ALSO + +<<nng_pipe.5#,nng_pipe(5)>>, +<<nng_listener.5#,nng_listener(5)>>, +<<nng.7#,nng(7)>> diff --git a/docs/man/nng_pipe_notify.3.adoc b/docs/man/nng_pipe_notify.3.adoc new file mode 100644 index 00000000..34b640c8 --- /dev/null +++ b/docs/man/nng_pipe_notify.3.adoc @@ -0,0 +1,71 @@ += nng_pipe_notify(3) +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_pipe_notify - register pipe notification callback + +== SYNOPSIS + +[source, c] +---- +#include <nng/nng.h> + +typedef enum { + NNG_PIPE_ADD, + NNG_PIPE_REM, +} nng_pipe_action; + +typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *); + +int nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg); +---- + +== DESCRIPTION + +The `nng_pipe_notify()` function registers the callback function _cb_ +to be called whenever a <<nng_pipe.5#,pipe>> is added to or removed from the +socket _s_. + +The function _cb_ will be called with the action `NNG_PIPE_ADD` just before +a pipe is added to the socket (as a result of a connection being established). +The final argument passed will be the argument _arg_ that was specified when +the function was registered. + +The function _cb_ will also be called with the action `NNG_PIPE_REM` when +the pipe is being removed from the socket for any reason. +This will also use the same argument _arg_. + +NOTE: Only one callback can be registered for a given socket. +Subsequent calls to `nng_pipe_notify()` on the same socket will overwrite +any prior registration. + +TIP: The callback _cb_ may reject a pipe for any reason by simply closing +it using `<<nng_pipe_close.3#,nng_pipe_close()>>`. +This might be done, for example, if the remote peer is not authorized to +access this session, based on values determined with the aid of +`<<nng_pipe_getopt.3#,nng_pipe_getopt()>>`. + +== RETURN VALUES + +This function returns 0 on success, and non-zero otherwise. + +== ERRORS + +`NNG_ECLOSED`:: The socket _s_ does not refer to an open socket. + +== SEE ALSO + +<<nng_pipe_close.3#,nng_pipe_close(3)>>, +<<nng_pipe_getopt.3#,nng_pipe_getopt(3)>>, +<<nng_pipe.5#,nng_pipe(5)>>, +<<nng_socket.5#,nng_socket(5)>>, +<<nng.7#,nng(7)>> diff --git a/docs/man/nng_pipe_socket.3.adoc b/docs/man/nng_pipe_socket.3.adoc new file mode 100644 index 00000000..f2d02aa3 --- /dev/null +++ b/docs/man/nng_pipe_socket.3.adoc @@ -0,0 +1,45 @@ += nng_pipe_socket(3) +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_pipe_socket - return owning socket for pipe + +== SYNOPSIS + +[source, c] +---- +#include <nng/nng.h> + +nng_socket nng_pipe_socket(nng_pipe p); +---- + +== DESCRIPTION + +The `nng_pipe_socket()` function returns the `<<nng_socket.5#,nng_socket>>` +that owns the pipe _p_. + +NOTE: The returned socket may be closed or in the process of closing, in +which case it will not be usable with other functions. + +== RETURN VALUES + +This function returns the socket that "`owns`" the pipe. + +== ERRORS + +None. + +== SEE ALSO + +<<nng_pipe.5#,nng_pipe(5)>>, +<<nng_socket.5#,nng_socket(5)>>, +<<nng.7#,nng(7)>> diff --git a/src/core/pipe.c b/src/core/pipe.c index f849e00e..010d306d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -18,7 +18,7 @@ // performed in the context of the protocol. struct nni_pipe { - uint64_t p_id; + uint32_t p_id; nni_tran_pipe p_tran_ops; void * p_tran_data; void * p_proto_data; @@ -26,7 +26,7 @@ struct nni_pipe { nni_list_node p_ep_node; nni_sock * p_sock; nni_ep * p_ep; - bool p_reap; + bool p_closed; bool p_stop; int p_refcnt; nni_mtx p_mtx; @@ -106,6 +106,15 @@ nni_pipe_destroy(nni_pipe *p) // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); + // We have exclusive access at this point, so we can check if + // we are still on any lists. + if (nni_list_node_active(&p->p_ep_node)) { + nni_ep_pipe_remove(p->p_ep, p); + } + if (nni_list_node_active(&p->p_sock_node)) { + nni_sock_pipe_remove(p->p_sock, p); + } + // Make sure any unlocked holders are done with this. // This happens during initialization for example. nni_mtx_lock(&nni_pipe_lk); @@ -117,16 +126,6 @@ nni_pipe_destroy(nni_pipe *p) } nni_mtx_unlock(&nni_pipe_lk); - // We have exclusive access at this point, so we can check if - // we are still on any lists. - - if (nni_list_node_active(&p->p_ep_node)) { - nni_ep_pipe_remove(p->p_ep, p); - } - if (nni_list_node_active(&p->p_sock_node)) { - nni_sock_pipe_remove(p->p_sock, p); - } - if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -141,13 +140,14 @@ nni_pipe_find(nni_pipe **pp, uint32_t id) int rv; nni_pipe *p; nni_mtx_lock(&nni_pipe_lk); + + // We don't care if the pipe is "closed". End users only have + // access to the pipe in order to obtain properties (which may + // be retried during the post-close notification callback) or to + // close the pipe. if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) { - if (p->p_reap) { - rv = NNG_ECLOSED; - } else { - p->p_refcnt++; - *pp = p; - } + p->p_refcnt++; + *pp = p; } nni_mtx_unlock(&nni_pipe_lk); return (rv); @@ -168,7 +168,7 @@ nni_pipe_rele(nni_pipe *p) uint32_t nni_pipe_id(nni_pipe *p) { - return ((uint32_t) p->p_id); + return (p->p_id); } void @@ -184,18 +184,18 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio) } // nni_pipe_close closes the underlying connection. It is expected that -// subsequent attempts receive or send (including any waiting receive) will +// subsequent attempts to receive or send (including any waiting receive) will // simply return NNG_ECLOSED. void nni_pipe_close(nni_pipe *p) { nni_mtx_lock(&p->p_mtx); - if (p->p_reap) { + if (p->p_closed) { // We already did a close. nni_mtx_unlock(&p->p_mtx); return; } - p->p_reap = true; + p->p_closed = true; // Close the underlying transport. if (p->p_tran_data != NULL) { @@ -208,6 +208,16 @@ nni_pipe_close(nni_pipe *p) nni_aio_abort(p->p_start_aio, NNG_ECLOSED); } +bool +nni_pipe_closed(nni_pipe *p) +{ + bool rv; + nni_mtx_lock(&p->p_mtx); + rv = p->p_closed; + nni_mtx_unlock(&p->p_mtx); + return (rv); +} + void nni_pipe_stop(nni_pipe *p) { @@ -270,7 +280,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_proto_data = NULL; p->p_ep = ep; p->p_sock = sock; - p->p_reap = false; + p->p_closed = false; p->p_stop = false; p->p_refcnt = 0; @@ -281,8 +291,11 @@ nni_pipe_create(nni_ep *ep, void *tdata) nni_mtx_init(&p->p_mtx); nni_cv_init(&p->p_cv, &nni_pipe_lk); if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) { + uint64_t id; nni_mtx_lock(&nni_pipe_lk); - rv = nni_idhash_alloc(nni_pipes, &p->p_id, p); + if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) { + p->p_id = (uint32_t) id; + } nni_mtx_unlock(&nni_pipe_lk); } @@ -343,6 +356,24 @@ nni_pipe_ep_list_init(nni_list *list) NNI_LIST_INIT(list, nni_pipe, p_ep_node); } +uint32_t +nni_pipe_sock_id(nni_pipe *p) +{ + return (nni_sock_id(p->p_sock)); +} + +uint32_t +nni_pipe_ep_id(nni_pipe *p) +{ + return (nni_ep_id(p->p_ep)); +} + +int +nni_pipe_ep_mode(nni_pipe *p) +{ + return (nni_ep_mode(p->p_ep)); +} + static void nni_pipe_reaper(void *notused) { diff --git a/src/core/pipe.h b/src/core/pipe.h index ea0c16db..18a59ddb 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -79,6 +79,20 @@ extern void nni_pipe_ep_list_init(nni_list *); // pipe, which must be released by the caller when it is done. extern int nni_pipe_find(nni_pipe **, uint32_t); +// nni_pipe_sock_id returns the socket id for the pipe (used by public API). +extern uint32_t nni_pipe_sock_id(nni_pipe *); + +// nni_pipe_ep_id returns the endpoint id for the pipe. +extern uint32_t nni_pipe_ep_id(nni_pipe *); + +// nni_pipe_ep_mode returns the endpoint mode for the pipe. +extern int nni_pipe_ep_mode(nni_pipe *); + +// nni_pipe_closed returns true if nni_pipe_close was called. +// (This is used by the socket to determine if user closed the pipe +// during callback.) +extern bool nni_pipe_closed(nni_pipe *); + // nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. extern void nni_pipe_rele(nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index f5acd941..66580300 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -81,12 +81,11 @@ struct nni_socket { nni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock - bool s_ctxwait; // Waiting for contexts to close. - - nni_notifyfd s_send_fd; - nni_notifyfd s_recv_fd; + bool s_closing; // Socket is closing + bool s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. + nng_pipe_cb s_pipe_cb; // User callback for pipe events. + void * s_pipe_cbarg; // Argument for pipe events. }; static void nni_ctx_destroy(nni_ctx *); @@ -435,21 +434,41 @@ nni_sock_rele(nni_sock *s) int nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) { - void *pdata = nni_pipe_get_proto_data(pipe); - int rv; + void * pdata = nni_pipe_get_proto_data(pipe); + nng_pipe_cb cb; + int rv; NNI_ASSERT(s != NULL); nni_mtx_lock(&s->s_mx); - if (s->s_closing) { - // We're closing, bail out. - rv = NNG_ECLOSED; - } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { + if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { // Peer protocol mismatch. - rv = NNG_EPROTO; - } else { - // Protocol can reject for other reasons. - rv = s->s_pipe_ops.pipe_start(pdata); + nni_mtx_unlock(&s->s_mx); + return (NNG_EPROTO); + } + if ((cb = s->s_pipe_cb) != NULL) { + nng_pipe p; + void * arg = s->s_pipe_cbarg; + nni_mtx_unlock(&s->s_mx); + p.id = nni_pipe_id(pipe); + cb(p, NNG_PIPE_ADD, arg); + if (nni_pipe_closed(pipe)) { + return (NNG_ECLOSED); + } + nni_mtx_lock(&s->s_mx); } + if (s->s_closing) { + // We're closing, bail out. This has to be done after + // we have dropped the lock above in case the sock is closed + // while the user callback runs. + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + // Protocol can reject for other reasons. + // This must be the last operation, until this point + // the protocol has not actually "seen" the pipe. + rv = s->s_pipe_ops.pipe_start(pdata); + nni_mtx_unlock(&s->s_mx); return (rv); } @@ -484,9 +503,18 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p) void nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { - void *pdata; + void * pdata; + nng_pipe_cb cb; nni_mtx_lock(&sock->s_mx); + if ((cb = sock->s_pipe_cb) != NULL) { + void * arg = sock->s_pipe_cbarg; + nng_pipe p; + nni_mtx_unlock(&sock->s_mx); + p.id = nni_pipe_id(pipe); + cb(p, NNG_PIPE_REM, arg); + nni_mtx_lock(&sock->s_mx); + } pdata = nni_pipe_get_proto_data(pipe); if (pdata != NULL) { sock->s_pipe_ops.pipe_stop(pdata); @@ -507,14 +535,6 @@ nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; - // Close any open notification pipes. - if (s->s_recv_fd.sn_init) { - nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); - } - if (s->s_send_fd.sn_init) { - nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd); - } - // The protocol needs to clean up its state. if (s->s_data != NULL) { s->s_sock_ops.sock_fini(s->s_data); @@ -546,21 +566,21 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->s_sndtimeo = -1; - s->s_rcvtimeo = -1; - s->s_closing = 0; - s->s_reconn = NNI_SECOND; - s->s_reconnmax = 0; - s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default - s->s_id = 0; - s->s_refcnt = 0; - s->s_send_fd.sn_init = 0; - s->s_recv_fd.sn_init = 0; - s->s_self_id = proto->proto_self; - s->s_peer_id = proto->proto_peer; - s->s_flags = proto->proto_flags; - s->s_sock_ops = *proto->proto_sock_ops; - s->s_pipe_ops = *proto->proto_pipe_ops; + s->s_sndtimeo = -1; + s->s_rcvtimeo = -1; + s->s_closing = 0; + s->s_reconn = NNI_SECOND; + s->s_reconnmax = 0; + s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default + s->s_id = 0; + s->s_refcnt = 0; + s->s_self_id = proto->proto_self; + s->s_peer_id = proto->proto_peer; + s->s_flags = proto->proto_flags; + s->s_sock_ops = *proto->proto_sock_ops; + s->s_pipe_ops = *proto->proto_pipe_ops; + s->s_closed = false; + s->s_closing = false; if (proto->proto_ctx_ops != NULL) { s->s_ctx_ops = *proto->proto_ctx_ops; @@ -692,7 +712,7 @@ nni_sock_shutdown(nni_sock *sock) return (NNG_ECLOSED); } // Mark us closing, so no more EPs or changes can occur. - sock->s_closing = 1; + sock->s_closing = true; // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. @@ -800,7 +820,7 @@ nni_sock_close(nni_sock *s) nni_sock_rele(s); return; } - s->s_closed = 1; + s->s_closed = true; nni_idhash_remove(nni_sock_hash, s->s_id); // We might have been removed from the list already, e.g. by @@ -1157,6 +1177,15 @@ nni_sock_flags(nni_sock *sock) return (sock->s_flags); } +void +nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg) +{ + nni_mtx_lock(&sock->s_mx); + sock->s_pipe_cb = cb; + sock->s_pipe_cbarg = arg; + nni_mtx_unlock(&sock->s_mx); +} + int nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) { diff --git a/src/core/socket.h b/src/core/socket.h index 87bf1374..bccda5b3 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -41,7 +41,7 @@ extern uint32_t nni_sock_id(nni_sock *); // a pipe could wind up orphaned. extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); -extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p); +extern int nni_sock_pipe_start(nni_sock *, nni_pipe *); extern int nni_sock_ep_add(nni_sock *, nni_ep *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); @@ -64,6 +64,13 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *); // and or write are appropriate for the protocol. extern uint32_t nni_sock_flags(nni_sock *); +// This function is used by the public API to set callbacks. It is +// one of the only cases (the only?) where the socket core understands +// the public data types. (Other solutions might exist, but they require +// keeping extra state to support conversion between public and internal +// types.) +extern void nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb, void *); + // nni_ctx_open is used to open/create a new context structure. // Contexts are not supported by most protocols, but for those that do, // this can offer some improvements for massive concurrency/scalability. @@ -1015,6 +1015,24 @@ nng_getopt_string(nng_socket s, const char *name, char **valp) } int +nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg) +{ + int rv; + nni_sock *sock; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_sock_find(&sock, s.id)) != 0) { + return (rv); + } + + nni_sock_set_pipe_cb(sock, cb, arg); + nni_sock_rele(sock); + return (0); +} + +int nng_device(nng_socket s1, nng_socket s2) { int rv; @@ -1190,6 +1208,47 @@ nng_pipe_getopt_string(nng_pipe p, const char *name, char **valp) return (nng_pipe_getx(p, name, valp, &sz, NNI_TYPE_STRING)); } +nng_socket +nng_pipe_socket(nng_pipe p) +{ + nng_socket s = NNG_SOCKET_INITIALIZER; + nni_pipe * pipe; + + if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) { + s.id = nni_pipe_sock_id(pipe); + nni_pipe_rele(pipe); + } + return (s); +} + +nng_dialer +nng_pipe_dialer(nng_pipe p) +{ + nng_dialer d = NNG_DIALER_INITIALIZER; + nni_pipe * pipe; + if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) { + if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_DIAL) { + d.id = nni_pipe_ep_id(pipe); + } + nni_pipe_rele(pipe); + } + return (d); +} + +nng_listener +nng_pipe_listener(nng_pipe p) +{ + nng_listener l = NNG_LISTENER_INITIALIZER; + nni_pipe * pipe; + if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) { + if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_LISTEN) { + l.id = nni_pipe_ep_id(pipe); + } + nni_pipe_rele(pipe); + } + return (l); +} + int nng_pipe_close(nng_pipe p) { @@ -217,6 +217,18 @@ NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *); NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *); NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **); +// Arguably the pipe callback functions could be handled as an option, +// but with the need to specify an argument, we find it best to unify +// this as a separate function to pass in the argument and the callback. +// Only one callback can be set on a given socket, and there is no way +// to retrieve the old value. +typedef enum { + NNG_PIPE_ADD, + NNG_PIPE_REM, +} nng_pipe_action; +typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *); +NNG_DECL int nng_pipe_notify(nng_socket, nng_pipe_cb, void *); + // nng_getopt_string is special -- it allocates a string to hold the // resulting string, which should be freed with nng_strfree when it is // no logner needed. @@ -600,6 +612,9 @@ NNG_DECL int nng_pipe_getopt_ptr(nng_pipe, const char *, void **); NNG_DECL int nng_pipe_getopt_string(nng_pipe, const char *, char **); NNG_DECL int nng_pipe_close(nng_pipe); NNG_DECL int nng_pipe_id(nng_pipe); +NNG_DECL nng_socket nng_pipe_socket(nng_pipe); +NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe); +NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // Flags. enum nng_flag_enum { @@ -696,7 +711,7 @@ enum nng_flag_enum { // state current). This is a boolean. #define NNG_OPT_TCP_KEEPALIVE "tcp-keepalive" -// XXX: TBD: priorities, ipv4only, TCP options +// XXX: TBD: priorities, ipv4only // Statistics. These are for informational purposes only, and subject // to change without notice. The API for accessing these is stable, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 3cfc89fa..0e3a033f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -143,6 +143,7 @@ add_nng_test(message 5 ON) add_nng_test(multistress 60 ON) add_nng_test(nonblock 60 ON) add_nng_test(options 5 ON) +add_nng_test(pipe 5 ON) add_nng_test(platform 5 ON) add_nng_test(pollfd 5 ON) add_nng_test(reconnect 5 ON) diff --git a/tests/pipe.c b/tests/pipe.c new file mode 100644 index 00000000..d5481c10 --- /dev/null +++ b/tests/pipe.c @@ -0,0 +1,148 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/pipeline0/pull.h" +#include "protocol/pipeline0/push.h" +#include "supplemental/util/platform.h" + +#include "stubs.h" +#include <string.h> + +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) \ + So(nng_msg_len(m) == strlen(s)); \ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) + +struct testcase { + nng_socket s; + nng_dialer d; + nng_listener l; + nng_pipe p; + int add; + int rem; + int err; +}; + +void +notify(nng_pipe p, nng_pipe_action act, void *arg) +{ + struct testcase *t = arg; + + if ((nng_socket_id(nng_pipe_socket(p)) != nng_socket_id(t->s)) || + (nng_listener_id(nng_pipe_listener(p)) != nng_listener_id(t->l)) || + (nng_dialer_id(nng_pipe_dialer(p)) != nng_dialer_id(t->d))) { + t->err++; + return; + } + switch (act) { + case NNG_PIPE_ADD: + t->add++; + break; + case NNG_PIPE_REM: + t->rem++; + break; + default: + t->err++; + return; + } + t->p = p; +} + +char addr[64]; +static int cnt; + +TestMain("Pipe notify works", { + atexit(nng_fini); + + Convey("We can create a pipeline", { + struct testcase push; + struct testcase pull; + sprintf(addr, "inproc://test%d", cnt++); + + memset(&pull, 0, sizeof(pull)); + memset(&push, 0, sizeof(push)); + So(nng_push_open(&push.s) == 0); + So(nng_pull_open(&pull.s) == 0); + + Reset({ + nng_close(push.s); + nng_close(pull.s); + }); + + So(nng_pipe_notify(push.s, notify, &push) == 0); + So(nng_pipe_notify(pull.s, notify, &pull) == 0); + + So(nng_setopt_ms(push.s, NNG_OPT_RECONNMINT, 10) == 0); + So(nng_setopt_ms(push.s, NNG_OPT_RECONNMAXT, 10) == 0); + + Convey("Dialing works", { + So(nng_listen(pull.s, addr, &pull.l, 0) == 0); + So(nng_listener_id(pull.l) > 0); + So(nng_dial(push.s, addr, &push.d, 0) == 0); + So(nng_dialer_id(push.d) > 0); + nng_msleep(200); + So(pull.add == 1); + So(pull.rem == 0); + So(pull.err == 0); + So(push.add == 1); + So(push.rem == 0); + So(push.err == 0); + Convey("We can send a frame", { + nng_msg *msg; + + nng_msleep(200); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "hello"); + So(nng_sendmsg(push.s, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(pull.s, &msg, 0) == 0); + So(msg != NULL); + CHECKSTR(msg, "hello"); + So(nng_pipe_id(nng_msg_get_pipe(msg)) == + nng_pipe_id(pull.p)); + nng_msg_free(msg); + }); + + Convey("Reconnection works", { + So(pull.add == 1); + nng_pipe_close(pull.p); + nng_msleep(200); + + So(pull.err == 0); + So(pull.rem == 1); + So(pull.add == 2); + + So(push.err == 0); + So(push.rem == 1); + So(push.add == 2); + + Convey("They still exchange frames", { + nng_msg *msg; + nng_pipe p1; + + nng_msleep(200); + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "hello"); + So(nng_sendmsg(push.s, msg, 0) == 0); + msg = NULL; + So(nng_recvmsg(pull.s, &msg, 0) == 0); + So(msg != NULL); + CHECKSTR(msg, "hello"); + p1 = nng_msg_get_pipe(msg); + nng_msg_free(msg); + So(nng_pipe_id(p1) == + nng_pipe_id(pull.p)); + }); + }); + }); + }); +}); |
