aboutsummaryrefslogtreecommitdiff
path: root/src/nng.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-25 15:00:52 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-25 18:29:47 -0700
commit9cbdeda1d0a9074bd65da2aaf9c87b79545a1590 (patch)
tree98254532f75a58cde92c837b4829bd2b3982db7a /src/nng.c
parentb28838f5cf3c5fed494d2684422099d26e8ab293 (diff)
downloadnng-9cbdeda1d0a9074bd65da2aaf9c87b79545a1590.tar.gz
nng-9cbdeda1d0a9074bd65da2aaf9c87b79545a1590.tar.bz2
nng-9cbdeda1d0a9074bd65da2aaf9c87b79545a1590.zip
fixes #45 expose aio to applications
While here we added a test for the aio stuff, and cleaned up some dead code for the old fd notifications. There were a few improvements to shorten & clean code elsewhere, such as short-circuiting task wait when the task has no callback. The legacy sendmsg() and recvmsg() APIs are still in the socket core until we convert the device code to use the aios.
Diffstat (limited to 'src/nng.c')
-rw-r--r--src/nng.c168
1 files changed, 143 insertions, 25 deletions
diff --git a/src/nng.c b/src/nng.c
index 9fc99dc3..39206941 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -95,6 +95,18 @@ nng_peer(nng_socket sid)
return (pnum);
}
+void *
+nng_alloc(size_t sz)
+{
+ return (nni_alloc(sz));
+}
+
+void
+nng_free(void *buf, size_t sz)
+{
+ nni_free(buf, sz);
+}
+
int
nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
{
@@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
int
nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags)
{
- int rv;
- nni_sock *sock;
+ int rv;
+ nng_aio *ap;
- if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
return (rv);
}
- rv = nni_sock_recvmsg(sock, msgp, flags);
- nni_sock_rele(sock);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ }
- // Possibly massage nonblocking attempt. Note that nonblocking is
- // still done asynchronously, and the calling thread loses context.
- if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
+ nng_recv_aio(sid, ap);
+ nng_aio_wait(ap);
+
+ if ((rv = nng_aio_result(ap)) == 0) {
+ *msgp = nng_aio_get_msg(ap);
+
+ } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
rv = NNG_EAGAIN;
}
+ nng_aio_free(ap);
return (rv);
}
@@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags)
return (rv);
}
-void *
-nng_alloc(size_t sz)
-{
- return (nni_alloc(sz));
-}
-
-void
-nng_free(void *buf, size_t sz)
-{
- nni_free(buf, sz);
-}
-
int
nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
{
- int rv;
- nni_sock *sock;
+ int rv;
+ nng_aio *ap;
- if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
return (rv);
}
- rv = nni_sock_sendmsg(sock, msg, flags);
- nni_sock_rele(sock);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ }
+
+ nng_aio_set_msg(ap, msg);
+ nng_send_aio(sid, ap);
+ nng_aio_wait(ap);
+
+ rv = nng_aio_result(ap);
+ nng_aio_free(ap);
// Possibly massage nonblocking attempt. Note that nonblocking is
// still done asynchronously, and the calling thread loses context.
@@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
return (rv);
}
+void
+nng_recv_aio(nng_socket sid, nng_aio *ap)
+{
+ nni_aio * aio = (nni_aio *) ap;
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_sock_recv(sock, aio);
+ nni_sock_rele(sock);
+}
+
+void
+nng_send_aio(nng_socket sid, nng_aio *ap)
+{
+ nni_aio * aio = (nni_aio *) ap;
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_sock_send(sock, aio);
+ nni_sock_rele(sock);
+}
+
int
nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
{
@@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
return (nni_msg_getopt(msg, opt, ptr, szp));
}
+int
+nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
+{
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) {
+ *app = (nng_aio *) aio;
+ }
+ aio->a_expire = (nni_time) NNG_DURATION_DEFAULT;
+ aio->a_reltime = 1;
+ return (rv);
+}
+
+void
+nng_aio_free(nng_aio *ap)
+{
+ nni_aio_fini((nni_aio *) ap);
+}
+
+int
+nng_aio_result(nng_aio *ap)
+{
+ return (nni_aio_result((nni_aio *) ap));
+}
+
+void
+nng_aio_stop(nng_aio *ap)
+{
+ nni_aio_stop((nni_aio *) ap);
+}
+
+void
+nng_aio_wait(nng_aio *ap)
+{
+ nni_aio_wait((nni_aio *) ap);
+}
+
+void
+nng_aio_cancel(nng_aio *ap)
+{
+ nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED);
+}
+
+void
+nng_aio_set_msg(nng_aio *ap, nng_msg *msg)
+{
+ nni_aio_set_msg((nni_aio *) ap, msg);
+}
+
+nng_msg *
+nng_aio_get_msg(nng_aio *ap)
+{
+ return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap)));
+}
+
+void
+nng_aio_set_timeout(nng_aio *ap, nng_duration dur)
+{
+ // Durations here are relative, since we have no notion of a
+ // common clock. But underlying aio uses absolute times normally.
+ // Fortunately the absolute times are big enough; we just have to
+ // make sure that we "convert" the timeout from relative time to
+ // absolute time when submitting operations.
+ nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur);
+}
+
#if 0
int
nng_snapshot_create(nng_socket sock, nng_snapshot **snapp)