aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pubsub0/pub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pubsub0/pub.c')
-rw-r--r--src/protocol/pubsub0/pub.c32
1 files changed, 9 insertions, 23 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 9b995c33..c6959148 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -9,6 +9,7 @@
//
#include <stdlib.h>
+#include <stdio.h>
#include <string.h>
#include "core/nng_impl.h"
@@ -90,11 +91,7 @@ pub0_sock_open(void *arg)
static void
pub0_sock_close(void *arg)
{
- pub0_sock *sock = arg;
-
- nni_mtx_lock(&sock->mtx);
- sock->closed = true;
- nni_mtx_unlock(&sock->mtx);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -186,15 +183,12 @@ pub0_pipe_recv_cb(void *arg)
{
pub0_pipe *p = arg;
- if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_close(p->pipe);
- return;
+ // We should never receive a message -- the only valid reason for us to
+ // be here is on pipe close.
+ if (nni_aio_result(p->aio_recv) == 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
}
-
- // We should never get any messages. If we do we just dicard them.
- nni_msg_free(nni_aio_get_msg(p->aio_recv));
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_close(p->pipe);
}
static void
@@ -212,7 +206,7 @@ pub0_pipe_send_cb(void *arg)
}
nni_mtx_lock(&sock->mtx);
- if (sock->closed || p->closed) {
+ if (p->closed) {
nni_mtx_unlock(&sock->mtx);
return;
}
@@ -246,15 +240,7 @@ pub0_sock_send(void *arg, nni_aio *aio)
msg = nni_aio_get_msg(aio);
len = nni_msg_len(msg);
nni_mtx_lock(&sock->mtx);
- if (sock->closed) {
- nni_mtx_unlock(&sock->mtx);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- return;
- }
NNI_LIST_FOREACH (&sock->pipes, p) {
- if (p->closed) {
- continue;
- }
if (p == nni_list_last(&sock->pipes)) {
dup = msg;
msg = NULL;
@@ -319,7 +305,7 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
// stop short. The others would likely fail for ENOMEM as
// well anyway. There is a weird effect here where the
// buffers may have been set for *some* of the pipes, but
- // we have no way to correct, or even report, partial failure.
+ // we have no way to correct partial failure.
if ((rv = nni_lmq_resize(&p->sendq, (size_t) val)) != 0) {
break;
}