summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-02 16:01:22 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-03 06:41:19 -0700
commit159dc95695515db767e208c41d5f44b334b71adc (patch)
tree5390251d939c33b1d274d0767861d2012673e258 /src
parent649e5d5ca9a03cef766e944c7735aab50041c012 (diff)
downloadnng-159dc95695515db767e208c41d5f44b334b71adc.tar.gz
nng-159dc95695515db767e208c41d5f44b334b71adc.tar.bz2
nng-159dc95695515db767e208c41d5f44b334b71adc.zip
fixes #389 Need pipe notification callbacks
This adds a new pipe event notification API (callbacks called on either pipe add or remove), including both tests and docs. Also supporting APIs to get the socket or endpoint associated with a pipe are included (tested and documented as well.)
Diffstat (limited to 'src')
-rw-r--r--src/core/pipe.c79
-rw-r--r--src/core/pipe.h14
-rw-r--r--src/core/socket.c113
-rw-r--r--src/core/socket.h9
-rw-r--r--src/nng.c59
-rw-r--r--src/nng.h17
6 files changed, 223 insertions, 68 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f849e00e..010d306d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -18,7 +18,7 @@
// performed in the context of the protocol.
struct nni_pipe {
- uint64_t p_id;
+ uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
void * p_proto_data;
@@ -26,7 +26,7 @@ struct nni_pipe {
nni_list_node p_ep_node;
nni_sock * p_sock;
nni_ep * p_ep;
- bool p_reap;
+ bool p_closed;
bool p_stop;
int p_refcnt;
nni_mtx p_mtx;
@@ -106,6 +106,15 @@ nni_pipe_destroy(nni_pipe *p)
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
+ // We have exclusive access at this point, so we can check if
+ // we are still on any lists.
+ if (nni_list_node_active(&p->p_ep_node)) {
+ nni_ep_pipe_remove(p->p_ep, p);
+ }
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_sock_pipe_remove(p->p_sock, p);
+ }
+
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&nni_pipe_lk);
@@ -117,16 +126,6 @@ nni_pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
- // We have exclusive access at this point, so we can check if
- // we are still on any lists.
-
- if (nni_list_node_active(&p->p_ep_node)) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
-
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -141,13 +140,14 @@ nni_pipe_find(nni_pipe **pp, uint32_t id)
int rv;
nni_pipe *p;
nni_mtx_lock(&nni_pipe_lk);
+
+ // We don't care if the pipe is "closed". End users only have
+ // access to the pipe in order to obtain properties (which may
+ // be retried during the post-close notification callback) or to
+ // close the pipe.
if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) {
- if (p->p_reap) {
- rv = NNG_ECLOSED;
- } else {
- p->p_refcnt++;
- *pp = p;
- }
+ p->p_refcnt++;
+ *pp = p;
}
nni_mtx_unlock(&nni_pipe_lk);
return (rv);
@@ -168,7 +168,7 @@ nni_pipe_rele(nni_pipe *p)
uint32_t
nni_pipe_id(nni_pipe *p)
{
- return ((uint32_t) p->p_id);
+ return (p->p_id);
}
void
@@ -184,18 +184,18 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
}
// nni_pipe_close closes the underlying connection. It is expected that
-// subsequent attempts receive or send (including any waiting receive) will
+// subsequent attempts to receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
void
nni_pipe_close(nni_pipe *p)
{
nni_mtx_lock(&p->p_mtx);
- if (p->p_reap) {
+ if (p->p_closed) {
// We already did a close.
nni_mtx_unlock(&p->p_mtx);
return;
}
- p->p_reap = true;
+ p->p_closed = true;
// Close the underlying transport.
if (p->p_tran_data != NULL) {
@@ -208,6 +208,16 @@ nni_pipe_close(nni_pipe *p)
nni_aio_abort(p->p_start_aio, NNG_ECLOSED);
}
+bool
+nni_pipe_closed(nni_pipe *p)
+{
+ bool rv;
+ nni_mtx_lock(&p->p_mtx);
+ rv = p->p_closed;
+ nni_mtx_unlock(&p->p_mtx);
+ return (rv);
+}
+
void
nni_pipe_stop(nni_pipe *p)
{
@@ -270,7 +280,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
- p->p_reap = false;
+ p->p_closed = false;
p->p_stop = false;
p->p_refcnt = 0;
@@ -281,8 +291,11 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
+ uint64_t id;
nni_mtx_lock(&nni_pipe_lk);
- rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = (uint32_t) id;
+ }
nni_mtx_unlock(&nni_pipe_lk);
}
@@ -343,6 +356,24 @@ nni_pipe_ep_list_init(nni_list *list)
NNI_LIST_INIT(list, nni_pipe, p_ep_node);
}
+uint32_t
+nni_pipe_sock_id(nni_pipe *p)
+{
+ return (nni_sock_id(p->p_sock));
+}
+
+uint32_t
+nni_pipe_ep_id(nni_pipe *p)
+{
+ return (nni_ep_id(p->p_ep));
+}
+
+int
+nni_pipe_ep_mode(nni_pipe *p)
+{
+ return (nni_ep_mode(p->p_ep));
+}
+
static void
nni_pipe_reaper(void *notused)
{
diff --git a/src/core/pipe.h b/src/core/pipe.h
index ea0c16db..18a59ddb 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -79,6 +79,20 @@ extern void nni_pipe_ep_list_init(nni_list *);
// pipe, which must be released by the caller when it is done.
extern int nni_pipe_find(nni_pipe **, uint32_t);
+// nni_pipe_sock_id returns the socket id for the pipe (used by public API).
+extern uint32_t nni_pipe_sock_id(nni_pipe *);
+
+// nni_pipe_ep_id returns the endpoint id for the pipe.
+extern uint32_t nni_pipe_ep_id(nni_pipe *);
+
+// nni_pipe_ep_mode returns the endpoint mode for the pipe.
+extern int nni_pipe_ep_mode(nni_pipe *);
+
+// nni_pipe_closed returns true if nni_pipe_close was called.
+// (This is used by the socket to determine if user closed the pipe
+// during callback.)
+extern bool nni_pipe_closed(nni_pipe *);
+
// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index f5acd941..66580300 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -81,12 +81,11 @@ struct nni_socket {
nni_list s_pipes; // active pipes
nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
- bool s_ctxwait; // Waiting for contexts to close.
-
- nni_notifyfd s_send_fd;
- nni_notifyfd s_recv_fd;
+ bool s_closing; // Socket is closing
+ bool s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
+ nng_pipe_cb s_pipe_cb; // User callback for pipe events.
+ void * s_pipe_cbarg; // Argument for pipe events.
};
static void nni_ctx_destroy(nni_ctx *);
@@ -435,21 +434,41 @@ nni_sock_rele(nni_sock *s)
int
nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- void *pdata = nni_pipe_get_proto_data(pipe);
- int rv;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ nng_pipe_cb cb;
+ int rv;
NNI_ASSERT(s != NULL);
nni_mtx_lock(&s->s_mx);
- if (s->s_closing) {
- // We're closing, bail out.
- rv = NNG_ECLOSED;
- } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
+ if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
// Peer protocol mismatch.
- rv = NNG_EPROTO;
- } else {
- // Protocol can reject for other reasons.
- rv = s->s_pipe_ops.pipe_start(pdata);
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EPROTO);
+ }
+ if ((cb = s->s_pipe_cb) != NULL) {
+ nng_pipe p;
+ void * arg = s->s_pipe_cbarg;
+ nni_mtx_unlock(&s->s_mx);
+ p.id = nni_pipe_id(pipe);
+ cb(p, NNG_PIPE_ADD, arg);
+ if (nni_pipe_closed(pipe)) {
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_lock(&s->s_mx);
}
+ if (s->s_closing) {
+ // We're closing, bail out. This has to be done after
+ // we have dropped the lock above in case the sock is closed
+ // while the user callback runs.
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ // Protocol can reject for other reasons.
+ // This must be the last operation, until this point
+ // the protocol has not actually "seen" the pipe.
+ rv = s->s_pipe_ops.pipe_start(pdata);
+
nni_mtx_unlock(&s->s_mx);
return (rv);
}
@@ -484,9 +503,18 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- void *pdata;
+ void * pdata;
+ nng_pipe_cb cb;
nni_mtx_lock(&sock->s_mx);
+ if ((cb = sock->s_pipe_cb) != NULL) {
+ void * arg = sock->s_pipe_cbarg;
+ nng_pipe p;
+ nni_mtx_unlock(&sock->s_mx);
+ p.id = nni_pipe_id(pipe);
+ cb(p, NNG_PIPE_REM, arg);
+ nni_mtx_lock(&sock->s_mx);
+ }
pdata = nni_pipe_get_proto_data(pipe);
if (pdata != NULL) {
sock->s_pipe_ops.pipe_stop(pdata);
@@ -507,14 +535,6 @@ nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
- // Close any open notification pipes.
- if (s->s_recv_fd.sn_init) {
- nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
- }
- if (s->s_send_fd.sn_init) {
- nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd);
- }
-
// The protocol needs to clean up its state.
if (s->s_data != NULL) {
s->s_sock_ops.sock_fini(s->s_data);
@@ -546,21 +566,21 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->s_sndtimeo = -1;
- s->s_rcvtimeo = -1;
- s->s_closing = 0;
- s->s_reconn = NNI_SECOND;
- s->s_reconnmax = 0;
- s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
- s->s_id = 0;
- s->s_refcnt = 0;
- s->s_send_fd.sn_init = 0;
- s->s_recv_fd.sn_init = 0;
- s->s_self_id = proto->proto_self;
- s->s_peer_id = proto->proto_peer;
- s->s_flags = proto->proto_flags;
- s->s_sock_ops = *proto->proto_sock_ops;
- s->s_pipe_ops = *proto->proto_pipe_ops;
+ s->s_sndtimeo = -1;
+ s->s_rcvtimeo = -1;
+ s->s_closing = 0;
+ s->s_reconn = NNI_SECOND;
+ s->s_reconnmax = 0;
+ s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
+ s->s_id = 0;
+ s->s_refcnt = 0;
+ s->s_self_id = proto->proto_self;
+ s->s_peer_id = proto->proto_peer;
+ s->s_flags = proto->proto_flags;
+ s->s_sock_ops = *proto->proto_sock_ops;
+ s->s_pipe_ops = *proto->proto_pipe_ops;
+ s->s_closed = false;
+ s->s_closing = false;
if (proto->proto_ctx_ops != NULL) {
s->s_ctx_ops = *proto->proto_ctx_ops;
@@ -692,7 +712,7 @@ nni_sock_shutdown(nni_sock *sock)
return (NNG_ECLOSED);
}
// Mark us closing, so no more EPs or changes can occur.
- sock->s_closing = 1;
+ sock->s_closing = true;
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
@@ -800,7 +820,7 @@ nni_sock_close(nni_sock *s)
nni_sock_rele(s);
return;
}
- s->s_closed = 1;
+ s->s_closed = true;
nni_idhash_remove(nni_sock_hash, s->s_id);
// We might have been removed from the list already, e.g. by
@@ -1157,6 +1177,15 @@ nni_sock_flags(nni_sock *sock)
return (sock->s_flags);
}
+void
+nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg)
+{
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_cb = cb;
+ sock->s_pipe_cbarg = arg;
+ nni_mtx_unlock(&sock->s_mx);
+}
+
int
nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 87bf1374..bccda5b3 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -41,7 +41,7 @@ extern uint32_t nni_sock_id(nni_sock *);
// a pipe could wind up orphaned.
extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
-extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p);
+extern int nni_sock_pipe_start(nni_sock *, nni_pipe *);
extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
@@ -64,6 +64,13 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *);
// and or write are appropriate for the protocol.
extern uint32_t nni_sock_flags(nni_sock *);
+// This function is used by the public API to set callbacks. It is
+// one of the only cases (the only?) where the socket core understands
+// the public data types. (Other solutions might exist, but they require
+// keeping extra state to support conversion between public and internal
+// types.)
+extern void nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb, void *);
+
// nni_ctx_open is used to open/create a new context structure.
// Contexts are not supported by most protocols, but for those that do,
// this can offer some improvements for massive concurrency/scalability.
diff --git a/src/nng.c b/src/nng.c
index 4dc6c3fa..9e010978 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1015,6 +1015,24 @@ nng_getopt_string(nng_socket s, const char *name, char **valp)
}
int
+nng_pipe_notify(nng_socket s, nng_pipe_cb cb, void *arg)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_sock_find(&sock, s.id)) != 0) {
+ return (rv);
+ }
+
+ nni_sock_set_pipe_cb(sock, cb, arg);
+ nni_sock_rele(sock);
+ return (0);
+}
+
+int
nng_device(nng_socket s1, nng_socket s2)
{
int rv;
@@ -1190,6 +1208,47 @@ nng_pipe_getopt_string(nng_pipe p, const char *name, char **valp)
return (nng_pipe_getx(p, name, valp, &sz, NNI_TYPE_STRING));
}
+nng_socket
+nng_pipe_socket(nng_pipe p)
+{
+ nng_socket s = NNG_SOCKET_INITIALIZER;
+ nni_pipe * pipe;
+
+ if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
+ s.id = nni_pipe_sock_id(pipe);
+ nni_pipe_rele(pipe);
+ }
+ return (s);
+}
+
+nng_dialer
+nng_pipe_dialer(nng_pipe p)
+{
+ nng_dialer d = NNG_DIALER_INITIALIZER;
+ nni_pipe * pipe;
+ if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
+ if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_DIAL) {
+ d.id = nni_pipe_ep_id(pipe);
+ }
+ nni_pipe_rele(pipe);
+ }
+ return (d);
+}
+
+nng_listener
+nng_pipe_listener(nng_pipe p)
+{
+ nng_listener l = NNG_LISTENER_INITIALIZER;
+ nni_pipe * pipe;
+ if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
+ if (nni_pipe_ep_mode(pipe) == NNI_EP_MODE_LISTEN) {
+ l.id = nni_pipe_ep_id(pipe);
+ }
+ nni_pipe_rele(pipe);
+ }
+ return (l);
+}
+
int
nng_pipe_close(nng_pipe p)
{
diff --git a/src/nng.h b/src/nng.h
index c3ae48ac..ac4f178b 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -217,6 +217,18 @@ NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *);
NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *);
NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **);
+// Arguably the pipe callback functions could be handled as an option,
+// but with the need to specify an argument, we find it best to unify
+// this as a separate function to pass in the argument and the callback.
+// Only one callback can be set on a given socket, and there is no way
+// to retrieve the old value.
+typedef enum {
+ NNG_PIPE_ADD,
+ NNG_PIPE_REM,
+} nng_pipe_action;
+typedef void (*nng_pipe_cb)(nng_pipe, nng_pipe_action, void *);
+NNG_DECL int nng_pipe_notify(nng_socket, nng_pipe_cb, void *);
+
// nng_getopt_string is special -- it allocates a string to hold the
// resulting string, which should be freed with nng_strfree when it is
// no logner needed.
@@ -600,6 +612,9 @@ NNG_DECL int nng_pipe_getopt_ptr(nng_pipe, const char *, void **);
NNG_DECL int nng_pipe_getopt_string(nng_pipe, const char *, char **);
NNG_DECL int nng_pipe_close(nng_pipe);
NNG_DECL int nng_pipe_id(nng_pipe);
+NNG_DECL nng_socket nng_pipe_socket(nng_pipe);
+NNG_DECL nng_dialer nng_pipe_dialer(nng_pipe);
+NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
// Flags.
enum nng_flag_enum {
@@ -696,7 +711,7 @@ enum nng_flag_enum {
// state current). This is a boolean.
#define NNG_OPT_TCP_KEEPALIVE "tcp-keepalive"
-// XXX: TBD: priorities, ipv4only, TCP options
+// XXX: TBD: priorities, ipv4only
// Statistics. These are for informational purposes only, and subject
// to change without notice. The API for accessing these is stable,