aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c40
-rw-r--r--src/core/endpt.h24
-rw-r--r--src/core/pipe.h1
-rw-r--r--src/core/socket.c193
-rw-r--r--src/core/socket.h2
5 files changed, 213 insertions, 47 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index fd328b0f..ad1a0b9a 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -49,6 +49,8 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_dialer = NULL;
ep->ep_listener = NULL;
ep->ep_close = 0;
+ ep->ep_start = 0;
+ ep->ep_pipe = NULL;
if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -71,7 +73,6 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
nni_free(ep, sizeof (*ep));
return (rv);
}
- NNI_LIST_INIT(&ep->ep_pipes, nni_pipe, p_ep_node);
*epp = ep;
return (0);
}
@@ -81,17 +82,12 @@ nni_endpt_destroy(nni_endpt *ep)
{
// We should already have been closed at this point, so this
// should proceed very quickly.
- if (ep->ep_dialer) {
+ if (ep->ep_dialer != NULL) {
nni_thread_reap(ep->ep_dialer);
}
- if (ep->ep_listener) {
+ if (ep->ep_listener != NULL) {
nni_thread_reap(ep->ep_listener);
}
- nni_mutex_enter(&ep->ep_mx);
- while (nni_list_first(&ep->ep_pipes) != NULL) {
- nni_cond_wait(&ep->ep_cv);
- }
- nni_mutex_exit(&ep->ep_mx);
ep->ep_ops.ep_destroy(ep->ep_data);
@@ -103,6 +99,7 @@ nni_endpt_destroy(nni_endpt *ep)
void
nni_endpt_close(nni_endpt *ep)
{
+ nni_pipe *pipe;
nni_mutex_enter(&ep->ep_mx);
if (ep->ep_close) {
nni_mutex_exit(&ep->ep_mx);
@@ -110,6 +107,10 @@ nni_endpt_close(nni_endpt *ep)
}
ep->ep_close = 1;
ep->ep_ops.ep_close(ep->ep_data);
+ if ((pipe = ep->ep_pipe) != NULL) {
+ pipe->p_ep = NULL;
+ ep->ep_pipe = NULL;
+ }
nni_cond_broadcast(&ep->ep_cv);
nni_mutex_exit(&ep->ep_mx);
}
@@ -144,6 +145,23 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp)
return (0);
}
-#if 0
-int nni_endpt_accept(nni_endpt *, nni_pipe **);
-#endif
+int
+nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
+{
+ nni_pipe *pipe;
+ int rv;
+
+ if (ep->ep_close) {
+ return (NNG_ECLOSED);
+ }
+ if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ return (rv);
+ }
+ if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ pipe->p_ep = ep;
+ *pp = pipe;
+ return (0);
+}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 7512daa8..64935c4c 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -16,17 +16,19 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS
// OR TRANSPORTS.
struct nng_endpt {
- nni_endpt_ops ep_ops;
- void * ep_data; // Transport private
- nni_list_node ep_sock_node; // Per socket list
- nni_socket * ep_sock;
- char ep_addr[NNG_MAXADDRLEN];
- nni_thread * ep_dialer;
- nni_thread * ep_listener;
- int ep_close;
- nni_mutex ep_mx;
- nni_cond ep_cv;
- nni_list ep_pipes; // Active list of pipes
+ nni_endpt_ops ep_ops;
+ void * ep_data; // Transport private
+ nni_list_node ep_sock_node; // Per socket list
+ nni_socket * ep_sock;
+ char ep_addr[NNG_MAXADDRLEN];
+ nni_thread * ep_dialer;
+ nni_thread * ep_listener;
+ int ep_stop; // thread exits before start
+ int ep_start; // start thread running
+ int ep_close; // full shutdown
+ nni_mutex ep_mx;
+ nni_cond ep_cv;
+ nni_pipe * ep_pipe; // Connected pipe (dialers only)
};
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 2ce4f1ec..3d0e07e7 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -24,7 +24,6 @@ struct nng_pipe {
void * p_data;
nni_list_node p_sock_node;
nni_socket * p_sock;
- nni_list_node p_ep_node;
nni_endpt * p_ep;
};
diff --git a/src/core/socket.c b/src/core/socket.c
index bbb9e48a..1b3bcf56 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -26,6 +26,7 @@ nni_socket_recvq(nni_socket *s)
return (s->s_urq);
}
+
// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket **sockp, uint16_t proto)
@@ -41,7 +42,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (NNG_ENOMEM);
}
sock->s_ops = *ops;
- sock->s_linger = NNG_DEFAULT_LINGER;
+ sock->s_linger = NNG_LINGER_DEFAULT;
if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
@@ -133,10 +134,10 @@ nni_socket_close(nni_socket *sock)
// need to reap them now.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
nni_list_remove(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_eps);
+ nni_mutex_exit(&sock->s_mx);
nni_endpt_destroy(ep);
- nni_mutex_enter(&sock->s_eps);
+ nni_mutex_enter(&sock->s_mx);
}
nni_mutex_exit(&sock->s_mx);
@@ -262,6 +263,8 @@ nni_socket_proto(nni_socket *sock)
void
nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
{
+ nni_endpt *ep;
+
nni_mutex_enter(&sock->s_mx);
if (pipe->p_sock != sock) {
nni_mutex_exit(&sock->s_mx);
@@ -274,12 +277,18 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
// Now remove it from our own list.
nni_list_remove(&sock->s_pipes, pipe);
pipe->p_sock = NULL;
- // XXX: TODO: Redial
- // XXX: also publish event...
- // if (pipe->p_ep != NULL) {
- // nn_endpt_rem_pipe(pipe->p_ep, pipe)
- // }
+ // If we were a connected (dialer) pipe, then let the endpoint
+ // know so it can try to reestablish the connection.
+ if ((ep = pipe->p_ep) != NULL) {
+ ep->ep_pipe = NULL;
+ pipe->p_ep = NULL;
+ nni_mutex_enter(&ep->ep_mx);
+ nni_cond_signal(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ }
+
+ // XXX: also publish event...
nni_pipe_destroy(pipe);
// If we're closing, wake the socket if we finished draining.
@@ -291,7 +300,7 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
+nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
{
int rv;
@@ -306,11 +315,6 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
}
nni_list_append(&sock->s_pipes, pipe);
- // Add the pipe to its endpoint list.
- nni_mutex_enter(&pipe->p_ep->ep_mx);
- nni_list_append(&pipe->p_ep->ep_pipes, pipe);
- nni_mutex_exit(&pipe->p_ep->ep_mx);
-
pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
@@ -318,6 +322,39 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
}
+// nni_socket_dial_one just does a single dial call, so it can be used
+// for synchronous dialing.
+static int
+nni_socket_dial_one(nni_endpt *ep)
+{
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ pipe = NULL;
+
+ if ((rv = nni_endpt_dial(ep, &pipe)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ // Set up the linkage so that when the pipe closes
+ // we can notify the dialer to redial.
+ pipe->p_ep = ep;
+ ep->ep_pipe = pipe;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ return (0);
+}
+
+
+// nni_socket_dialer is the thread worker that dials in the background.
static void
nni_socket_dialer(void *arg)
{
@@ -325,13 +362,116 @@ nni_socket_dialer(void *arg)
nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
+ nni_time cooldown;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
for (;;) {
nni_mutex_enter(&ep->ep_mx);
- while ((!ep->ep_close) &&
- (nni_list_first(&ep->ep_pipes) != NULL)) {
+ while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ nni_mutex_exit(&ep->ep_mx);
+
+ rv = nni_socket_dial_one(ep);
+ switch (rv) {
+ case 0:
+ // good connection
+ continue;
+ case NNG_ENOMEM:
+ cooldown = 1000000;
+ break;
+ default:
+ // XXX: THIS NEEDS TO BE A PROPER BACKOFF.
+ cooldown = 100000;
+ break;
+ }
+ // we inject a delay so we don't just spin hard on
+ // errors like connection refused. For NNG_ENOMEM, we
+ // wait even longer, since the system needs time to
+ // release resources.
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ }
+}
+
+
+int
+nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&sock->s_mx);
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_sock != sock) { // Should never happen
+ rv = NNG_EINVAL;
+ goto out;
+ }
+ if (sock->s_closing || ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = sync ? 0 : 1;
+ if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (sync)) {
+ nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(&sock->s_mx);
+ rv = nni_socket_dial_one(ep);
+ nni_mutex_enter(&sock->s_mx);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_dialer;
+ ep->ep_dialer = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(&sock->s_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}
+
+
+static void
+nni_socket_accepter(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ for (;;) {
+ nni_mutex_enter(&ep->ep_mx);
if (ep->ep_close) {
nni_mutex_exit(&ep->ep_mx);
break;
@@ -340,24 +480,31 @@ nni_socket_dialer(void *arg)
pipe = NULL;
- if (((rv = nni_endpt_dial(ep, &pipe)) != 0) ||
- ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) {
+ if (((rv = nni_endpt_accept(ep, &pipe)) != 0) ||
+ ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) {
if (rv == NNG_ECLOSED) {
break;
}
if (pipe != NULL) {
nni_pipe_destroy(pipe);
}
- // XXX: Publish connection error event...
- // XXX: Inject a wait for reconnect...
- continue;
+ // XXX: Publish accept error event...
+
+ // If we can't allocate memory, don't spin, so that
+ // things get a chance to release memory later.
+ // Other errors, like ECONNRESET, should not recur.
+ // (If we find otherwise we can inject a short sleep
+ // here of about 1 ms without too much penalty.)
+ if (rv == NNG_ENOMEM) {
+ nni_usleep(100000);
+ }
}
}
}
int
-nni_socket_dial(nni_socket *sock, nni_endpt *ep)
+nni_socket_accept(nni_socket *sock, nni_endpt *ep)
{
int rv = 0;
@@ -367,7 +514,7 @@ nni_socket_dial(nni_socket *sock, nni_endpt *ep)
rv = NNG_EBUSY;
goto out;
}
- if (ep->ep_sock != sock) { // Should never happen
+ if (ep->ep_sock != sock) { // Should never happen
rv = NNG_EINVAL;
goto out;
}
diff --git a/src/core/socket.h b/src/core/socket.h
index ff6a63d5..05113489 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -40,7 +40,7 @@ struct nng_socket {
extern int nni_socket_create(nni_socket **, uint16_t);
extern int nni_socket_close(nni_socket *);
-extern int nni_socket_add_pipe(nni_socket *, nni_pipe *);
+extern int nni_socket_add_pipe(nni_socket *, nni_pipe *, int);
extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);