aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c84
-rw-r--r--src/core/aio.h10
-rw-r--r--src/core/endpt.c20
-rw-r--r--src/core/pipe.c65
-rw-r--r--src/core/socket.c93
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/taskq.c3
7 files changed, 159 insertions, 123 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a9fbc50d..620a865d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,10 +12,9 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
- NNI_AIO_START = 0x8,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
};
// These are used for expiration.
@@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_START;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- void (*cancelfn)(nni_aio *);
+ nni_aio_stop(aio);
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- if ((aio->a_flags & NNI_AIO_DONE) == 0) {
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = NNG_ECANCELED;
- cancelfn = aio->a_prov_cancel;
- if (aio->a_flags & NNI_AIO_START) {
- aio->a_flags &= ~NNI_AIO_START;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+ nni_mtx_fini(&aio->a_lk);
- } else {
- cancelfn = NULL;
+ if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
+ NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
}
- nni_cv_wake(&aio->a_cv);
+}
- while (aio->a_refcnt != 0) {
- nni_cv_wait(&aio->a_cv);
+// nni_aio_stop cancels any oustanding operation, and waits for the
+// callback to complete, if still running. It also marks the AIO as
+// stopped, preventing further calls to nni_aio_start from succeeding.
+// To correctly tear down an AIO, call stop, and make sure any other
+// calles are not also stopped, before calling nni_aio_fini to release
+// actual memory.
+void
+nni_aio_stop(nni_aio *aio)
+{
+ if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) {
+ // Never initialized, so nothing should have happened.
+ return;
}
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
nni_mtx_unlock(&aio->a_lk);
- // Stop any timeouts. If one was in flight, we wait until it
- // completes (it could fire the completion callback.)
- nni_aio_expire_remove(aio);
-
- // Cancel the AIO if it was scheduled.
- if (cancelfn != NULL) {
- cancelfn(aio);
- }
+ nni_aio_cancel(aio, NNG_ECANCELED);
// Wait for any outstanding task to complete. We won't schedule
// new stuff because nni_aio_start will fail (due to AIO_FINI).
nni_taskq_wait(NULL, &aio->a_tqe);
-
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
-
- if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
- NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
- }
}
int
@@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv)
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// The operation already completed - so there's nothing
// left for us to do.
nni_mtx_unlock(&aio->a_lk);
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
-
aio->a_refcnt++;
nni_mtx_unlock(&aio->a_lk);
+ // Guaraneed to just be a list operation.
+ nni_aio_expire_remove(aio);
+
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
cancelfn(aio);
@@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- if (!(aio->a_flags & NNI_AIO_FINI)) {
- // If we are finalizing, then we are done.
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
nni_mtx_unlock(&aio->a_lk);
}
@@ -213,13 +198,12 @@ int
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = count;
@@ -240,13 +224,12 @@ int
nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = 0;
@@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
diff --git a/src/core/aio.h b/src/core/aio.h
index ad5b560e..c698ce20 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -69,6 +69,16 @@ extern int nni_aio_init(nni_aio *, nni_cb, void *);
// on zero'd memory.
extern void nni_aio_fini(nni_aio *);
+// nni_aio_stop cancels any unfinished I/O, running completion callbacks,
+// but also prevents any new operations from starting (nni_aio_start will
+// return NNG_ESTATE). This should be called before nni_aio_fini(). The
+// best pattern is to call nni_aio_stop on all linked aios, before calling
+// nni_aio_fini on any of them. This function will block until any
+// callbacks are executed, and therefore it should never be executed
+// from a callback itself. (To abort operations without blocking
+// use nni_aio_cancel instead.)
+extern void nni_aio_stop(nni_aio *);
+
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
diff --git a/src/core/endpt.c b/src/core/endpt.c
index ca76838a..6b474698 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -64,6 +64,10 @@ nni_ep_destroy(nni_ep *ep)
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+ nni_aio_stop(&ep->ep_backoff);
nni_aio_fini(&ep->ep_acc_aio);
nni_aio_fini(&ep->ep_con_aio);
@@ -179,6 +183,10 @@ nni_ep_reap(nni_ep *ep)
{
nni_ep_close(ep); // Extra sanity.
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
@@ -188,11 +196,13 @@ nni_ep_reap(nni_ep *ep)
// done everything we can to wake any waiter (synchronous connect)
// gracefully.
nni_mtx_lock(&ep->ep_mtx);
- while (!nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wait(&ep->ep_cv);
- }
- while (ep->ep_refcnt != 0) {
- nni_cv_wait(&ep->ep_cv);
+ ep->ep_closed = 1;
+ for (;;) {
+ if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) {
+ nni_cv_wait(&ep->ep_cv);
+ continue;
+ }
+ break;
}
nni_mtx_unlock(&ep->ep_mtx);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 8cc21941..5b51a38b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p)
if (p == NULL) {
return;
}
-
+ NNI_ASSERT(p->p_refcnt != 0xDEAD);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
@@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p)
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
+ p->p_refcnt = 0xDEAD;
+
+ nni_aio_stop(&p->p_start_aio);
+ nni_aio_fini(&p->p_start_aio);
if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
@@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p)
}
p->p_reap = 1;
- // abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
-
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
@@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
- // Ensure that the negotiation step is aborted fully.
- nni_aio_fini(&p->p_start_aio);
+ // abort any pending negotiation/start process.
+ nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
}
// Pipe reap is called on a taskq when the pipe should be closed. No
@@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p)
// Transport close...
nni_pipe_close(p);
+ nni_aio_stop(&p->p_start_aio);
+
// Remove the pipe from the socket and the endpoint. Note
// that it is in theory possible for either of these to be null
// if the pipe is being torn down before it is fully initialized.
@@ -148,6 +151,7 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
+ nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg)
}
}
-void
-nni_pipe_hold(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt++;
- nni_mtx_unlock(&p->p_mtx);
-}
-
-void
-nni_pipe_rele(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt--;
- if (p->p_refcnt == 0) {
- nni_cv_wake(&p->p_cv);
- }
- nni_mtx_unlock(&p->p_mtx);
-}
-
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
@@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
- tran->tran_pipe->p_fini(p);
+ tran->tran_pipe->p_fini(tdata);
return (NNG_ENOMEM);
}
@@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata)
return (rv);
}
- nni_pipe_hold(p);
-
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- goto fail;
- }
-
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
- goto fail;
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Attempt to initialize sock protocol & endpoint.
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- goto fail;
- }
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- goto fail;
+ if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Start the pipe running.
- nni_pipe_start(p);
- nni_pipe_rele(p);
-
return (0);
-
-fail:
- nni_pipe_stop(p);
- nni_pipe_rele(p);
-
- return (rv);
}
int
diff --git a/src/core/socket.c b/src/core/socket.c
index 10bc1c80..23a1793a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock)
nni_objhash_unref(nni_socks, sock->s_id);
}
+static int
+nni_sock_pipe_start(nni_pipe *pipe)
+{
+ nni_sock *sock = pipe->p_sock;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
+
+ NNI_ASSERT(sock != NULL);
+ if (sock->s_closing) {
+ // We're closing, bail out.
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+ if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
+ // Protocol rejection for other reasons.
+ // E.g. pair and already have active connected partner.
+ return (rv);
+ }
+ return (0);
+}
+
+static void
+nni_sock_pipe_start_cb(void *arg)
+{
+ nni_pipe *pipe = arg;
+ nni_aio * aio = &pipe->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ // Failed I/O during start, abort everything.
+ nni_pipe_stop(pipe);
+ return;
+ }
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ return;
+ }
+}
+
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
{
int rv;
// Initialize protocol pipe data.
nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
+ nni_mtx_lock(&ep->ep_mtx);
+
+ if ((sock->s_closing) || (ep->ep_closed)) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
+ rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
rv = sock->s_pipe_ops.pipe_init(
&pipe->p_proto_data, pipe, sock->s_data);
if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&sock->s_mx);
return (rv);
}
// Save the protocol destructor.
pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
pipe->p_sock = sock;
+ pipe->p_ep = ep;
+
nni_list_append(&sock->s_pipes, pipe);
+ nni_list_append(&ep->ep_pipes, pipe);
+
+ // Start the initial negotiation I/O...
+ if (pipe->p_tran_ops.p_start == NULL) {
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ }
+ } else {
+ pipe->p_tran_ops.p_start(
+ pipe->p_tran_data, &pipe->p_start_aio);
+ }
+
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (0);
}
@@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- nni_mtx_lock(&sock->s_mx);
+ // Stop any pending negotiation.
+ nni_aio_stop(&pipe->p_start_aio);
+ nni_mtx_lock(&sock->s_mx);
if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
nni_mtx_unlock(&sock->s_mx);
return;
@@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, arrange for it to teardown hard. (Close, etc.).
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_stop(ep);
}
-
- // Wait for the pipes to be reaped (there should not be any because
- // we have already reaped the EPs.)
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_cv_wait(&sock->s_cv);
+ // For each pipe, arrange for it to teardown hard.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
}
- // Wait for the eps to be reaped.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ // We have to wait for *both* endpoints and pipes to be removed.
+ while ((!nni_list_empty(&sock->s_pipes)) ||
+ (!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index d0196eea..41dfbc33 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -87,8 +87,11 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// nni_sock_pipe_add adds the pipe to the socket. It is called by
-// the generic pipe creation code.
-extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
+// the generic pipe creation code. It also adds the socket to the
+// ep list, and starts the pipe. It does all these to ensure that
+// we have complete success or failure, and there is no point where
+// a pipe could wind up orphaned.
+extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 36129bfd..64179790 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -153,7 +154,7 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (NNG_ECLOSED);
}
// It might already be scheduled... if so don't redo it.
- if (ent->tqe_tq == NULL) {
+ if (!nni_list_active(&tq->tq_ents, ent)) {
ent->tqe_tq = tq;
nni_list_append(&tq->tq_ents, ent);
}