aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pipeline')
-rw-r--r--src/protocol/pipeline/pull.c27
-rw-r--r--src/protocol/pipeline/push.c18
2 files changed, 40 insertions, 5 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 80db2425..7dd0c8ed 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -26,6 +26,7 @@ static void pull_putq(pull_pipe *, nni_msg *);
struct pull_sock {
nni_msgq *urq;
int raw;
+ nni_sock *sock;
};
// A pull_pipe is our per-pipe protocol private structure.
@@ -44,10 +45,11 @@ pull_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = 0;
- s->urq = nni_sock_recvq(sock);
- *sp = s;
- nni_sock_senderr(sock, NNG_ENOTSUP);
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
+ s->sock = sock;
+
+ *sp = s;
return (0);
}
@@ -185,6 +187,21 @@ pull_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+pull_sock_send(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+pull_sock_recv(void *arg, nni_aio *aio)
+{
+ pull_sock *s = arg;
+
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops pull_pipe_ops = {
.pipe_init = pull_pipe_init,
.pipe_fini = pull_pipe_fini,
@@ -207,6 +224,8 @@ static nni_proto_sock_ops pull_sock_ops = {
.sock_fini = pull_sock_fini,
.sock_open = pull_sock_open,
.sock_close = pull_sock_close,
+ .sock_send = pull_sock_send,
+ .sock_recv = pull_sock_recv,
.sock_options = pull_sock_options,
};
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 77db7fcb..af7b80ca 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -54,7 +54,6 @@ push_sock_init(void **sp, nni_sock *sock)
s->sock = sock;
s->uwq = nni_sock_sendq(sock);
*sp = s;
- nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
@@ -205,6 +204,21 @@ push_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+push_sock_send(void *arg, nni_aio *aio)
+{
+ push_sock *s = arg;
+
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+push_sock_recv(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
static nni_proto_pipe_ops push_pipe_ops = {
.pipe_init = push_pipe_init,
.pipe_fini = push_pipe_fini,
@@ -228,6 +242,8 @@ static nni_proto_sock_ops push_sock_ops = {
.sock_open = push_sock_open,
.sock_close = push_sock_close,
.sock_options = push_sock_options,
+ .sock_send = push_sock_send,
+ .sock_recv = push_sock_recv,
};
static nni_proto push_proto = {