diff options
Diffstat (limited to 'src/nng.c')
| -rw-r--r-- | src/nng.c | 168 |
1 files changed, 143 insertions, 25 deletions
@@ -95,6 +95,18 @@ nng_peer(nng_socket sid) return (pnum); } +void * +nng_alloc(size_t sz) +{ + return (nni_alloc(sz)); +} + +void +nng_free(void *buf, size_t sz) +{ + nni_free(buf, sz); +} + int nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) { @@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_recvmsg(sock, msgp, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } - // Possibly massage nonblocking attempt. Note that nonblocking is - // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { + nng_recv_aio(sid, ap); + nng_aio_wait(ap); + + if ((rv = nng_aio_result(ap)) == 0) { + *msgp = nng_aio_get_msg(ap); + + } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } + nng_aio_free(ap); return (rv); } @@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags) return (rv); } -void * -nng_alloc(size_t sz) -{ - return (nni_alloc(sz)); -} - -void -nng_free(void *buf, size_t sz) -{ - nni_free(buf, sz); -} - int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_sendmsg(sock, msg, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } + + nng_aio_set_msg(ap, msg); + nng_send_aio(sid, ap); + nng_aio_wait(ap); + + rv = nng_aio_result(ap); + nng_aio_free(ap); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. @@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) return (rv); } +void +nng_recv_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_recv(sock, aio); + nni_sock_rele(sock); +} + +void +nng_send_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_send(sock, aio); + nni_sock_rele(sock); +} + int nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { @@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) return (nni_msg_getopt(msg, opt, ptr, szp)); } +int +nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) +{ + nni_aio *aio; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + *app = (nng_aio *) aio; + } + aio->a_expire = (nni_time) NNG_DURATION_DEFAULT; + aio->a_reltime = 1; + return (rv); +} + +void +nng_aio_free(nng_aio *ap) +{ + nni_aio_fini((nni_aio *) ap); +} + +int +nng_aio_result(nng_aio *ap) +{ + return (nni_aio_result((nni_aio *) ap)); +} + +void +nng_aio_stop(nng_aio *ap) +{ + nni_aio_stop((nni_aio *) ap); +} + +void +nng_aio_wait(nng_aio *ap) +{ + nni_aio_wait((nni_aio *) ap); +} + +void +nng_aio_cancel(nng_aio *ap) +{ + nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED); +} + +void +nng_aio_set_msg(nng_aio *ap, nng_msg *msg) +{ + nni_aio_set_msg((nni_aio *) ap, msg); +} + +nng_msg * +nng_aio_get_msg(nng_aio *ap) +{ + return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap))); +} + +void +nng_aio_set_timeout(nng_aio *ap, nng_duration dur) +{ + // Durations here are relative, since we have no notion of a + // common clock. But underlying aio uses absolute times normally. + // Fortunately the absolute times are big enough; we just have to + // make sure that we "convert" the timeout from relative time to + // absolute time when submitting operations. + nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur); +} + #if 0 int nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) |
