diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-06-26 17:39:17 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-06-27 17:28:05 -0700 |
| commit | 251553b13e6bc8019914b9edd1292f97e856dd43 (patch) | |
| tree | 9193b8b4d4df86253f0a469cd96d8bb304a64c82 /src/core/socket.c | |
| parent | 91f9061ad9289afffb0111c03a8390d0f82d7114 (diff) | |
| download | nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.gz nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.bz2 nng-251553b13e6bc8019914b9edd1292f97e856dd43.zip | |
fixes #522 Separate out the endpoint plumbing
This separates the plumbing for endpoints into distinct
dialer and listeners. Some of the transports could benefit
from further separation, but we've done some rather larger
separation e.g. for the websocket transport.
IPC would be a good one to update later, when we start looking
at exposing a more natural underlying API.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 139 |
1 files changed, 105 insertions, 34 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 4cf624e2..894e4fee 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -82,9 +82,10 @@ struct nni_socket { nni_list s_options; // opts not handled by sock/proto char s_name[64]; // socket name (legacy compat) - nni_list s_eps; // active endpoints - nni_list s_pipes; // active pipes - nni_list s_ctxs; // active contexts (protected by global sock_lk) + nni_list s_listeners; // active listeners + nni_list s_dialers; // active dialers + nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global sock_lk) bool s_closing; // Socket is closing bool s_closed; // Socket closed, protected by global lock @@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); nni_pipe_sock_list_init(&s->s_pipes); - nni_ep_list_init(&s->s_eps); + nni_listener_list_init(&s->s_listeners); + nni_dialer_list_init(&s->s_dialers); nni_mtx_init(&s->s_mx); nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); @@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) int nni_sock_shutdown(nni_sock *sock) { - nni_pipe *pipe; - nni_ep * ep; - nni_ep * nep; - nni_ctx * ctx; - nni_ctx * nctx; + nni_pipe * pipe; + nni_dialer * d; + nni_dialer * nd; + nni_listener *l; + nni_listener *nl; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock) // Close the EPs. This prevents new connections from forming // but but allows existing ones to drain. - NNI_LIST_FOREACH (&sock->s_eps, ep) { - nni_ep_shutdown(ep); + NNI_LIST_FOREACH (&sock->s_listeners, l) { + nni_listener_shutdown(l); } + NNI_LIST_FOREACH (&sock->s_dialers, d) { + nni_dialer_shutdown(d); + } + nni_mtx_unlock(&sock->s_mx); // We now mark any owned contexts as closing. @@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // Go through the endpoint list, attempting to close them. + // Go through the dialers and listeners, attempting to close them. // We might already have a close in progress, in which case // we skip past it; it will be removed from another thread. - nep = nni_list_first(&sock->s_eps); - while ((ep = nep) != NULL) { - nep = nni_list_next(&sock->s_eps, nep); + nl = nni_list_first(&sock->s_listeners); + while ((l = nl) != NULL) { + nl = nni_list_next(&sock->s_listeners, nl); + + if (nni_listener_hold(l) == 0) { + nni_mtx_unlock(&sock->s_mx); + nni_listener_close(l); + nni_mtx_lock(&sock->s_mx); + } + } + nd = nni_list_first(&sock->s_dialers); + while ((d = nd) != NULL) { + nd = nni_list_next(&sock->s_dialers, nd); - if (nni_ep_hold(ep) == 0) { + if (nni_dialer_hold(d) == 0) { nni_mtx_unlock(&sock->s_mx); - nni_ep_close(ep); + nni_dialer_close(d); nni_mtx_lock(&sock->s_mx); } } @@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock) // We have to wait for *both* endpoints and pipes to be // removed. while ((!nni_list_empty(&sock->s_pipes)) || - (!nni_list_empty(&sock->s_eps))) { + (!nni_list_empty(&sock->s_listeners)) || + (!nni_list_empty(&sock->s_dialers))) { nni_cv_wait(&sock->s_cv); } @@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s) // Wait for pipes, eps, and contexts to finish closing. nni_mtx_lock(&s->s_mx); - while ( - (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) { + while ((!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_dialers)) || + (!nni_list_empty(&s->s_listeners))) { nni_cv_wait(&s->s_cv); } nni_mtx_unlock(&s->s_mx); @@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_ep_add(nni_sock *s, nni_ep *ep) +nni_sock_add_listener(nni_sock *s, nni_listener *l) +{ + nni_sockopt *sopt; + + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + NNI_LIST_FOREACH (&s->s_options, sopt) { + int rv; + rv = nni_listener_setopt( + l, sopt->name, sopt->data, sopt->sz, sopt->typ); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + nni_mtx_unlock(&s->s_mx); + return (rv); + } + } + + nni_list_append(&s->s_listeners, l); + nni_mtx_unlock(&s->s_mx); + return (0); +} + +int +nni_sock_add_dialer(nni_sock *s, nni_dialer *d) { nni_sockopt *sopt; @@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep) NNI_LIST_FOREACH (&s->s_options, sopt) { int rv; - rv = nni_ep_setopt( - ep, sopt->name, sopt->data, sopt->sz, sopt->typ); + rv = nni_dialer_setopt( + d, sopt->name, sopt->data, sopt->sz, sopt->typ); if ((rv != 0) && (rv != NNG_ENOTSUP)) { nni_mtx_unlock(&s->s_mx); return (rv); } } - nni_list_append(&s->s_eps, ep); + nni_list_append(&s->s_dialers, d); nni_mtx_unlock(&s->s_mx); return (0); } void -nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) +nni_sock_remove_listener(nni_sock *s, nni_listener *l) { - nni_mtx_lock(&sock->s_mx); - if (nni_list_active(&sock->s_eps, ep)) { - nni_list_remove(&sock->s_eps, ep); - if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) { - nni_cv_wake(&sock->s_cv); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_listeners, l)) { + nni_list_remove(&s->s_listeners, l); + if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) { + nni_cv_wake(&s->s_cv); } } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); +} + +void +nni_sock_remove_dialer(nni_sock *s, nni_dialer *d) +{ + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_dialers, d)) { + nni_list_remove(&s->s_dialers, d); + if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) { + nni_cv_wake(&s->s_cv); + } + } + nni_mtx_unlock(&s->s_mx); } int @@ -948,7 +1007,8 @@ nni_sock_setopt( nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t) { int rv = NNG_ENOTSUP; - nni_ep * ep; + nni_dialer * d; + nni_listener * l; nni_sockopt * optv; nni_sockopt * oldv = NULL; const sock_option * sso; @@ -1042,9 +1102,20 @@ nni_sock_setopt( // transport (other than ENOTSUP) stops the operation // altogether. Its important that transport wide checks // properly pre-validate. - NNI_LIST_FOREACH (&s->s_eps, ep) { + NNI_LIST_FOREACH (&s->s_listeners, l) { + int x; + x = nni_listener_setopt(l, optv->name, optv->data, sz, t); + if (x != NNG_ENOTSUP) { + if ((rv = x) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (rv); + } + } + } + NNI_LIST_FOREACH (&s->s_dialers, d) { int x; - x = nni_ep_setopt(ep, optv->name, optv->data, sz, t); + x = nni_dialer_setopt(d, optv->name, optv->data, sz, t); if (x != NNG_ENOTSUP) { if ((rv = x) != 0) { nni_mtx_unlock(&s->s_mx); |
