aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c84
-rw-r--r--src/core/aio.h10
-rw-r--r--src/core/endpt.c20
-rw-r--r--src/core/pipe.c65
-rw-r--r--src/core/socket.c93
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/taskq.c3
-rw-r--r--src/platform/posix/posix_epdesc.c15
-rw-r--r--src/platform/posix/posix_pipedesc.c13
-rw-r--r--src/platform/posix/posix_thread.c1
-rw-r--r--src/protocol/bus/bus.c25
-rw-r--r--src/protocol/pair/pair.c1
-rw-r--r--src/protocol/pipeline/pull.c12
-rw-r--r--src/protocol/pipeline/push.c9
-rw-r--r--src/protocol/pubsub/pub.c9
-rw-r--r--src/protocol/pubsub/sub.c4
-rw-r--r--src/protocol/reqrep/rep.c9
-rw-r--r--src/protocol/reqrep/req.c59
-rw-r--r--src/protocol/survey/respond.c33
-rw-r--r--src/protocol/survey/survey.c10
-rw-r--r--src/transport/ipc/ipc.c5
-rw-r--r--src/transport/tcp/tcp.c5
22 files changed, 291 insertions, 201 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a9fbc50d..620a865d 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -12,10 +12,9 @@
#include <string.h>
enum nni_aio_flags {
- NNI_AIO_WAKE = 0x1,
- NNI_AIO_DONE = 0x2,
- NNI_AIO_FINI = 0x4,
- NNI_AIO_START = 0x8,
+ NNI_AIO_WAKE = 0x1,
+ NNI_AIO_DONE = 0x2,
+ NNI_AIO_FINI = 0x4,
};
// These are used for expiration.
@@ -49,7 +48,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
aio->a_cb = cb;
aio->a_cbarg = arg;
aio->a_expire = NNI_TIME_NEVER;
- aio->a_flags = NNI_AIO_START;
+ aio->a_flags = 0;
nni_taskq_ent_init(&aio->a_tqe, cb, arg);
return (0);
@@ -58,49 +57,39 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
- void (*cancelfn)(nni_aio *);
+ nni_aio_stop(aio);
- nni_mtx_lock(&aio->a_lk);
- aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
- if ((aio->a_flags & NNI_AIO_DONE) == 0) {
- aio->a_flags |= NNI_AIO_DONE;
- aio->a_result = NNG_ECANCELED;
- cancelfn = aio->a_prov_cancel;
- if (aio->a_flags & NNI_AIO_START) {
- aio->a_flags &= ~NNI_AIO_START;
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+ nni_mtx_fini(&aio->a_lk);
- } else {
- cancelfn = NULL;
+ if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
+ NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
}
- nni_cv_wake(&aio->a_cv);
+}
- while (aio->a_refcnt != 0) {
- nni_cv_wait(&aio->a_cv);
+// nni_aio_stop cancels any oustanding operation, and waits for the
+// callback to complete, if still running. It also marks the AIO as
+// stopped, preventing further calls to nni_aio_start from succeeding.
+// To correctly tear down an AIO, call stop, and make sure any other
+// calles are not also stopped, before calling nni_aio_fini to release
+// actual memory.
+void
+nni_aio_stop(nni_aio *aio)
+{
+ if ((aio->a_cb == NULL) && (aio->a_cbarg == NULL)) {
+ // Never initialized, so nothing should have happened.
+ return;
}
+ nni_mtx_lock(&aio->a_lk);
+ aio->a_flags |= NNI_AIO_FINI; // this prevents us from being scheduled
nni_mtx_unlock(&aio->a_lk);
- // Stop any timeouts. If one was in flight, we wait until it
- // completes (it could fire the completion callback.)
- nni_aio_expire_remove(aio);
-
- // Cancel the AIO if it was scheduled.
- if (cancelfn != NULL) {
- cancelfn(aio);
- }
+ nni_aio_cancel(aio, NNG_ECANCELED);
// Wait for any outstanding task to complete. We won't schedule
// new stuff because nni_aio_start will fail (due to AIO_FINI).
nni_taskq_wait(NULL, &aio->a_tqe);
-
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
- nni_mtx_fini(&aio->a_lk);
-
- if ((aio->a_naddrs != 0) && (aio->a_addrs != NULL)) {
- NNI_FREE_STRUCTS(aio->a_addrs, aio->a_naddrs);
- }
}
int
@@ -166,24 +155,23 @@ nni_aio_cancel(nni_aio *aio, int rv)
void (*cancelfn)(nni_aio *);
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// The operation already completed - so there's nothing
// left for us to do.
nni_mtx_unlock(&aio->a_lk);
return;
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = rv;
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
- // Guaraneed to just be a list operation.
- nni_aio_expire_remove(aio);
-
aio->a_refcnt++;
nni_mtx_unlock(&aio->a_lk);
+ // Guaraneed to just be a list operation.
+ nni_aio_expire_remove(aio);
+
// Stop any I/O at the provider level.
if (cancelfn != NULL) {
cancelfn(aio);
@@ -200,10 +188,7 @@ nni_aio_cancel(nni_aio *aio, int rv)
aio->a_prov_data = NULL;
aio->a_prov_cancel = NULL;
- if (!(aio->a_flags & NNI_AIO_FINI)) {
- // If we are finalizing, then we are done.
- nni_taskq_dispatch(NULL, &aio->a_tqe);
- }
+ nni_taskq_dispatch(NULL, &aio->a_tqe);
nni_mtx_unlock(&aio->a_lk);
}
@@ -213,13 +198,12 @@ int
nni_aio_finish(nni_aio *aio, int result, size_t count)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = count;
@@ -240,13 +224,12 @@ int
nni_aio_finish_pipe(nni_aio *aio, int result, void *pipe)
{
nni_mtx_lock(&aio->a_lk);
- if (aio->a_flags & (NNI_AIO_DONE | NNI_AIO_FINI)) {
+ if (aio->a_flags & NNI_AIO_DONE) {
// Operation already done (canceled or timed out?)
nni_mtx_unlock(&aio->a_lk);
return (NNG_ESTATE);
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = result;
aio->a_count = 0;
@@ -392,7 +375,6 @@ nni_aio_expire_loop(void *arg)
}
aio->a_flags |= NNI_AIO_DONE;
- aio->a_flags &= ~NNI_AIO_START;
aio->a_result = NNG_ETIMEDOUT;
cancelfn = aio->a_prov_cancel;
diff --git a/src/core/aio.h b/src/core/aio.h
index ad5b560e..c698ce20 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -69,6 +69,16 @@ extern int nni_aio_init(nni_aio *, nni_cb, void *);
// on zero'd memory.
extern void nni_aio_fini(nni_aio *);
+// nni_aio_stop cancels any unfinished I/O, running completion callbacks,
+// but also prevents any new operations from starting (nni_aio_start will
+// return NNG_ESTATE). This should be called before nni_aio_fini(). The
+// best pattern is to call nni_aio_stop on all linked aios, before calling
+// nni_aio_fini on any of them. This function will block until any
+// callbacks are executed, and therefore it should never be executed
+// from a callback itself. (To abort operations without blocking
+// use nni_aio_cancel instead.)
+extern void nni_aio_stop(nni_aio *);
+
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
diff --git a/src/core/endpt.c b/src/core/endpt.c
index ca76838a..6b474698 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -64,6 +64,10 @@ nni_ep_destroy(nni_ep *ep)
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+ nni_aio_stop(&ep->ep_backoff);
nni_aio_fini(&ep->ep_acc_aio);
nni_aio_fini(&ep->ep_con_aio);
@@ -179,6 +183,10 @@ nni_ep_reap(nni_ep *ep)
{
nni_ep_close(ep); // Extra sanity.
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+
// Take us off the sock list.
nni_sock_ep_remove(ep->ep_sock, ep);
@@ -188,11 +196,13 @@ nni_ep_reap(nni_ep *ep)
// done everything we can to wake any waiter (synchronous connect)
// gracefully.
nni_mtx_lock(&ep->ep_mtx);
- while (!nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wait(&ep->ep_cv);
- }
- while (ep->ep_refcnt != 0) {
- nni_cv_wait(&ep->ep_cv);
+ ep->ep_closed = 1;
+ for (;;) {
+ if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) {
+ nni_cv_wait(&ep->ep_cv);
+ continue;
+ }
+ break;
}
nni_mtx_unlock(&ep->ep_mtx);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 8cc21941..5b51a38b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -52,7 +52,7 @@ nni_pipe_destroy(nni_pipe *p)
if (p == NULL) {
return;
}
-
+ NNI_ASSERT(p->p_refcnt != 0xDEAD);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
@@ -60,6 +60,10 @@ nni_pipe_destroy(nni_pipe *p)
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
+ p->p_refcnt = 0xDEAD;
+
+ nni_aio_stop(&p->p_start_aio);
+ nni_aio_fini(&p->p_start_aio);
if (p->p_proto_data != NULL) {
p->p_proto_dtor(p->p_proto_data);
@@ -107,9 +111,6 @@ nni_pipe_close(nni_pipe *p)
}
p->p_reap = 1;
- // abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
-
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
@@ -117,8 +118,8 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
- // Ensure that the negotiation step is aborted fully.
- nni_aio_fini(&p->p_start_aio);
+ // abort any pending negotiation/start process.
+ nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
}
// Pipe reap is called on a taskq when the pipe should be closed. No
@@ -131,6 +132,8 @@ nni_pipe_reap(nni_pipe *p)
// Transport close...
nni_pipe_close(p);
+ nni_aio_stop(&p->p_start_aio);
+
// Remove the pipe from the socket and the endpoint. Note
// that it is in theory possible for either of these to be null
// if the pipe is being torn down before it is fully initialized.
@@ -148,6 +151,7 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
+ nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -186,25 +190,6 @@ nni_pipe_start_cb(void *arg)
}
}
-void
-nni_pipe_hold(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt++;
- nni_mtx_unlock(&p->p_mtx);
-}
-
-void
-nni_pipe_rele(nni_pipe *p)
-{
- nni_mtx_lock(&p->p_mtx);
- p->p_refcnt--;
- if (p->p_refcnt == 0) {
- nni_cv_wake(&p->p_cv);
- }
- nni_mtx_unlock(&p->p_mtx);
-}
-
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
@@ -215,7 +200,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
- tran->tran_pipe->p_fini(p);
+ tran->tran_pipe->p_fini(tdata);
return (NNG_ENOMEM);
}
@@ -232,38 +217,20 @@ nni_pipe_create(nni_ep *ep, void *tdata)
return (rv);
}
- nni_pipe_hold(p);
-
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- goto fail;
- }
-
- if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
- goto fail;
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Attempt to initialize sock protocol & endpoint.
- if ((rv = nni_ep_pipe_add(ep, p)) != 0) {
- goto fail;
- }
- if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
- goto fail;
+ if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
}
- // Start the pipe running.
- nni_pipe_start(p);
- nni_pipe_rele(p);
-
return (0);
-
-fail:
- nni_pipe_stop(p);
- nni_pipe_rele(p);
-
- return (rv);
}
int
diff --git a/src/core/socket.c b/src/core/socket.c
index 10bc1c80..23a1793a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock)
nni_objhash_unref(nni_socks, sock->s_id);
}
+static int
+nni_sock_pipe_start(nni_pipe *pipe)
+{
+ nni_sock *sock = pipe->p_sock;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
+
+ NNI_ASSERT(sock != NULL);
+ if (sock->s_closing) {
+ // We're closing, bail out.
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+ if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
+ // Protocol rejection for other reasons.
+ // E.g. pair and already have active connected partner.
+ return (rv);
+ }
+ return (0);
+}
+
+static void
+nni_sock_pipe_start_cb(void *arg)
+{
+ nni_pipe *pipe = arg;
+ nni_aio * aio = &pipe->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ // Failed I/O during start, abort everything.
+ nni_pipe_stop(pipe);
+ return;
+ }
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ return;
+ }
+}
+
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
{
int rv;
// Initialize protocol pipe data.
nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
+ nni_mtx_lock(&ep->ep_mtx);
+
+ if ((sock->s_closing) || (ep->ep_closed)) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
+ rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
rv = sock->s_pipe_ops.pipe_init(
&pipe->p_proto_data, pipe, sock->s_data);
if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&sock->s_mx);
return (rv);
}
// Save the protocol destructor.
pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
pipe->p_sock = sock;
+ pipe->p_ep = ep;
+
nni_list_append(&sock->s_pipes, pipe);
+ nni_list_append(&ep->ep_pipes, pipe);
+
+ // Start the initial negotiation I/O...
+ if (pipe->p_tran_ops.p_start == NULL) {
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ }
+ } else {
+ pipe->p_tran_ops.p_start(
+ pipe->p_tran_data, &pipe->p_start_aio);
+ }
+
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (0);
}
@@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- nni_mtx_lock(&sock->s_mx);
+ // Stop any pending negotiation.
+ nni_aio_stop(&pipe->p_start_aio);
+ nni_mtx_lock(&sock->s_mx);
if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
nni_mtx_unlock(&sock->s_mx);
return;
@@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, arrange for it to teardown hard. (Close, etc.).
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_stop(ep);
}
-
- // Wait for the pipes to be reaped (there should not be any because
- // we have already reaped the EPs.)
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_cv_wait(&sock->s_cv);
+ // For each pipe, arrange for it to teardown hard.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
}
- // Wait for the eps to be reaped.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ // We have to wait for *both* endpoints and pipes to be removed.
+ while ((!nni_list_empty(&sock->s_pipes)) ||
+ (!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index d0196eea..41dfbc33 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -87,8 +87,11 @@ extern void nni_sock_unnotify(nni_sock *, nni_notify *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// nni_sock_pipe_add adds the pipe to the socket. It is called by
-// the generic pipe creation code.
-extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
+// the generic pipe creation code. It also adds the socket to the
+// ep list, and starts the pipe. It does all these to ensure that
+// we have complete success or failure, and there is no point where
+// a pipe could wind up orphaned.
+extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
diff --git a/src/core/taskq.c b/src/core/taskq.c
index 36129bfd..64179790 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -153,7 +154,7 @@ nni_taskq_dispatch(nni_taskq *tq, nni_taskq_ent *ent)
return (NNG_ECLOSED);
}
// It might already be scheduled... if so don't redo it.
- if (ent->tqe_tq == NULL) {
+ if (!nni_list_active(&tq->tq_ents, ent)) {
ent->tqe_tq = tq;
nni_list_append(&tq->tq_ents, ent);
}
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index b89af982..7a91b4ec 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -195,6 +195,7 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
{
nni_aio * aio;
struct sockaddr_un *sun;
+ int fd;
ed->closed = 1;
while ((aio = nni_list_first(&ed->acceptq)) != NULL) {
@@ -204,14 +205,14 @@ nni_posix_epdesc_doclose(nni_posix_epdesc *ed)
nni_posix_epdesc_finish(aio, NNG_ECLOSED, 0);
}
- if (ed->node.fd != -1) {
- (void) shutdown(ed->node.fd, SHUT_RDWR);
+ if ((fd = ed->node.fd) != -1) {
+ ed->node.fd = -1;
+ (void) shutdown(fd, SHUT_RDWR);
+ (void) close(fd);
sun = (void *) &ed->locaddr;
if ((sun->sun_family == AF_UNIX) && (ed->loclen != 0)) {
(void) unlink(sun->sun_path);
}
- (void) close(ed->node.fd);
- ed->node.fd = -1;
}
}
@@ -511,9 +512,13 @@ nni_posix_epdesc_set_remote(nni_posix_epdesc *ed, void *sa, int len)
void
nni_posix_epdesc_fini(nni_posix_epdesc *ed)
{
- if (ed->node.fd >= 0) {
+ int fd;
+ nni_mtx_lock(&ed->mtx);
+ if ((fd = ed->node.fd) != -1) {
(void) close(ed->node.fd);
+ nni_posix_epdesc_doclose(ed);
}
+ nni_mtx_unlock(&ed->mtx);
nni_posix_pollq_remove(&ed->node);
nni_mtx_fini(&ed->mtx);
NNI_FREE_STRUCT(ed);
diff --git a/src/platform/posix/posix_pipedesc.c b/src/platform/posix/posix_pipedesc.c
index 4e61c2c4..b85e79a9 100644
--- a/src/platform/posix/posix_pipedesc.c
+++ b/src/platform/posix/posix_pipedesc.c
@@ -45,18 +45,21 @@ static void
nni_posix_pipedesc_doclose(nni_posix_pipedesc *pd)
{
nni_aio *aio;
+ int fd;
pd->closed = 1;
- if (pd->node.fd != -1) {
- // Let any peer know we are closing.
- (void) shutdown(pd->node.fd, SHUT_RDWR);
- }
while ((aio = nni_list_first(&pd->readq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
while ((aio = nni_list_first(&pd->writeq)) != NULL) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
}
+ if ((fd = pd->node.fd) != -1) {
+ // Let any peer know we are closing.
+ pd->node.fd = -1;
+ (void) shutdown(fd, SHUT_RDWR);
+ (void) close(fd);
+ }
}
static void
@@ -269,7 +272,7 @@ nni_posix_pipedesc_send(nni_posix_pipedesc *pd, nni_aio *aio)
nni_mtx_unlock(&pd->mtx);
return;
}
- if (pd->closed < 0) {
+ if (pd->closed) {
nni_posix_pipedesc_finish(aio, NNG_ECLOSED);
nni_mtx_unlock(&pd->mtx);
return;
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 45bbfcd3..071e6007 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -329,6 +329,7 @@ nni_plat_fini(void)
{
pthread_mutex_lock(&nni_plat_lock);
if (nni_plat_inited) {
+ nni_posix_resolv_sysfini();
nni_posix_pollq_sysfini();
pthread_mutexattr_destroy(&nni_mxattr);
pthread_condattr_destroy(&nni_cvattr);
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 17ef03bb..3070b90d 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -60,6 +60,7 @@ nni_bus_sock_fini(void *arg)
nni_bus_sock *psock = arg;
if (psock != NULL) {
+ nni_aio_stop(&psock->aio_getq);
nni_aio_fini(&psock->aio_getq);
nni_mtx_fini(&psock->mtx);
NNI_FREE_STRUCT(psock);
@@ -107,15 +108,13 @@ nni_bus_pipe_fini(void *arg)
{
nni_bus_pipe *ppipe = arg;
- if (ppipe != NULL) {
- nni_mtx_fini(&ppipe->mtx);
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- NNI_FREE_STRUCT(ppipe);
- }
+ nni_aio_fini(&ppipe->aio_getq);
+ nni_aio_fini(&ppipe->aio_send);
+ nni_aio_fini(&ppipe->aio_recv);
+ nni_aio_fini(&ppipe->aio_putq);
+ nni_msgq_fini(ppipe->sendq);
+ nni_mtx_fini(&ppipe->mtx);
+ NNI_FREE_STRUCT(ppipe);
}
static int
@@ -183,10 +182,10 @@ nni_bus_pipe_stop(void *arg)
nni_msgq_close(ppipe->sendq);
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECLOSED);
- nni_aio_cancel(&ppipe->aio_send, NNG_ECLOSED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECLOSED);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECLOSED);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
nni_mtx_lock(&ppipe->psock->mtx);
if (nni_list_active(&psock->pipes, ppipe)) {
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index f5fec540..55ce5aa9 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -122,7 +122,6 @@ static void
nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *ppipe = arg;
-
nni_aio_fini(&ppipe->aio_send);
nni_aio_fini(&ppipe->aio_recv);
nni_aio_fini(&ppipe->aio_putq);
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 39e809e6..cde79824 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -90,11 +90,9 @@ nni_pull_pipe_fini(void *arg)
{
nni_pull_pipe *pp = arg;
- if (pp != NULL) {
- nni_aio_fini(&pp->putq_aio);
- nni_aio_fini(&pp->recv_aio);
- NNI_FREE_STRUCT(pp);
- }
+ nni_aio_fini(&pp->putq_aio);
+ nni_aio_fini(&pp->recv_aio);
+ NNI_FREE_STRUCT(pp);
}
static int
@@ -113,8 +111,8 @@ nni_pull_pipe_stop(void *arg)
{
nni_pull_pipe *pp = arg;
- nni_aio_cancel(&pp->putq_aio, NNG_ECANCELED);
- nni_aio_cancel(&pp->recv_aio, NNG_ECANCELED);
+ nni_aio_stop(&pp->putq_aio);
+ nni_aio_stop(&pp->recv_aio);
}
static void
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 43c0feaf..b7d4322c 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -132,12 +132,11 @@ nni_push_pipe_start(void *arg)
static void
nni_push_pipe_stop(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ nni_push_pipe *pp = arg;
- nni_aio_cancel(&pp->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_getq, NNG_ECANCELED);
+ nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_getq);
}
static void
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 316cbf50..e32f179a 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -83,6 +83,7 @@ nni_pub_sock_fini(void *arg)
{
nni_pub_sock *pub = arg;
+ nni_aio_stop(&pub->aio_getq);
nni_aio_fini(&pub->aio_getq);
nni_mtx_fini(&pub->mtx);
NNI_FREE_STRUCT(pub);
@@ -100,7 +101,6 @@ static void
nni_pub_pipe_fini(void *arg)
{
nni_pub_pipe *pp = arg;
-
nni_aio_fini(&pp->aio_getq);
nni_aio_fini(&pp->aio_send);
nni_aio_fini(&pp->aio_recv);
@@ -172,9 +172,10 @@ nni_pub_pipe_stop(void *arg)
nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
- nni_aio_cancel(&pp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&pp->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(&pp->aio_send);
+ nni_aio_stop(&pp->aio_recv);
+
nni_msgq_close(pp->sendq);
nni_mtx_lock(&pub->mtx);
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 36a42c49..03d76e2d 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -123,8 +123,8 @@ nni_sub_pipe_stop(void *arg)
{
nni_sub_pipe *sp = arg;
- nni_aio_cancel(&sp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&sp->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&sp->aio_putq);
+ nni_aio_stop(&sp->aio_recv);
}
static void
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 013b02fb..049b1422 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -56,6 +56,7 @@ nni_rep_sock_fini(void *arg)
{
nni_rep_sock *rep = arg;
+ nni_aio_stop(&rep->aio_getq);
nni_aio_fini(&rep->aio_getq);
nni_idhash_fini(rep->pipes);
if (rep->btrace != NULL) {
@@ -192,10 +193,10 @@ nni_rep_pipe_stop(void *arg)
nni_rep_sock *rep = rp->rep;
nni_msgq_close(rp->sendq);
- nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_send);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_putq);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f13094ff..8e7056f5 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -34,6 +34,7 @@ struct nni_req_sock {
nni_time resend;
int raw;
int wantw;
+ int closed;
nni_msg * reqmsg;
nni_req_pipe *pendpipe;
@@ -46,6 +47,7 @@ struct nni_req_sock {
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
nni_mtx mtx;
+ nni_cv cv;
};
// An nni_req_pipe is our per-pipe protocol private structure.
@@ -81,6 +83,10 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
NNI_FREE_STRUCT(req);
return (rv);
}
+ if ((rv = nni_cv_init(&req->cv, &req->mtx)) != 0) {
+ nni_mtx_fini(&req->mtx);
+ NNI_FREE_STRUCT(req);
+ }
NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
@@ -108,6 +114,10 @@ nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx_lock(&req->mtx);
+ req->closed = 1;
+ nni_mtx_unlock(&req->mtx);
+
nni_timer_cancel(&req->timer);
}
@@ -117,10 +127,15 @@ nni_req_sock_fini(void *arg)
nni_req_sock *req = arg;
nni_mtx_lock(&req->mtx);
+ while ((!nni_list_empty(&req->readypipes)) ||
+ (!nni_list_empty(&req->busypipes))) {
+ nni_cv_wait(&req->cv);
+ }
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
nni_mtx_unlock(&req->mtx);
+ nni_cv_fini(&req->cv);
nni_mtx_fini(&req->mtx);
NNI_FREE_STRUCT(req);
}
@@ -171,15 +186,13 @@ nni_req_pipe_fini(void *arg)
{
nni_req_pipe *rp = arg;
- if (rp != NULL) {
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_putq);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_sendcooked);
- nni_aio_fini(&rp->aio_sendraw);
- nni_mtx_fini(&rp->mtx);
- NNI_FREE_STRUCT(rp);
- }
+ nni_aio_fini(&rp->aio_getq);
+ nni_aio_fini(&rp->aio_putq);
+ nni_aio_fini(&rp->aio_recv);
+ nni_aio_fini(&rp->aio_sendcooked);
+ nni_aio_fini(&rp->aio_sendraw);
+ nni_mtx_fini(&rp->mtx);
+ NNI_FREE_STRUCT(rp);
}
static int
@@ -193,6 +206,10 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_lock(&req->mtx);
+ if (req->closed) {
+ nni_mtx_unlock(&req->mtx);
+ return (NNG_ECLOSED);
+ }
nni_list_append(&req->readypipes, rp);
if (req->wantw) {
nni_req_resend(req);
@@ -210,11 +227,11 @@ nni_req_pipe_stop(void *arg)
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
- nni_aio_cancel(&rp->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendcooked, NNG_ECANCELED);
- nni_aio_cancel(&rp->aio_sendraw, NNG_ECANCELED);
+ nni_aio_stop(&rp->aio_getq);
+ nni_aio_stop(&rp->aio_putq);
+ nni_aio_stop(&rp->aio_recv);
+ nni_aio_stop(&rp->aio_sendcooked);
+ nni_aio_stop(&rp->aio_sendraw);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
@@ -222,8 +239,11 @@ nni_req_pipe_stop(void *arg)
nni_mtx_lock(&req->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
- if (nni_list_active(&req->readypipes, rp)) {
- nni_list_remove(&req->readypipes, rp);
+ if (nni_list_node_active(&rp->node)) {
+ nni_list_node_remove(&rp->node);
+ if (req->closed) {
+ nni_cv_wake(&req->cv);
+ }
}
if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
@@ -443,10 +463,15 @@ nni_req_resend(nni_req_sock *req)
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
// requests.
- if (req->reqmsg == NULL) {
+ if ((msg = req->reqmsg) == NULL) {
return;
}
+ if (req->closed) {
+ req->reqmsg = NULL;
+ nni_msg_free(msg);
+ }
+
if (req->wantw) {
req->wantw = 0;
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 4a4c8741..089e730e 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -38,6 +38,7 @@ struct nni_resp_sock {
char * btrace;
size_t btrace_len;
nni_aio aio_getq;
+ nni_mtx mtx;
};
// An nni_resp_pipe is our per-pipe protocol private structure.
@@ -57,14 +58,14 @@ nni_resp_sock_fini(void *arg)
{
nni_resp_sock *psock = arg;
- if (psock != NULL) {
- nni_aio_fini(&psock->aio_getq);
- nni_idhash_fini(psock->pipes);
- if (psock->btrace != NULL) {
- nni_free(psock->btrace, psock->btrace_len);
- }
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(&psock->aio_getq);
+ nni_aio_fini(&psock->aio_getq);
+ nni_idhash_fini(psock->pipes);
+ if (psock->btrace != NULL) {
+ nni_free(psock->btrace, psock->btrace_len);
}
+ nni_mtx_fini(&psock->mtx);
+ NNI_FREE_STRUCT(psock);
}
static int
@@ -83,6 +84,10 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
psock->btrace_len = 0;
psock->urq = nni_sock_recvq(nsock);
psock->uwq = nni_sock_sendq(nsock);
+
+ if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
+ goto fail;
+ }
if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
goto fail;
}
@@ -177,7 +182,9 @@ nni_resp_pipe_start(void *arg)
ppipe->id = nni_pipe_id(ppipe->npipe);
+ nni_mtx_lock(&psock->mtx);
rv = nni_idhash_insert(psock->pipes, ppipe->id, ppipe);
+ nni_mtx_unlock(&psock->mtx);
if (rv != 0) {
return (rv);
}
@@ -195,13 +202,15 @@ nni_resp_pipe_stop(void *arg)
nni_resp_sock *psock = ppipe->psock;
nni_msgq_close(ppipe->sendq);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED);
+ nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
if (ppipe->id != 0) {
+ nni_mtx_lock(&psock->mtx);
nni_idhash_remove(psock->pipes, ppipe->id);
+ nni_mtx_unlock(&psock->mtx);
ppipe->id = 0;
}
}
@@ -239,6 +248,7 @@ nni_resp_sock_getq_cb(void *arg)
NNI_GET32(header, id);
nni_msg_trim_header(msg, 4);
+ nni_mtx_lock(&psock->mtx);
rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe);
if (rv != 0) {
@@ -250,6 +260,7 @@ nni_resp_sock_getq_cb(void *arg)
nni_msg_free(msg);
}
}
+ nni_mtx_unlock(&psock->mtx);
}
void
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 85657f57..633e1491 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -60,6 +60,7 @@ nni_surv_sock_fini(void *arg)
{
nni_surv_sock *psock = arg;
+ nni_aio_stop(&psock->aio_getq);
nni_aio_fini(&psock->aio_getq);
nni_mtx_fini(&psock->mtx);
NNI_FREE_STRUCT(psock);
@@ -191,10 +192,11 @@ nni_surv_pipe_stop(void *arg)
nni_surv_pipe *ppipe = arg;
nni_surv_sock *psock = ppipe->psock;
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED);
+ nni_aio_stop(&ppipe->aio_getq);
+ nni_aio_stop(&ppipe->aio_send);
+ nni_aio_stop(&ppipe->aio_recv);
+ nni_aio_stop(&ppipe->aio_putq);
+
nni_msgq_close(ppipe->sendq);
nni_mtx_lock(&psock->mtx);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 65266ccc..b2b9c438 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -86,6 +86,10 @@ nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
+ nni_aio_stop(&pipe->rxaio);
+ nni_aio_stop(&pipe->txaio);
+ nni_aio_stop(&pipe->negaio);
+
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
@@ -462,6 +466,7 @@ nni_ipc_ep_fini(void *arg)
{
nni_ipc_ep *ep = arg;
+ nni_aio_stop(&ep->aio);
nni_plat_ipc_ep_fini(ep->iep);
nni_aio_fini(&ep->aio);
nni_mtx_fini(&ep->mtx);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 13d5716a..1bd42cec 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -86,6 +86,10 @@ nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *pipe = arg;
+ nni_aio_stop(&pipe->rxaio);
+ nni_aio_stop(&pipe->txaio);
+ nni_aio_stop(&pipe->negaio);
+
nni_aio_fini(&pipe->rxaio);
nni_aio_fini(&pipe->txaio);
nni_aio_fini(&pipe->negaio);
@@ -530,6 +534,7 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
+ nni_aio_stop(&ep->aio);
if (ep->tep != NULL) {
nni_plat_tcp_ep_fini(ep->tep);
}