diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 23:17:12 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 23:17:12 -0800 |
| commit | bca0a27e2f4978a5a74748b07613c0e30014880c (patch) | |
| tree | a6beea2e7e63e02be070e4b124dd40c92917dbd6 | |
| parent | 29628309ae018c3f317eef9b03625d4ce3807a92 (diff) | |
| download | nng-bca0a27e2f4978a5a74748b07613c0e30014880c.tar.gz nng-bca0a27e2f4978a5a74748b07613c0e30014880c.tar.bz2 nng-bca0a27e2f4978a5a74748b07613c0e30014880c.zip | |
Implemened synchronous & asynchronuos dialer, accepter. Getting close...
| -rw-r--r-- | src/core/endpt.c | 40 | ||||
| -rw-r--r-- | src/core/endpt.h | 24 | ||||
| -rw-r--r-- | src/core/pipe.h | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 193 | ||||
| -rw-r--r-- | src/core/socket.h | 2 |
5 files changed, 213 insertions, 47 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index fd328b0f..ad1a0b9a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -49,6 +49,8 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) ep->ep_dialer = NULL; ep->ep_listener = NULL; ep->ep_close = 0; + ep->ep_start = 0; + ep->ep_pipe = NULL; if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) { nni_free(ep, sizeof (*ep)); return (NNG_ENOMEM); @@ -71,7 +73,6 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) nni_free(ep, sizeof (*ep)); return (rv); } - NNI_LIST_INIT(&ep->ep_pipes, nni_pipe, p_ep_node); *epp = ep; return (0); } @@ -81,17 +82,12 @@ nni_endpt_destroy(nni_endpt *ep) { // We should already have been closed at this point, so this // should proceed very quickly. - if (ep->ep_dialer) { + if (ep->ep_dialer != NULL) { nni_thread_reap(ep->ep_dialer); } - if (ep->ep_listener) { + if (ep->ep_listener != NULL) { nni_thread_reap(ep->ep_listener); } - nni_mutex_enter(&ep->ep_mx); - while (nni_list_first(&ep->ep_pipes) != NULL) { - nni_cond_wait(&ep->ep_cv); - } - nni_mutex_exit(&ep->ep_mx); ep->ep_ops.ep_destroy(ep->ep_data); @@ -103,6 +99,7 @@ nni_endpt_destroy(nni_endpt *ep) void nni_endpt_close(nni_endpt *ep) { + nni_pipe *pipe; nni_mutex_enter(&ep->ep_mx); if (ep->ep_close) { nni_mutex_exit(&ep->ep_mx); @@ -110,6 +107,10 @@ nni_endpt_close(nni_endpt *ep) } ep->ep_close = 1; ep->ep_ops.ep_close(ep->ep_data); + if ((pipe = ep->ep_pipe) != NULL) { + pipe->p_ep = NULL; + ep->ep_pipe = NULL; + } nni_cond_broadcast(&ep->ep_cv); nni_mutex_exit(&ep->ep_mx); } @@ -144,6 +145,23 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) return (0); } -#if 0 -int nni_endpt_accept(nni_endpt *, nni_pipe **); -#endif +int +nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) +{ + nni_pipe *pipe; + int rv; + + if (ep->ep_close) { + return (NNG_ECLOSED); + } + if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) { + return (rv); + } + if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + pipe->p_ep = ep; + *pp = pipe; + return (0); +} diff --git a/src/core/endpt.h b/src/core/endpt.h index 7512daa8..64935c4c 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -16,17 +16,19 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS // OR TRANSPORTS. struct nng_endpt { - nni_endpt_ops ep_ops; - void * ep_data; // Transport private - nni_list_node ep_sock_node; // Per socket list - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; - nni_list ep_pipes; // Active list of pipes + nni_endpt_ops ep_ops; + void * ep_data; // Transport private + nni_list_node ep_sock_node; // Per socket list + nni_socket * ep_sock; + char ep_addr[NNG_MAXADDRLEN]; + nni_thread * ep_dialer; + nni_thread * ep_listener; + int ep_stop; // thread exits before start + int ep_start; // start thread running + int ep_close; // full shutdown + nni_mutex ep_mx; + nni_cond ep_cv; + nni_pipe * ep_pipe; // Connected pipe (dialers only) }; extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); diff --git a/src/core/pipe.h b/src/core/pipe.h index 2ce4f1ec..3d0e07e7 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -24,7 +24,6 @@ struct nng_pipe { void * p_data; nni_list_node p_sock_node; nni_socket * p_sock; - nni_list_node p_ep_node; nni_endpt * p_ep; }; diff --git a/src/core/socket.c b/src/core/socket.c index bbb9e48a..1b3bcf56 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -26,6 +26,7 @@ nni_socket_recvq(nni_socket *s) return (s->s_urq); } + // nn_socket_create creates the underlying socket. int nni_socket_create(nni_socket **sockp, uint16_t proto) @@ -41,7 +42,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (NNG_ENOMEM); } sock->s_ops = *ops; - sock->s_linger = NNG_DEFAULT_LINGER; + sock->s_linger = NNG_LINGER_DEFAULT; if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); @@ -133,10 +134,10 @@ nni_socket_close(nni_socket *sock) // need to reap them now. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { nni_list_remove(&sock->s_eps, ep); - nni_mutex_exit(&sock->s_eps); + nni_mutex_exit(&sock->s_mx); nni_endpt_destroy(ep); - nni_mutex_enter(&sock->s_eps); + nni_mutex_enter(&sock->s_mx); } nni_mutex_exit(&sock->s_mx); @@ -262,6 +263,8 @@ nni_socket_proto(nni_socket *sock) void nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) { + nni_endpt *ep; + nni_mutex_enter(&sock->s_mx); if (pipe->p_sock != sock) { nni_mutex_exit(&sock->s_mx); @@ -274,12 +277,18 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // Now remove it from our own list. nni_list_remove(&sock->s_pipes, pipe); pipe->p_sock = NULL; - // XXX: TODO: Redial - // XXX: also publish event... - // if (pipe->p_ep != NULL) { - // nn_endpt_rem_pipe(pipe->p_ep, pipe) - // } + // If we were a connected (dialer) pipe, then let the endpoint + // know so it can try to reestablish the connection. + if ((ep = pipe->p_ep) != NULL) { + ep->ep_pipe = NULL; + pipe->p_ep = NULL; + nni_mutex_enter(&ep->ep_mx); + nni_cond_signal(&ep->ep_cv); + nni_mutex_exit(&ep->ep_mx); + } + + // XXX: also publish event... nni_pipe_destroy(pipe); // If we're closing, wake the socket if we finished draining. @@ -291,7 +300,7 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) int -nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) +nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) { int rv; @@ -306,11 +315,6 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) } nni_list_append(&sock->s_pipes, pipe); - // Add the pipe to its endpoint list. - nni_mutex_enter(&pipe->p_ep->ep_mx); - nni_list_append(&pipe->p_ep->ep_pipes, pipe); - nni_mutex_exit(&pipe->p_ep->ep_mx); - pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); @@ -318,6 +322,39 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) } +// nni_socket_dial_one just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_socket_dial_one(nni_endpt *ep) +{ + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + pipe = NULL; + + if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { + return (rv); + } + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + + return (0); +} + + +// nni_socket_dialer is the thread worker that dials in the background. static void nni_socket_dialer(void *arg) { @@ -325,13 +362,116 @@ nni_socket_dialer(void *arg) nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); for (;;) { nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && - (nni_list_first(&ep->ep_pipes) != NULL)) { + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { nni_cond_wait(&ep->ep_cv); } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_socket_dial_one(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) +{ + int rv = 0; + nni_thread *reap = NULL; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (ep->ep_sock != sock) { // Should never happen + rv = NNG_EINVAL; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = sync ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (sync)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_socket_dial_one(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + +static void +nni_socket_accepter(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + for (;;) { + nni_mutex_enter(&ep->ep_mx); if (ep->ep_close) { nni_mutex_exit(&ep->ep_mx); break; @@ -340,24 +480,31 @@ nni_socket_dialer(void *arg) pipe = NULL; - if (((rv = nni_endpt_dial(ep, &pipe)) != 0) || - ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) { + if (((rv = nni_endpt_accept(ep, &pipe)) != 0) || + ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) { if (rv == NNG_ECLOSED) { break; } if (pipe != NULL) { nni_pipe_destroy(pipe); } - // XXX: Publish connection error event... - // XXX: Inject a wait for reconnect... - continue; + // XXX: Publish accept error event... + + // If we can't allocate memory, don't spin, so that + // things get a chance to release memory later. + // Other errors, like ECONNRESET, should not recur. + // (If we find otherwise we can inject a short sleep + // here of about 1 ms without too much penalty.) + if (rv == NNG_ENOMEM) { + nni_usleep(100000); + } } } } int -nni_socket_dial(nni_socket *sock, nni_endpt *ep) +nni_socket_accept(nni_socket *sock, nni_endpt *ep) { int rv = 0; @@ -367,7 +514,7 @@ nni_socket_dial(nni_socket *sock, nni_endpt *ep) rv = NNG_EBUSY; goto out; } - if (ep->ep_sock != sock) { // Should never happen + if (ep->ep_sock != sock) { // Should never happen rv = NNG_EINVAL; goto out; } diff --git a/src/core/socket.h b/src/core/socket.h index ff6a63d5..05113489 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -40,7 +40,7 @@ struct nng_socket { extern int nni_socket_create(nni_socket **, uint16_t); extern int nni_socket_close(nni_socket *); -extern int nni_socket_add_pipe(nni_socket *, nni_pipe *); +extern int nni_socket_add_pipe(nni_socket *, nni_pipe *, int); extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *); extern uint16_t nni_socket_proto(nni_socket *); extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); |
