aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
committerGarrett D'Amore <garrett@damore.org>2017-07-20 14:34:51 -0700
commita37093079b492e966344416445aae354b147d30e (patch)
tree2f21fc2bc716f2423ba02f4713b25038c429ec4e /src/core/socket.c
parent88fb04f61918b06e6e269c1960058c3df5e0a0ef (diff)
downloadnng-a37093079b492e966344416445aae354b147d30e.tar.gz
nng-a37093079b492e966344416445aae354b147d30e.tar.bz2
nng-a37093079b492e966344416445aae354b147d30e.zip
Yet more race condition fixes.
We need to remember that protocol stops can run synchronously, and therefore we need to wait for the aio to complete. Further, we need to break apart shutting down aio activity from deallocation, as we need to shut down *all* async activity before deallocating *anything*. Noticed that we had a pipe race in the surveyor pattern too.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c93
1 files changed, 78 insertions, 15 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 10bc1c80..23a1793a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock)
nni_objhash_unref(nni_socks, sock->s_id);
}
+static int
+nni_sock_pipe_start(nni_pipe *pipe)
+{
+ nni_sock *sock = pipe->p_sock;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
+
+ NNI_ASSERT(sock != NULL);
+ if (sock->s_closing) {
+ // We're closing, bail out.
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+ if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
+ // Protocol rejection for other reasons.
+ // E.g. pair and already have active connected partner.
+ return (rv);
+ }
+ return (0);
+}
+
+static void
+nni_sock_pipe_start_cb(void *arg)
+{
+ nni_pipe *pipe = arg;
+ nni_aio * aio = &pipe->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ // Failed I/O during start, abort everything.
+ nni_pipe_stop(pipe);
+ return;
+ }
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ return;
+ }
+}
+
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
{
int rv;
// Initialize protocol pipe data.
nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
+ nni_mtx_lock(&ep->ep_mtx);
+
+ if ((sock->s_closing) || (ep->ep_closed)) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
+ rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
rv = sock->s_pipe_ops.pipe_init(
&pipe->p_proto_data, pipe, sock->s_data);
if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&sock->s_mx);
return (rv);
}
// Save the protocol destructor.
pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
pipe->p_sock = sock;
+ pipe->p_ep = ep;
+
nni_list_append(&sock->s_pipes, pipe);
+ nni_list_append(&ep->ep_pipes, pipe);
+
+ // Start the initial negotiation I/O...
+ if (pipe->p_tran_ops.p_start == NULL) {
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ }
+ } else {
+ pipe->p_tran_ops.p_start(
+ pipe->p_tran_data, &pipe->p_start_aio);
+ }
+
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (0);
}
@@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- nni_mtx_lock(&sock->s_mx);
+ // Stop any pending negotiation.
+ nni_aio_stop(&pipe->p_start_aio);
+ nni_mtx_lock(&sock->s_mx);
if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
nni_mtx_unlock(&sock->s_mx);
return;
@@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, arrange for it to teardown hard. (Close, etc.).
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_stop(ep);
}
-
- // Wait for the pipes to be reaped (there should not be any because
- // we have already reaped the EPs.)
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_cv_wait(&sock->s_cv);
+ // For each pipe, arrange for it to teardown hard.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
}
- // Wait for the eps to be reaped.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ // We have to wait for *both* endpoints and pipes to be removed.
+ while ((!nni_list_empty(&sock->s_pipes)) ||
+ (!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
}