aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c207
-rw-r--r--src/core/endpt.h3
-rw-r--r--src/core/pipe.c9
-rw-r--r--src/core/socket.c25
-rw-r--r--src/core/socket.h4
5 files changed, 172 insertions, 76 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index fef6ac83..a99cd21a 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -18,6 +18,8 @@
static void nni_ep_accept_start(nni_ep *);
static void nni_ep_accept_done(void *);
+static void nni_ep_connect_start(nni_ep *);
+static void nni_ep_connect_done(void *);
static nni_idhash *nni_eps;
@@ -56,13 +58,14 @@ nni_ep_destroy(nni_ep *ep)
return;
}
nni_aio_fini(&ep->ep_acc_aio);
+ nni_aio_fini(&ep->ep_con_aio);
+ nni_aio_fini(&ep->ep_con_syn);
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
- nni_thr_fini(&ep->ep_thr);
nni_cv_fini(&ep->ep_cv);
nni_mtx_fini(&ep->ep_mtx);
NNI_FREE_STRUCT(ep);
@@ -91,6 +94,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
ep->ep_pipe = NULL;
ep->ep_id = id;
ep->ep_data = NULL;
+ ep->ep_refcnt = 0;
NNI_LIST_NODE_INIT(&ep->ep_node);
@@ -107,6 +111,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
nni_ep_destroy(ep);
return (rv);
}
+ rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
+ rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL);
+ if (rv != 0) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
ep->ep_sock = sock;
ep->ep_tran = tran;
@@ -132,16 +146,15 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
void
nni_ep_close(nni_ep *ep)
{
- // Abort any in-flight operations.
- nni_aio_stop(&ep->ep_acc_aio);
-
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed == 0) {
- ep->ep_closed = 1;
- ep->ep_ops.ep_close(ep->ep_data);
+ if (ep->ep_closed) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return;
}
- nni_cv_wake(&ep->ep_cv);
+ ep->ep_closed = 1;
nni_mtx_unlock(&ep->ep_mtx);
+
+ ep->ep_ops.ep_close(ep->ep_data);
}
static void
@@ -151,9 +164,25 @@ nni_ep_reap(nni_ep *ep)
nni_ep_close(ep); // Extra sanity.
+ // Abort any in-flight operations.
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
+ // Make sure any other unlocked users (references) are gone
+ // before we actually remove the memory. We should not have
+ // to wait long as we have closed the underlying pipe and
+ // done everything we can to wake any waiter (synchronous connect)
+ // gracefully.
+ nni_mtx_lock(&ep->ep_mtx);
+ while (ep->ep_refcnt != 0) {
+ nni_cv_wait(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+
nni_ep_destroy(ep);
}
@@ -173,59 +202,73 @@ nni_ep_stop(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
}
-static int
-nni_ep_connect_aio(nni_ep *ep, void **pipep)
+static void
+nni_ep_connect_done(void *arg)
{
- nni_aio aio;
- int rv;
+ nni_ep * ep = arg;
+ nni_aio * aio = &ep->ep_con_aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ const nni_tran_pipe *ops;
- nni_aio_init(&aio, NULL, NULL);
- aio.a_endpt = ep->ep_data;
- ep->ep_ops.ep_connect(ep->ep_data, &aio);
- nni_aio_wait(&aio);
+ int rv;
+
+ nni_mtx_lock(&ep->ep_mtx);
+ if ((rv = nni_aio_result(aio)) == 0) {
+
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *pipep = aio.a_pipe;
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
+ if (rv != 0) {
+ ops->p_fini(tpipe);
+ }
}
- nni_aio_fini(&aio);
- return (rv);
+
+done:
+ switch (rv) {
+ case 0:
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ // No further outgoing connects -- we will restart a
+ // connection from the pipe when the pipe is removed.
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled/closed -- stop everything.
+ break;
+ default:
+ // cooldown = nni_clock() + rtime;
+ // rtime *= 2;
+ // if ((maxrtime >= defrtime) && (rtime > maxrtime)) {
+ // rtime = maxrtime;
+ // }
+
+ // XXX: We need to inject a cooldown, and then try again. For
+ // now we just try again immediately. This is very suboptimal.
+ nni_ep_connect_start(ep);
+ break;
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
}
-static int
-nni_ep_connect_sync(nni_ep *ep)
+static void
+nni_ep_connect_start(nni_ep *ep)
{
- nni_pipe *pipe;
- int rv;
+ nni_aio *aio = &ep->ep_acc_aio;
- nni_mtx_lock(&ep->ep_mtx);
+ // Call with the Endpoint lock held.
if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
- }
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
+ return;
}
- pipe->p_ep = ep;
- nni_list_append(&ep->ep_pipes, pipe);
- nni_mtx_unlock(&ep->ep_mtx);
- rv = nni_ep_connect_aio(ep, &pipe->p_tran_data);
- if (rv != 0) {
- if (rv != NNG_ECLOSED) { // HACK ALERT
- nni_pipe_stop(pipe);
- }
- return (rv);
- }
- nni_pipe_start(pipe);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_pipe = pipe;
- nni_mtx_unlock(&ep->ep_mtx);
- return (0);
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_connect(ep->ep_data, aio);
}
// nni_dialer is the thread worker that dials in the background.
+#if 0 // this has the logic for timing out... remove when done.
static void
nni_dialer(void *arg)
{
@@ -287,13 +330,20 @@ nni_dialer(void *arg)
nni_mtx_unlock(&ep->ep_mtx);
}
}
+#endif
int
nni_ep_dial(nni_ep *ep, int flags)
{
- int rv = 0;
+ int rv = 0;
+ nni_aio * aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ const nni_tran_pipe *ops;
nni_mtx_lock(&ep->ep_mtx);
+ ops = ep->ep_tran->tran_pipe;
+
if (ep->ep_mode != NNI_EP_MODE_DIAL) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
@@ -307,31 +357,59 @@ nni_ep_dial(nni_ep *ep, int flags)
return (NNG_ECLOSED);
}
- if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
ep->ep_started = 1;
- if (flags & NNG_FLAG_SYNCH) {
+ if ((flags & NNG_FLAG_SYNCH) == 0) {
+ nni_ep_connect_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
- rv = nni_ep_connect_sync(ep);
- if (rv != 0) {
- nni_thr_fini(&ep->ep_thr);
- ep->ep_started = 0;
- return (rv);
- }
- nni_mtx_lock(&ep->ep_mtx);
+ return (0);
}
- nni_thr_run(&ep->ep_thr);
+ // This one is kind of special, since we need
+ // to block for the connection to complete. Ick.
+ aio = &ep->ep_con_syn;
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_connect(ep->ep_data, aio);
+
+ // We're about to drop the lock, but we cannot allow the
+ // endpoint to be removed. Put a hold on it.
+ ep->ep_refcnt++;
nni_mtx_unlock(&ep->ep_mtx);
+ nni_aio_wait(aio);
+
+ nni_mtx_lock(&ep->ep_mtx);
+ ep->ep_refcnt--;
+
+ if (ep->ep_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ rv = nni_aio_result(aio);
+ }
+
+ if (rv != 0) {
+ ep->ep_started = 0;
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (rv);
+ }
+
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
+ if (rv != 0) {
+ ops->p_fini(tpipe);
+ } else {
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ }
+
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
-static void nni_ep_accept_start(nni_ep *ep);
-
static void
nni_ep_accept_done(void *arg)
{
@@ -348,7 +426,8 @@ nni_ep_accept_done(void *arg)
if ((rv = nni_aio_result(aio)) != 0) {
goto done;
}
- NNI_ASSERT((tpipe = aio->a_pipe) != NULL);
+ tpipe = aio->a_pipe;
+ NNI_ASSERT(tpipe != NULL);
rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
if (rv != 0) {
diff --git a/src/core/endpt.h b/src/core/endpt.h
index a47586a0..3350bc59 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -26,18 +26,19 @@ struct nni_ep {
nni_list_node ep_node; // per socket list
nni_sock * ep_sock;
char ep_addr[NNG_MAXADDRLEN];
- nni_thr ep_thr;
int ep_mode;
int ep_started;
int ep_stop;
int ep_closed; // full shutdown
int ep_bound; // true if we bound locally
+ int ep_refcnt;
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
nni_list ep_pipes;
nni_aio ep_acc_aio;
nni_aio ep_con_aio;
+ nni_aio ep_con_syn; // used for sync connect
nni_taskq_ent ep_reap_tqe;
};
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 8f2099a9..f1e8014e 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -91,8 +91,6 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
- nni_sock *sock = p->p_sock;
-
nni_mtx_lock(&p->p_mtx);
if (p->p_reap == 1) {
// We already did a close.
@@ -200,18 +198,13 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran)
nni_pipe_destroy(p);
return (rv);
}
- p->p_sock = sock;
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *tran->tran_pipe;
- // Save the protocol destructor.
- p->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
-
// Initialize protocol pipe data.
- rv = sock->s_pipe_ops.pipe_init(&p->p_proto_data, p, sock->s_data);
- if (rv != 0) {
+ if ((rv = nni_sock_pipe_init(sock, p)) != 0) {
nni_pipe_destroy(p);
return (rv);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 3a4f36e5..75ae8450 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -77,6 +78,27 @@ nni_sock_rele(nni_sock *sock)
}
int
+nni_sock_pipe_init(nni_sock *sock, nni_pipe *pipe)
+{
+ int rv;
+
+ // Initialize protocol pipe data.
+ nni_mtx_lock(&sock->s_mx);
+ rv = sock->s_pipe_ops.pipe_init(
+ &pipe->p_proto_data, pipe, sock->s_data);
+ if (rv != 0) {
+ nni_mtx_lock(&sock->s_mx);
+ return (rv);
+ }
+ // Save the protocol destructor.
+ pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
+ pipe->p_sock = sock;
+ nni_list_append(&sock->s_pipes, pipe);
+ nni_mtx_unlock(&sock->s_mx);
+ return (0);
+}
+
+int
nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
{
int rv;
@@ -98,9 +120,6 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
return (rv);
}
- // We have claimed ownership of the pipe, so add it to the list.
- // Up until this point, the caller could destroy the pipe.
- nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
return (0);
diff --git a/src/core/socket.h b/src/core/socket.h
index 2dc06009..56cf60c0 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -87,6 +87,10 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
+// nni_sock_pipe_init adds the pipe to the socket. It is called by
+// the generic pipe creation code.
+extern int nni_sock_pipe_init(nni_sock *, nni_pipe *);
+
extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *);
// nni_sock_pipe_ready lets the socket know the pipe is ready for