diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-20 17:03:12 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-23 16:14:53 -0700 |
| commit | 3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch) | |
| tree | a45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/protocol/pipeline | |
| parent | fdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff) | |
| download | nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2 nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip | |
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/protocol/pipeline')
| -rw-r--r-- | src/protocol/pipeline/pull.c | 27 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 18 |
2 files changed, 40 insertions, 5 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 80db2425..7dd0c8ed 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -26,6 +26,7 @@ static void pull_putq(pull_pipe *, nni_msg *); struct pull_sock { nni_msgq *urq; int raw; + nni_sock *sock; }; // A pull_pipe is our per-pipe protocol private structure. @@ -44,10 +45,11 @@ pull_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->urq = nni_sock_recvq(sock); - *sp = s; - nni_sock_senderr(sock, NNG_ENOTSUP); + s->raw = 0; + s->urq = nni_sock_recvq(sock); + s->sock = sock; + + *sp = s; return (0); } @@ -185,6 +187,21 @@ pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +pull_sock_send(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +pull_sock_recv(void *arg, nni_aio *aio) +{ + pull_sock *s = arg; + + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops pull_pipe_ops = { .pipe_init = pull_pipe_init, .pipe_fini = pull_pipe_fini, @@ -207,6 +224,8 @@ static nni_proto_sock_ops pull_sock_ops = { .sock_fini = pull_sock_fini, .sock_open = pull_sock_open, .sock_close = pull_sock_close, + .sock_send = pull_sock_send, + .sock_recv = pull_sock_recv, .sock_options = pull_sock_options, }; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 77db7fcb..af7b80ca 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -54,7 +54,6 @@ push_sock_init(void **sp, nni_sock *sock) s->sock = sock; s->uwq = nni_sock_sendq(sock); *sp = s; - nni_sock_recverr(sock, NNG_ENOTSUP); return (0); } @@ -205,6 +204,21 @@ push_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +push_sock_send(void *arg, nni_aio *aio) +{ + push_sock *s = arg; + + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +push_sock_recv(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + static nni_proto_pipe_ops push_pipe_ops = { .pipe_init = push_pipe_init, .pipe_fini = push_pipe_fini, @@ -228,6 +242,8 @@ static nni_proto_sock_ops push_sock_ops = { .sock_open = push_sock_open, .sock_close = push_sock_close, .sock_options = push_sock_options, + .sock_send = push_sock_send, + .sock_recv = push_sock_recv, }; static nni_proto push_proto = { |
