aboutsummaryrefslogtreecommitdiff
path: root/src/sp/protocol/pubsub0/pub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp/protocol/pubsub0/pub.c')
-rw-r--r--src/sp/protocol/pubsub0/pub.c86
1 files changed, 40 insertions, 46 deletions
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c
index cfc3ed6d..3911127a 100644
--- a/src/sp/protocol/pubsub0/pub.c
+++ b/src/sp/protocol/pubsub0/pub.c
@@ -36,22 +36,22 @@ static void pub0_pipe_fini(void *);
// pub0_sock is our per-socket protocol private structure.
struct pub0_sock {
- nni_list pipes;
- nni_mtx mtx;
- bool closed;
- size_t sendbuf;
- nni_pollable sendable;
+ nni_list pipes;
+ nni_mtx mtx;
+ bool closed;
+ size_t sendbuf;
+ nni_pollable sendable;
};
// pub0_pipe is our per-pipe protocol private structure.
struct pub0_pipe {
- nni_pipe * pipe;
- pub0_sock * pub;
+ nni_pipe *pipe;
+ pub0_sock *pub;
nni_lmq sendq;
bool closed;
bool busy;
- nni_aio * aio_send;
- nni_aio * aio_recv;
+ nni_aio aio_send;
+ nni_aio aio_recv;
nni_list_node node;
};
@@ -65,10 +65,10 @@ pub0_sock_fini(void *arg)
}
static int
-pub0_sock_init(void *arg, nni_sock *nsock)
+pub0_sock_init(void *arg, nni_sock *ns)
{
pub0_sock *sock = arg;
- NNI_ARG_UNUSED(nsock);
+ NNI_ARG_UNUSED(ns);
nni_pollable_init(&sock->sendable);
nni_mtx_init(&sock->mtx);
@@ -94,8 +94,8 @@ pub0_pipe_stop(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_stop(&p->aio_send);
+ nni_aio_stop(&p->aio_recv);
}
static void
@@ -103,8 +103,8 @@ pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
- nni_aio_free(p->aio_send);
- nni_aio_free(p->aio_recv);
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
nni_lmq_fini(&p->sendq);
}
@@ -113,21 +113,15 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s)
{
pub0_pipe *p = arg;
pub0_sock *sock = s;
- int rv;
size_t len;
nni_mtx_lock(&sock->mtx);
len = sock->sendbuf;
nni_mtx_unlock(&sock->mtx);
- // XXX: consider making this depth tunable
- if (((rv = nni_lmq_init(&p->sendq, len)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) {
-
- pub0_pipe_fini(p);
- return (rv);
- }
+ nni_lmq_init(&p->sendq, len);
+ nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p);
p->busy = false;
p->pipe = pipe;
@@ -149,7 +143,7 @@ pub0_pipe_start(void *arg)
nni_mtx_unlock(&sock->mtx);
// Start the receiver.
- nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -160,8 +154,8 @@ pub0_pipe_close(void *arg)
pub0_pipe *p = arg;
pub0_sock *sock = p->pub;
- nni_aio_close(p->aio_send);
- nni_aio_close(p->aio_recv);
+ nni_aio_close(&p->aio_send);
+ nni_aio_close(&p->aio_recv);
nni_mtx_lock(&sock->mtx);
p->closed = true;
@@ -180,8 +174,8 @@ pub0_pipe_recv_cb(void *arg)
// We should never receive a message -- the only valid reason for us to
// be here is on pipe close.
- if (nni_aio_result(p->aio_recv) == 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ if (nni_aio_result(&p->aio_recv) == 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_recv));
}
nni_pipe_close(p->pipe);
}
@@ -191,11 +185,11 @@ pub0_pipe_send_cb(void *arg)
{
pub0_pipe *p = arg;
pub0_sock *sock = p->pub;
- nni_msg * msg;
+ nni_msg *msg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(&p->aio_send));
+ nni_aio_set_msg(&p->aio_send, NULL);
nni_pipe_close(p->pipe);
return;
}
@@ -205,9 +199,9 @@ pub0_pipe_send_cb(void *arg)
nni_mtx_unlock(&sock->mtx);
return;
}
- if (nni_lmq_getq(&p->sendq, &msg) == 0) {
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ if (nni_lmq_get(&p->sendq, &msg) == 0) {
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
} else {
p->busy = false;
}
@@ -228,7 +222,7 @@ pub0_sock_send(void *arg, nni_aio *aio)
{
pub0_sock *sock = arg;
pub0_pipe *p;
- nng_msg * msg;
+ nng_msg *msg;
size_t len;
msg = nni_aio_get_msg(aio);
@@ -241,14 +235,14 @@ pub0_sock_send(void *arg, nni_aio *aio)
if (nni_lmq_full(&p->sendq)) {
// Make space for the new message.
nni_msg *old;
- (void) nni_lmq_getq(&p->sendq, &old);
+ (void) nni_lmq_get(&p->sendq, &old);
nni_msg_free(old);
}
- nni_lmq_putq(&p->sendq, msg);
+ nni_lmq_put(&p->sendq, msg);
} else {
p->busy = true;
- nni_aio_set_msg(p->aio_send, msg);
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
}
}
nni_mtx_unlock(&sock->mtx);
@@ -289,7 +283,7 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t)
nni_mtx_lock(&sock->mtx);
sock->sendbuf = (size_t) val;
NNI_LIST_FOREACH (&sock->pipes, p) {
- // If we fail part way thru (should only be ENOMEM), we
+ // If we fail part way through (should only be ENOMEM), we
// stop short. The others would likely fail for ENOMEM as
// well anyway. There is a weird effect here where the
// buffers may have been set for *some* of the pipes, but
@@ -368,13 +362,13 @@ static nni_proto pub0_proto_raw = {
};
int
-nng_pub0_open(nng_socket *sidp)
+nng_pub0_open(nng_socket *id)
{
- return (nni_proto_open(sidp, &pub0_proto));
+ return (nni_proto_open(id, &pub0_proto));
}
int
-nng_pub0_open_raw(nng_socket *sidp)
+nng_pub0_open_raw(nng_socket *id)
{
- return (nni_proto_open(sidp, &pub0_proto_raw));
+ return (nni_proto_open(id, &pub0_proto_raw));
}