aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c207
1 files changed, 34 insertions, 173 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 6b9b082c..e95fe1d4 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <string.h>
@@ -17,54 +18,17 @@
// Operations on pipes (to the transport) are generally blocking operations,
// performed in the context of the protocol.
-struct nni_pipe {
- uint32_t p_id;
- uint32_t p_sock_id;
- uint32_t p_dialer_id;
- uint32_t p_listener_id;
- nni_tran_pipe_ops p_tran_ops;
- nni_proto_pipe_ops p_proto_ops;
- void * p_tran_data;
- void * p_proto_data;
- nni_list_node p_sock_node;
- nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_listener * p_listener;
- nni_dialer * p_dialer;
- bool p_closed;
- nni_atomic_flag p_stop;
- bool p_cbs;
- int p_refcnt;
- nni_mtx p_mtx;
- nni_cv p_cv;
- nni_list_node p_reap_node;
- nni_aio * p_start_aio;
-};
-
static nni_idhash *nni_pipes;
static nni_mtx nni_pipe_lk;
-static nni_list nni_pipe_reap_list;
-static nni_mtx nni_pipe_reap_lk;
-static nni_cv nni_pipe_reap_cv;
-static nni_thr nni_pipe_reap_thr;
-static int nni_pipe_reap_run;
-
-static void nni_pipe_reaper(void *);
-
int
nni_pipe_sys_init(void)
{
int rv;
- NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
nni_mtx_init(&nni_pipe_lk);
- nni_mtx_init(&nni_pipe_reap_lk);
- nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);
- if (((rv = nni_idhash_init(&nni_pipes)) != 0) ||
- ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) !=
- 0)) {
+ if ((rv = nni_idhash_init(&nni_pipes)) != 0) {
return (rv);
}
@@ -76,25 +40,13 @@ nni_pipe_sys_init(void)
nni_idhash_set_limits(
nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff);
- nni_pipe_reap_run = 1;
- nni_thr_run(&nni_pipe_reap_thr);
-
return (0);
}
void
nni_pipe_sys_fini(void)
{
- if (nni_pipe_reap_run) {
- nni_mtx_lock(&nni_pipe_reap_lk);
- nni_pipe_reap_run = 0;
- nni_cv_wake(&nni_pipe_reap_cv);
- nni_mtx_unlock(&nni_pipe_reap_lk);
- }
-
- nni_thr_fini(&nni_pipe_reap_thr);
- nni_cv_fini(&nni_pipe_reap_cv);
- nni_mtx_fini(&nni_pipe_reap_lk);
+ nni_reap_drain();
nni_mtx_fini(&nni_pipe_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
@@ -109,28 +61,7 @@ nni_pipe_destroy(nni_pipe *p)
return;
}
- if (p->p_cbs) {
- nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
- }
-
- // Stop any pending negotiation.
- nni_aio_stop(p->p_start_aio);
-
- if (p->p_proto_data != NULL) {
- p->p_proto_ops.pipe_stop(p->p_proto_data);
- }
- if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
- p->p_tran_ops.p_stop(p->p_tran_data);
- }
-
- // We have exclusive access at this point, so we can check if
- // we are still on any lists.
- nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL
- nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL
-
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
+ nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
@@ -138,11 +69,24 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_id != 0) {
nni_idhash_remove(nni_pipes, p->p_id);
}
+ // This wait guarantees that all callers are done with us.
while (p->p_refcnt != 0) {
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&nni_pipe_lk);
+ // Wait for neg callbacks to finish. (Already closed).
+ nni_aio_stop(p->p_start_aio);
+
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_stop(p->p_proto_data);
+ }
+ if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
+ p->p_tran_ops.p_stop(p->p_tran_data);
+ }
+
+ nni_pipe_remove(p);
+
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_fini(p->p_proto_data);
}
@@ -229,6 +173,8 @@ nni_pipe_close(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
}
+
+ nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p);
}
bool
@@ -241,23 +187,6 @@ nni_pipe_closed(nni_pipe *p)
return (rv);
}
-void
-nni_pipe_stop(nni_pipe *p)
-{
- // Guard against recursive calls.
- if (nni_atomic_flag_test_and_set(&p->p_stop)) {
- return;
- }
-
- nni_pipe_close(p);
-
- // Put it on the reaplist for async cleanup
- nni_mtx_lock(&nni_pipe_reap_lk);
- nni_list_append(&nni_pipe_reap_list, p);
- nni_cv_wake(&nni_pipe_reap_cv);
- nni_mtx_unlock(&nni_pipe_reap_lk);
-}
-
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -270,32 +199,29 @@ nni_pipe_start_cb(void *arg)
nni_pipe *p = arg;
nni_sock *s = p->p_sock;
nni_aio * aio = p->p_start_aio;
- uint32_t id = nni_pipe_id(p);
if (nni_aio_result(aio) != 0) {
- nni_pipe_stop(p);
+ nni_pipe_close(p);
return;
}
- p->p_cbs = true; // We're running all cbs going forward
-
- nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
if (nni_pipe_closed(p)) {
- nni_pipe_stop(p);
+ nni_pipe_close(p);
return;
}
if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
nni_sock_closing(s)) {
- nni_pipe_stop(p);
+ nni_pipe_close(p);
return;
}
- nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
}
int
-nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
+nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
nni_pipe * p;
int rv;
@@ -315,13 +241,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
p->p_proto_ops = *pops;
p->p_proto_data = NULL;
p->p_sock = sock;
- p->p_sock_id = nni_sock_id(sock);
p->p_closed = false;
p->p_cbs = false;
p->p_refcnt = 0;
nni_atomic_flag_reset(&p->p_stop);
- NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
@@ -347,27 +271,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
return (0);
}
-void
-nni_pipe_set_listener(nni_pipe *p, nni_listener *l)
-{
- p->p_listener = l;
- p->p_listener_id = nni_listener_id(l);
-}
-
-void
-nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d)
-{
- p->p_dialer = d;
- p->p_dialer_id = nni_dialer_id(d);
-}
-
int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{
nni_tran_option *o;
- nni_dialer * d;
- nni_listener * l;
for (o = p->p_tran_ops.p_options; o && o->o_name; o++) {
if (strcmp(o->o_name, name) != 0) {
@@ -376,24 +284,13 @@ nni_pipe_getopt(
return (o->o_get(p->p_tran_data, val, szp, t));
}
- // Maybe the endpoint knows? We look up by ID, instead of using
- // the links directly, to avoid needing a hold on them. The pipe
- // can wind up outliving the endpoint in certain circumstances.
- // This means that getting these properties from the pipe may wind
- // up being somewhat more expensive.
- if ((p->p_dialer_id != 0) &&
- (nni_dialer_find(&d, p->p_dialer_id) == 0)) {
- int rv;
- rv = nni_dialer_getopt(d, name, val, szp, t);
- nni_dialer_rele(d);
- return (rv);
+ // Maybe the endpoint knows? The guarantees on pipes ensure that the
+ // pipe will not outlive its creating endpoint.
+ if (p->p_dialer != NULL) {
+ return (nni_dialer_getopt(p->p_dialer, name, val, szp, t));
}
- if ((p->p_listener_id != 0) &&
- (nni_listener_find(&l, p->p_listener_id) == 0)) {
- int rv;
- rv = nni_listener_getopt(l, name, val, szp, t);
- nni_listener_rele(l);
- return (rv);
+ if (p->p_listener != NULL) {
+ return (nni_listener_getopt(p->p_listener, name, val, szp, t));
}
return (NNG_ENOTSUP);
}
@@ -414,56 +311,20 @@ nni_pipe_get_proto_data(nni_pipe *p)
return (p->p_proto_data);
}
-void
-nni_pipe_sock_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_pipe, p_sock_node);
-}
-
-void
-nni_pipe_ep_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_pipe, p_ep_node);
-}
-
uint32_t
nni_pipe_sock_id(nni_pipe *p)
{
- return (p->p_sock_id);
+ return (nni_sock_id(p->p_sock));
}
uint32_t
nni_pipe_listener_id(nni_pipe *p)
{
- return (p->p_listener_id);
+ return (p->p_listener ? nni_listener_id(p->p_listener) : 0);
}
uint32_t
nni_pipe_dialer_id(nni_pipe *p)
{
- return (p->p_dialer_id);
-}
-
-static void
-nni_pipe_reaper(void *notused)
-{
- NNI_ARG_UNUSED(notused);
-
- nni_mtx_lock(&nni_pipe_reap_lk);
- for (;;) {
- nni_pipe *p;
- if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) {
- nni_list_remove(&nni_pipe_reap_list, p);
-
- nni_mtx_unlock(&nni_pipe_reap_lk);
- nni_pipe_destroy(p);
- nni_mtx_lock(&nni_pipe_reap_lk);
- continue;
- }
- if (!nni_pipe_reap_run) {
- break;
- }
- nni_cv_wait(&nni_pipe_reap_cv);
- }
- nni_mtx_unlock(&nni_pipe_reap_lk);
+ return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
}