diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 144 |
1 files changed, 91 insertions, 53 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index fe1b62d2..ec2691a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -645,6 +645,13 @@ nni_sock_shutdown(nni_sock *sock) nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } + + // For each pipe, arrange for it to teardown hard. We would + // expect there not to be any here. + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + nni_mtx_unlock(&sock->s_mx); // Close the upper queues immediately. @@ -679,30 +686,15 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock_lk); nni_mtx_lock(&sock->s_mx); - - // At this point, we've done everything we politely can to - // give the protocol a chance to flush its write side. Now - // it is time to be a little more insistent. - - // For each pipe, arrange for it to teardown hard. We would - // expect there not to be any here. - NNI_LIST_FOREACH (&sock->s_pipes, pipe) { - nni_pipe_close(pipe); - } - // We have to wait for pipes to be removed. while (!nni_list_empty(&sock->s_pipes)) { nni_cv_wait(&sock->s_cv); } - - sock->s_sock_ops.sock_close(sock->s_data); - - nni_cv_wake(&sock->s_cv); - NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL); - nni_mtx_unlock(&sock->s_mx); + sock->s_sock_ops.sock_close(sock->s_data); + // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -1277,24 +1269,16 @@ nni_dialer_timer_start(nni_dialer *d) nni_mtx_unlock(&s->s_mx); } -void -nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +static void +dialer_start_pipe(nni_dialer *d, nni_pipe *p) { nni_sock *s = d->d_sock; - nni_pipe *p; nni_mtx_lock(&s->s_mx); - - if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - - nni_list_append(&d->d_pipes, p); - nni_list_append(&s->s_pipes, p); d->d_pipe = p; d->d_currtime = d->d_inirtime; nni_mtx_unlock(&s->s_mx); + #ifdef NNG_ENABLE_STATS nni_stat_inc(&s->st_pipes, 1); nni_stat_inc(&d->st_pipes, 1); @@ -1342,6 +1326,24 @@ nni_dialer_add_pipe(nni_dialer *d, void *tpipe) } void +nni_dialer_add_pipe(nni_dialer *d, void *tpipe) +{ + nni_sock *s = d->d_sock; + nni_pipe *p; + + if (nni_pipe_create_dialer(&p, d, tpipe) != 0) { + return; + } + + nni_mtx_lock(&s->s_mx); + d->d_pipe = p; + d->d_currtime = d->d_inirtime; + nni_mtx_unlock(&s->s_mx); + + dialer_start_pipe(d, p); +} + +void nni_dialer_shutdown(nni_dialer *d) { nni_sock *s = d->d_sock; @@ -1403,21 +1405,11 @@ nni_dialer_reap(nni_dialer *d) nni_reap(&dialer_reap_list, d); } -void -nni_listener_add_pipe(nni_listener *l, void *tpipe) +static void +listener_start_pipe(nni_listener *l, nni_pipe *p) { nni_sock *s = l->l_sock; - nni_pipe *p; - - nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; - } - nni_list_append(&l->l_pipes, p); - nni_list_append(&s->s_pipes, p); - nni_mtx_unlock(&s->s_mx); #ifdef NNG_ENABLE_STATS nni_stat_inc(&l->st_pipes, 1); nni_stat_inc(&s->st_pipes, 1); @@ -1454,10 +1446,25 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) "Accepted pipe<%u> on socket<%u> from %s", nni_pipe_id(p), nni_sock_id(s), nni_pipe_peer_addr(p, addr)); } + + // the socket now "owns" the pipe, and a pipe close should immediately + // start the process of teardown. nni_pipe_rele(p); } void +nni_listener_add_pipe(nni_listener *l, void *tpipe) +{ + nni_pipe *p; + + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + return; + } + + listener_start_pipe(l, p); +} + +void nni_listener_shutdown(nni_listener *l) { nni_sock *s = l->l_sock; @@ -1520,6 +1527,44 @@ nni_listener_reap(nni_listener *l) nni_reap(&listener_reap_list, l); } +// nni_pipe_add just registers the pipe with the socket and endpoint +// so they won't be deallocated while the pipe still exists. +void +nni_pipe_add(nni_pipe *p) +{ + nni_sock *s = p->p_sock; + nni_dialer *d = p->p_dialer; + nni_listener *l = p->p_listener; + + nni_mtx_lock(&s->s_mx); + nni_list_append(&s->s_pipes, p); + if (d != NULL) { + NNI_ASSERT(l == NULL); + nni_list_append(&d->d_pipes, p); + } + if (l != NULL) { + NNI_ASSERT(d == NULL); + nni_list_append(&l->l_pipes, p); + } + nni_mtx_unlock(&s->s_mx); +} + +// nni_pipe_start attempts to start the pipe, adding it to the socket and +// endpoints and calling callbacks, etc. The pipe should already have finished +// any negotiation needed at the transport layer. +void +nni_pipe_start(nni_pipe *p) +{ + if (p->p_listener) { + NNI_ASSERT(p->p_dialer == NULL); + listener_start_pipe(p->p_listener, p); + } + if (p->p_dialer) { + NNI_ASSERT(p->p_listener == NULL); + dialer_start_pipe(p->p_dialer, p); + } +} + void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { @@ -1528,14 +1573,11 @@ nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) void *arg; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (!p->p_cbs) { - if (ev == NNG_PIPE_EV_ADD_PRE) { - // First event, after this we want all other events. - p->p_cbs = true; - } else { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } + if (ev == NNG_PIPE_EV_ADD_PRE) { + p->p_cbs = true; + } else if (!p->p_cbs) { + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + return; } cb = s->s_pipe_cbs[ev].cb_fn; arg = s->s_pipe_cbs[ev].cb_arg; @@ -1556,9 +1598,7 @@ nni_pipe_remove(nni_pipe *p) nni_mtx_lock(&s->s_mx); #ifdef NNG_ENABLE_STATS - if (nni_list_node_active(&p->p_sock_node)) { - nni_stat_dec(&s->st_pipes, 1); - } + nni_stat_dec(&s->st_pipes, 1); if (p->p_listener != NULL) { nni_stat_dec(&p->p_listener->st_pipes, 1); } @@ -1568,8 +1608,6 @@ nni_pipe_remove(nni_pipe *p) #endif nni_list_node_remove(&p->p_sock_node); nni_list_node_remove(&p->p_ep_node); - p->p_listener = NULL; - p->p_dialer = NULL; if ((d != NULL) && (d->d_pipe == p)) { d->d_pipe = NULL; dialer_timer_start_locked(d); // Kick the timer to redial. |
