diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 139 |
1 files changed, 105 insertions, 34 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 4cf624e2..894e4fee 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -82,9 +82,10 @@ struct nni_socket { nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) - nni_list s_eps; // active endpoints - nni_list s_pipes; // active pipes - nni_list s_ctxs; // active contexts (protected by global sock_lk) + nni_list s_listeners; // active listeners + nni_list s_dialers; // active dialers + nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global sock_lk) bool s_closing; // Socket is closing bool s_closed; // Socket closed, protected by global lock @@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); nni_pipe_sock_list_init(&s->s_pipes); - nni_ep_list_init(&s->s_eps); + nni_listener_list_init(&s->s_listeners); + nni_dialer_list_init(&s->s_dialers); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) int nni_sock_shutdown(nni_sock *sock) { - nni_pipe *pipe; - nni_ep * ep; - nni_ep * nep; - nni_ctx * ctx; - nni_ctx * nctx; + nni_pipe * pipe; + nni_dialer * d; + nni_dialer * nd; + nni_listener *l; + nni_listener *nl; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_shutdown(ep); + NNI_LIST_FOREACH (&sock->s_listeners, l) { + nni_listener_shutdown(l); } + NNI_LIST_FOREACH (&sock->s_dialers, d) { + nni_dialer_shutdown(d); + } + nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the endpoint list, attempting to close them. + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nep = nni_list_first(&sock->s_eps); - while ((ep = nep) != NULL) { - nep = nni_list_next(&sock->s_eps, nep); + nl = nni_list_first(&sock->s_listeners); + while ((l = nl) != NULL) { + nl = nni_list_next(&sock->s_listeners, nl); + + if (nni_listener_hold(l) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); + } + } + nd = nni_list_first(&sock->s_dialers); + while ((d = nd) != NULL) { + nd = nni_list_next(&sock->s_dialers, nd); - if (nni_ep_hold(ep) == 0) { + if (nni_dialer_hold(d) == 0) { nni_mtx_unlock(&sock->s_mx); - nni_ep_close(ep); + nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } } @@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock) // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_eps))) { + (!nni_list_empty(&sock->s_listeners)) || + (!nni_list_empty(&sock->s_dialers))) { nni_cv_wait(&sock->s_cv); } @@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s) // Wait for pipes, eps, and contexts to finish closing. nni_mtx_lock(&s->s_mx); - while ( - (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { + while ((!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_dialers)) || + (!nni_list_empty(&s->s_listeners))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_ep_add(nni_sock *s, nni_ep *ep) +nni_sock_add_listener(nni_sock *s, nni_listener *l) +{ + nni_sockopt *sopt; + + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + NNI_LIST_FOREACH (&s->s_options, sopt) { + int rv; + rv = nni_listener_setopt( + l, sopt->name, sopt->data, sopt->sz, sopt->typ); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + nni_mtx_unlock(&s->s_mx); + return (rv); + } + } + + nni_list_append(&s->s_listeners, l); + nni_mtx_unlock(&s->s_mx); + return (0); +} + +int +nni_sock_add_dialer(nni_sock *s, nni_dialer *d) { nni_sockopt *sopt; @@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep) NNI_LIST_FOREACH (&s->s_options, sopt) { int rv; - rv = nni_ep_setopt( - ep, sopt->name, sopt->data, sopt->sz, sopt->typ); + rv = nni_dialer_setopt( + d, sopt->name, sopt->data, sopt->sz, sopt->typ); if ((rv != 0) && (rv != NNG_ENOTSUP)) { nni_mtx_unlock(&s->s_mx); return (rv); } } - nni_list_append(&s->s_eps, ep); + nni_list_append(&s->s_dialers, d); nni_mtx_unlock(&s->s_mx); return (0); } void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) +nni_sock_remove_listener(nni_sock *s, nni_listener *l) { - nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_eps, ep)) { - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_listeners, l)) { + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); } } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) +{ + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_dialers, d)) { + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + } + nni_mtx_unlock(&s->s_mx); } int @@ -948,7 +1007,8 @@ nni_sock_setopt( nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t) { int rv = NNG_ENOTSUP; - nni_ep * ep; + nni_dialer * d; + nni_listener * l; nni_sockopt * optv; nni_sockopt * oldv = NULL; const sock_option * sso; @@ -1042,9 +1102,20 @@ nni_sock_setopt( // transport (other than ENOTSUP) stops the operation // altogether. Its important that transport wide checks // properly pre-validate. - NNI_LIST_FOREACH (&s->s_eps, ep) { + NNI_LIST_FOREACH (&s->s_listeners, l) { + int x; + x = nni_listener_setopt(l, optv->name, optv->data, sz, t); + if (x != NNG_ENOTSUP) { + if ((rv = x) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (rv); + } + } + } + NNI_LIST_FOREACH (&s->s_dialers, d) { int x; - x = nni_ep_setopt(ep, optv->name, optv->data, sz, t); + x = nni_dialer_setopt(d, optv->name, optv->data, sz, t); if (x != NNG_ENOTSUP) { if ((rv = x) != 0) { nni_mtx_unlock(&s->s_mx); |
