diff options
Diffstat (limited to 'src/sp/protocol/pubsub0/pub.c')
| -rw-r--r-- | src/sp/protocol/pubsub0/pub.c | 86 |
1 files changed, 40 insertions, 46 deletions
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c index cfc3ed6d..3911127a 100644 --- a/src/sp/protocol/pubsub0/pub.c +++ b/src/sp/protocol/pubsub0/pub.c @@ -36,22 +36,22 @@ static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { - nni_list pipes; - nni_mtx mtx; - bool closed; - size_t sendbuf; - nni_pollable sendable; + nni_list pipes; + nni_mtx mtx; + bool closed; + size_t sendbuf; + nni_pollable sendable; }; // pub0_pipe is our per-pipe protocol private structure. struct pub0_pipe { - nni_pipe * pipe; - pub0_sock * pub; + nni_pipe *pipe; + pub0_sock *pub; nni_lmq sendq; bool closed; bool busy; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_aio aio_send; + nni_aio aio_recv; nni_list_node node; }; @@ -65,10 +65,10 @@ pub0_sock_fini(void *arg) } static int -pub0_sock_init(void *arg, nni_sock *nsock) +pub0_sock_init(void *arg, nni_sock *ns) { pub0_sock *sock = arg; - NNI_ARG_UNUSED(nsock); + NNI_ARG_UNUSED(ns); nni_pollable_init(&sock->sendable); nni_mtx_init(&sock->mtx); @@ -94,8 +94,8 @@ pub0_pipe_stop(void *arg) { pub0_pipe *p = arg; - nni_aio_stop(p->aio_send); - nni_aio_stop(p->aio_recv); + nni_aio_stop(&p->aio_send); + nni_aio_stop(&p->aio_recv); } static void @@ -103,8 +103,8 @@ pub0_pipe_fini(void *arg) { pub0_pipe *p = arg; - nni_aio_free(p->aio_send); - nni_aio_free(p->aio_recv); + nni_aio_fini(&p->aio_send); + nni_aio_fini(&p->aio_recv); nni_lmq_fini(&p->sendq); } @@ -113,21 +113,15 @@ pub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { pub0_pipe *p = arg; pub0_sock *sock = s; - int rv; size_t len; nni_mtx_lock(&sock->mtx); len = sock->sendbuf; nni_mtx_unlock(&sock->mtx); - // XXX: consider making this depth tunable - if (((rv = nni_lmq_init(&p->sendq, len)) != 0) || - ((rv = nni_aio_alloc(&p->aio_send, pub0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->aio_recv, pub0_pipe_recv_cb, p)) != 0)) { - - pub0_pipe_fini(p); - return (rv); - } + nni_lmq_init(&p->sendq, len); + nni_aio_init(&p->aio_send, pub0_pipe_send_cb, p); + nni_aio_init(&p->aio_recv, pub0_pipe_recv_cb, p); p->busy = false; p->pipe = pipe; @@ -149,7 +143,7 @@ pub0_pipe_start(void *arg) nni_mtx_unlock(&sock->mtx); // Start the receiver. - nni_pipe_recv(p->pipe, p->aio_recv); + nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } @@ -160,8 +154,8 @@ pub0_pipe_close(void *arg) pub0_pipe *p = arg; pub0_sock *sock = p->pub; - nni_aio_close(p->aio_send); - nni_aio_close(p->aio_recv); + nni_aio_close(&p->aio_send); + nni_aio_close(&p->aio_recv); nni_mtx_lock(&sock->mtx); p->closed = true; @@ -180,8 +174,8 @@ pub0_pipe_recv_cb(void *arg) // We should never receive a message -- the only valid reason for us to // be here is on pipe close. - if (nni_aio_result(p->aio_recv) == 0) { - nni_msg_free(nni_aio_get_msg(p->aio_recv)); + if (nni_aio_result(&p->aio_recv) == 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_recv)); } nni_pipe_close(p->pipe); } @@ -191,11 +185,11 @@ pub0_pipe_send_cb(void *arg) { pub0_pipe *p = arg; pub0_sock *sock = p->pub; - nni_msg * msg; + nni_msg *msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); + if (nni_aio_result(&p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(&p->aio_send)); + nni_aio_set_msg(&p->aio_send, NULL); nni_pipe_close(p->pipe); return; } @@ -205,9 +199,9 @@ pub0_pipe_send_cb(void *arg) nni_mtx_unlock(&sock->mtx); return; } - if (nni_lmq_getq(&p->sendq, &msg) == 0) { - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + if (nni_lmq_get(&p->sendq, &msg) == 0) { + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } else { p->busy = false; } @@ -228,7 +222,7 @@ pub0_sock_send(void *arg, nni_aio *aio) { pub0_sock *sock = arg; pub0_pipe *p; - nng_msg * msg; + nng_msg *msg; size_t len; msg = nni_aio_get_msg(aio); @@ -241,14 +235,14 @@ pub0_sock_send(void *arg, nni_aio *aio) if (nni_lmq_full(&p->sendq)) { // Make space for the new message. nni_msg *old; - (void) nni_lmq_getq(&p->sendq, &old); + (void) nni_lmq_get(&p->sendq, &old); nni_msg_free(old); } - nni_lmq_putq(&p->sendq, msg); + nni_lmq_put(&p->sendq, msg); } else { p->busy = true; - nni_aio_set_msg(p->aio_send, msg); - nni_pipe_send(p->pipe, p->aio_send); + nni_aio_set_msg(&p->aio_send, msg); + nni_pipe_send(p->pipe, &p->aio_send); } } nni_mtx_unlock(&sock->mtx); @@ -289,7 +283,7 @@ pub0_sock_set_sendbuf(void *arg, const void *buf, size_t sz, nni_type t) nni_mtx_lock(&sock->mtx); sock->sendbuf = (size_t) val; NNI_LIST_FOREACH (&sock->pipes, p) { - // If we fail part way thru (should only be ENOMEM), we + // If we fail part way through (should only be ENOMEM), we // stop short. The others would likely fail for ENOMEM as // well anyway. There is a weird effect here where the // buffers may have been set for *some* of the pipes, but @@ -368,13 +362,13 @@ static nni_proto pub0_proto_raw = { }; int -nng_pub0_open(nng_socket *sidp) +nng_pub0_open(nng_socket *id) { - return (nni_proto_open(sidp, &pub0_proto)); + return (nni_proto_open(id, &pub0_proto)); } int -nng_pub0_open_raw(nng_socket *sidp) +nng_pub0_open_raw(nng_socket *id) { - return (nni_proto_open(sidp, &pub0_proto_raw)); + return (nni_proto_open(id, &pub0_proto_raw)); } |
