diff options
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 23 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 16 |
2 files changed, 30 insertions, 9 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 45f4b7d9..4db48754 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -61,7 +61,6 @@ pub0_sock_fini(void *arg) { pub0_sock *s = arg; - nni_aio_stop(s->aio_getq); nni_aio_fini(s->aio_getq); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); @@ -103,13 +102,24 @@ pub0_sock_close(void *arg) { pub0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_aio_close(s->aio_getq); +} + +static void +pub0_pipe_stop(void *arg) +{ + pub0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); } static void pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; + nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -164,14 +174,14 @@ pub0_pipe_start(void *arg) } static void -pub0_pipe_stop(void *arg) +pub0_pipe_close(void *arg) { pub0_pipe *p = arg; pub0_sock *s = p->pub; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_close(p->aio_getq); + nni_aio_close(p->aio_send); + nni_aio_close(p->aio_recv); nni_msgq_close(p->sendq); @@ -290,6 +300,7 @@ static nni_proto_pipe_ops pub0_pipe_ops = { .pipe_init = pub0_pipe_init, .pipe_fini = pub0_pipe_fini, .pipe_start = pub0_pipe_start, + .pipe_close = pub0_pipe_close, .pipe_stop = pub0_pipe_stop, }; diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index b41b33ea..c244b0ad 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -99,6 +99,15 @@ sub0_sock_close(void *arg) } static void +sub0_pipe_stop(void *arg) +{ + sub0_pipe *p = arg; + + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); +} + +static void sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; @@ -139,12 +148,12 @@ sub0_pipe_start(void *arg) } static void -sub0_pipe_stop(void *arg) +sub0_pipe_close(void *arg) { sub0_pipe *p = arg; - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_recv); + nni_aio_close(p->aio_putq); + nni_aio_close(p->aio_recv); } static void @@ -338,6 +347,7 @@ static nni_proto_pipe_ops sub0_pipe_ops = { .pipe_init = sub0_pipe_init, .pipe_fini = sub0_pipe_fini, .pipe_start = sub0_pipe_start, + .pipe_close = sub0_pipe_close, .pipe_stop = sub0_pipe_stop, }; |
