From 9cbdeda1d0a9074bd65da2aaf9c87b79545a1590 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 25 Oct 2017 15:00:52 -0700 Subject: fixes #45 expose aio to applications While here we added a test for the aio stuff, and cleaned up some dead code for the old fd notifications. There were a few improvements to shorten & clean code elsewhere, such as short-circuiting task wait when the task has no callback. The legacy sendmsg() and recvmsg() APIs are still in the socket core until we convert the device code to use the aios. --- src/nng.c | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 143 insertions(+), 25 deletions(-) (limited to 'src/nng.c') diff --git a/src/nng.c b/src/nng.c index 9fc99dc3..39206941 100644 --- a/src/nng.c +++ b/src/nng.c @@ -95,6 +95,18 @@ nng_peer(nng_socket sid) return (pnum); } +void * +nng_alloc(size_t sz) +{ + return (nni_alloc(sz)); +} + +void +nng_free(void *buf, size_t sz) +{ + nni_free(buf, sz); +} + int nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) { @@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_recvmsg(sock, msgp, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } - // Possibly massage nonblocking attempt. Note that nonblocking is - // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { + nng_recv_aio(sid, ap); + nng_aio_wait(ap); + + if ((rv = nng_aio_result(ap)) == 0) { + *msgp = nng_aio_get_msg(ap); + + } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } + nng_aio_free(ap); return (rv); } @@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags) return (rv); } -void * -nng_alloc(size_t sz) -{ - return (nni_alloc(sz)); -} - -void -nng_free(void *buf, size_t sz) -{ - nni_free(buf, sz); -} - int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_sendmsg(sock, msg, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } + + nng_aio_set_msg(ap, msg); + nng_send_aio(sid, ap); + nng_aio_wait(ap); + + rv = nng_aio_result(ap); + nng_aio_free(ap); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. @@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) return (rv); } +void +nng_recv_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_recv(sock, aio); + nni_sock_rele(sock); +} + +void +nng_send_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_send(sock, aio); + nni_sock_rele(sock); +} + int nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { @@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) return (nni_msg_getopt(msg, opt, ptr, szp)); } +int +nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) +{ + nni_aio *aio; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + *app = (nng_aio *) aio; + } + aio->a_expire = (nni_time) NNG_DURATION_DEFAULT; + aio->a_reltime = 1; + return (rv); +} + +void +nng_aio_free(nng_aio *ap) +{ + nni_aio_fini((nni_aio *) ap); +} + +int +nng_aio_result(nng_aio *ap) +{ + return (nni_aio_result((nni_aio *) ap)); +} + +void +nng_aio_stop(nng_aio *ap) +{ + nni_aio_stop((nni_aio *) ap); +} + +void +nng_aio_wait(nng_aio *ap) +{ + nni_aio_wait((nni_aio *) ap); +} + +void +nng_aio_cancel(nng_aio *ap) +{ + nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED); +} + +void +nng_aio_set_msg(nng_aio *ap, nng_msg *msg) +{ + nni_aio_set_msg((nni_aio *) ap, msg); +} + +nng_msg * +nng_aio_get_msg(nng_aio *ap) +{ + return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap))); +} + +void +nng_aio_set_timeout(nng_aio *ap, nng_duration dur) +{ + // Durations here are relative, since we have no notion of a + // common clock. But underlying aio uses absolute times normally. + // Fortunately the absolute times are big enough; we just have to + // make sure that we "convert" the timeout from relative time to + // absolute time when submitting operations. + nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur); +} + #if 0 int nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) -- cgit v1.2.3-70-g09d2