aboutsummaryrefslogtreecommitdiff
path: root/src/nng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nng.c')
-rw-r--r--src/nng.c168
1 files changed, 143 insertions, 25 deletions
diff --git a/src/nng.c b/src/nng.c
index 9fc99dc3..39206941 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -95,6 +95,18 @@ nng_peer(nng_socket sid)
return (pnum);
}
+void *
+nng_alloc(size_t sz)
+{
+ return (nni_alloc(sz));
+}
+
+void
+nng_free(void *buf, size_t sz)
+{
+ nni_free(buf, sz);
+}
+
int
nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
{
@@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
int
nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags)
{
- int rv;
- nni_sock *sock;
+ int rv;
+ nng_aio *ap;
- if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
return (rv);
}
- rv = nni_sock_recvmsg(sock, msgp, flags);
- nni_sock_rele(sock);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ }
- // Possibly massage nonblocking attempt. Note that nonblocking is
- // still done asynchronously, and the calling thread loses context.
- if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
+ nng_recv_aio(sid, ap);
+ nng_aio_wait(ap);
+
+ if ((rv = nng_aio_result(ap)) == 0) {
+ *msgp = nng_aio_get_msg(ap);
+
+ } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
rv = NNG_EAGAIN;
}
+ nng_aio_free(ap);
return (rv);
}
@@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags)
return (rv);
}
-void *
-nng_alloc(size_t sz)
-{
- return (nni_alloc(sz));
-}
-
-void
-nng_free(void *buf, size_t sz)
-{
- nni_free(buf, sz);
-}
-
int
nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
{
- int rv;
- nni_sock *sock;
+ int rv;
+ nng_aio *ap;
- if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
return (rv);
}
- rv = nni_sock_sendmsg(sock, msg, flags);
- nni_sock_rele(sock);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ }
+
+ nng_aio_set_msg(ap, msg);
+ nng_send_aio(sid, ap);
+ nng_aio_wait(ap);
+
+ rv = nng_aio_result(ap);
+ nng_aio_free(ap);
// Possibly massage nonblocking attempt. Note that nonblocking is
// still done asynchronously, and the calling thread loses context.
@@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
return (rv);
}
+void
+nng_recv_aio(nng_socket sid, nng_aio *ap)
+{
+ nni_aio * aio = (nni_aio *) ap;
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_sock_recv(sock, aio);
+ nni_sock_rele(sock);
+}
+
+void
+nng_send_aio(nng_socket sid, nng_aio *ap)
+{
+ nni_aio * aio = (nni_aio *) ap;
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_sock_send(sock, aio);
+ nni_sock_rele(sock);
+}
+
int
nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
{
@@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
return (nni_msg_getopt(msg, opt, ptr, szp));
}
+int
+nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
+{
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) {
+ *app = (nng_aio *) aio;
+ }
+ aio->a_expire = (nni_time) NNG_DURATION_DEFAULT;
+ aio->a_reltime = 1;
+ return (rv);
+}
+
+void
+nng_aio_free(nng_aio *ap)
+{
+ nni_aio_fini((nni_aio *) ap);
+}
+
+int
+nng_aio_result(nng_aio *ap)
+{
+ return (nni_aio_result((nni_aio *) ap));
+}
+
+void
+nng_aio_stop(nng_aio *ap)
+{
+ nni_aio_stop((nni_aio *) ap);
+}
+
+void
+nng_aio_wait(nng_aio *ap)
+{
+ nni_aio_wait((nni_aio *) ap);
+}
+
+void
+nng_aio_cancel(nng_aio *ap)
+{
+ nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED);
+}
+
+void
+nng_aio_set_msg(nng_aio *ap, nng_msg *msg)
+{
+ nni_aio_set_msg((nni_aio *) ap, msg);
+}
+
+nng_msg *
+nng_aio_get_msg(nng_aio *ap)
+{
+ return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap)));
+}
+
+void
+nng_aio_set_timeout(nng_aio *ap, nng_duration dur)
+{
+ // Durations here are relative, since we have no notion of a
+ // common clock. But underlying aio uses absolute times normally.
+ // Fortunately the absolute times are big enough; we just have to
+ // make sure that we "convert" the timeout from relative time to
+ // absolute time when submitting operations.
+ nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur);
+}
+
#if 0
int
nng_snapshot_create(nng_socket sock, nng_snapshot **snapp)