diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 147 | ||||
| -rw-r--r-- | src/core/endpt.h | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 142 | ||||
| -rw-r--r-- | src/core/socket.h | 2 |
4 files changed, 142 insertions, 151 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index ba557e3a..c5e479a2 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,20 +14,6 @@ #include <stdio.h> // Functionality realited to end points. -#if 0 -struct nng_endpt { - struct nni_endpt_ops ep_ops; - void * ep_data; - nni_list_node_t ep_sock_node; - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; -}; -#endif int nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) @@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) int rv; if ((tran = nni_transport_find(addr)) == NULL) { - return (NNG_EINVAL); + return (NNG_ENOTSUP); } if (strlen(addr) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); @@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep) } -int -nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +// nni_dial_once just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_dial_once(nni_endpt *ep) { + nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + pipe = NULL; + if (ep->ep_close) { return (NNG_ECLOSED); } @@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; - *pp = pipe; + + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + return (0); } +// nni_socket_dialer is the thread worker that dials in the background. +static void +nni_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_dial_once(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_endpt_dial(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + nni_socket *sock = ep->ep_sock; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_dial_once(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { diff --git a/src/core/endpt.h b/src/core/endpt.h index 64935c4c..71d3ff59 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -33,9 +33,9 @@ struct nng_endpt { extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); extern void nni_endpt_destroy(nni_endpt *); -extern int nni_endpt_dial(nni_endpt *, nni_pipe **); extern int nni_endpt_listen(nni_endpt *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); +extern int nni_endpt_dial(nni_endpt *, int); #endif // CORE_ENDPT_H diff --git a/src/core/socket.c b/src/core/socket.c index 9db61838..9263b142 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) } -// nni_socket_dial_one just does a single dial call, so it can be used -// for synchronous dialing. -static int -nni_socket_dial_one(nni_endpt *ep) +int +nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) { - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; + nni_endpt *ep; int rv; - pipe = NULL; - - if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { - return (rv); - } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { return (rv); } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); -} - - -// nni_socket_dialer is the thread worker that dials in the background. -static void -nni_socket_dialer(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - nni_time cooldown; - - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - return; - } - nni_mutex_exit(&ep->ep_mx); - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { - nni_cond_wait(&ep->ep_cv); - } - nni_mutex_exit(&ep->ep_mx); - - rv = nni_socket_dial_one(ep); - switch (rv) { - case 0: - // good connection - continue; - case NNG_ENOMEM: - cooldown = 1000000; - break; - default: - // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; - break; - } - // we inject a delay so we don't just spin hard on - // errors like connection refused. For NNG_ENOMEM, we - // wait even longer, since the system needs time to - // release resources. - cooldown += nni_clock(); - while (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, cooldown); - } - } -} - - -int -nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) -{ - int rv = 0; - nni_thread *reap = NULL; - - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing || ep->ep_close) { - rv = NNG_ECLOSED; - goto out; - } - - ep->ep_stop = 0; - ep->ep_start = sync ? 0 : 1; - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { - rv = NNG_ENOMEM; - goto out; - } - if ((rv == 0) && (sync)) { - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - rv = nni_socket_dial_one(ep); - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if (rv == 0) { - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_dialer; - ep->ep_dialer = NULL; - } - nni_cond_signal(&ep->ep_cv); - } -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - - if (reap != NULL) { - nni_thread_reap(reap); + rv = nni_endpt_dial(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else if (epp != NULL) { + *epp = ep; } - return (rv); } @@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep) rv = NNG_ECLOSED; goto out; } - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { rv = NNG_ENOMEM; goto out; } diff --git a/src/core/socket.h b/src/core/socket.h index a31e09e5..4e1ea166 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -51,5 +51,7 @@ extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); extern int nni_socket_getopt(nni_socket *, int, void *, size_t *); extern int nni_socket_recvmsg(nni_socket *, nni_msg **, nni_time); extern int nni_socket_sendmsg(nni_socket *, nni_msg *, nni_time); +extern int nni_socket_dial(nni_socket *, const char *, nni_endpt **, int); +extern int nni_socket_listen(nni_socket *, const char *, nni_endpt **, int); #endif // CORE_SOCKET_H |
