diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 23:17:12 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 23:17:12 -0800 |
| commit | bca0a27e2f4978a5a74748b07613c0e30014880c (patch) | |
| tree | a6beea2e7e63e02be070e4b124dd40c92917dbd6 /src/core/socket.c | |
| parent | 29628309ae018c3f317eef9b03625d4ce3807a92 (diff) | |
| download | nng-bca0a27e2f4978a5a74748b07613c0e30014880c.tar.gz nng-bca0a27e2f4978a5a74748b07613c0e30014880c.tar.bz2 nng-bca0a27e2f4978a5a74748b07613c0e30014880c.zip | |
Implemened synchronous & asynchronuos dialer, accepter. Getting close...
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 193 |
1 files changed, 170 insertions, 23 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index bbb9e48a..1b3bcf56 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -26,6 +26,7 @@ nni_socket_recvq(nni_socket *s) return (s->s_urq); } + // nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket **sockp, uint16_t proto) @@ -41,7 +42,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (NNG_ENOMEM); } sock->s_ops = *ops; - sock->s_linger = NNG_DEFAULT_LINGER; + sock->s_linger = NNG_LINGER_DEFAULT; if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); @@ -133,10 +134,10 @@ nni_socket_close(nni_socket *sock) // need to reap them now. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { nni_list_remove(&sock->s_eps, ep); - nni_mutex_exit(&sock->s_eps); + nni_mutex_exit(&sock->s_mx); nni_endpt_destroy(ep); - nni_mutex_enter(&sock->s_eps); + nni_mutex_enter(&sock->s_mx); } nni_mutex_exit(&sock->s_mx); @@ -262,6 +263,8 @@ nni_socket_proto(nni_socket *sock) void nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) { + nni_endpt *ep; + nni_mutex_enter(&sock->s_mx); if (pipe->p_sock != sock) { nni_mutex_exit(&sock->s_mx); @@ -274,12 +277,18 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // Now remove it from our own list. nni_list_remove(&sock->s_pipes, pipe); pipe->p_sock = NULL; - // XXX: TODO: Redial - // XXX: also publish event... - // if (pipe->p_ep != NULL) { - // nn_endpt_rem_pipe(pipe->p_ep, pipe) - // } + // If we were a connected (dialer) pipe, then let the endpoint + // know so it can try to reestablish the connection. + if ((ep = pipe->p_ep) != NULL) { + ep->ep_pipe = NULL; + pipe->p_ep = NULL; + nni_mutex_enter(&ep->ep_mx); + nni_cond_signal(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + } + + // XXX: also publish event... nni_pipe_destroy(pipe); // If we're closing, wake the socket if we finished draining. @@ -291,7 +300,7 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) int -nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) +nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) { int rv; @@ -306,11 +315,6 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) } nni_list_append(&sock->s_pipes, pipe); - // Add the pipe to its endpoint list. - nni_mutex_enter(&pipe->p_ep->ep_mx); - nni_list_append(&pipe->p_ep->ep_pipes, pipe); - nni_mutex_exit(&pipe->p_ep->ep_mx); - pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); @@ -318,6 +322,39 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) } +// nni_socket_dial_one just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_socket_dial_one(nni_endpt *ep) +{ + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + pipe = NULL; + + if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { + return (rv); + } + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + + return (0); +} + + +// nni_socket_dialer is the thread worker that dials in the background. static void nni_socket_dialer(void *arg) { @@ -325,13 +362,116 @@ nni_socket_dialer(void *arg) nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); for (;;) { nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && - (nni_list_first(&ep->ep_pipes) != NULL)) { + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { nni_cond_wait(&ep->ep_cv); } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_socket_dial_one(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) +{ + int rv = 0; + nni_thread *reap = NULL; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (ep->ep_sock != sock) { // Should never happen + rv = NNG_EINVAL; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = sync ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (sync)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_socket_dial_one(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + +static void +nni_socket_accepter(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + for (;;) { + nni_mutex_enter(&ep->ep_mx); if (ep->ep_close) { nni_mutex_exit(&ep->ep_mx); break; @@ -340,24 +480,31 @@ nni_socket_dialer(void *arg) pipe = NULL; - if (((rv = nni_endpt_dial(ep, &pipe)) != 0) || - ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) { + if (((rv = nni_endpt_accept(ep, &pipe)) != 0) || + ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) { if (rv == NNG_ECLOSED) { break; } if (pipe != NULL) { nni_pipe_destroy(pipe); } - // XXX: Publish connection error event... - // XXX: Inject a wait for reconnect... - continue; + // XXX: Publish accept error event... + + // If we can't allocate memory, don't spin, so that + // things get a chance to release memory later. + // Other errors, like ECONNRESET, should not recur. + // (If we find otherwise we can inject a short sleep + // here of about 1 ms without too much penalty.) + if (rv == NNG_ENOMEM) { + nni_usleep(100000); + } } } } int -nni_socket_dial(nni_socket *sock, nni_endpt *ep) +nni_socket_accept(nni_socket *sock, nni_endpt *ep) { int rv = 0; @@ -367,7 +514,7 @@ nni_socket_dial(nni_socket *sock, nni_endpt *ep) rv = NNG_EBUSY; goto out; } - if (ep->ep_sock != sock) { // Should never happen + if (ep->ep_sock != sock) { // Should never happen rv = NNG_EINVAL; goto out; } |
