diff options
Diffstat (limited to 'src/protocol/pipeline/pull.c')
| -rw-r--r-- | src/protocol/pipeline/pull.c | 79 |
1 files changed, 25 insertions, 54 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 695b730e..9612177c 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -19,7 +19,6 @@ typedef struct nni_pull_sock nni_pull_sock; // An nni_pull_sock is our per-socket protocol private structure. struct nni_pull_sock { - nni_mtx mx; nni_msgq * urq; int raw; }; @@ -31,7 +30,7 @@ struct nni_pull_pipe { }; static int -nni_pull_init(void **pullp, nni_sock *sock) +nni_pull_sock_init(void **pullp, nni_sock *sock) { nni_pull_sock *pull; int rv; @@ -39,10 +38,6 @@ nni_pull_init(void **pullp, nni_sock *sock) if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pull->mx)) != 0) { - NNI_FREE_STRUCT(pull); - return (rv); - } pull->raw = 0; pull->urq = nni_sock_recvq(sock); *pullp = pull; @@ -52,11 +47,10 @@ nni_pull_init(void **pullp, nni_sock *sock) static void -nni_pull_fini(void *arg) +nni_pull_sock_fini(void *arg) { nni_pull_sock *pull = arg; - nni_mtx_fini(&pull->mx); NNI_FREE_STRUCT(pull); } @@ -86,32 +80,6 @@ nni_pull_pipe_fini(void *arg) } -static int -nni_pull_pipe_add(void *arg) -{ - nni_pull_pipe *pp = arg; - - if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) { - return (NNG_EPROTO); - } - return (0); -} - - -static void -nni_pull_pipe_rem(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - -static void -nni_pull_pipe_send(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - static void nni_pull_pipe_recv(void *arg) { @@ -133,16 +101,14 @@ nni_pull_pipe_recv(void *arg) static int -nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pull_sock *pull = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pull->mx); rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1); - nni_mtx_unlock(&pull->mx); break; default: rv = NNG_ENOTSUP; @@ -152,16 +118,14 @@ nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pull_sock *pull = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pull->mx); rv = nni_getopt_int(&pull->raw, buf, szp); - nni_mtx_unlock(&pull->mx); break; default: rv = NNG_ENOTSUP; @@ -172,24 +136,31 @@ nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pull_proto_pipe = { +static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_add = nni_pull_pipe_add, - .pipe_rem = nni_pull_pipe_rem, - .pipe_send = nni_pull_pipe_send, + .pipe_add = NULL, + .pipe_rem = NULL, + .pipe_send = NULL, .pipe_recv = nni_pull_pipe_recv, }; +static nni_proto_sock_ops nni_pull_sock_ops = { + .sock_init = nni_pull_sock_init, + .sock_fini = nni_pull_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pull_sock_setopt, + .sock_getopt = nni_pull_sock_getopt, + .sock_send = NULL, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_pull_proto = { - .proto_self = NNG_PROTO_PULL, - .proto_peer = NNG_PROTO_PUSH, - .proto_name = "pull", - .proto_pipe = &nni_pull_proto_pipe, - .proto_init = nni_pull_init, - .proto_fini = nni_pull_fini, - .proto_setopt = nni_pull_setopt, - .proto_getopt = nni_pull_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PULL, + .proto_peer = NNG_PROTO_PUSH, + .proto_name = "pull", + .proto_pipe_ops = &nni_pull_pipe_ops, + .proto_sock_ops = &nni_pull_sock_ops, }; |
