aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c198
1 files changed, 156 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 1c5f4db5..24d92206 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_listener = NULL;
ep->ep_close = 0;
ep->ep_start = 0;
+ ep->ep_bound = 0;
ep->ep_pipe = NULL;
+ NNI_LIST_NODE_INIT(&ep->ep_node);
if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep)
}
-int
-nni_endpt_bind(nni_endpt *ep)
+static int
+nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
{
- if (ep->ep_close) {
- return (NNG_ECLOSED);
+ nni_pipe *pipe;
+ int rv;
+
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
+ return (rv);
}
- return (ep->ep_ops.ep_bind(ep->ep_data));
+ if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
}
@@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep)
nni_pipe *pipe;
int rv;
- pipe = NULL;
-
- if (ep->ep_close) {
- return (NNG_ECLOSED);
- }
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
- return (rv);
- }
- if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
+ if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ return (0);
}
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
- }
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
+ return (rv);
}
@@ -160,7 +148,6 @@ static void
nni_dialer(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
nni_time cooldown;
@@ -180,6 +167,10 @@ nni_dialer(void *arg)
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
nni_mutex_exit(&ep->ep_mx);
rv = nni_dial_once(ep);
@@ -190,9 +181,12 @@ nni_dialer(void *arg)
case NNG_ENOMEM:
cooldown = 1000000;
break;
+ case NNG_ECLOSED:
+ break;
default:
// XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
+ printf("COOLING DOWN!!\n");
+ cooldown = 1000000;
break;
}
// we inject a delay so we don't just spin hard on
@@ -200,9 +194,12 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
+ nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
+ // We need a different condvar...
nni_cond_waituntil(&ep->ep_cv, cooldown);
}
+ nni_mutex_exit(&ep->ep_mx);
}
}
@@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
- nni_socket *sock = ep->ep_sock;
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
}
- if (sock->s_closing || ep->ep_close) {
+ if (ep->ep_close) {
rv = NNG_ECLOSED;
goto out;
}
@@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
+
if (rv == 0) {
ep->ep_start = 1;
} else {
@@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
out:
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
if (reap != NULL) {
nni_thread_reap(reap);
@@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
if (ep->ep_close) {
return (NNG_ECLOSED);
}
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
return (rv);
}
- if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) {
+ if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) {
nni_pipe_destroy(pipe);
return (rv);
}
- pipe->p_ep = ep;
*pp = pipe;
return (0);
}
+
+
+static void
+nni_listener(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ for (;;) {
+ nni_time cooldown;
+ nni_mutex_enter(&ep->ep_mx);
+
+ // If we didn't bind synchronously, do it now.
+ while (!ep->ep_bound && !ep->ep_close) {
+ int rv;
+
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ } else {
+ // Invalid address? Out of memory? Who knows.
+ // Try again in a bit (10ms).
+ // XXX: PROPER BACKOFF NEEDED
+ cooldown = 10000;
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv,
+ cooldown);
+ }
+ }
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ continue;
+ }
+ if (rv == NNG_ECLOSED) {
+ break;
+ }
+ cooldown = 1000; // 1 ms cooldown
+ if (rv == NNG_ENOMEM) {
+ // For out of memory, we need to give more
+ // time for the system to reclaim resources.
+ cooldown = 100000; // 100ms
+ }
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ }
+}
+
+
+int
+nni_endpt_listen(nni_endpt *ep, int flags)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
+ if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_listener;
+ ep->ep_listener = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}