diff options
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 61 |
1 files changed, 21 insertions, 40 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 142edb9b..328babbc 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -53,7 +53,7 @@ struct rep0_sock { nni_idhash * pipes; nni_list recvpipes; // list of pipes with data to receive nni_list recvq; - rep0_ctx * ctx; + rep0_ctx ctx; nni_pollable *recvable; nni_pollable *sendable; }; @@ -99,25 +99,19 @@ rep0_ctx_fini(void *arg) rep0_ctx *ctx = arg; rep0_ctx_close(ctx); - NNI_FREE_STRUCT(ctx); } static int -rep0_ctx_init(void **ctxp, void *sarg) +rep0_ctx_init(void *carg, void *sarg) { - rep0_sock *s = sarg; - rep0_ctx * ctx; - - if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { - return (NNG_ENOMEM); - } + rep0_sock *s = sarg; + rep0_ctx * ctx = carg; NNI_LIST_NODE_INIT(&ctx->sqnode); NNI_LIST_NODE_INIT(&ctx->rqnode); ctx->btrace_len = 0; ctx->sock = s; ctx->pipe_id = 0; - *ctxp = ctx; return (0); } @@ -168,7 +162,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) ctx->btrace_len = 0; ctx->pipe_id = 0; - if (ctx == s->ctx) { + if (ctx == &s->ctx) { // No matter how this goes, we will no longer be able // to send on the socket (root context). That's because // we will have finished (successfully or otherwise) the @@ -225,26 +219,20 @@ rep0_sock_fini(void *arg) rep0_sock *s = arg; nni_idhash_fini(s->pipes); - if (s->ctx != NULL) { - rep0_ctx_fini(s->ctx); - } + rep0_ctx_fini(&s->ctx); nni_pollable_free(s->sendable); nni_pollable_free(s->recvable); nni_mtx_fini(&s->lk); - NNI_FREE_STRUCT(s); } static int -rep0_sock_init(void **sp, nni_sock *sock) +rep0_sock_init(void *arg, nni_sock *sock) { - rep0_sock *s; + rep0_sock *s = arg; int rv; NNI_ARG_UNUSED(sock); - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&s->lk); if ((rv = nni_idhash_init(&s->pipes)) != 0) { rep0_sock_fini(s); @@ -256,10 +244,7 @@ rep0_sock_init(void **sp, nni_sock *sock) s->ttl = 8; - if ((rv = rep0_ctx_init((void **) &s->ctx, s)) != 0) { - rep0_sock_fini(s); - return (rv); - } + (void) rep0_ctx_init(&s->ctx, s); // We start off without being either readable or pollable. // Readability comes when there is something on the socket. @@ -269,8 +254,6 @@ rep0_sock_init(void **sp, nni_sock *sock) return (rv); } - *sp = s; - return (0); } @@ -285,7 +268,7 @@ rep0_sock_close(void *arg) { rep0_sock *s = arg; - rep0_ctx_close(s->ctx); + rep0_ctx_close(&s->ctx); } static void @@ -310,18 +293,14 @@ rep0_pipe_fini(void *arg) nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - NNI_FREE_STRUCT(p); } static int -rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) +rep0_pipe_init(void *arg, nni_pipe *pipe, void *s) { - rep0_pipe *p; + rep0_pipe *p = arg; int rv; - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { rep0_pipe_fini(p); @@ -333,7 +312,6 @@ rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) p->id = nni_pipe_id(pipe); p->pipe = pipe; p->rep = s; - *pp = p; return (0); } @@ -386,7 +364,7 @@ rep0_pipe_close(void *arg) nni_aio_finish(aio, 0, nni_msg_len(msg)); nni_msg_free(msg); } - if (p->id == s->ctx->pipe_id) { + if (p->id == s->ctx.pipe_id) { // We "can" send. (Well, not really, but we will happily // accept a message and discard it.) nni_pollable_raise(s->sendable); @@ -415,7 +393,7 @@ rep0_pipe_send_cb(void *arg) p->busy = false; if ((ctx = nni_list_first(&p->sendq)) == NULL) { // Nothing else to send. - if (p->id == s->ctx->pipe_id) { + if (p->id == s->ctx.pipe_id) { // Mark us ready for the other side to send! nni_pollable_raise(s->sendable); } @@ -494,7 +472,7 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_pollable_clear(s->recvable); } nni_pipe_recv(p->pipe, p->aio_recv); - if ((ctx == s->ctx) && !p->busy) { + if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(s->sendable); } @@ -578,7 +556,7 @@ rep0_pipe_recv_cb(void *arg) aio = ctx->raio; ctx->raio = NULL; nni_aio_set_msg(p->aio_recv, NULL); - if ((ctx == s->ctx) && !p->busy) { + if ((ctx == &s->ctx) && !p->busy) { nni_pollable_raise(s->sendable); } @@ -650,7 +628,7 @@ rep0_sock_send(void *arg, nni_aio *aio) { rep0_sock *s = arg; - rep0_ctx_send(s->ctx, aio); + rep0_ctx_send(&s->ctx, aio); } static void @@ -658,12 +636,13 @@ rep0_sock_recv(void *arg, nni_aio *aio) { rep0_sock *s = arg; - rep0_ctx_recv(s->ctx, aio); + rep0_ctx_recv(&s->ctx, aio); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops rep0_pipe_ops = { + .pipe_size = sizeof(rep0_pipe), .pipe_init = rep0_pipe_init, .pipe_fini = rep0_pipe_fini, .pipe_start = rep0_pipe_start, @@ -672,6 +651,7 @@ static nni_proto_pipe_ops rep0_pipe_ops = { }; static nni_proto_ctx_ops rep0_ctx_ops = { + .ctx_size = sizeof(rep0_ctx), .ctx_init = rep0_ctx_init, .ctx_fini = rep0_ctx_fini, .ctx_send = rep0_ctx_send, @@ -699,6 +679,7 @@ static nni_option rep0_sock_options[] = { }; static nni_proto_sock_ops rep0_sock_ops = { + .sock_size = sizeof(rep0_sock), .sock_init = rep0_sock_init, .sock_fini = rep0_sock_fini, .sock_open = rep0_sock_open, |
