diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-25 11:03:06 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-25 11:03:06 -0800 |
| commit | 2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca (patch) | |
| tree | 7f7c96adc773e94fe057081b40f7a8b5b31508fe | |
| parent | b7188678e1fb7399afd11faaea3d537bf52f6923 (diff) | |
| download | nng-2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca.tar.gz nng-2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca.tar.bz2 nng-2cf6c5b96de05ca3870495f615b23e1fcdd3c4ca.zip | |
New dial/listen API. Dialing might work now.
In order to give control over synchronous vs. async dialing, we provide a
flag to indicate synchronous dialing is desired. (Hmm. Should we reverse the
default sense?) We extend listen to have the same flag.
Logic is moved to endpt.c since dialing is really and endpoint specific operation.
There are other minor related bug fixes here too.
| -rw-r--r-- | src/core/endpt.c | 147 | ||||
| -rw-r--r-- | src/core/endpt.h | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 142 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | src/nng.c | 7 | ||||
| -rw-r--r-- | src/nng.h | 20 | ||||
| -rw-r--r-- | tests/sock.c | 10 |
7 files changed, 173 insertions, 157 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index ba557e3a..c5e479a2 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -14,20 +14,6 @@ #include <stdio.h> // Functionality realited to end points. -#if 0 -struct nng_endpt { - struct nni_endpt_ops ep_ops; - void * ep_data; - nni_list_node_t ep_sock_node; - nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_close; - nni_mutex ep_mx; - nni_cond ep_cv; -}; -#endif int nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) @@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) int rv; if ((tran = nni_transport_find(addr)) == NULL) { - return (NNG_EINVAL); + return (NNG_ENOTSUP); } if (strlen(addr) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); @@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep) } -int -nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +// nni_dial_once just does a single dial call, so it can be used +// for synchronous dialing. +static int +nni_dial_once(nni_endpt *ep) { + nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; + pipe = NULL; + if (ep->ep_close) { return (NNG_ECLOSED); } @@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } - pipe->p_ep = ep; - *pp = pipe; + + if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + + nni_mutex_enter(&ep->ep_mx); + if (!ep->ep_close) { + // Set up the linkage so that when the pipe closes + // we can notify the dialer to redial. + pipe->p_ep = ep; + ep->ep_pipe = pipe; + } + nni_mutex_exit(&ep->ep_mx); + return (0); } +// nni_socket_dialer is the thread worker that dials in the background. +static void +nni_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + nni_time cooldown; + + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_stop || ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + return; + } + nni_mutex_exit(&ep->ep_mx); + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + nni_mutex_exit(&ep->ep_mx); + + rv = nni_dial_once(ep); + switch (rv) { + case 0: + // good connection + continue; + case NNG_ENOMEM: + cooldown = 1000000; + break; + default: + // XXX: THIS NEEDS TO BE A PROPER BACKOFF. + cooldown = 100000; + break; + } + // we inject a delay so we don't just spin hard on + // errors like connection refused. For NNG_ENOMEM, we + // wait even longer, since the system needs time to + // release resources. + cooldown += nni_clock(); + while (!ep->ep_close) { + nni_cond_waituntil(&ep->ep_cv, cooldown); + } + } +} + + +int +nni_endpt_dial(nni_endpt *ep, int flags) +{ + int rv = 0; + nni_thread *reap = NULL; + nni_socket *sock = ep->ep_sock; + + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (sock->s_closing || ep->ep_close) { + rv = NNG_ECLOSED; + goto out; + } + + ep->ep_stop = 0; + ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; + if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + rv = nni_dial_once(ep); + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if (rv == 0) { + ep->ep_start = 1; + } else { + // This will cause the thread to exit instead of + // starting. + ep->ep_stop = 1; + reap = ep->ep_dialer; + ep->ep_dialer = NULL; + } + nni_cond_signal(&ep->ep_cv); + } +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); + + if (reap != NULL) { + nni_thread_reap(reap); + } + + return (rv); +} + + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { diff --git a/src/core/endpt.h b/src/core/endpt.h index 64935c4c..71d3ff59 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -33,9 +33,9 @@ struct nng_endpt { extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); extern void nni_endpt_destroy(nni_endpt *); -extern int nni_endpt_dial(nni_endpt *, nni_pipe **); extern int nni_endpt_listen(nni_endpt *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); +extern int nni_endpt_dial(nni_endpt *, int); #endif // CORE_ENDPT_H diff --git a/src/core/socket.c b/src/core/socket.c index 9db61838..9263b142 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer) } -// nni_socket_dial_one just does a single dial call, so it can be used -// for synchronous dialing. -static int -nni_socket_dial_one(nni_endpt *ep) +int +nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) { - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; + nni_endpt *ep; int rv; - pipe = NULL; - - if ((rv = nni_endpt_dial(ep, &pipe)) != 0) { - return (rv); - } - if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) { - nni_pipe_destroy(pipe); + if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) { return (rv); } - - nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - // Set up the linkage so that when the pipe closes - // we can notify the dialer to redial. - pipe->p_ep = ep; - ep->ep_pipe = pipe; - } - nni_mutex_exit(&ep->ep_mx); - - return (0); -} - - -// nni_socket_dialer is the thread worker that dials in the background. -static void -nni_socket_dialer(void *arg) -{ - nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; - nni_pipe *pipe; - int rv; - nni_time cooldown; - - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(&ep->ep_mx); - return; - } - nni_mutex_exit(&ep->ep_mx); - - for (;;) { - nni_mutex_enter(&ep->ep_mx); - while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { - nni_cond_wait(&ep->ep_cv); - } - nni_mutex_exit(&ep->ep_mx); - - rv = nni_socket_dial_one(ep); - switch (rv) { - case 0: - // good connection - continue; - case NNG_ENOMEM: - cooldown = 1000000; - break; - default: - // XXX: THIS NEEDS TO BE A PROPER BACKOFF. - cooldown = 100000; - break; - } - // we inject a delay so we don't just spin hard on - // errors like connection refused. For NNG_ENOMEM, we - // wait even longer, since the system needs time to - // release resources. - cooldown += nni_clock(); - while (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, cooldown); - } - } -} - - -int -nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync) -{ - int rv = 0; - nni_thread *reap = NULL; - - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; - } - if (ep->ep_sock != sock) { // Should never happen - rv = NNG_EINVAL; - goto out; - } - if (sock->s_closing || ep->ep_close) { - rv = NNG_ECLOSED; - goto out; - } - - ep->ep_stop = 0; - ep->ep_start = sync ? 0 : 1; - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { - rv = NNG_ENOMEM; - goto out; - } - if ((rv == 0) && (sync)) { - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - rv = nni_socket_dial_one(ep); - nni_mutex_enter(&sock->s_mx); - nni_mutex_enter(&ep->ep_mx); - if (rv == 0) { - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_dialer; - ep->ep_dialer = NULL; - } - nni_cond_signal(&ep->ep_cv); - } -out: - nni_mutex_exit(&ep->ep_mx); - nni_mutex_exit(&sock->s_mx); - - if (reap != NULL) { - nni_thread_reap(reap); + rv = nni_endpt_dial(ep, flags); + if (rv != 0) { + nni_endpt_close(ep); + nni_endpt_destroy(ep); + } else if (epp != NULL) { + *epp = ep; } - return (rv); } @@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep) rv = NNG_ECLOSED; goto out; } - if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) { rv = NNG_ENOMEM; goto out; } diff --git a/src/core/socket.h b/src/core/socket.h index a31e09e5..4e1ea166 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -51,5 +51,7 @@ extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); extern int nni_socket_getopt(nni_socket *, int, void *, size_t *); extern int nni_socket_recvmsg(nni_socket *, nni_msg **, nni_time); extern int nni_socket_sendmsg(nni_socket *, nni_msg *, nni_time); +extern int nni_socket_dial(nni_socket *, const char *, nni_endpt **, int); +extern int nni_socket_listen(nni_socket *, const char *, nni_endpt **, int); #endif // CORE_SOCKET_H @@ -96,6 +96,13 @@ nng_sendmsg(nng_socket *s, nng_msg *msg, int flags) int +nng_dial(nng_socket *s, const char *addr, nng_endpt **epp, int flags) +{ + NNI_INIT_INT(); + return (nni_socket_dial(s, addr, epp, flags)); +} + +int nng_setopt(nng_socket *s, int opt, const void *val, size_t sz) { NNI_INIT_INT(); @@ -114,14 +114,21 @@ NNG_DECL const char *nng_event_reason(nng_event *); // nng_listen creates a listening endpoint with no special options, // and starts it listening. It is functionally equivalent to the legacy -// nn_bind(). The underlying endpoint is returned back to the caller. -NNG_DECL int nng_listen(nng_endpt **, nng_socket *, const char *); +// nn_bind(). The underlying endpoint is returned back to the caller in the +// endpt pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to +// indicate that a failure setting the socket up should return an error +// back to the caller immediately. +NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpt **, int); // nng_dial creates a dialing endpoint, with no special options, and // starts it dialing. Dialers have at most one active connection at a time // This is similar to the legacy nn_connect(). The underlying endpoint -// is returned back to the caller. -NNG_DECL int nng_dial(nng_endpt **, nng_socket *, const char *); +// is returned back to the caller in the endpt pointer, if it is not NULL. +// The flags may be NNG_FLAG_SYNCH to indicate that the first attempt to +// dial will be made synchronously, and a failure condition returned back +// to the caller. (If the connection is dropped, it will still be +// reconnected in the background -- only the initial connect is synchronous.) +NNG_DECL int nng_dial(nng_socket *, const char *, nng_endpt **, int); // nng_endpt_create creates an endpoint on the socket, but does not // start it either dialing or listening. @@ -129,11 +136,11 @@ NNG_DECL int nng_endpt_create(nng_endpt **, nng_socket *, const char *); // nng_endpt_dial starts the endpoint dialing. This is only possible if // the endpoint is not already dialing or listening. -NNG_DECL int nng_endpt_dial(nng_endpt *); +NNG_DECL int nng_endpt_dial(nng_endpt *, int); // nng_endpt_listen starts the endpoint listening. This is only possible if // the endpoint is not already dialing or listening. -NNG_DECL int nng_endpt_listen(nng_endpt *); +NNG_DECL int nng_endpt_listen(nng_endpt *, int); // nng_endpt_close closes the endpt, shutting down all underlying // connections and releasing all associated resources. It is an error to @@ -205,6 +212,7 @@ NNG_DECL int nng_pipe_close(nng_pipe *); // Flags. #define NNG_FLAG_ALLOC 1 // Recv to allocate receive buffer. #define NNG_FLAG_NONBLOCK 2 // Non-block send/recv. +#define NNG_FLAG_SYNCH 4 // Synchronous dial / listen // Protocol numbers. These are to be used with nng_socket_create(). // These values are used on the wire, so must not be changed. The major diff --git a/tests/sock.c b/tests/sock.c index 65ed5565..c4fcbee0 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -100,5 +100,15 @@ TestMain("Socket Operations", { So(check == 1234); }) }) + + Convey("Dialing bogus address not supported", { + rv = nng_dial(sock, "bogus://somewhere", NULL, 0); + So(rv == NNG_ENOTSUP); + }) + + Convey("Dialing synch can get refused", { + rv = nng_dial(sock, "inproc://notthere", NULL, NNG_FLAG_SYNCH); + So(rv == NNG_ECONNREFUSED); + }) }) })
\ No newline at end of file |
