aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c93
1 files changed, 78 insertions, 15 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 10bc1c80..23a1793a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,27 +69,94 @@ nni_sock_rele(nni_sock *sock)
nni_objhash_unref(nni_socks, sock->s_id);
}
+static int
+nni_sock_pipe_start(nni_pipe *pipe)
+{
+ nni_sock *sock = pipe->p_sock;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
+
+ NNI_ASSERT(sock != NULL);
+ if (sock->s_closing) {
+ // We're closing, bail out.
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ // Peer protocol mismatch.
+ return (NNG_EPROTO);
+ }
+ if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
+ // Protocol rejection for other reasons.
+ // E.g. pair and already have active connected partner.
+ return (rv);
+ }
+ return (0);
+}
+
+static void
+nni_sock_pipe_start_cb(void *arg)
+{
+ nni_pipe *pipe = arg;
+ nni_aio * aio = &pipe->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ // Failed I/O during start, abort everything.
+ nni_pipe_stop(pipe);
+ return;
+ }
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ return;
+ }
+}
+
int
-nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
{
int rv;
// Initialize protocol pipe data.
nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
+ nni_mtx_lock(&ep->ep_mtx);
+
+ if ((sock->s_closing) || (ep->ep_closed)) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
+ rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
+ if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
rv = sock->s_pipe_ops.pipe_init(
&pipe->p_proto_data, pipe, sock->s_data);
if (rv != 0) {
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_lock(&sock->s_mx);
return (rv);
}
// Save the protocol destructor.
pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
pipe->p_sock = sock;
+ pipe->p_ep = ep;
+
nni_list_append(&sock->s_pipes, pipe);
+ nni_list_append(&ep->ep_pipes, pipe);
+
+ // Start the initial negotiation I/O...
+ if (pipe->p_tran_ops.p_start == NULL) {
+ if (nni_sock_pipe_start(pipe) != 0) {
+ nni_pipe_stop(pipe);
+ }
+ } else {
+ pipe->p_tran_ops.p_start(
+ pipe->p_tran_data, &pipe->p_start_aio);
+ }
+
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
return (0);
}
@@ -128,8 +195,10 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- nni_mtx_lock(&sock->s_mx);
+ // Stop any pending negotiation.
+ nni_aio_stop(&pipe->p_start_aio);
+ nni_mtx_lock(&sock->s_mx);
if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
nni_mtx_unlock(&sock->s_mx);
return;
@@ -508,24 +577,18 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, arrange for it to teardown hard. (Close, etc.).
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_stop(ep);
}
-
- // Wait for the pipes to be reaped (there should not be any because
- // we have already reaped the EPs.)
- while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- nni_cv_wait(&sock->s_cv);
+ // For each pipe, arrange for it to teardown hard.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_stop(pipe);
}
- // Wait for the eps to be reaped.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ // We have to wait for *both* endpoints and pipes to be removed.
+ while ((!nni_list_empty(&sock->s_pipes)) ||
+ (!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
}