summaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-25 18:08:44 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-25 18:08:44 -0800
commit0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c (patch)
tree1098c7f4976033bb311b45c378079700c9330b62 /src/core/endpt.c
parent64de60d98e8e4a896f9d13e4aa70343f329d88b4 (diff)
downloadnng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.tar.gz
nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.tar.bz2
nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.zip
Substantial fixes for listen & dialers.
At this point listening and dialing operations appear to function properly. As part of this I had to break the close logic up since otherwise we had a loop trying to reap a thread from itself. So there is now a separate reaper thread for pipes per-socket. I also changed lists to be a bit more rigid, and allocations now zero memory initially. (We had bugs due to uninitialized memory, and rather than hunt them all down, lets just init them to sane zero values.)
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c198
1 files changed, 156 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 1c5f4db5..24d92206 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_listener = NULL;
ep->ep_close = 0;
ep->ep_start = 0;
+ ep->ep_bound = 0;
ep->ep_pipe = NULL;
+ NNI_LIST_NODE_INIT(&ep->ep_node);
if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep)
}
-int
-nni_endpt_bind(nni_endpt *ep)
+static int
+nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
{
- if (ep->ep_close) {
- return (NNG_ECLOSED);
+ nni_pipe *pipe;
+ int rv;
+
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
+ return (rv);
}
- return (ep->ep_ops.ep_bind(ep->ep_data));
+ if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
}
@@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep)
nni_pipe *pipe;
int rv;
- pipe = NULL;
-
- if (ep->ep_close) {
- return (NNG_ECLOSED);
- }
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
- return (rv);
- }
- if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
+ if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ return (0);
}
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
- }
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
+ return (rv);
}
@@ -160,7 +148,6 @@ static void
nni_dialer(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
nni_time cooldown;
@@ -180,6 +167,10 @@ nni_dialer(void *arg)
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
nni_mutex_exit(&ep->ep_mx);
rv = nni_dial_once(ep);
@@ -190,9 +181,12 @@ nni_dialer(void *arg)
case NNG_ENOMEM:
cooldown = 1000000;
break;
+ case NNG_ECLOSED:
+ break;
default:
// XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
+ printf("COOLING DOWN!!\n");
+ cooldown = 1000000;
break;
}
// we inject a delay so we don't just spin hard on
@@ -200,9 +194,12 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
+ nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
+ // We need a different condvar...
nni_cond_waituntil(&ep->ep_cv, cooldown);
}
+ nni_mutex_exit(&ep->ep_mx);
}
}
@@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
- nni_socket *sock = ep->ep_sock;
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
}
- if (sock->s_closing || ep->ep_close) {
+ if (ep->ep_close) {
rv = NNG_ECLOSED;
goto out;
}
@@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
+
if (rv == 0) {
ep->ep_start = 1;
} else {
@@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
out:
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
if (reap != NULL) {
nni_thread_reap(reap);
@@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
if (ep->ep_close) {
return (NNG_ECLOSED);
}
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
return (rv);
}
- if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) {
+ if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) {
nni_pipe_destroy(pipe);
return (rv);
}
- pipe->p_ep = ep;
*pp = pipe;
return (0);
}
+
+
+static void
+nni_listener(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ for (;;) {
+ nni_time cooldown;
+ nni_mutex_enter(&ep->ep_mx);
+
+ // If we didn't bind synchronously, do it now.
+ while (!ep->ep_bound && !ep->ep_close) {
+ int rv;
+
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ } else {
+ // Invalid address? Out of memory? Who knows.
+ // Try again in a bit (10ms).
+ // XXX: PROPER BACKOFF NEEDED
+ cooldown = 10000;
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv,
+ cooldown);
+ }
+ }
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ continue;
+ }
+ if (rv == NNG_ECLOSED) {
+ break;
+ }
+ cooldown = 1000; // 1 ms cooldown
+ if (rv == NNG_ENOMEM) {
+ // For out of memory, we need to give more
+ // time for the system to reclaim resources.
+ cooldown = 100000; // 100ms
+ }
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ }
+}
+
+
+int
+nni_endpt_listen(nni_endpt *ep, int flags)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
+ if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_listener;
+ ep->ep_listener = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}