diff options
Diffstat (limited to 'src/protocol/bus/bus.c')
| -rw-r--r-- | src/protocol/bus/bus.c | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index cad21989..046aa19a 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -22,6 +22,9 @@ typedef struct bus_pipe bus_pipe; typedef struct bus_sock bus_sock; static void bus_sock_getq(bus_sock *); +static void bus_sock_send(void *, nni_aio *); +static void bus_sock_recv(void *, nni_aio *); + static void bus_pipe_getq(bus_pipe *); static void bus_pipe_send(bus_pipe *); static void bus_pipe_recv(bus_pipe *); @@ -39,6 +42,8 @@ struct bus_sock { nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; + nni_msgq *uwq; + nni_msgq *urq; }; // A bus_pipe is our per-pipe protocol private structure. @@ -82,6 +87,8 @@ bus_sock_init(void **sp, nni_sock *nsock) } s->nsock = nsock; s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); *sp = s; return (0); @@ -259,7 +266,6 @@ bus_sock_getq_cb(void *arg) bus_sock *s = arg; bus_pipe *p; bus_pipe *lastp; - nni_msgq *uwq = nni_sock_sendq(s->nsock); nni_msg * msg; nni_msg * dup; uint32_t sender; @@ -338,6 +344,24 @@ bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +bus_sock_send(void *arg, nni_aio *aio) +{ + bus_sock *s = arg; + + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +bus_sock_recv(void *arg, nni_aio *aio) +{ + bus_sock *s = arg; + + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops bus_pipe_ops = { .pipe_init = bus_pipe_init, .pipe_fini = bus_pipe_fini, @@ -360,6 +384,8 @@ static nni_proto_sock_ops bus_sock_ops = { .sock_fini = bus_sock_fini, .sock_open = bus_sock_open, .sock_close = bus_sock_close, + .sock_send = bus_sock_send, + .sock_recv = bus_sock_recv, .sock_options = bus_sock_options, }; |
