aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
commita37093079b492e966344416445aae354b147d30e (patch)
tree2f21fc2bc716f2423ba02f4713b25038c429ec4e /src
parent88fb04f61918b06e6e269c1960058c3df5e0a0ef (diff)
downloadnng-a37093079b492e966344416445aae354b147d30e.tar.gz
nng-a37093079b492e966344416445aae354b147d30e.tar.bz2
nng-a37093079b492e966344416445aae354b147d30e.zip
Yet more race condition fixes.
We need to remember that protocol stops can run synchronously, and therefore we need to wait for the aio to complete. Further, we need to break apart shutting down aio activity from deallocation, as we need to shut down *all* async activity before deallocating *anything*. Noticed that we had a pipe race in the surveyor pattern too.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c84
-rw-r--r--src/core/aio.h10
-rw-r--r--src/core/endpt.c20
-rw-r--r--src/core/pipe.c65
-rw-r--r--src/core/socket.c93
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/taskq.c3
-rw-r--r--src/platform/posix/posix_epdesc.c15
-rw-r--r--src/platform/posix/posix_pipedesc.c13
-rw-r--r--src/platform/posix/posix_thread.c1
-rw-r--r--src/protocol/bus/bus.c25
-rw-r--r--src/protocol/pair/pair.c1
-rw-r--r--src/protocol/pipeline/pull.c12
-rw-r--r--src/protocol/pipeline/push.c9
-rw-r--r--src/protocol/pubsub/pub.c9
-rw-r--r--src/protocol/pubsub/sub.c4
-rw-r--r--src/protocol/reqrep/rep.c9
-rw-r--r--src/protocol/reqrep/req.c59
-rw-r--r--src/protocol/survey/respond.c33
-rw-r--r--src/protocol/survey/survey.c10
-rw-r--r--src/transport/ipc/ipc.c5
-rw-r--r--src/transport/tcp/tcp.c5
22 files changed, 291 insertions, 201 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a9fbc50d..620a865d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,10 +12,9 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
- NNI_AIO_START = 0x8,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
};
// These are used for expiration.
@@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_START;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- void (*cancelfn)(nni_aio *);
+ nni_aio_stop(aio);
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- if ((aio->a_flags & NNI_AIO_DONE) == 0) {
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = NNG_ECANCELED;
- cancelfn = aio->a_prov_cancel;
- if (aio->a_flags & NNI_AIO_START) {
- aio->a_flags &= ~NNI_AIO_START;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+ nni_mtx_fini(&aio->a_lk);
- } else {
- cancelfn = NULL;
+ if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
+ NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
}
- nni_cv_wake(&aio->a_cv);
+}
- while (aio->a_refcnt != 0) {
- nni_cv_wait(&aio->a_cv);
+// nni_aio_stop cancels any oustanding operation, and waits for the
+// callback to complete, if still running. It also marks the AIO as
+// stopped, preventing further calls to nni_aio_start from succeeding.
+// To correctly tear down an AIO, call stop, and make sure any other
+// calles are not also stopped, before calling nni_aio_fini to release
+// actual memory.
+void
+nni_aio_stop(nni_aio *aio)
+{
+ if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) {
+ // Never initialized, so nothing should have happened.
+ return;
}
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
nni_mtx_unlock(&aio->a_lk);
- // Stop any timeouts. If one was in flight, we wait until it
- // completes (it could fire the completion callback.)
- nni_aio_expire_remove(aio);
-
- // Cancel the AIO if it was scheduled.
- if (cancelfn != NULL) {
- cancelfn(aio);
- }
+ nni_aio_cancel(aio, NNG_ECANCELED);
// Wait for any outstanding task to complete. We won't schedule
// new stuff because nni_aio_start will fail (due to AIO_FINI).
nni_taskq_wait(NULL, &aio->a_tqe);
-
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
-
- if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
- NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
- }
}
int
@@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv)
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// The operation already completed - so there's nothing
// left for us to do.
nni_mtx_unlock(&aio->a_lk);
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
-
aio->a_refcnt++;
nni_mtx_unlock(&aio->a_lk);
+ // Guaraneed to just be a list operation.
+ nni_aio_expire_remove(aio);
+
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
cancelfn(aio);
@@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- if (!(aio->a_flags & NNI_AIO_FINI)) {
- // If we are finalizing, then we are done.
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
nni_mtx_unlock(&aio->a_lk);
}
@@ -213,13 +198,12 @@ int
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = count;
@@ -240,13 +224,12 @@ int
nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = 0;
@@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
diff --git a/src/core/aio.h b/src/core/aio.h
index ad5b560e..c698ce20 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -69,6 +69,16 @@ extern int nni_aio_init(nni_aio *, nni_cb, void *);
// on zero'd memory.
extern void nni_aio_fini(nni_aio *);
+// nni_aio_stop cancels any unfinished I/O, running completion callbacks,
+// but also prevents any new operations from starting (nni_aio_start will
+// return NNG_ESTATE). This should be called before nni_aio_fini(). The
+// best pattern is to call nni_aio_stop on all linked aios, before calling
+// nni_aio_fini on any of them. This function will block until any
+// callbacks are executed, and therefore it should never be executed
+// from a callback itself. (To abort operations without blocking
+// use nni_aio_cancel instead.)
+extern void nni_aio_stop(nni_aio *);
+
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
diff --git a/src/core/endpt.c b/src/core/endpt.c
index ca76838a..6b474698 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -64,6 +64,10 @@ nni_ep_destroy(nni_ep *ep)
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+ nni_aio_stop(&ep->ep_backoff);
nni_aio_fini(&ep->ep_acc_aio);
nni_aio_fini(&ep->ep_con_aio);
@@ -179,6 +183,10 @@ nni_ep_reap(nni_ep *ep)
{
nni_ep_close(ep); // Extra sanity.
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
@@ -188,11 +196,13 @@ nni_ep_reap(nni_ep *ep)
// done everything we can to wake any waiter (synchronous connect)
// gracefully.
nni_mtx_lock(&ep->ep_mtx);
- while (!nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wait(&ep->ep_cv);
- }
- while (ep->ep_refcnt != 0) {
- nni_cv_wait(&ep->ep_cv);
+ ep->ep_closed = 1;
+ for (;;) {
+ if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) {
+ nni_cv_wait(&ep->ep_cv);
+ continue;
+ }
+ break;
}
nni_mtx_unlock(&ep->ep_mtx);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 8cc21941..5b51a38b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p)
if (p == NULL) {
return;
}
-
+ NNI_ASSERT(p->p_refcnt != 0xDEAD);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
@@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p)
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
+ p->p_refcnt = 0xDEAD;
+
+ nni_aio_stop(&p->p_start_aio);
+ nni_aio_fini(&p->p_start_aio);
if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
@@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p)
}
p->p_reap = 1;
- // abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
-
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
@@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
- // Ensure that the negotiation step is aborted fully.
- nni_aio_fini(&p->p_start_aio);
+ // abort any pending negotiation/start process.
+ nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
}
// Pipe reap is called on a taskq when the pipe should be closed. No
@@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p)
// Transport close...
nni_pipe_close(p);
+ nni_aio_stop(&p->p_start_aio);
+
// Remove the pipe from the socket and the endpoint. Note
// that it is in theory possible for either of these to be null
// if the pipe is being torn down before it is fully initialized.
@@ -148,6 +151,7 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
+ nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg)
}
}
-void
-nni_pipe_hold(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt++;
- nni_mtx_unlock(&p->p_mtx);
-}
-
-void
-nni_pipe_rele(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt--;
- if (p->p_refcnt == 0) {
- nni_cv_wake(&p->p_cv);
- }
- nni_mtx_unlock(&p->p_mtx);
-}
-
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
@@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
- tran->tran_pipe->p_fini(p);
+ tran->tran_pipe->p_fini(tdata);
return (NNG_ENOMEM);
}
@@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata)
return (rv);
}
- nni_pipe_hold(p);
-
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- goto fail;
- }
-
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
- goto fail;
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Attempt to initialize sock protocol & endpoint.
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- goto fail;
- }
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- goto fail;
+ if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Start the pipe running.
- nni_pipe_start(p);
- nni_pipe_rele(p);
-
return (0);
-
-fail:
- nni_pipe_stop(p);
- nni_pipe_rele(p);
-
- return (rv);
}
int
diff --git a/src/core/socket.c b/src/core/socket.c
index 10bc1c80..23a1793a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock)
nni_objhash_unref(nni_socks, sock->s_id);
}
+static int
+nni_sock_pipe_start(nni_pipe *pipe)
+{
+ nni_sock *sock = pipe->p_sock;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
+
+ NNI_ASSERT(sock != NULL);
+ if (sock->s_closing) {
+ // We're closing, bail out.
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+ if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
+ // Protocol rejection for other reasons.
+ // E.g. pair and already have active connected partner.
+ return (rv);
+ }
+ return (0);
+}
+
+static void
+nni_sock_pipe_start_cb(void *arg)
+{
+ nni_pipe *pipe = arg;
+ nni_aio * aio = &pipe->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ // Failed I/O during start, abort everything.
+ nni_pipe_stop(pipe);
+ return;
+ }
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ return;
+ }
+}
+
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
{
int rv;
// Initialize protocol pipe data.
nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
+ nni_mtx_lock(&ep->ep_mtx);
+
+ if ((sock->s_closing) || (ep->ep_closed)) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
+ rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
rv = sock->s_pipe_ops.pipe_init(
&pipe->p_proto_data, pipe, sock->s_data);
if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&sock->s_mx);
return (rv);
}
// Save the protocol destructor.
pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
pipe->p_sock = sock;
+ pipe->p_ep = ep;
+
nni_list_append(&sock->s_pipes, pipe);
+ nni_list_append(&ep->ep_pipes, pipe);
+
+ // Start the initial negotiation I/O...
+ if (pipe->p_tran_ops.p_start == NULL) {
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ }
+ } else {
+ pipe->p_tran_ops.p_start(
+ pipe->p_tran_data, &pipe->p_start_aio);
+ }
+
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (0);
}
@@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- nni_mtx_lock(&sock->s_mx);
+ // Stop any pending negotiation.
+ nni_aio_stop(&pipe->p_start_aio);
+ nni_mtx_lock(&sock->s_mx);
if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
nni_mtx_unlock(&sock->s_mx);
return;
@@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, arrange for it to teardown hard. (Close, etc.).
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_stop(ep);
}
-
- // Wait for the pipes to be reaped (there should not be any because
- // we have already reaped the EPs.)
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_cv_wait(&sock->s_cv);
+ // For each pipe, arrange for it to teardown hard.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
}
- // Wait for the eps to be reaped.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ // We have to wait for *both* endpoints and pipes to be removed.
+ while ((!nni_list_empty(&sock->s_pipes)) ||
+ (!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index d0196eea..41dfbc33 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -87,8 +87,11 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// nni_sock_pipe_add adds the pipe to the socket. It is called by
-// the generic pipe creation code.
-extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
+// the generic pipe creation code. It also adds the socket to the
+// ep list, and starts the pipe. It does all these to ensure that
+// we have complete success or failure, and there is no point where
+// a pipe could wind up orphaned.
+extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 36129bfd..64179790 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -153,7 +154,7 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (NNG_ECLOSED);
}
// It might already be scheduled... if so don't redo it.
- if (ent->tqe_tq == NULL) {
+ if (!nni_list_active(&tq->tq_ents, ent)) {
ent->tqe_tq = tq;
nni_list_append(&tq->tq_ents, ent);
}
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index b89af982..7a91b4ec 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -195,6 +195,7 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio * aio;
struct sockaddr_un *sun;
+ int fd;
ed->closed = 1;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
@@ -204,14 +205,14 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
- if (ed->node.fd != -1) {
- (void) shutdown(ed->node.fd, SHUT_RDWR);
+ if ((fd = ed->node.fd) != -1) {
+ ed->node.fd = -1;
+ (void) shutdown(fd, SHUT_RDWR);
+ (void) close(fd);
sun = (void *) &ed->locaddr;
if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
(void) unlink(sun->sun_path);
}
- (void) close(ed->node.fd);
- ed->node.fd = -1;
}
}
@@ -511,9 +512,13 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- if (ed->node.fd >= 0) {
+ int fd;
+ nni_mtx_lock(&ed->mtx);
+ if ((fd = ed->node.fd) != -1) {
(void) close(ed->node.fd);
+ nni_posix_epdesc_doclose(ed);
}
+ nni_mtx_unlock(&ed->mtx);
nni_posix_pollq_remove(&ed->node);
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index 4e61c2c4..b85e79a9 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -45,18 +45,21 @@ static void
nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
pd->closed = 1;
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
+ if ((fd = pd->node.fd) != -1) {
+ // Let any peer know we are closing.
+ pd->node.fd = -1;
+ (void) shutdown(fd, SHUT_RDWR);
+ (void) close(fd);
+ }
}
static void
@@ -269,7 +272,7 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
nni_mtx_unlock(&pd->mtx);
return;
}
- if (pd->closed < 0) {
+ if (pd->closed) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
return;
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 45bbfcd3..071e6007 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -329,6 +329,7 @@ nni_plat_fini(void)
{
pthread_mutex_lock(&nni_plat_lock);
if (nni_plat_inited) {
+ nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 17ef03bb..3070b90d 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -60,6 +60,7 @@ nni_bus_sock_fini(void *arg)
nni_bus_sock *psock = arg;
if (psock != NULL) {
+ nni_aio_stop(&psock->aio_getq);
nni_aio_fini(&psock->aio_getq);
nni_mtx_fini(&psock->mtx);
NNI_FREE_STRUCT(psock);
@@ -107,15 +108,13 @@ nni_bus_pipe_fini(void *arg)
{
nni_bus_pipe *ppipe = arg;
- if (ppipe != NULL) {
- nni_mtx_fini(&ppipe->mtx);
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- NNI_FREE_STRUCT(ppipe);
- }
+ nni_aio_fini(&ppipe->aio_getq);
+ nni_aio_fini(&ppipe->aio_send);
+ nni_aio_fini(&ppipe->aio_recv);
+ nni_aio_fini(&ppipe->aio_putq);
+ nni_msgq_fini(ppipe->sendq);
+ nni_mtx_fini(&ppipe->mtx);
+ NNI_FREE_STRUCT(ppipe);
}
static int
@@ -183,10 +182,10 @@ nni_bus_pipe_stop(void *arg)
nni_msgq_close(ppipe->sendq);
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECLOSED);
- nni_aio_cancel(&ppipe->aio_send, NNG_ECLOSED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECLOSED);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECLOSED);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
nni_mtx_lock(&ppipe->psock->mtx);
if (nni_list_active(&psock->pipes, ppipe)) {
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index f5fec540..55ce5aa9 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -122,7 +122,6 @@ static void
nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *ppipe = arg;
-
nni_aio_fini(&ppipe->aio_send);
nni_aio_fini(&ppipe->aio_recv);
nni_aio_fini(&ppipe->aio_putq);
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 39e809e6..cde79824 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -90,11 +90,9 @@ nni_pull_pipe_fini(void *arg)
{
nni_pull_pipe *pp = arg;
- if (pp != NULL) {
- nni_aio_fini(&pp->putq_aio);
- nni_aio_fini(&pp->recv_aio);
- NNI_FREE_STRUCT(pp);
- }
+ nni_aio_fini(&pp->putq_aio);
+ nni_aio_fini(&pp->recv_aio);
+ NNI_FREE_STRUCT(pp);
}
static int
@@ -113,8 +111,8 @@ nni_pull_pipe_stop(void *arg)
{
nni_pull_pipe *pp = arg;
- nni_aio_cancel(&pp->putq_aio, NNG_ECANCELED);
- nni_aio_cancel(&pp->recv_aio, NNG_ECANCELED);
+ nni_aio_stop(&pp->putq_aio);
+ nni_aio_stop(&pp->recv_aio);
}
static void
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 43c0feaf..b7d4322c 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -132,12 +132,11 @@ nni_push_pipe_start(void *arg)
static void
nni_push_pipe_stop(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ nni_push_pipe *pp = arg;
- nni_aio_cancel(&pp->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_getq, NNG_ECANCELED);
+ nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_getq);
}
static void
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 316cbf50..e32f179a 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -83,6 +83,7 @@ nni_pub_sock_fini(void *arg)
{
nni_pub_sock *pub = arg;
+ nni_aio_stop(&pub->aio_getq);
nni_aio_fini(&pub->aio_getq);
nni_mtx_fini(&pub->mtx);
NNI_FREE_STRUCT(pub);
@@ -100,7 +101,6 @@ static void
nni_pub_pipe_fini(void *arg)
{
nni_pub_pipe *pp = arg;
-
nni_aio_fini(&pp->aio_getq);
nni_aio_fini(&pp->aio_send);
nni_aio_fini(&pp->aio_recv);
@@ -172,9 +172,10 @@ nni_pub_pipe_stop(void *arg)
nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
- nni_aio_cancel(&pp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_recv);
+
nni_msgq_close(pp->sendq);
nni_mtx_lock(&pub->mtx);
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 36a42c49..03d76e2d 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -123,8 +123,8 @@ nni_sub_pipe_stop(void *arg)
{
nni_sub_pipe *sp = arg;
- nni_aio_cancel(&sp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&sp->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&sp->aio_putq);
+ nni_aio_stop(&sp->aio_recv);
}
static void
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 013b02fb..049b1422 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -56,6 +56,7 @@ nni_rep_sock_fini(void *arg)
{
nni_rep_sock *rep = arg;
+ nni_aio_stop(&rep->aio_getq);
nni_aio_fini(&rep->aio_getq);
nni_idhash_fini(rep->pipes);
if (rep->btrace != NULL) {
@@ -192,10 +193,10 @@ nni_rep_pipe_stop(void *arg)
nni_rep_sock *rep = rp->rep;
nni_msgq_close(rp->sendq);
- nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_send);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_putq);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f13094ff..8e7056f5 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -34,6 +34,7 @@ struct nni_req_sock {
nni_time resend;
int raw;
int wantw;
+ int closed;
nni_msg * reqmsg;
nni_req_pipe *pendpipe;
@@ -46,6 +47,7 @@ struct nni_req_sock {
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
nni_mtx mtx;
+ nni_cv cv;
};
// An nni_req_pipe is our per-pipe protocol private structure.
@@ -81,6 +83,10 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
NNI_FREE_STRUCT(req);
return (rv);
}
+ if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) {
+ nni_mtx_fini(&req->mtx);
+ NNI_FREE_STRUCT(req);
+ }
NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
@@ -108,6 +114,10 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx_lock(&req->mtx);
+ req->closed = 1;
+ nni_mtx_unlock(&req->mtx);
+
nni_timer_cancel(&req->timer);
}
@@ -117,10 +127,15 @@ nni_req_sock_fini(void *arg)
nni_req_sock *req = arg;
nni_mtx_lock(&req->mtx);
+ while ((!nni_list_empty(&req->readypipes)) ||
+ (!nni_list_empty(&req->busypipes))) {
+ nni_cv_wait(&req->cv);
+ }
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
nni_mtx_unlock(&req->mtx);
+ nni_cv_fini(&req->cv);
nni_mtx_fini(&req->mtx);
NNI_FREE_STRUCT(req);
}
@@ -171,15 +186,13 @@ nni_req_pipe_fini(void *arg)
{
nni_req_pipe *rp = arg;
- if (rp != NULL) {
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_putq);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_sendcooked);
- nni_aio_fini(&rp->aio_sendraw);
- nni_mtx_fini(&rp->mtx);
- NNI_FREE_STRUCT(rp);
- }
+ nni_aio_fini(&rp->aio_getq);
+ nni_aio_fini(&rp->aio_putq);
+ nni_aio_fini(&rp->aio_recv);
+ nni_aio_fini(&rp->aio_sendcooked);
+ nni_aio_fini(&rp->aio_sendraw);
+ nni_mtx_fini(&rp->mtx);
+ NNI_FREE_STRUCT(rp);
}
static int
@@ -193,6 +206,10 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_lock(&req->mtx);
+ if (req->closed) {
+ nni_mtx_unlock(&req->mtx);
+ return (NNG_ECLOSED);
+ }
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
nni_req_resend(req);
@@ -210,11 +227,11 @@ nni_req_pipe_stop(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendcooked, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendraw, NNG_ECANCELED);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_sendcooked);
+ nni_aio_stop(&rp->aio_sendraw);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
@@ -222,8 +239,11 @@ nni_req_pipe_stop(void *arg)
nni_mtx_lock(&req->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
- if (nni_list_active(&req->readypipes, rp)) {
- nni_list_remove(&req->readypipes, rp);
+ if (nni_list_node_active(&rp->node)) {
+ nni_list_node_remove(&rp->node);
+ if (req->closed) {
+ nni_cv_wake(&req->cv);
+ }
}
if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
@@ -443,10 +463,15 @@ nni_req_resend(nni_req_sock *req)
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
// requests.
- if (req->reqmsg == NULL) {
+ if ((msg = req->reqmsg) == NULL) {
return;
}
+ if (req->closed) {
+ req->reqmsg = NULL;
+ nni_msg_free(msg);
+ }
+
if (req->wantw) {
req->wantw = 0;
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 4a4c8741..089e730e 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -38,6 +38,7 @@ struct nni_resp_sock {
char * btrace;
size_t btrace_len;
nni_aio aio_getq;
+ nni_mtx mtx;
};
// An nni_resp_pipe is our per-pipe protocol private structure.
@@ -57,14 +58,14 @@ nni_resp_sock_fini(void *arg)
{
nni_resp_sock *psock = arg;
- if (psock != NULL) {
- nni_aio_fini(&psock->aio_getq);
- nni_idhash_fini(psock->pipes);
- if (psock->btrace != NULL) {
- nni_free(psock->btrace, psock->btrace_len);
- }
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(&psock->aio_getq);
+ nni_aio_fini(&psock->aio_getq);
+ nni_idhash_fini(psock->pipes);
+ if (psock->btrace != NULL) {
+ nni_free(psock->btrace, psock->btrace_len);
}
+ nni_mtx_fini(&psock->mtx);
+ NNI_FREE_STRUCT(psock);
}
static int
@@ -83,6 +84,10 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
psock->btrace_len = 0;
psock->urq = nni_sock_recvq(nsock);
psock->uwq = nni_sock_sendq(nsock);
+
+ if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
+ goto fail;
+ }
if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
goto fail;
}
@@ -177,7 +182,9 @@ nni_resp_pipe_start(void *arg)
ppipe->id = nni_pipe_id(ppipe->npipe);
+ nni_mtx_lock(&psock->mtx);
rv = nni_idhash_insert(psock->pipes, ppipe->id, ppipe);
+ nni_mtx_unlock(&psock->mtx);
if (rv != 0) {
return (rv);
}
@@ -195,13 +202,15 @@ nni_resp_pipe_stop(void *arg)
nni_resp_sock *psock = ppipe->psock;
nni_msgq_close(ppipe->sendq);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
if (ppipe->id != 0) {
+ nni_mtx_lock(&psock->mtx);
nni_idhash_remove(psock->pipes, ppipe->id);
+ nni_mtx_unlock(&psock->mtx);
ppipe->id = 0;
}
}
@@ -239,6 +248,7 @@ nni_resp_sock_getq_cb(void *arg)
NNI_GET32(header, id);
nni_msg_trim_header(msg, 4);
+ nni_mtx_lock(&psock->mtx);
rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe);
if (rv != 0) {
@@ -250,6 +260,7 @@ nni_resp_sock_getq_cb(void *arg)
nni_msg_free(msg);
}
}
+ nni_mtx_unlock(&psock->mtx);
}
void
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 85657f57..633e1491 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -60,6 +60,7 @@ nni_surv_sock_fini(void *arg)
{
nni_surv_sock *psock = arg;
+ nni_aio_stop(&psock->aio_getq);
nni_aio_fini(&psock->aio_getq);
nni_mtx_fini(&psock->mtx);
NNI_FREE_STRUCT(psock);
@@ -191,10 +192,11 @@ nni_surv_pipe_stop(void *arg)
nni_surv_pipe *ppipe = arg;
nni_surv_sock *psock = ppipe->psock;
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
+
nni_msgq_close(ppipe->sendq);
nni_mtx_lock(&psock->mtx);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 65266ccc..b2b9c438 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -86,6 +86,10 @@ nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
+ nni_aio_stop(&pipe->rxaio);
+ nni_aio_stop(&pipe->txaio);
+ nni_aio_stop(&pipe->negaio);
+
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
@@ -462,6 +466,7 @@ nni_ipc_ep_fini(void *arg)
{
nni_ipc_ep *ep = arg;
+ nni_aio_stop(&ep->aio);
nni_plat_ipc_ep_fini(ep->iep);
nni_aio_fini(&ep->aio);
nni_mtx_fini(&ep->mtx);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 13d5716a..1bd42cec 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -86,6 +86,10 @@ nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *pipe = arg;
+ nni_aio_stop(&pipe->rxaio);
+ nni_aio_stop(&pipe->txaio);
+ nni_aio_stop(&pipe->negaio);
+
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
@@ -530,6 +534,7 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
+ nni_aio_stop(&ep->aio);
if (ep->tep != NULL) {
nni_plat_tcp_ep_fini(ep->tep);
}