aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/endpt.c147
-rw-r--r--src/core/endpt.h2
-rw-r--r--src/core/socket.c142
-rw-r--r--src/core/socket.h2
-rw-r--r--src/nng.c7
-rw-r--r--src/nng.h20
-rw-r--r--tests/sock.c10
7 files changed, 173 insertions, 157 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index ba557e3a..c5e479a2 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -14,20 +14,6 @@
#include <stdio.h>
// Functionality realited to end points.
-#if 0
-struct nng_endpt {
- struct nni_endpt_ops ep_ops;
- void * ep_data;
- nni_list_node_t ep_sock_node;
- nni_socket * ep_sock;
- char ep_addr[NNG_MAXADDRLEN];
- nni_thread * ep_dialer;
- nni_thread * ep_listener;
- int ep_close;
- nni_mutex ep_mx;
- nni_cond ep_cv;
-};
-#endif
int
nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
@@ -37,7 +23,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
int rv;
if ((tran = nni_transport_find(addr)) == NULL) {
- return (NNG_EINVAL);
+ return (NNG_ENOTSUP);
}
if (strlen(addr) >= NNG_MAXADDRLEN) {
return (NNG_EINVAL);
@@ -129,12 +115,17 @@ nni_endpt_listen(nni_endpt *ep)
}
-int
-nni_endpt_dial(nni_endpt *ep, nni_pipe **pp)
+// nni_dial_once just does a single dial call, so it can be used
+// for synchronous dialing.
+static int
+nni_dial_once(nni_endpt *ep)
{
+ nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
+ pipe = NULL;
+
if (ep->ep_close) {
return (NNG_ECLOSED);
}
@@ -145,12 +136,130 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp)
nni_pipe_destroy(pipe);
return (rv);
}
- pipe->p_ep = ep;
- *pp = pipe;
+
+ if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ // Set up the linkage so that when the pipe closes
+ // we can notify the dialer to redial.
+ pipe->p_ep = ep;
+ ep->ep_pipe = pipe;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
return (0);
}
+// nni_socket_dialer is the thread worker that dials in the background.
+static void
+nni_dialer(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+ nni_time cooldown;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ for (;;) {
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ rv = nni_dial_once(ep);
+ switch (rv) {
+ case 0:
+ // good connection
+ continue;
+ case NNG_ENOMEM:
+ cooldown = 1000000;
+ break;
+ default:
+ // XXX: THIS NEEDS TO BE A PROPER BACKOFF.
+ cooldown = 100000;
+ break;
+ }
+ // we inject a delay so we don't just spin hard on
+ // errors like connection refused. For NNG_ENOMEM, we
+ // wait even longer, since the system needs time to
+ // release resources.
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ }
+}
+
+
+int
+nni_endpt_dial(nni_endpt *ep, int flags)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+ nni_socket *sock = ep->ep_sock;
+
+ nni_mutex_enter(&sock->s_mx);
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (sock->s_closing || ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
+ if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(&sock->s_mx);
+ rv = nni_dial_once(ep);
+ nni_mutex_enter(&sock->s_mx);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_dialer;
+ ep->ep_dialer = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(&sock->s_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}
+
+
int
nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
{
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 64935c4c..71d3ff59 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -33,9 +33,9 @@ struct nng_endpt {
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
extern void nni_endpt_destroy(nni_endpt *);
-extern int nni_endpt_dial(nni_endpt *, nni_pipe **);
extern int nni_endpt_listen(nni_endpt *);
extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
extern void nni_endpt_close(nni_endpt *);
+extern int nni_endpt_dial(nni_endpt *, int);
#endif // CORE_ENDPT_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 9db61838..9263b142 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -328,142 +328,22 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
}
-// nni_socket_dial_one just does a single dial call, so it can be used
-// for synchronous dialing.
-static int
-nni_socket_dial_one(nni_endpt *ep)
+int
+nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
{
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
+ nni_endpt *ep;
int rv;
- pipe = NULL;
-
- if ((rv = nni_endpt_dial(ep, &pipe)) != 0) {
- return (rv);
- }
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
+ if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) {
return (rv);
}
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
-}
-
-
-// nni_socket_dialer is the thread worker that dials in the background.
-static void
-nni_socket_dialer(void *arg)
-{
- nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
- int rv;
- nni_time cooldown;
-
- nni_mutex_enter(&ep->ep_mx);
- while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
- nni_cond_wait(&ep->ep_cv);
- }
- if (ep->ep_stop || ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
- return;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- for (;;) {
- nni_mutex_enter(&ep->ep_mx);
- while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
- nni_cond_wait(&ep->ep_cv);
- }
- nni_mutex_exit(&ep->ep_mx);
-
- rv = nni_socket_dial_one(ep);
- switch (rv) {
- case 0:
- // good connection
- continue;
- case NNG_ENOMEM:
- cooldown = 1000000;
- break;
- default:
- // XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
- break;
- }
- // we inject a delay so we don't just spin hard on
- // errors like connection refused. For NNG_ENOMEM, we
- // wait even longer, since the system needs time to
- // release resources.
- cooldown += nni_clock();
- while (!ep->ep_close) {
- nni_cond_waituntil(&ep->ep_cv, cooldown);
- }
- }
-}
-
-
-int
-nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync)
-{
- int rv = 0;
- nni_thread *reap = NULL;
-
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
- }
- if (ep->ep_sock != sock) { // Should never happen
- rv = NNG_EINVAL;
- goto out;
- }
- if (sock->s_closing || ep->ep_close) {
- rv = NNG_ECLOSED;
- goto out;
- }
-
- ep->ep_stop = 0;
- ep->ep_start = sync ? 0 : 1;
- if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
- }
- if ((rv == 0) && (sync)) {
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
- rv = nni_socket_dial_one(ep);
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if (rv == 0) {
- ep->ep_start = 1;
- } else {
- // This will cause the thread to exit instead of
- // starting.
- ep->ep_stop = 1;
- reap = ep->ep_dialer;
- ep->ep_dialer = NULL;
- }
- nni_cond_signal(&ep->ep_cv);
- }
-out:
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
-
- if (reap != NULL) {
- nni_thread_reap(reap);
+ rv = nni_endpt_dial(ep, flags);
+ if (rv != 0) {
+ nni_endpt_close(ep);
+ nni_endpt_destroy(ep);
+ } else if (epp != NULL) {
+ *epp = ep;
}
-
return (rv);
}
@@ -528,7 +408,7 @@ nni_socket_accept(nni_socket *sock, nni_endpt *ep)
rv = NNG_ECLOSED;
goto out;
}
- if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) {
+ if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) {
rv = NNG_ENOMEM;
goto out;
}
diff --git a/src/core/socket.h b/src/core/socket.h
index a31e09e5..4e1ea166 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -51,5 +51,7 @@ extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);
extern int nni_socket_recvmsg(nni_socket *, nni_msg **, nni_time);
extern int nni_socket_sendmsg(nni_socket *, nni_msg *, nni_time);
+extern int nni_socket_dial(nni_socket *, const char *, nni_endpt **, int);
+extern int nni_socket_listen(nni_socket *, const char *, nni_endpt **, int);
#endif // CORE_SOCKET_H
diff --git a/src/nng.c b/src/nng.c
index 1034f4dd..eb08991a 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -96,6 +96,13 @@ nng_sendmsg(nng_socket *s, nng_msg *msg, int flags)
int
+nng_dial(nng_socket *s, const char *addr, nng_endpt **epp, int flags)
+{
+ NNI_INIT_INT();
+ return (nni_socket_dial(s, addr, epp, flags));
+}
+
+int
nng_setopt(nng_socket *s, int opt, const void *val, size_t sz)
{
NNI_INIT_INT();
diff --git a/src/nng.h b/src/nng.h
index be8b57c9..e22df1c6 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -114,14 +114,21 @@ NNG_DECL const char *nng_event_reason(nng_event *);
// nng_listen creates a listening endpoint with no special options,
// and starts it listening. It is functionally equivalent to the legacy
-// nn_bind(). The underlying endpoint is returned back to the caller.
-NNG_DECL int nng_listen(nng_endpt **, nng_socket *, const char *);
+// nn_bind(). The underlying endpoint is returned back to the caller in the
+// endpt pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to
+// indicate that a failure setting the socket up should return an error
+// back to the caller immediately.
+NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpt **, int);
// nng_dial creates a dialing endpoint, with no special options, and
// starts it dialing. Dialers have at most one active connection at a time
// This is similar to the legacy nn_connect(). The underlying endpoint
-// is returned back to the caller.
-NNG_DECL int nng_dial(nng_endpt **, nng_socket *, const char *);
+// is returned back to the caller in the endpt pointer, if it is not NULL.
+// The flags may be NNG_FLAG_SYNCH to indicate that the first attempt to
+// dial will be made synchronously, and a failure condition returned back
+// to the caller. (If the connection is dropped, it will still be
+// reconnected in the background -- only the initial connect is synchronous.)
+NNG_DECL int nng_dial(nng_socket *, const char *, nng_endpt **, int);
// nng_endpt_create creates an endpoint on the socket, but does not
// start it either dialing or listening.
@@ -129,11 +136,11 @@ NNG_DECL int nng_endpt_create(nng_endpt **, nng_socket *, const char *);
// nng_endpt_dial starts the endpoint dialing. This is only possible if
// the endpoint is not already dialing or listening.
-NNG_DECL int nng_endpt_dial(nng_endpt *);
+NNG_DECL int nng_endpt_dial(nng_endpt *, int);
// nng_endpt_listen starts the endpoint listening. This is only possible if
// the endpoint is not already dialing or listening.
-NNG_DECL int nng_endpt_listen(nng_endpt *);
+NNG_DECL int nng_endpt_listen(nng_endpt *, int);
// nng_endpt_close closes the endpt, shutting down all underlying
// connections and releasing all associated resources. It is an error to
@@ -205,6 +212,7 @@ NNG_DECL int nng_pipe_close(nng_pipe *);
// Flags.
#define NNG_FLAG_ALLOC 1 // Recv to allocate receive buffer.
#define NNG_FLAG_NONBLOCK 2 // Non-block send/recv.
+#define NNG_FLAG_SYNCH 4 // Synchronous dial / listen
// Protocol numbers. These are to be used with nng_socket_create().
// These values are used on the wire, so must not be changed. The major
diff --git a/tests/sock.c b/tests/sock.c
index 65ed5565..c4fcbee0 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -100,5 +100,15 @@ TestMain("Socket Operations", {
So(check == 1234);
})
})
+
+ Convey("Dialing bogus address not supported", {
+ rv = nng_dial(sock, "bogus://somewhere", NULL, 0);
+ So(rv == NNG_ENOTSUP);
+ })
+
+ Convey("Dialing synch can get refused", {
+ rv = nng_dial(sock, "inproc://notthere", NULL, NNG_FLAG_SYNCH);
+ So(rv == NNG_ECONNREFUSED);
+ })
})
}) \ No newline at end of file