diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-31 14:30:36 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-31 14:30:36 -0800 |
| commit | 4e4203fb1cddcfb205d602dd76cd0a8338321ee6 (patch) | |
| tree | 566907bf0c9b5e588f49db8fcb3fa3a681060fbd /src/core | |
| parent | 8fe11dfb66acfe067b21b6eb47eb9e3928169950 (diff) | |
| download | nng-4e4203fb1cddcfb205d602dd76cd0a8338321ee6.tar.gz nng-4e4203fb1cddcfb205d602dd76cd0a8338321ee6.tar.bz2 nng-4e4203fb1cddcfb205d602dd76cd0a8338321ee6.zip | |
Close & destroy endpoints in a single operation.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 107 | ||||
| -rw-r--r-- | src/core/endpt.h | 10 | ||||
| -rw-r--r-- | src/core/socket.c | 35 |
3 files changed, 63 insertions, 89 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index b5a877e5..2d2f62ce 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -32,6 +32,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) if ((ep = nni_alloc(sizeof (*ep))) == NULL) { return (NNG_ENOMEM); } + ep->ep_sock = sock; ep->ep_dialer = NULL; ep->ep_listener = NULL; ep->ep_close = 0; @@ -39,25 +40,18 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) ep->ep_bound = 0; ep->ep_pipe = NULL; NNI_LIST_NODE_INIT(&ep->ep_node); - if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) { - nni_free(ep, sizeof (*ep)); - return (NNG_ENOMEM); - } - if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_mx)) != 0) { - nni_mutex_fini(&ep->ep_mx); + if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) { nni_free(ep, sizeof (*ep)); return (NNG_ENOMEM); } // Could safely use strcpy here, but this avoids discussion. (void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr); - ep->ep_sock = sock; ep->ep_ops = *tran->tran_ep_ops; rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock)); if (rv != 0) { nni_cond_fini(&ep->ep_cv); - nni_mutex_fini(&ep->ep_mx); nni_free(ep, sizeof (*ep)); return (rv); } @@ -67,33 +61,14 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) void -nni_endpt_destroy(nni_endpt *ep) -{ - // We should already have been closed at this point, so this - // should proceed very quickly. - if (ep->ep_dialer != NULL) { - nni_thread_reap(ep->ep_dialer); - } - if (ep->ep_listener != NULL) { - nni_thread_reap(ep->ep_listener); - } - - ep->ep_ops.ep_destroy(ep->ep_data); - - nni_cond_fini(&ep->ep_cv); - nni_mutex_fini(&ep->ep_mx); - nni_free(ep, sizeof (*ep)); -} - - -void nni_endpt_close(nni_endpt *ep) { nni_pipe *pipe; + nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); if (ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); return; } ep->ep_close = 1; @@ -103,7 +78,19 @@ nni_endpt_close(nni_endpt *ep) ep->ep_pipe = NULL; } nni_cond_broadcast(&ep->ep_cv); - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); + + if (ep->ep_dialer != NULL) { + nni_thread_reap(ep->ep_dialer); + } + if (ep->ep_listener != NULL) { + nni_thread_reap(ep->ep_listener); + } + + ep->ep_ops.ep_destroy(ep->ep_data); + + nni_cond_fini(&ep->ep_cv); + nni_free(ep, sizeof (*ep)); } @@ -151,27 +138,28 @@ nni_dialer(void *arg) nni_pipe *pipe; int rv; nni_time cooldown; + nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { nni_cond_wait(&ep->ep_cv); } if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); return; } - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); for (;;) { - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { nni_cond_wait(&ep->ep_cv); } if (ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); break; } - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); rv = nni_dial_once(ep); switch (rv) { @@ -193,7 +181,7 @@ nni_dialer(void *arg) // wait even longer, since the system needs time to // release resources. cooldown += nni_clock(); - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); while (!ep->ep_close) { // We need a different condvar... rv = nni_cond_waituntil(&ep->ep_cv, cooldown); @@ -201,7 +189,7 @@ nni_dialer(void *arg) break; } } - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); } } @@ -211,8 +199,9 @@ nni_endpt_dial(nni_endpt *ep, int flags) { int rv = 0; nni_thread *reap = NULL; + nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { rv = NNG_EBUSY; goto out; @@ -229,9 +218,9 @@ nni_endpt_dial(nni_endpt *ep, int flags) goto out; } if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); rv = nni_dial_once(ep); - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); if (rv == 0) { ep->ep_start = 1; @@ -245,7 +234,7 @@ nni_endpt_dial(nni_endpt *ep, int flags) nni_cond_signal(&ep->ep_cv); } out: - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); if (reap != NULL) { nni_thread_reap(reap); @@ -282,27 +271,28 @@ nni_listener(void *arg) nni_endpt *ep = arg; nni_pipe *pipe; int rv; + nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { nni_cond_wait(&ep->ep_cv); } if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); return; } - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); for (;;) { nni_time cooldown; - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); // If we didn't bind synchronously, do it now. while (!ep->ep_bound && !ep->ep_close) { int rv; - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); if (rv == 0) { ep->ep_bound = 1; @@ -321,10 +311,10 @@ nni_listener(void *arg) } } if (ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); break; } - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); pipe = NULL; @@ -342,14 +332,14 @@ nni_listener(void *arg) cooldown = 100000; // 100ms } cooldown += nni_clock(); - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); while (!ep->ep_close) { rv = nni_cond_waituntil(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); } } @@ -359,8 +349,9 @@ nni_endpt_listen(nni_endpt *ep, int flags) { int rv = 0; nni_thread *reap = NULL; + nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { rv = NNG_EBUSY; goto out; @@ -377,9 +368,9 @@ nni_endpt_listen(nni_endpt *ep, int flags) goto out; } if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mutex_enter(&ep->ep_mx); + nni_mutex_enter(mx); if (rv == 0) { ep->ep_bound = 1; ep->ep_start = 1; @@ -390,10 +381,10 @@ nni_endpt_listen(nni_endpt *ep, int flags) reap = ep->ep_listener; ep->ep_listener = NULL; } - nni_cond_signal(&ep->ep_cv); + nni_cond_broadcast(&ep->ep_cv); } out: - nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(mx); if (reap != NULL) { nni_thread_reap(reap); diff --git a/src/core/endpt.h b/src/core/endpt.h index 36f55de2..73ed3243 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -23,17 +23,15 @@ struct nng_endpt { char ep_addr[NNG_MAXADDRLEN]; nni_thread * ep_dialer; nni_thread * ep_listener; - int ep_stop; // thread exits before start - int ep_start; // start thread running - int ep_close; // full shutdown - int ep_bound; // true if we bound locally - nni_mutex ep_mx; + int ep_stop; // thread exits before start + int ep_start; // start thread running + int ep_close; // full shutdown + int ep_bound; // true if we bound locally nni_cond ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) }; extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); -extern void nni_endpt_destroy(nni_endpt *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); extern int nni_endpt_dial(nni_endpt *, int); diff --git a/src/core/socket.c b/src/core/socket.c index 6fa6712d..1cba29c2 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -44,6 +44,12 @@ nni_reaper(void *arg) nni_mutex_enter(&sock->s_mx); if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) { nni_list_remove(&sock->s_reaps, pipe); + + if (((ep = pipe->p_ep) != NULL) && + ((ep->ep_pipe == pipe))) { + ep->ep_pipe = NULL; + nni_cond_broadcast(&ep->ep_cv); + } nni_mutex_exit(&sock->s_mx); // This should already have been done. @@ -58,18 +64,6 @@ nni_reaper(void *arg) pipe->p_pdata); } - // If pipe was a connected (dialer) pipe, - // then let the endpoint know so it can try to - // reestablish the connection. - if (((ep = pipe->p_ep) != NULL) && - ((ep->ep_pipe == pipe))) { - ep->ep_pipe = NULL; - pipe->p_ep = NULL; - nni_mutex_enter(&ep->ep_mx); - nni_cond_signal(&ep->ep_cv); - nni_mutex_exit(&ep->ep_mx); - } - // XXX: also publish event... nni_pipe_destroy(pipe); continue; @@ -174,8 +168,11 @@ nni_socket_close(nni_socket *sock) // Stop all EPS. We're going to do this first, since we know // we're closing. - NNI_LIST_FOREACH (&sock->s_eps, ep) { + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_list_remove(&sock->s_eps, ep); + nni_mutex_exit(&sock->s_mx); nni_endpt_close(ep); + nni_mutex_enter(&sock->s_mx); } // Special optimization; if there are no pipes connected, @@ -225,16 +222,6 @@ nni_socket_close(nni_socket *sock) reaper = sock->s_reaper; sock->s_reaper = NULL; nni_cond_broadcast(&sock->s_cv); - - // We already told the endpoints to shutdown. We just - // need to reap them now. - while ((ep = nni_list_first(&sock->s_eps)) != NULL) { - nni_list_remove(&sock->s_eps, ep); - nni_mutex_exit(&sock->s_mx); - - nni_endpt_destroy(ep); - nni_mutex_enter(&sock->s_mx); - } nni_mutex_exit(&sock->s_mx); // Wait for the reaper to exit. @@ -351,7 +338,6 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) rv = nni_endpt_dial(ep, flags); if (rv != 0) { nni_endpt_close(ep); - nni_endpt_destroy(ep); } else { if (epp != NULL) { *epp = ep; @@ -377,7 +363,6 @@ nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp, rv = nni_endpt_listen(ep, flags); if (rv != 0) { nni_endpt_close(ep); - nni_endpt_destroy(ep); } else { if (epp != NULL) { *epp = ep; |
