diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-25 11:03:06 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-25 11:03:06 -0800 |
| commit | 2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca (patch) | |
| tree | 7f7c96adc773e94fe057081b40f7a8b5b31508fe /src/core/endpt.c | |
| parent | b7188678e1fb7399afd11faaea3d537bf52f6923 (diff) | |
| download | nng-2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca.tar.gz nng-2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca.tar.bz2 nng-2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca.zip | |
New dial/listen API. Dialing might work now.
In order to give control over synchronous vs. async dialing, we provide a
flag to indicate synchronous dialing is desired. (Hmm. Should we reverse the
default sense?) We extend listen to have the same flag.
Logic is moved to endpt.c since dialing is really and endpoint specific operation.
There are other minor related bug fixes here too.
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 147 |
1 files changed, 128 insertions, 19 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index ba557e3a..c5e479a2 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,20 +14,6 @@ #include <stdio.h> // Functionality realited to end points. -#if 0 -struct nng_endpt { - struct nni_endpt_ops ep_ops; - void * ep_data; - nni_list_node_t ep_sock_node; - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; -}; -#endif int nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) @@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) int rv; if ((tran = nni_transport_find(addr)) == NULL) { - return (NNG_EINVAL); + return (NNG_ENOTSUP); } if (strlen(addr) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); @@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep) } -int -nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +// nni_dial_once just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_dial_once(nni_endpt *ep) { + nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + pipe = NULL; + if (ep->ep_close) { return (NNG_ECLOSED); } @@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; - *pp = pipe; + + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + return (0); } +// nni_socket_dialer is the thread worker that dials in the background. +static void +nni_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_dial_once(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_endpt_dial(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + nni_socket *sock = ep->ep_sock; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_dial_once(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { |
