aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-02 16:01:22 -0700
committerGarrett D'Amore <garrett@damore.org>2018-05-03 06:41:19 -0700
commit159dc95695515db767e208c41d5f44b334b71adc (patch)
tree5390251d939c33b1d274d0767861d2012673e258 /src/core/pipe.c
parent649e5d5ca9a03cef766e944c7735aab50041c012 (diff)
downloadnng-159dc95695515db767e208c41d5f44b334b71adc.tar.gz
nng-159dc95695515db767e208c41d5f44b334b71adc.tar.bz2
nng-159dc95695515db767e208c41d5f44b334b71adc.zip
fixes #389 Need pipe notification callbacks
This adds a new pipe event notification API (callbacks called on either pipe add or remove), including both tests and docs. Also supporting APIs to get the socket or endpoint associated with a pipe are included (tested and documented as well.)
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c79
1 files changed, 55 insertions, 24 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index f849e00e..010d306d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -18,7 +18,7 @@
// performed in the context of the protocol.
struct nni_pipe {
- uint64_t p_id;
+ uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
void * p_proto_data;
@@ -26,7 +26,7 @@ struct nni_pipe {
nni_list_node p_ep_node;
nni_sock * p_sock;
nni_ep * p_ep;
- bool p_reap;
+ bool p_closed;
bool p_stop;
int p_refcnt;
nni_mtx p_mtx;
@@ -106,6 +106,15 @@ nni_pipe_destroy(nni_pipe *p)
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
+ // We have exclusive access at this point, so we can check if
+ // we are still on any lists.
+ if (nni_list_node_active(&p->p_ep_node)) {
+ nni_ep_pipe_remove(p->p_ep, p);
+ }
+ if (nni_list_node_active(&p->p_sock_node)) {
+ nni_sock_pipe_remove(p->p_sock, p);
+ }
+
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&nni_pipe_lk);
@@ -117,16 +126,6 @@ nni_pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
- // We have exclusive access at this point, so we can check if
- // we are still on any lists.
-
- if (nni_list_node_active(&p->p_ep_node)) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
-
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -141,13 +140,14 @@ nni_pipe_find(nni_pipe **pp, uint32_t id)
int rv;
nni_pipe *p;
nni_mtx_lock(&nni_pipe_lk);
+
+ // We don't care if the pipe is "closed". End users only have
+ // access to the pipe in order to obtain properties (which may
+ // be retried during the post-close notification callback) or to
+ // close the pipe.
if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) {
- if (p->p_reap) {
- rv = NNG_ECLOSED;
- } else {
- p->p_refcnt++;
- *pp = p;
- }
+ p->p_refcnt++;
+ *pp = p;
}
nni_mtx_unlock(&nni_pipe_lk);
return (rv);
@@ -168,7 +168,7 @@ nni_pipe_rele(nni_pipe *p)
uint32_t
nni_pipe_id(nni_pipe *p)
{
- return ((uint32_t) p->p_id);
+ return (p->p_id);
}
void
@@ -184,18 +184,18 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
}
// nni_pipe_close closes the underlying connection. It is expected that
-// subsequent attempts receive or send (including any waiting receive) will
+// subsequent attempts to receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
void
nni_pipe_close(nni_pipe *p)
{
nni_mtx_lock(&p->p_mtx);
- if (p->p_reap) {
+ if (p->p_closed) {
// We already did a close.
nni_mtx_unlock(&p->p_mtx);
return;
}
- p->p_reap = true;
+ p->p_closed = true;
// Close the underlying transport.
if (p->p_tran_data != NULL) {
@@ -208,6 +208,16 @@ nni_pipe_close(nni_pipe *p)
nni_aio_abort(p->p_start_aio, NNG_ECLOSED);
}
+bool
+nni_pipe_closed(nni_pipe *p)
+{
+ bool rv;
+ nni_mtx_lock(&p->p_mtx);
+ rv = p->p_closed;
+ nni_mtx_unlock(&p->p_mtx);
+ return (rv);
+}
+
void
nni_pipe_stop(nni_pipe *p)
{
@@ -270,7 +280,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
- p->p_reap = false;
+ p->p_closed = false;
p->p_stop = false;
p->p_refcnt = 0;
@@ -281,8 +291,11 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
+ uint64_t id;
nni_mtx_lock(&nni_pipe_lk);
- rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = (uint32_t) id;
+ }
nni_mtx_unlock(&nni_pipe_lk);
}
@@ -343,6 +356,24 @@ nni_pipe_ep_list_init(nni_list *list)
NNI_LIST_INIT(list, nni_pipe, p_ep_node);
}
+uint32_t
+nni_pipe_sock_id(nni_pipe *p)
+{
+ return (nni_sock_id(p->p_sock));
+}
+
+uint32_t
+nni_pipe_ep_id(nni_pipe *p)
+{
+ return (nni_ep_id(p->p_ep));
+}
+
+int
+nni_pipe_ep_mode(nni_pipe *p)
+{
+ return (nni_ep_mode(p->p_ep));
+}
+
static void
nni_pipe_reaper(void *notused)
{