diff options
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 198 |
1 files changed, 156 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 1c5f4db5..24d92206 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) ep->ep_listener = NULL; ep->ep_close = 0; ep->ep_start = 0; + ep->ep_bound = 0; ep->ep_pipe = NULL; + NNI_LIST_NODE_INIT(&ep->ep_node); if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) { nni_free(ep, sizeof (*ep)); return (NNG_ENOMEM); @@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep) } -int -nni_endpt_bind(nni_endpt *ep) +static int +nni_endpt_connect(nni_endpt *ep, nni_pipe **pp) { - if (ep->ep_close) { - return (NNG_ECLOSED); + nni_pipe *pipe; + int rv; + + if ((rv = nni_pipe_create(&pipe, ep)) != 0) { + return (rv); } - return (ep->ep_ops.ep_bind(ep->ep_data)); + if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + *pp = pipe; + return (0); } @@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep) nni_pipe *pipe; int rv; - pipe = NULL; - - if (ep->ep_close) { - return (NNG_ECLOSED); - } - if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) { - return (rv); - } - if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) { - nni_pipe_destroy(pipe); - return (rv); + if (((rv = nni_endpt_connect(ep, &pipe)) == 0) && + ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) { + return (0); } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); - return (rv); - } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); + return (rv); } @@ -160,7 +148,6 @@ static void nni_dialer(void *arg) { nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; nni_time cooldown; @@ -180,6 +167,10 @@ nni_dialer(void *arg) while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { nni_cond_wait(&ep->ep_cv); } + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + break; + } nni_mutex_exit(&ep->ep_mx); rv = nni_dial_once(ep); @@ -190,9 +181,12 @@ nni_dialer(void *arg) case NNG_ENOMEM: cooldown = 1000000; break; + case NNG_ECLOSED: + break; default: // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; + printf("COOLING DOWN!!\n"); + cooldown = 1000000; break; } // we inject a delay so we don't just spin hard on @@ -200,9 +194,12 @@ nni_dialer(void *arg) // wait even longer, since the system needs time to // release resources. cooldown += nni_clock(); + nni_mutex_enter(&ep->ep_mx); while (!ep->ep_close) { + // We need a different condvar... nni_cond_waituntil(&ep->ep_cv, cooldown); } + nni_mutex_exit(&ep->ep_mx); } } @@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags) { int rv = 0; nni_thread *reap = NULL; - nni_socket *sock = ep->ep_sock; - nni_mutex_enter(&sock->s_mx); nni_mutex_enter(&ep->ep_mx); if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { rv = NNG_EBUSY; goto out; } - if (sock->s_closing || ep->ep_close) { + if (ep->ep_close) { rv = NNG_ECLOSED; goto out; } @@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags) } if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); rv = nni_dial_once(ep); - nni_mutex_enter(&sock->s_mx); nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { ep->ep_start = 1; } else { @@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags) } out: nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); if (reap != NULL) { nni_thread_reap(reap); @@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) if (ep->ep_close) { return (NNG_ECLOSED); } - if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) { + if ((rv = nni_pipe_create(&pipe, ep)) != 0) { return (rv); } - if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) { + if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) { nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; *pp = pipe; return (0); } + + +static void +nni_listener(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + for (;;) { + nni_time cooldown; + nni_mutex_enter(&ep->ep_mx); + + // If we didn't bind synchronously, do it now. + while (!ep->ep_bound && !ep->ep_close) { + int rv; + + nni_mutex_exit(&ep->ep_mx); + rv = ep->ep_ops.ep_bind(ep->ep_data); + nni_mutex_enter(&ep->ep_mx); + + if (rv == 0) { + ep->ep_bound = 1; + } else { + // Invalid address? Out of memory? Who knows. + // Try again in a bit (10ms). + // XXX: PROPER BACKOFF NEEDED + cooldown = 10000; + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, + cooldown); + } + } + } + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + break; + } + nni_mutex_exit(&ep->ep_mx); + + pipe = NULL; + + if (((rv = nni_endpt_accept(ep, &pipe)) == 0) && + ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) { + continue; + } + if (rv == NNG_ECLOSED) { + break; + } + cooldown = 1000; // 1 ms cooldown + if (rv == NNG_ENOMEM) { + // For out of memory, we need to give more + // time for the system to reclaim resources. + cooldown = 100000; // 100ms + } + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + nni_mutex_exit(&ep->ep_mx); + } +} + + +int +nni_endpt_listen(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + rv = ep->ep_ops.ep_bind(ep->ep_data); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_bound = 1; + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_listener; + ep->ep_listener = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} |
