diff options
| -rw-r--r-- | src/core/endpt.c | 147 | ||||
| -rw-r--r-- | src/core/endpt.h | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 142 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | src/nng.c | 7 | ||||
| -rw-r--r-- | src/nng.h | 20 | ||||
| -rw-r--r-- | tests/sock.c | 10 |
7 files changed, 173 insertions, 157 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index ba557e3a..c5e479a2 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,20 +14,6 @@ #include <stdio.h> // Functionality realited to end points. -#if 0 -struct nng_endpt { - struct nni_endpt_ops ep_ops; - void * ep_data; - nni_list_node_t ep_sock_node; - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; -}; -#endif int nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) @@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) int rv; if ((tran = nni_transport_find(addr)) == NULL) { - return (NNG_EINVAL); + return (NNG_ENOTSUP); } if (strlen(addr) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); @@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep) } -int -nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +// nni_dial_once just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_dial_once(nni_endpt *ep) { + nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + pipe = NULL; + if (ep->ep_close) { return (NNG_ECLOSED); } @@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; - *pp = pipe; + + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + return (0); } +// nni_socket_dialer is the thread worker that dials in the background. +static void +nni_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_dial_once(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_endpt_dial(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + nni_socket *sock = ep->ep_sock; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_dial_once(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { diff --git a/src/core/endpt.h b/src/core/endpt.h index 64935c4c..71d3ff59 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -33,9 +33,9 @@ struct nng_endpt { extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); extern void nni_endpt_destroy(nni_endpt *); -extern int nni_endpt_dial(nni_endpt *, nni_pipe **); extern int nni_endpt_listen(nni_endpt *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); +extern int nni_endpt_dial(nni_endpt *, int); #endif // CORE_ENDPT_H diff --git a/src/core/socket.c b/src/core/socket.c index 9db61838..9263b142 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) } -// nni_socket_dial_one just does a single dial call, so it can be used -// for synchronous dialing. -static int -nni_socket_dial_one(nni_endpt *ep) +int +nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) { - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; + nni_endpt *ep; int rv; - pipe = NULL; - - if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { - return (rv); - } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { return (rv); } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); -} - - -// nni_socket_dialer is the thread worker that dials in the background. -static void -nni_socket_dialer(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - nni_time cooldown; - - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - return; - } - nni_mutex_exit(&ep->ep_mx); - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { - nni_cond_wait(&ep->ep_cv); - } - nni_mutex_exit(&ep->ep_mx); - - rv = nni_socket_dial_one(ep); - switch (rv) { - case 0: - // good connection - continue; - case NNG_ENOMEM: - cooldown = 1000000; - break; - default: - // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; - break; - } - // we inject a delay so we don't just spin hard on - // errors like connection refused. For NNG_ENOMEM, we - // wait even longer, since the system needs time to - // release resources. - cooldown += nni_clock(); - while (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, cooldown); - } - } -} - - -int -nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) -{ - int rv = 0; - nni_thread *reap = NULL; - - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing || ep->ep_close) { - rv = NNG_ECLOSED; - goto out; - } - - ep->ep_stop = 0; - ep->ep_start = sync ? 0 : 1; - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { - rv = NNG_ENOMEM; - goto out; - } - if ((rv == 0) && (sync)) { - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - rv = nni_socket_dial_one(ep); - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if (rv == 0) { - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_dialer; - ep->ep_dialer = NULL; - } - nni_cond_signal(&ep->ep_cv); - } -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - - if (reap != NULL) { - nni_thread_reap(reap); + rv = nni_endpt_dial(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else if (epp != NULL) { + *epp = ep; } - return (rv); } @@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep) rv = NNG_ECLOSED; goto out; } - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { rv = NNG_ENOMEM; goto out; } diff --git a/src/core/socket.h b/src/core/socket.h index a31e09e5..4e1ea166 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -51,5 +51,7 @@ extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); extern int nni_socket_getopt(nni_socket *, int, void *, size_t *); extern int nni_socket_recvmsg(nni_socket *, nni_msg **, nni_time); extern int nni_socket_sendmsg(nni_socket *, nni_msg *, nni_time); +extern int nni_socket_dial(nni_socket *, const char *, nni_endpt **, int); +extern int nni_socket_listen(nni_socket *, const char *, nni_endpt **, int); #endif // CORE_SOCKET_H @@ -96,6 +96,13 @@ nng_sendmsg(nng_socket *s, nng_msg *msg, int flags) int +nng_dial(nng_socket *s, const char *addr, nng_endpt **epp, int flags) +{ + NNI_INIT_INT(); + return (nni_socket_dial(s, addr, epp, flags)); +} + +int nng_setopt(nng_socket *s, int opt, const void *val, size_t sz) { NNI_INIT_INT(); @@ -114,14 +114,21 @@ NNG_DECL const char *nng_event_reason(nng_event *); // nng_listen creates a listening endpoint with no special options, // and starts it listening. It is functionally equivalent to the legacy -// nn_bind(). The underlying endpoint is returned back to the caller. -NNG_DECL int nng_listen(nng_endpt **, nng_socket *, const char *); +// nn_bind(). The underlying endpoint is returned back to the caller in the +// endpt pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to +// indicate that a failure setting the socket up should return an error +// back to the caller immediately. +NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpt **, int); // nng_dial creates a dialing endpoint, with no special options, and // starts it dialing. Dialers have at most one active connection at a time // This is similar to the legacy nn_connect(). The underlying endpoint -// is returned back to the caller. -NNG_DECL int nng_dial(nng_endpt **, nng_socket *, const char *); +// is returned back to the caller in the endpt pointer, if it is not NULL. +// The flags may be NNG_FLAG_SYNCH to indicate that the first attempt to +// dial will be made synchronously, and a failure condition returned back +// to the caller. (If the connection is dropped, it will still be +// reconnected in the background -- only the initial connect is synchronous.) +NNG_DECL int nng_dial(nng_socket *, const char *, nng_endpt **, int); // nng_endpt_create creates an endpoint on the socket, but does not // start it either dialing or listening. @@ -129,11 +136,11 @@ NNG_DECL int nng_endpt_create(nng_endpt **, nng_socket *, const char *); // nng_endpt_dial starts the endpoint dialing. This is only possible if // the endpoint is not already dialing or listening. -NNG_DECL int nng_endpt_dial(nng_endpt *); +NNG_DECL int nng_endpt_dial(nng_endpt *, int); // nng_endpt_listen starts the endpoint listening. This is only possible if // the endpoint is not already dialing or listening. -NNG_DECL int nng_endpt_listen(nng_endpt *); +NNG_DECL int nng_endpt_listen(nng_endpt *, int); // nng_endpt_close closes the endpt, shutting down all underlying // connections and releasing all associated resources. It is an error to @@ -205,6 +212,7 @@ NNG_DECL int nng_pipe_close(nng_pipe *); // Flags. #define NNG_FLAG_ALLOC 1 // Recv to allocate receive buffer. #define NNG_FLAG_NONBLOCK 2 // Non-block send/recv. +#define NNG_FLAG_SYNCH 4 // Synchronous dial / listen // Protocol numbers. These are to be used with nng_socket_create(). // These values are used on the wire, so must not be changed. The major diff --git a/tests/sock.c b/tests/sock.c index 65ed5565..c4fcbee0 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -100,5 +100,15 @@ TestMain("Socket Operations", { So(check == 1234); }) }) + + Convey("Dialing bogus address not supported", { + rv = nng_dial(sock, "bogus://somewhere", NULL, 0); + So(rv == NNG_ENOTSUP); + }) + + Convey("Dialing synch can get refused", { + rv = nng_dial(sock, "inproc://notthere", NULL, NNG_FLAG_SYNCH); + So(rv == NNG_ECONNREFUSED); + }) }) })
\ No newline at end of file |
