aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-15 15:45:48 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-15 15:45:48 -0700
commit7f95fde8d752dd93c20ff0a209334f4aec549111 (patch)
treef6226f1e9741ae855a96d215600dacb006927434 /src/core/endpt.c
parent5fe345c66139fc3242c4fdbd78bf05e5670581e8 (diff)
downloadnng-7f95fde8d752dd93c20ff0a209334f4aec549111.tar.gz
nng-7f95fde8d752dd93c20ff0a209334f4aec549111.tar.bz2
nng-7f95fde8d752dd93c20ff0a209334f4aec549111.zip
Some initial progress on *connect* async.
This actually is breaking at the moment, because we don't have good integration with timeouts, and there are some frustrating races with timeouts at points that can cause apparent hangs.
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c207
1 files changed, 143 insertions, 64 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index fef6ac83..a99cd21a 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -18,6 +18,8 @@
static void nni_ep_accept_start(nni_ep *);
static void nni_ep_accept_done(void *);
+static void nni_ep_connect_start(nni_ep *);
+static void nni_ep_connect_done(void *);
static nni_idhash *nni_eps;
@@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep)
return;
}
nni_aio_fini(&ep->ep_acc_aio);
+ nni_aio_fini(&ep->ep_con_aio);
+ nni_aio_fini(&ep->ep_con_syn);
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
- nni_thr_fini(&ep->ep_thr);
nni_cv_fini(&ep->ep_cv);
nni_mtx_fini(&ep->ep_mtx);
NNI_FREE_STRUCT(ep);
@@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
ep->ep_pipe = NULL;
ep->ep_id = id;
ep->ep_data = NULL;
+ ep->ep_refcnt = 0;
NNI_LIST_NODE_INIT(&ep->ep_node);
@@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
nni_ep_destroy(ep);
return (rv);
}
+ rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
+ rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
ep->ep_sock = sock;
ep->ep_tran = tran;
@@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
void
nni_ep_close(nni_ep *ep)
{
- // Abort any in-flight operations.
- nni_aio_stop(&ep->ep_acc_aio);
-
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed == 0) {
- ep->ep_closed = 1;
- ep->ep_ops.ep_close(ep->ep_data);
+ if (ep->ep_closed) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return;
}
- nni_cv_wake(&ep->ep_cv);
+ ep->ep_closed = 1;
nni_mtx_unlock(&ep->ep_mtx);
+
+ ep->ep_ops.ep_close(ep->ep_data);
}
static void
@@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep)
nni_ep_close(ep); // Extra sanity.
+ // Abort any in-flight operations.
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
+ // Make sure any other unlocked users (references) are gone
+ // before we actually remove the memory. We should not have
+ // to wait long as we have closed the underlying pipe and
+ // done everything we can to wake any waiter (synchronous connect)
+ // gracefully.
+ nni_mtx_lock(&ep->ep_mtx);
+ while (ep->ep_refcnt != 0) {
+ nni_cv_wait(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+
nni_ep_destroy(ep);
}
@@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
}
-static int
-nni_ep_connect_aio(nni_ep *ep, void **pipep)
+static void
+nni_ep_connect_done(void *arg)
{
- nni_aio aio;
- int rv;
+ nni_ep * ep = arg;
+ nni_aio * aio = &ep->ep_con_aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ const nni_tran_pipe *ops;
- nni_aio_init(&aio, NULL, NULL);
- aio.a_endpt = ep->ep_data;
- ep->ep_ops.ep_connect(ep->ep_data, &aio);
- nni_aio_wait(&aio);
+ int rv;
+
+ nni_mtx_lock(&ep->ep_mtx);
+ if ((rv = nni_aio_result(aio)) == 0) {
+
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *pipep = aio.a_pipe;
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
+ if (rv != 0) {
+ ops->p_fini(tpipe);
+ }
}
- nni_aio_fini(&aio);
- return (rv);
+
+done:
+ switch (rv) {
+ case 0:
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // cooldown = nni_clock() + rtime;
+ // rtime *= 2;
+ // if ((maxrtime >= defrtime) && (rtime > maxrtime)) {
+ // rtime = maxrtime;
+ // }
+
+ // XXX: We need to inject a cooldown, and then try again. For
+ // now we just try again immediately. This is very suboptimal.
+ nni_ep_connect_start(ep);
+ break;
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
}
-static int
-nni_ep_connect_sync(nni_ep *ep)
+static void
+nni_ep_connect_start(nni_ep *ep)
{
- nni_pipe *pipe;
- int rv;
+ nni_aio *aio = &ep->ep_acc_aio;
- nni_mtx_lock(&ep->ep_mtx);
+ // Call with the Endpoint lock held.
if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
+ return;
}
- pipe->p_ep = ep;
- nni_list_append(&ep->ep_pipes, pipe);
- nni_mtx_unlock(&ep->ep_mtx);
- rv = nni_ep_connect_aio(ep, &pipe->p_tran_data);
- if (rv != 0) {
- if (rv != NNG_ECLOSED) { // HACK ALERT
- nni_pipe_stop(pipe);
- }
- return (rv);
- }
- nni_pipe_start(pipe);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_pipe = pipe;
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_connect(ep->ep_data, aio);
}
// nni_dialer is the thread worker that dials in the background.
+#if 0 // this has the logic for timing out... remove when done.
static void
nni_dialer(void *arg)
{
@@ -287,13 +330,20 @@ nni_dialer(void *arg)
nni_mtx_unlock(&ep->ep_mtx);
}
}
+#endif
int
nni_ep_dial(nni_ep *ep, int flags)
{
- int rv = 0;
+ int rv = 0;
+ nni_aio * aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ const nni_tran_pipe *ops;
nni_mtx_lock(&ep->ep_mtx);
+ ops = ep->ep_tran->tran_pipe;
+
if (ep->ep_mode != NNI_EP_MODE_DIAL) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
@@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags)
return (NNG_ECLOSED);
}
- if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
ep->ep_started = 1;
- if (flags & NNG_FLAG_SYNCH) {
+ if ((flags & NNG_FLAG_SYNCH) == 0) {
+ nni_ep_connect_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
- rv = nni_ep_connect_sync(ep);
- if (rv != 0) {
- nni_thr_fini(&ep->ep_thr);
- ep->ep_started = 0;
- return (rv);
- }
- nni_mtx_lock(&ep->ep_mtx);
+ return (0);
}
- nni_thr_run(&ep->ep_thr);
+ // This one is kind of special, since we need
+ // to block for the connection to complete. Ick.
+ aio = &ep->ep_con_syn;
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_connect(ep->ep_data, aio);
+
+ // We're about to drop the lock, but we cannot allow the
+ // endpoint to be removed. Put a hold on it.
+ ep->ep_refcnt++;
nni_mtx_unlock(&ep->ep_mtx);
+ nni_aio_wait(aio);
+
+ nni_mtx_lock(&ep->ep_mtx);
+ ep->ep_refcnt--;
+
+ if (ep->ep_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_result(aio);
+ }
+
+ if (rv != 0) {
+ ep->ep_started = 0;
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (rv);
+ }
+
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
+ if (rv != 0) {
+ ops->p_fini(tpipe);
+ } else {
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ }
+
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
-static void nni_ep_accept_start(nni_ep *ep);
-
static void
nni_ep_accept_done(void *arg)
{
@@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg)
if ((rv = nni_aio_result(aio)) != 0) {
goto done;
}
- NNI_ASSERT((tpipe = aio->a_pipe) != NULL);
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
if (rv != 0) {