aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c144
1 files changed, 91 insertions, 53 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index fe1b62d2..ec2691a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock)
nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
+
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// Close the upper queues immediately.
@@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&sock_lk);
nni_mtx_lock(&sock->s_mx);
-
- // At this point, we've done everything we politely can to
- // give the protocol a chance to flush its write side. Now
- // it is time to be a little more insistent.
-
- // For each pipe, arrange for it to teardown hard. We would
- // expect there not to be any here.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
- }
-
// We have to wait for pipes to be removed.
while (!nni_list_empty(&sock->s_pipes)) {
nni_cv_wait(&sock->s_cv);
}
-
- sock->s_sock_ops.sock_close(sock->s_data);
-
- nni_cv_wake(&sock->s_cv);
-
NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL);
-
nni_mtx_unlock(&sock->s_mx);
+ sock->s_sock_ops.sock_close(sock->s_data);
+
// At this point, there are no threads blocked inside of us
// that are referencing socket state. User code should call
// nng_close to release the last resources.
@@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d)
nni_mtx_unlock(&s->s_mx);
}
-void
-nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+static void
+dialer_start_pipe(nni_dialer *d, nni_pipe *p)
{
nni_sock *s = d->d_sock;
- nni_pipe *p;
nni_mtx_lock(&s->s_mx);
-
- if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
- nni_mtx_unlock(&s->s_mx);
- return;
- }
-
- nni_list_append(&d->d_pipes, p);
- nni_list_append(&s->s_pipes, p);
d->d_pipe = p;
d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&s->s_mx);
+
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&s->st_pipes, 1);
nni_stat_inc(&d->st_pipes, 1);
@@ -1342,6 +1326,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
}
void
+nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
+{
+ nni_sock *s = d->d_sock;
+ nni_pipe *p;
+
+ if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->s_mx);
+ d->d_pipe = p;
+ d->d_currtime = d->d_inirtime;
+ nni_mtx_unlock(&s->s_mx);
+
+ dialer_start_pipe(d, p);
+}
+
+void
nni_dialer_shutdown(nni_dialer *d)
{
nni_sock *s = d->d_sock;
@@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d)
nni_reap(&dialer_reap_list, d);
}
-void
-nni_listener_add_pipe(nni_listener *l, void *tpipe)
+static void
+listener_start_pipe(nni_listener *l, nni_pipe *p)
{
nni_sock *s = l->l_sock;
- nni_pipe *p;
-
- nni_mtx_lock(&s->s_mx);
- if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
- nni_mtx_unlock(&s->s_mx);
- return;
- }
- nni_list_append(&l->l_pipes, p);
- nni_list_append(&s->s_pipes, p);
- nni_mtx_unlock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&l->st_pipes, 1);
nni_stat_inc(&s->st_pipes, 1);
@@ -1454,10 +1446,25 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe)
"Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p),
nni_sock_id(s), nni_pipe_peer_addr(p, addr));
}
+
+ // the socket now "owns" the pipe, and a pipe close should immediately
+ // start the process of teardown.
nni_pipe_rele(p);
}
void
+nni_listener_add_pipe(nni_listener *l, void *tpipe)
+{
+ nni_pipe *p;
+
+ if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
+ return;
+ }
+
+ listener_start_pipe(l, p);
+}
+
+void
nni_listener_shutdown(nni_listener *l)
{
nni_sock *s = l->l_sock;
@@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l)
nni_reap(&listener_reap_list, l);
}
+// nni_pipe_add just registers the pipe with the socket and endpoint
+// so they won't be deallocated while the pipe still exists.
+void
+nni_pipe_add(nni_pipe *p)
+{
+ nni_sock *s = p->p_sock;
+ nni_dialer *d = p->p_dialer;
+ nni_listener *l = p->p_listener;
+
+ nni_mtx_lock(&s->s_mx);
+ nni_list_append(&s->s_pipes, p);
+ if (d != NULL) {
+ NNI_ASSERT(l == NULL);
+ nni_list_append(&d->d_pipes, p);
+ }
+ if (l != NULL) {
+ NNI_ASSERT(d == NULL);
+ nni_list_append(&l->l_pipes, p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+}
+
+// nni_pipe_start attempts to start the pipe, adding it to the socket and
+// endpoints and calling callbacks, etc. The pipe should already have finished
+// any negotiation needed at the transport layer.
+void
+nni_pipe_start(nni_pipe *p)
+{
+ if (p->p_listener) {
+ NNI_ASSERT(p->p_dialer == NULL);
+ listener_start_pipe(p->p_listener, p);
+ }
+ if (p->p_dialer) {
+ NNI_ASSERT(p->p_listener == NULL);
+ dialer_start_pipe(p->p_dialer, p);
+ }
+}
+
void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
@@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
void *arg;
nni_mtx_lock(&s->s_pipe_cbs_mtx);
- if (!p->p_cbs) {
- if (ev == NNG_PIPE_EV_ADD_PRE) {
- // First event, after this we want all other events.
- p->p_cbs = true;
- } else {
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
- return;
- }
+ if (ev == NNG_PIPE_EV_ADD_PRE) {
+ p->p_cbs = true;
+ } else if (!p->p_cbs) {
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+ return;
}
cb = s->s_pipe_cbs[ev].cb_fn;
arg = s->s_pipe_cbs[ev].cb_arg;
@@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p)
nni_mtx_lock(&s->s_mx);
#ifdef NNG_ENABLE_STATS
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_stat_dec(&s->st_pipes, 1);
- }
+ nni_stat_dec(&s->st_pipes, 1);
if (p->p_listener != NULL) {
nni_stat_dec(&p->p_listener->st_pipes, 1);
}
@@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p)
#endif
nni_list_node_remove(&p->p_sock_node);
nni_list_node_remove(&p->p_ep_node);
- p->p_listener = NULL;
- p->p_dialer = NULL;
if ((d != NULL) && (d->d_pipe == p)) {
d->d_pipe = NULL;
dialer_timer_start_locked(d); // Kick the timer to redial.