diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-25 15:00:52 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-25 18:29:47 -0700 |
| commit | 9cbdeda1d0a9074bd65da2aaf9c87b79545a1590 (patch) | |
| tree | 98254532f75a58cde92c837b4829bd2b3982db7a /src/nng.c | |
| parent | b28838f5cf3c5fed494d2684422099d26e8ab293 (diff) | |
| download | nng-9cbdeda1d0a9074bd65da2aaf9c87b79545a1590.tar.gz nng-9cbdeda1d0a9074bd65da2aaf9c87b79545a1590.tar.bz2 nng-9cbdeda1d0a9074bd65da2aaf9c87b79545a1590.zip | |
fixes #45 expose aio to applications
While here we added a test for the aio stuff, and cleaned up some dead
code for the old fd notifications. There were a few improvements to
shorten & clean code elsewhere, such as short-circuiting task wait
when the task has no callback.
The legacy sendmsg() and recvmsg() APIs are still in the socket core
until we convert the device code to use the aios.
Diffstat (limited to 'src/nng.c')
| -rw-r--r-- | src/nng.c | 168 |
1 files changed, 143 insertions, 25 deletions
@@ -95,6 +95,18 @@ nng_peer(nng_socket sid) return (pnum); } +void * +nng_alloc(size_t sz) +{ + return (nni_alloc(sz)); +} + +void +nng_free(void *buf, size_t sz) +{ + nni_free(buf, sz); +} + int nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) { @@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_recvmsg(sock, msgp, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } - // Possibly massage nonblocking attempt. Note that nonblocking is - // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { + nng_recv_aio(sid, ap); + nng_aio_wait(ap); + + if ((rv = nng_aio_result(ap)) == 0) { + *msgp = nng_aio_get_msg(ap); + + } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } + nng_aio_free(ap); return (rv); } @@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags) return (rv); } -void * -nng_alloc(size_t sz) -{ - return (nni_alloc(sz)); -} - -void -nng_free(void *buf, size_t sz) -{ - nni_free(buf, sz); -} - int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_sendmsg(sock, msg, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } + + nng_aio_set_msg(ap, msg); + nng_send_aio(sid, ap); + nng_aio_wait(ap); + + rv = nng_aio_result(ap); + nng_aio_free(ap); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. @@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) return (rv); } +void +nng_recv_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_recv(sock, aio); + nni_sock_rele(sock); +} + +void +nng_send_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_send(sock, aio); + nni_sock_rele(sock); +} + int nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { @@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) return (nni_msg_getopt(msg, opt, ptr, szp)); } +int +nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) +{ + nni_aio *aio; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + *app = (nng_aio *) aio; + } + aio->a_expire = (nni_time) NNG_DURATION_DEFAULT; + aio->a_reltime = 1; + return (rv); +} + +void +nng_aio_free(nng_aio *ap) +{ + nni_aio_fini((nni_aio *) ap); +} + +int +nng_aio_result(nng_aio *ap) +{ + return (nni_aio_result((nni_aio *) ap)); +} + +void +nng_aio_stop(nng_aio *ap) +{ + nni_aio_stop((nni_aio *) ap); +} + +void +nng_aio_wait(nng_aio *ap) +{ + nni_aio_wait((nni_aio *) ap); +} + +void +nng_aio_cancel(nng_aio *ap) +{ + nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED); +} + +void +nng_aio_set_msg(nng_aio *ap, nng_msg *msg) +{ + nni_aio_set_msg((nni_aio *) ap, msg); +} + +nng_msg * +nng_aio_get_msg(nng_aio *ap) +{ + return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap))); +} + +void +nng_aio_set_timeout(nng_aio *ap, nng_duration dur) +{ + // Durations here are relative, since we have no notion of a + // common clock. But underlying aio uses absolute times normally. + // Fortunately the absolute times are big enough; we just have to + // make sure that we "convert" the timeout from relative time to + // absolute time when submitting operations. + nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur); +} + #if 0 int nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) |
