diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 142 |
1 files changed, 11 insertions, 131 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 9db61838..9263b142 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) } -// nni_socket_dial_one just does a single dial call, so it can be used -// for synchronous dialing. -static int -nni_socket_dial_one(nni_endpt *ep) +int +nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) { - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; + nni_endpt *ep; int rv; - pipe = NULL; - - if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { - return (rv); - } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { return (rv); } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); -} - - -// nni_socket_dialer is the thread worker that dials in the background. -static void -nni_socket_dialer(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - nni_time cooldown; - - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - return; - } - nni_mutex_exit(&ep->ep_mx); - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { - nni_cond_wait(&ep->ep_cv); - } - nni_mutex_exit(&ep->ep_mx); - - rv = nni_socket_dial_one(ep); - switch (rv) { - case 0: - // good connection - continue; - case NNG_ENOMEM: - cooldown = 1000000; - break; - default: - // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; - break; - } - // we inject a delay so we don't just spin hard on - // errors like connection refused. For NNG_ENOMEM, we - // wait even longer, since the system needs time to - // release resources. - cooldown += nni_clock(); - while (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, cooldown); - } - } -} - - -int -nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) -{ - int rv = 0; - nni_thread *reap = NULL; - - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing || ep->ep_close) { - rv = NNG_ECLOSED; - goto out; - } - - ep->ep_stop = 0; - ep->ep_start = sync ? 0 : 1; - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { - rv = NNG_ENOMEM; - goto out; - } - if ((rv == 0) && (sync)) { - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - rv = nni_socket_dial_one(ep); - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if (rv == 0) { - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_dialer; - ep->ep_dialer = NULL; - } - nni_cond_signal(&ep->ep_cv); - } -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - - if (reap != NULL) { - nni_thread_reap(reap); + rv = nni_endpt_dial(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else if (epp != NULL) { + *epp = ep; } - return (rv); } @@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep) rv = NNG_ECLOSED; goto out; } - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { rv = NNG_ENOMEM; goto out; } |
