From 2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 25 Dec 2016 11:03:06 -0800 Subject: New dial/listen API. Dialing might work now. In order to give control over synchronous vs. async dialing, we provide a flag to indicate synchronous dialing is desired. (Hmm. Should we reverse the default sense?) We extend listen to have the same flag. Logic is moved to endpt.c since dialing is really and endpoint specific operation. There are other minor related bug fixes here too. --- src/core/endpt.c | 147 +++++++++++++++++++++++++++++++++++++++++++++++------- src/core/endpt.h | 2 +- src/core/socket.c | 142 ++++------------------------------------------------ src/core/socket.h | 2 + 4 files changed, 142 insertions(+), 151 deletions(-) (limited to 'src/core') diff --git a/src/core/endpt.c b/src/core/endpt.c index ba557e3a..c5e479a2 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,20 +14,6 @@ #include // Functionality realited to end points. -#if 0 -struct nng_endpt { - struct nni_endpt_ops ep_ops; - void * ep_data; - nni_list_node_t ep_sock_node; - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; -}; -#endif int nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) @@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) int rv; if ((tran = nni_transport_find(addr)) == NULL) { - return (NNG_EINVAL); + return (NNG_ENOTSUP); } if (strlen(addr) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); @@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep) } -int -nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +// nni_dial_once just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_dial_once(nni_endpt *ep) { + nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + pipe = NULL; + if (ep->ep_close) { return (NNG_ECLOSED); } @@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; - *pp = pipe; + + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + return (0); } +// nni_socket_dialer is the thread worker that dials in the background. +static void +nni_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_dial_once(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_endpt_dial(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + nni_socket *sock = ep->ep_sock; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_dial_once(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { diff --git a/src/core/endpt.h b/src/core/endpt.h index 64935c4c..71d3ff59 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -33,9 +33,9 @@ struct nng_endpt { extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); extern void nni_endpt_destroy(nni_endpt *); -extern int nni_endpt_dial(nni_endpt *, nni_pipe **); extern int nni_endpt_listen(nni_endpt *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); +extern int nni_endpt_dial(nni_endpt *, int); #endif // CORE_ENDPT_H diff --git a/src/core/socket.c b/src/core/socket.c index 9db61838..9263b142 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) } -// nni_socket_dial_one just does a single dial call, so it can be used -// for synchronous dialing. -static int -nni_socket_dial_one(nni_endpt *ep) +int +nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) { - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; + nni_endpt *ep; int rv; - pipe = NULL; - - if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { - return (rv); - } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { return (rv); } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); -} - - -// nni_socket_dialer is the thread worker that dials in the background. -static void -nni_socket_dialer(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - nni_time cooldown; - - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - return; - } - nni_mutex_exit(&ep->ep_mx); - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { - nni_cond_wait(&ep->ep_cv); - } - nni_mutex_exit(&ep->ep_mx); - - rv = nni_socket_dial_one(ep); - switch (rv) { - case 0: - // good connection - continue; - case NNG_ENOMEM: - cooldown = 1000000; - break; - default: - // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; - break; - } - // we inject a delay so we don't just spin hard on - // errors like connection refused. For NNG_ENOMEM, we - // wait even longer, since the system needs time to - // release resources. - cooldown += nni_clock(); - while (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, cooldown); - } - } -} - - -int -nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) -{ - int rv = 0; - nni_thread *reap = NULL; - - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing || ep->ep_close) { - rv = NNG_ECLOSED; - goto out; - } - - ep->ep_stop = 0; - ep->ep_start = sync ? 0 : 1; - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { - rv = NNG_ENOMEM; - goto out; - } - if ((rv == 0) && (sync)) { - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - rv = nni_socket_dial_one(ep); - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if (rv == 0) { - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_dialer; - ep->ep_dialer = NULL; - } - nni_cond_signal(&ep->ep_cv); - } -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - - if (reap != NULL) { - nni_thread_reap(reap); + rv = nni_endpt_dial(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else if (epp != NULL) { + *epp = ep; } - return (rv); } @@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep) rv = NNG_ECLOSED; goto out; } - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { rv = NNG_ENOMEM; goto out; } diff --git a/src/core/socket.h b/src/core/socket.h index a31e09e5..4e1ea166 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -51,5 +51,7 @@ extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); extern int nni_socket_getopt(nni_socket *, int, void *, size_t *); extern int nni_socket_recvmsg(nni_socket *, nni_msg **, nni_time); extern int nni_socket_sendmsg(nni_socket *, nni_msg *, nni_time); +extern int nni_socket_dial(nni_socket *, const char *, nni_endpt **, int); +extern int nni_socket_listen(nni_socket *, const char *, nni_endpt **, int); #endif // CORE_SOCKET_H -- cgit v1.2.3-70-g09d2