aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pubsub0/sub.c30
1 files changed, 11 insertions, 19 deletions
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b4fa2e2a..a7d6b9f5 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -31,7 +31,6 @@ typedef struct sub0_sock sub0_sock;
typedef struct sub0_topic sub0_topic;
static void sub0_recv_cb(void *);
-static void sub0_putq_cb(void *);
static void sub0_pipe_fini(void *);
struct sub0_topic {
@@ -52,7 +51,6 @@ struct sub0_pipe {
nni_pipe * pipe;
sub0_sock *sub;
nni_aio * aio_recv;
- nni_aio * aio_putq;
};
static int
@@ -103,7 +101,6 @@ sub0_pipe_stop(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
}
@@ -112,7 +109,6 @@ sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
NNI_FREE_STRUCT(p);
}
@@ -126,8 +122,7 @@ sub0_pipe_init(void **pp, nni_pipe *pipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_aio_init(&p->aio_putq, sub0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0)) {
+ if ((rv = nni_aio_init(&p->aio_recv, sub0_recv_cb, p)) != 0) {
sub0_pipe_fini(p);
return (rv);
}
@@ -157,7 +152,6 @@ sub0_pipe_close(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_close(p->aio_putq);
nni_aio_close(p->aio_recv);
}
@@ -177,22 +171,20 @@ sub0_recv_cb(void *arg)
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(urq, p->aio_putq);
-}
-static void
-sub0_putq_cb(void *arg)
-{
- sub0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
+ switch (nni_msgq_tryput(urq, msg)) {
+ case 0:
+ break;
+ case NNG_EAGAIN:
+ nni_msg_free(msg);
+ break;
+ default:
+ // Any other error we stop the pipe for. It's probably
+ // NNG_ECLOSED anyway.
+ nng_msg_free(msg);
nni_pipe_stop(p->pipe);
return;
}
-
nni_pipe_recv(p->pipe, p->aio_recv);
}