aboutsummaryrefslogtreecommitdiff
path: root/src/core/endpt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/endpt.c')
-rw-r--r--src/core/endpt.c207
1 files changed, 143 insertions, 64 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index fef6ac83..a99cd21a 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -18,6 +18,8 @@
static void nni_ep_accept_start(nni_ep *);
static void nni_ep_accept_done(void *);
+static void nni_ep_connect_start(nni_ep *);
+static void nni_ep_connect_done(void *);
static nni_idhash *nni_eps;
@@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep)
return;
}
nni_aio_fini(&ep->ep_acc_aio);
+ nni_aio_fini(&ep->ep_con_aio);
+ nni_aio_fini(&ep->ep_con_syn);
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
- nni_thr_fini(&ep->ep_thr);
nni_cv_fini(&ep->ep_cv);
nni_mtx_fini(&ep->ep_mtx);
NNI_FREE_STRUCT(ep);
@@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
ep->ep_pipe = NULL;
ep->ep_id = id;
ep->ep_data = NULL;
+ ep->ep_refcnt = 0;
NNI_LIST_NODE_INIT(&ep->ep_node);
@@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
nni_ep_destroy(ep);
return (rv);
}
+ rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
+ rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
ep->ep_sock = sock;
ep->ep_tran = tran;
@@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
void
nni_ep_close(nni_ep *ep)
{
- // Abort any in-flight operations.
- nni_aio_stop(&ep->ep_acc_aio);
-
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed == 0) {
- ep->ep_closed = 1;
- ep->ep_ops.ep_close(ep->ep_data);
+ if (ep->ep_closed) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return;
}
- nni_cv_wake(&ep->ep_cv);
+ ep->ep_closed = 1;
nni_mtx_unlock(&ep->ep_mtx);
+
+ ep->ep_ops.ep_close(ep->ep_data);
}
static void
@@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep)
nni_ep_close(ep); // Extra sanity.
+ // Abort any in-flight operations.
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
+ // Make sure any other unlocked users (references) are gone
+ // before we actually remove the memory. We should not have
+ // to wait long as we have closed the underlying pipe and
+ // done everything we can to wake any waiter (synchronous connect)
+ // gracefully.
+ nni_mtx_lock(&ep->ep_mtx);
+ while (ep->ep_refcnt != 0) {
+ nni_cv_wait(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+
nni_ep_destroy(ep);
}
@@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
}
-static int
-nni_ep_connect_aio(nni_ep *ep, void **pipep)
+static void
+nni_ep_connect_done(void *arg)
{
- nni_aio aio;
- int rv;
+ nni_ep * ep = arg;
+ nni_aio * aio = &ep->ep_con_aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ const nni_tran_pipe *ops;
- nni_aio_init(&aio, NULL, NULL);
- aio.a_endpt = ep->ep_data;
- ep->ep_ops.ep_connect(ep->ep_data, &aio);
- nni_aio_wait(&aio);
+ int rv;
+
+ nni_mtx_lock(&ep->ep_mtx);
+ if ((rv = nni_aio_result(aio)) == 0) {
+
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *pipep = aio.a_pipe;
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
+ if (rv != 0) {
+ ops->p_fini(tpipe);
+ }
}
- nni_aio_fini(&aio);
- return (rv);
+
+done:
+ switch (rv) {
+ case 0:
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // cooldown = nni_clock() + rtime;
+ // rtime *= 2;
+ // if ((maxrtime >= defrtime) && (rtime > maxrtime)) {
+ // rtime = maxrtime;
+ // }
+
+ // XXX: We need to inject a cooldown, and then try again. For
+ // now we just try again immediately. This is very suboptimal.
+ nni_ep_connect_start(ep);
+ break;
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
}
-static int
-nni_ep_connect_sync(nni_ep *ep)
+static void
+nni_ep_connect_start(nni_ep *ep)
{
- nni_pipe *pipe;
- int rv;
+ nni_aio *aio = &ep->ep_acc_aio;
- nni_mtx_lock(&ep->ep_mtx);
+ // Call with the Endpoint lock held.
if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
+ return;
}
- pipe->p_ep = ep;
- nni_list_append(&ep->ep_pipes, pipe);
- nni_mtx_unlock(&ep->ep_mtx);
- rv = nni_ep_connect_aio(ep, &pipe->p_tran_data);
- if (rv != 0) {
- if (rv != NNG_ECLOSED) { // HACK ALERT
- nni_pipe_stop(pipe);
- }
- return (rv);
- }
- nni_pipe_start(pipe);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_pipe = pipe;
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_connect(ep->ep_data, aio);
}
// nni_dialer is the thread worker that dials in the background.
+#if 0 // this has the logic for timing out... remove when done.
static void
nni_dialer(void *arg)
{
@@ -287,13 +330,20 @@ nni_dialer(void *arg)
nni_mtx_unlock(&ep->ep_mtx);
}
}
+#endif
int
nni_ep_dial(nni_ep *ep, int flags)
{
- int rv = 0;
+ int rv = 0;
+ nni_aio * aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ const nni_tran_pipe *ops;
nni_mtx_lock(&ep->ep_mtx);
+ ops = ep->ep_tran->tran_pipe;
+
if (ep->ep_mode != NNI_EP_MODE_DIAL) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
@@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags)
return (NNG_ECLOSED);
}
- if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
ep->ep_started = 1;
- if (flags & NNG_FLAG_SYNCH) {
+ if ((flags & NNG_FLAG_SYNCH) == 0) {
+ nni_ep_connect_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
- rv = nni_ep_connect_sync(ep);
- if (rv != 0) {
- nni_thr_fini(&ep->ep_thr);
- ep->ep_started = 0;
- return (rv);
- }
- nni_mtx_lock(&ep->ep_mtx);
+ return (0);
}
- nni_thr_run(&ep->ep_thr);
+ // This one is kind of special, since we need
+ // to block for the connection to complete. Ick.
+ aio = &ep->ep_con_syn;
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_connect(ep->ep_data, aio);
+
+ // We're about to drop the lock, but we cannot allow the
+ // endpoint to be removed. Put a hold on it.
+ ep->ep_refcnt++;
nni_mtx_unlock(&ep->ep_mtx);
+ nni_aio_wait(aio);
+
+ nni_mtx_lock(&ep->ep_mtx);
+ ep->ep_refcnt--;
+
+ if (ep->ep_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_result(aio);
+ }
+
+ if (rv != 0) {
+ ep->ep_started = 0;
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (rv);
+ }
+
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
+ if (rv != 0) {
+ ops->p_fini(tpipe);
+ } else {
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ }
+
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
-static void nni_ep_accept_start(nni_ep *ep);
-
static void
nni_ep_accept_done(void *arg)
{
@@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg)
if ((rv = nni_aio_result(aio)) != 0) {
goto done;
}
- NNI_ASSERT((tpipe = aio->a_pipe) != NULL);
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
if (rv != 0) {