From 2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 25 Dec 2016 11:03:06 -0800 Subject: New dial/listen API. Dialing might work now. In order to give control over synchronous vs. async dialing, we provide a flag to indicate synchronous dialing is desired. (Hmm. Should we reverse the default sense?) We extend listen to have the same flag. Logic is moved to endpt.c since dialing is really and endpoint specific operation. There are other minor related bug fixes here too. --- src/core/endpt.c | 147 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 128 insertions(+), 19 deletions(-) (limited to 'src/core/endpt.c') diff --git a/src/core/endpt.c b/src/core/endpt.c index ba557e3a..c5e479a2 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,20 +14,6 @@ #include // Functionality realited to end points. -#if 0 -struct nng_endpt { - struct nni_endpt_ops ep_ops; - void * ep_data; - nni_list_node_t ep_sock_node; - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; -}; -#endif int nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) @@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) int rv; if ((tran = nni_transport_find(addr)) == NULL) { - return (NNG_EINVAL); + return (NNG_ENOTSUP); } if (strlen(addr) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); @@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep) } -int -nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +// nni_dial_once just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_dial_once(nni_endpt *ep) { + nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + pipe = NULL; + if (ep->ep_close) { return (NNG_ECLOSED); } @@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; - *pp = pipe; + + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + return (0); } +// nni_socket_dialer is the thread worker that dials in the background. +static void +nni_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_dial_once(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_endpt_dial(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + nni_socket *sock = ep->ep_sock; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_dial_once(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { -- cgit v1.2.3-70-g09d2