diff options
Diffstat (limited to 'src/protocol/reqrep/rep.c')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 99 |
1 files changed, 31 insertions, 68 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 56ee2367..8de196c4 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -22,7 +22,6 @@ typedef struct nni_rep_sock nni_rep_sock; // An nni_rep_sock is our per-socket protocol private structure. struct nni_rep_sock { nni_sock * sock; - nni_mtx mx; nni_msgq * uwq; nni_msgq * urq; int raw; @@ -44,7 +43,7 @@ struct nni_rep_pipe { static void nni_rep_topsender(void *); static int -nni_rep_init(void **repp, nni_sock *sock) +nni_rep_sock_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; int rv; @@ -52,17 +51,12 @@ nni_rep_init(void **repp, nni_sock *sock) if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&rep->mx)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); - } rep->ttl = 8; // Per RFC rep->sock = sock; rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; if ((rv = nni_idhash_create(&rep->pipes)) != 0) { - nni_mtx_fini(&rep->mx); NNI_FREE_STRUCT(rep); return (rv); } @@ -70,28 +64,18 @@ nni_rep_init(void **repp, nni_sock *sock) rep->uwq = nni_sock_sendq(sock); rep->urq = nni_sock_recvq(sock); - rv = nni_thr_init(&rep->sender, nni_rep_topsender, rep); - if (rv != 0) { - nni_idhash_destroy(rep->pipes); - nni_mtx_fini(&rep->mx); - NNI_FREE_STRUCT(rep); - return (rv); - } *repp = rep; nni_sock_senderr(sock, NNG_ESTATE); - nni_thr_run(&rep->sender); return (0); } static void -nni_rep_fini(void *arg) +nni_rep_sock_fini(void *arg) { nni_rep_sock *rep = arg; - nni_thr_fini(&rep->sender); nni_idhash_destroy(rep->pipes); - nni_mtx_fini(&rep->mx); if (rep->btrace != NULL) { nni_free(rep->btrace, rep->btrace_len); } @@ -135,16 +119,8 @@ nni_rep_pipe_add(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - int rv; - - if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) { - return (NNG_EPROTO); - } - nni_mtx_lock(&rep->mx); - rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); - nni_mtx_unlock(&rep->mx); - return (rv); + return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp)); } @@ -154,22 +130,21 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); - nni_mtx_unlock(&rep->mx); } -// nni_rep_topsender watches for messages from the upper write queue, +// nni_rep_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket. static void -nni_rep_topsender(void *arg) +nni_rep_sock_send(void *arg) { nni_rep_sock *rep = arg; nni_msgq *uwq = rep->uwq; nni_msgq *urq = rep->urq; + nni_mtx *mx = nni_sock_mtx(rep->sock); nni_msg *msg; for (;;) { @@ -190,9 +165,9 @@ nni_rep_topsender(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); - nni_mtx_lock(&rep->mx); + nni_mtx_lock(mx); if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) { - nni_mtx_unlock(&rep->mx); + nni_mtx_unlock(mx); nni_msg_free(msg); continue; } @@ -204,7 +179,7 @@ nni_rep_topsender(void *arg) // circumstances. nni_msg_free(msg); } - nni_mtx_unlock(&rep->mx); + nni_mtx_unlock(mx); } } @@ -218,8 +193,6 @@ nni_rep_pipe_send(void *arg) nni_msgq *wq = rp->sendq; nni_pipe *pipe = rp->pipe; nni_msg *msg; - uint8_t *body; - size_t size; int rv; for (;;) { @@ -311,21 +284,17 @@ again: static int -nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_rep_sock *rep = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255); - nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); - nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -335,21 +304,17 @@ nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_rep_sock *rep = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->ttl, buf, szp); - nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->raw, buf, szp); - nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -359,14 +324,12 @@ nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * -nni_rep_sendfilter(void *arg, nni_msg *msg) +nni_rep_sock_sfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; size_t len; - nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mtx_unlock(&rep->mx); return (msg); } @@ -376,7 +339,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. if (rep->btrace == NULL) { - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -388,7 +350,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -396,21 +357,18 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mtx_unlock(&rep->mx); return (msg); } static nni_msg * -nni_rep_recvfilter(void *arg, nni_msg *msg) +nni_rep_sock_rfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; char *header; size_t len; - nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mtx_unlock(&rep->mx); return (msg); } @@ -423,21 +381,19 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) rep->btrace_len = 0; } if ((rep->btrace = nni_alloc(len)) == NULL) { - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } rep->btrace_len = len; memcpy(rep->btrace, header, len); nni_msg_trunc_header(msg, len); - nni_mtx_unlock(&rep->mx); return (msg); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_rep_proto_pipe = { +static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_add = nni_rep_pipe_add, @@ -446,15 +402,22 @@ static nni_proto_pipe nni_rep_proto_pipe = { .pipe_recv = nni_rep_pipe_recv, }; +static nni_proto_sock_ops nni_rep_sock_ops = { + .sock_init = nni_rep_sock_init, + .sock_fini = nni_rep_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_rep_sock_setopt, + .sock_getopt = nni_rep_sock_getopt, + .sock_rfilter = nni_rep_sock_rfilter, + .sock_sfilter = nni_rep_sock_sfilter, + .sock_send = nni_rep_sock_send, + .sock_recv = NULL, +}; + nni_proto nni_rep_proto = { - .proto_self = NNG_PROTO_REP, - .proto_peer = NNG_PROTO_REQ, - .proto_name = "rep", - .proto_pipe = &nni_rep_proto_pipe, - .proto_init = nni_rep_init, - .proto_fini = nni_rep_fini, - .proto_setopt = nni_rep_setopt, - .proto_getopt = nni_rep_getopt, - .proto_recv_filter = nni_rep_recvfilter, - .proto_send_filter = nni_rep_sendfilter, + .proto_self = NNG_PROTO_REP, + .proto_peer = NNG_PROTO_REQ, + .proto_name = "rep", + .proto_sock_ops = &nni_rep_sock_ops, + .proto_pipe_ops = &nni_rep_pipe_ops, }; |
