diff options
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/pipeline/pull.c | 107 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 197 |
2 files changed, 209 insertions, 95 deletions
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 6f2d716b..ec66fab6 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -17,6 +17,11 @@ typedef struct nni_pull_pipe nni_pull_pipe; typedef struct nni_pull_sock nni_pull_sock; +static void nni_pull_putq_cb(void *); +static void nni_pull_recv_cb(void *); +static void nni_pull_recv(nni_pull_pipe *); +static void nni_pull_putq(nni_pull_pipe *, nni_msg *); + // An nni_pull_sock is our per-socket protocol private structure. struct nni_pull_sock { nni_msgq * urq; @@ -27,6 +32,8 @@ struct nni_pull_sock { struct nni_pull_pipe { nni_pipe * pipe; nni_pull_sock * pull; + nni_aio putq_aio; + nni_aio recv_aio; }; static int @@ -60,10 +67,20 @@ static int nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { nni_pull_pipe *pp; + int rv; if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } + if (((rv = nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp))) != 0) { + NNI_FREE_STRUCT(pp); + return (rv); + } + if (((rv = nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp))) != 0) { + nni_aio_fini(&pp->putq_aio); + NNI_FREE_STRUCT(pp); + return (rv); + } pp->pipe = pipe; pp->pull = psock; *ppp = pp; @@ -77,28 +94,93 @@ nni_pull_pipe_fini(void *arg) nni_pull_pipe *pp = arg; if (pp != NULL) { + nni_aio_fini(&pp->putq_aio); + nni_aio_fini(&pp->recv_aio); NNI_FREE_STRUCT(pp); } } +static int +nni_pull_pipe_start(void *arg) +{ + nni_pull_pipe *pp = arg; + + // Start the pending pull... + nni_pull_recv(pp); + + return (0); +} + + static void -nni_pull_pipe_recv(void *arg) +nni_pull_pipe_stop(void *arg) { nni_pull_pipe *pp = arg; - nni_pull_sock *pull = pp->pull; + + // Cancel any pending sendup. + nni_msgq_aio_cancel(pp->pull->urq, &pp->putq_aio); +} + + +static void +nni_pull_recv_cb(void *arg) +{ + nni_pull_pipe *pp = arg; + nni_aio *aio = &pp->recv_aio; nni_msg *msg; - for (;;) { - if (nni_pipe_recv(pp->pipe, &msg) != 0) { - break; - } - if (nni_msgq_put(pull->urq, msg) != 0) { - nni_msg_free(msg); - break; - } + if (nni_aio_result(aio) != 0) { + // Failed to get a message, probably the pipe is closed. + nni_pipe_close(pp->pipe); + return; + } + + // Got a message... start the put to send it up to the application. + msg = aio->a_msg; + aio->a_msg = NULL; + nni_pull_putq(pp, msg); +} + + +static void +nni_pull_putq_cb(void *arg) +{ + nni_pull_pipe *pp = arg; + nni_aio *aio = &pp->putq_aio; + int rv; + + if (nni_aio_result(aio) != 0) { + // If we failed to put, probably NNG_ECLOSED, nothing else + // we can do. Just close the pipe. + nni_pipe_close(pp->pipe); + return; + } + + nni_pull_recv(pp); +} + + +// nni_pull_recv is called to schedule a pending recv on the incoming pipe. +static void +nni_pull_recv(nni_pull_pipe *pp) +{ + // Schedule the aio with callback. + if (nni_pipe_aio_recv(pp->pipe, &pp->recv_aio) != 0) { + nni_pipe_close(pp->pipe); } - nni_pipe_close(pp->pipe); +} + + +// nni_pull_putq schedules a put operation to the user socket (sendup). +static void +nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg) +{ + nni_pull_sock *pull = pp->pull; + + pp->putq_aio.a_msg = msg; + + nni_msgq_aio_put(pull->urq, &pp->putq_aio); } @@ -141,7 +223,8 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_worker = { nni_pull_pipe_recv }, + .pipe_add = nni_pull_pipe_start, + .pipe_rem = nni_pull_pipe_stop, }; static nni_proto_sock_ops nni_pull_sock_ops = { diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 3c3164d5..fcbb6d4f 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -19,27 +19,34 @@ typedef struct nni_push_pipe nni_push_pipe; typedef struct nni_push_sock nni_push_sock; +static void nni_push_send_cb(void *); +static void nni_push_recv_cb(void *); +static void nni_push_getq_cb(void *); +static void nni_push_recv(nni_push_pipe *); +static void nni_push_send(nni_push_sock *); + // An nni_push_sock is our per-socket protocol private structure. struct nni_push_sock { - nni_cv cv; nni_msgq * uwq; + nni_msg * msg; // pending message int raw; - int closing; - int wantw; nni_list pipes; nni_push_pipe * nextpipe; int npipes; nni_sock * sock; + + nni_aio aio_getq; }; // An nni_push_pipe is our per-pipe protocol private structure. struct nni_push_pipe { nni_pipe * pipe; nni_push_sock * push; - nni_msgq * mq; - int sigclose; int wantr; nni_list_node node; + + nni_aio aio_recv; + nni_aio aio_send; }; static int @@ -51,14 +58,13 @@ nni_push_sock_init(void **pushp, nni_sock *sock) if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) { + if ((rv = nni_aio_init(&push->aio_getq, nni_push_getq_cb, push)) != 0) { NNI_FREE_STRUCT(push); return (rv); } NNI_LIST_INIT(&push->pipes, nni_push_pipe, node); push->raw = 0; push->npipes = 0; - push->wantw = 0; push->nextpipe = NULL; push->sock = sock; push->uwq = nni_sock_sendq(sock); @@ -69,14 +75,20 @@ nni_push_sock_init(void **pushp, nni_sock *sock) static void +nni_push_sock_open(void *arg) +{ + nni_push_sock *push = arg; + + nni_msgq_aio_get(push->uwq, &push->aio_getq); +} + + +static void nni_push_sock_close(void *arg) { nni_push_sock *push = arg; - // Shut down the resender. We request it to exit by clearing - // its old value, then kick it. - push->closing = 1; - nni_cv_wake(&push->cv); + nni_msgq_aio_cancel(push->uwq, &push->aio_getq); } @@ -86,7 +98,9 @@ nni_push_sock_fini(void *arg) nni_push_sock *push = arg; if (push != NULL) { - nni_cv_fini(&push->cv); + if (push->msg != NULL) { + nni_msg_free(push->msg); + } NNI_FREE_STRUCT(push); } } @@ -101,13 +115,17 @@ nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock) if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_msgq_init(&pp->mq, 0)) != 0) { + if ((rv = nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp)) != 0) { + NNI_FREE_STRUCT(pp); + return (rv); + } + if ((rv = nni_aio_init(&pp->aio_send, nni_push_send_cb, pp)) != 0) { + nni_aio_fini(&pp->aio_recv); NNI_FREE_STRUCT(pp); return (rv); } NNI_LIST_NODE_INIT(&pp->node); pp->pipe = pipe; - pp->sigclose = 0; pp->push = psock; pp->wantr = 0; *ppp = pp; @@ -121,7 +139,8 @@ nni_push_pipe_fini(void *arg) nni_push_pipe *pp = arg; if (pp != NULL) { - nni_msgq_fini(pp->mq); + nni_aio_fini(&pp->aio_recv); + nni_aio_fini(&pp->aio_send); NNI_FREE_STRUCT(pp); } } @@ -140,9 +159,19 @@ nni_push_pipe_add(void *arg) // The end makes our test cases easier. nni_list_append(&push->pipes, pp); + // We start out wanting data to read. + pp->wantr = 1; + // Wake the top sender, as we can accept a job. push->npipes++; - nni_cv_wake(&push->cv); + + // Schedule a receiver. This is mostly so that we can detect + // a closed transport pipe. + nni_pipe_aio_recv(pp->pipe, &pp->aio_recv); + + // Possibly schedule the sender. + nni_push_send(pp->push); + return (0); } @@ -162,46 +191,66 @@ nni_push_pipe_rem(void *arg) static void -nni_push_pipe_send(void *arg) +nni_push_recv(nni_push_pipe *pp) +{ + nni_pipe_aio_recv(pp->pipe, &pp->aio_recv); +} + + +static void +nni_push_recv_cb(void *arg) +{ + nni_push_pipe *pp = arg; + + // We normally expect to receive an error. If a pipe actually + // sends us data, we just discard it. + if (nni_aio_result(&pp->aio_recv) != 0) { + nni_pipe_close(pp->pipe); + return; + } + nni_push_recv(pp); +} + + +static void +nni_push_send_cb(void *arg) { nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; nni_mtx *mx = nni_sock_mtx(push->sock); - nni_msg *msg; - for (;;) { - nni_mtx_lock(mx); - pp->wantr = 1; - if (push->wantw) { - nni_cv_wake(&push->cv); - } - nni_mtx_unlock(mx); - if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { - break; - } - if (nni_pipe_send(pp->pipe, msg) != 0) { - nni_msg_free(msg); - break; - } + if (nni_aio_result(&pp->aio_send) != 0) { + nni_pipe_close(pp->pipe); + return; } - nni_pipe_close(pp->pipe); + + nni_mtx_lock(mx); + pp->wantr = 1; + + // This effectively kicks off a pull down. + nni_push_send(pp->push); + nni_mtx_unlock(mx); } static void -nni_push_pipe_recv(void *arg) +nni_push_getq_cb(void *arg) { - nni_push_pipe *pp = arg; - nni_msg *msg; + nni_push_sock *push = arg; + nni_mtx *mx = nni_sock_mtx(push->sock); + nni_aio *aio = &push->aio_getq; - for (;;) { - if (nni_pipe_recv(pp->pipe, &msg) != 0) { - break; - } - nni_msg_free(msg); + if (nni_aio_result(aio) != 0) { + // If the socket is closing, nothing else we can do. + return; } - nni_msgq_signal(pp->mq, &pp->sigclose); - nni_pipe_close(pp->pipe); + + nni_mtx_lock(mx); + push->msg = aio->a_msg; + aio->a_msg = NULL; + + nni_push_send(push); + nni_mtx_unlock(mx); } @@ -240,51 +289,35 @@ nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_push_sock_send(void *arg) +nni_push_send(nni_push_sock *push) { - nni_push_sock *push = arg; nni_push_pipe *pp; - nni_msgq *uwq = push->uwq; nni_msg *msg = NULL; - nni_mtx *mx = nni_sock_mtx(push->sock); int i; - for (;;) { - if ((msg == NULL) && (nni_msgq_get(uwq, &msg) != 0)) { - // Should only be NNG_ECLOSED - return; - } + if ((msg = push->msg) == NULL) { + // Nothing to send... bail... + return; + } - nni_mtx_lock(mx); - if (push->closing) { - if (msg != NULL) { - nni_mtx_unlock(mx); - nni_msg_free(msg); - return; - } - } - push->wantw = 0; - for (i = 0; i < push->npipes; i++) { - pp = push->nextpipe; - if (pp == NULL) { - pp = nni_list_first(&push->pipes); - } - push->nextpipe = nni_list_next(&push->pipes, pp); - if (pp->wantr) { - pp->wantr = 0; - if (nni_msgq_put(pp->mq, msg) == 0) { - msg = NULL; - break; - } - } + // Let's try to send it. + for (i = 0; i < push->npipes; i++) { + pp = push->nextpipe; + if (pp == NULL) { + pp = nni_list_first(&push->pipes); } - if (msg != NULL) { - // We weren't able to deliver it, so keep it and - // wait for a sender to let us know its ready. - push->wantw = 1; - nni_cv_wait(&push->cv); + push->nextpipe = nni_list_next(&push->pipes, pp); + if (pp->wantr) { + pp->aio_send.a_msg = msg; + push->msg = NULL; + + // Schedule outbound pipe delivery... + nni_pipe_aio_send(pp->pipe, &pp->aio_send); + + // And schedule getting another message for send. + nni_msgq_aio_get(push->uwq, &push->aio_getq); + break; } - nni_mtx_unlock(mx); } } @@ -296,17 +329,15 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, .pipe_rem = nni_push_pipe_rem, - .pipe_worker = { nni_push_pipe_send, - nni_push_pipe_recv }, }; static nni_proto_sock_ops nni_push_sock_ops = { .sock_init = nni_push_sock_init, .sock_fini = nni_push_sock_fini, + .sock_open = nni_push_sock_open, .sock_close = nni_push_sock_close, .sock_setopt = nni_push_sock_setopt, .sock_getopt = nni_push_sock_getopt, - .sock_worker = { nni_push_sock_send }, }; nni_proto nni_push_proto = { |
