aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c142
1 files changed, 11 insertions, 131 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 9db61838..9263b142 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
}
-// nni_socket_dial_one just does a single dial call, so it can be used
-// for synchronous dialing.
-static int
-nni_socket_dial_one(nni_endpt *ep)
+int
+nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
{
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
+ nni_endpt *ep;
int rv;
- pipe = NULL;
-
- if ((rv = nni_endpt_dial(ep, &pipe)) != 0) {
- return (rv);
- }
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
+ if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) {
return (rv);
}
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
-}
-
-
-// nni_socket_dialer is the thread worker that dials in the background.
-static void
-nni_socket_dialer(void *arg)
-{
- nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
- int rv;
- nni_time cooldown;
-
- nni_mutex_enter(&ep->ep_mx);
- while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
- nni_cond_wait(&ep->ep_cv);
- }
- if (ep->ep_stop || ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
- return;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- for (;;) {
- nni_mutex_enter(&ep->ep_mx);
- while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
- nni_cond_wait(&ep->ep_cv);
- }
- nni_mutex_exit(&ep->ep_mx);
-
- rv = nni_socket_dial_one(ep);
- switch (rv) {
- case 0:
- // good connection
- continue;
- case NNG_ENOMEM:
- cooldown = 1000000;
- break;
- default:
- // XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
- break;
- }
- // we inject a delay so we don't just spin hard on
- // errors like connection refused. For NNG_ENOMEM, we
- // wait even longer, since the system needs time to
- // release resources.
- cooldown += nni_clock();
- while (!ep->ep_close) {
- nni_cond_waituntil(&ep->ep_cv, cooldown);
- }
- }
-}
-
-
-int
-nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync)
-{
- int rv = 0;
- nni_thread *reap = NULL;
-
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
- }
- if (ep->ep_sock != sock) { // Should never happen
- rv = NNG_EINVAL;
- goto out;
- }
- if (sock->s_closing || ep->ep_close) {
- rv = NNG_ECLOSED;
- goto out;
- }
-
- ep->ep_stop = 0;
- ep->ep_start = sync ? 0 : 1;
- if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
- }
- if ((rv == 0) && (sync)) {
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
- rv = nni_socket_dial_one(ep);
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if (rv == 0) {
- ep->ep_start = 1;
- } else {
- // This will cause the thread to exit instead of
- // starting.
- ep->ep_stop = 1;
- reap = ep->ep_dialer;
- ep->ep_dialer = NULL;
- }
- nni_cond_signal(&ep->ep_cv);
- }
-out:
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
-
- if (reap != NULL) {
- nni_thread_reap(reap);
+ rv = nni_endpt_dial(ep, flags);
+ if (rv != 0) {
+ nni_endpt_close(ep);
+ nni_endpt_destroy(ep);
+ } else if (epp != NULL) {
+ *epp = ep;
}
-
return (rv);
}
@@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep)
rv = NNG_ECLOSED;
goto out;
}
- if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) {
+ if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) {
rv = NNG_ENOMEM;
goto out;
}