aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-08-06 19:01:12 +0300
committerGarrett D'Amore <garrett@damore.org>2018-08-06 19:01:12 +0300
commitae944a8de32c107eea9427104e153c25e4a681f1 (patch)
tree7029f7668fe3e1a9899da57bf6c1e60e0394bacb /src/core/pipe.c
parentd7f7c896c0ede24249ef63b1e45b1878bf4bd473 (diff)
downloadnng-ae944a8de32c107eea9427104e153c25e4a681f1.tar.gz
nng-ae944a8de32c107eea9427104e153c25e4a681f1.tar.bz2
nng-ae944a8de32c107eea9427104e153c25e4a681f1.zip
Revert "fixes #599 nng_dial sync should not return until added to socket"
This changeset needs work. We are seeing errors described by This reverts commit d7f7c896c0ede24249ef63b1e45b1878bf4bd473.
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c67
1 files changed, 62 insertions, 5 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 374c45c8..4f50ac7c 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -75,6 +75,9 @@ pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
+ // Wait for neg callbacks to finish. (Already closed).
+ nni_aio_stop(p->p_start_aio);
+
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_stop(p->p_proto_data);
}
@@ -90,6 +93,7 @@ pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
+ nni_aio_fini(p->p_start_aio);
nni_cv_fini(&p->p_cv);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
@@ -150,6 +154,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
+ // abort any pending negotiation/start process.
+ nni_aio_close(p->p_start_aio);
+
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -171,12 +178,49 @@ nni_pipe_close(nni_pipe *p)
nni_reap(&p->p_reap, (nni_cb) pipe_destroy, p);
}
+bool
+nni_pipe_closed(nni_pipe *p)
+{
+ bool rv;
+ nni_mtx_lock(&p->p_mtx);
+ rv = p->p_closed;
+ nni_mtx_unlock(&p->p_mtx);
+ return (rv);
+}
+
uint16_t
nni_pipe_peer(nni_pipe *p)
{
return (p->p_tran_ops.p_peer(p->p_tran_data));
}
+static void
+nni_pipe_start_cb(void *arg)
+{
+ nni_pipe *p = arg;
+ nni_sock *s = p->p_sock;
+ nni_aio * aio = p->p_start_aio;
+
+ if (nni_aio_result(aio) != 0) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
+ if (nni_pipe_closed(p)) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
+ nni_sock_closing(s)) {
+ nni_pipe_close(p);
+ return;
+ }
+
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
+}
+
int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
@@ -184,7 +228,6 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
int rv;
void * sdata = nni_sock_proto_data(sock);
nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
- uint64_t id;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -193,6 +236,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
}
// Make a private copy of the transport ops.
+ p->p_start_aio = NULL;
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_ops = *pops;
@@ -209,11 +253,14 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
- nni_mtx_lock(&nni_pipe_lk);
- if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
- p->p_id = (uint32_t) id;
+ if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
+ uint64_t id;
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_alloc(nni_pipes, &id, p)) == 0) {
+ p->p_id = (uint32_t) id;
+ }
+ nni_mtx_unlock(&nni_pipe_lk);
}
- nni_mtx_unlock(&nni_pipe_lk);
if ((rv != 0) ||
((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0)) {
@@ -249,6 +296,16 @@ nni_pipe_getopt(
return (NNG_ENOTSUP);
}
+void
+nni_pipe_start(nni_pipe *p)
+{
+ if (p->p_tran_ops.p_start == NULL) {
+ nni_aio_finish(p->p_start_aio, 0, 0);
+ } else {
+ p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
+ }
+}
+
void *
nni_pipe_get_proto_data(nni_pipe *p)
{