aboutsummaryrefslogtreecommitdiff
path: root/src/testing/streams.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/testing/streams.c')
-rw-r--r--src/testing/streams.c146
1 files changed, 146 insertions, 0 deletions
diff --git a/src/testing/streams.c b/src/testing/streams.c
new file mode 100644
index 00000000..d718ab76
--- /dev/null
+++ b/src/testing/streams.c
@@ -0,0 +1,146 @@
+//
+// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#define TEST_NO_MAIN
+
+#include <nng/nng.h>
+#include <nng/supplemental/util/platform.h>
+
+typedef struct {
+ uint8_t * base;
+ size_t rem;
+ nng_iov iov;
+ nng_aio * upper_aio;
+ nng_aio * lower_aio;
+ nng_stream *s;
+ void (*submit)(nng_stream *, nng_aio *);
+} stream_xfr_t;
+
+static void
+stream_xfr_free(stream_xfr_t *x)
+{
+ if (x == NULL) {
+ return;
+ }
+ if (x->upper_aio != NULL) {
+ nng_aio_free(x->upper_aio);
+ }
+ if (x->lower_aio != NULL) {
+ nng_aio_free(x->lower_aio);
+ }
+ nng_free(x, sizeof(*x));
+}
+
+static void
+stream_xfr_start(stream_xfr_t *x)
+{
+ nng_iov iov;
+ iov.iov_buf = x->base;
+ iov.iov_len = x->rem;
+
+ nng_aio_set_iov(x->lower_aio, 1, &iov);
+ x->submit(x->s, x->lower_aio);
+}
+
+static void
+stream_xfr_cb(void *arg)
+{
+ stream_xfr_t *x = arg;
+ int rv;
+ size_t n;
+
+ rv = nng_aio_result(x->lower_aio);
+ if (rv != 0) {
+ nng_aio_finish(x->upper_aio, rv);
+ return;
+ }
+ n = nng_aio_count(x->lower_aio);
+
+ x->rem -= n;
+ x->base += n;
+
+ if (x->rem == 0) {
+ nng_aio_finish(x->upper_aio, 0);
+ return;
+ }
+
+ stream_xfr_start(x);
+}
+
+static stream_xfr_t *
+stream_xfr_alloc(nng_stream *s, void (*submit)(nng_stream *, nng_aio *),
+ void *buf, size_t size)
+{
+ stream_xfr_t *x;
+
+ if ((x = nng_alloc(size)) == NULL) {
+ return (NULL);
+ }
+ if (nng_aio_alloc(&x->upper_aio, NULL, NULL) != 0) {
+ stream_xfr_free(x);
+ return (NULL);
+ }
+ if (nng_aio_alloc(&x->lower_aio, stream_xfr_cb, x) != 0) {
+ stream_xfr_free(x);
+ return (NULL);
+ }
+
+ // Upper should not take more than 30 seconds, lower not more than 5.
+ nng_aio_set_timeout(x->upper_aio, 30000);
+ nng_aio_set_timeout(x->lower_aio, 5000);
+
+ nng_aio_begin(x->upper_aio);
+
+ x->s = s;
+ x->rem = size;
+ x->base = buf;
+ x->submit = submit;
+
+ return (x);
+}
+
+int
+nuts_stream_wait(stream_xfr_t *x)
+{
+ int rv;
+ if (x == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nng_aio_wait(x->upper_aio);
+ rv = nng_aio_result(x->upper_aio);
+ stream_xfr_free(x);
+ return (rv);
+}
+
+void *
+nuts_stream_recv_start(nng_stream *s, void *buf, size_t size)
+{
+ stream_xfr_t *x;
+
+ x = stream_xfr_alloc(s, nng_stream_recv, buf, size);
+ if (x == NULL) {
+ return (x);
+ }
+ stream_xfr_start(x);
+ return (x);
+}
+
+void *
+nuts_stream_send_start(nng_stream *s, void *buf, size_t size)
+{
+ stream_xfr_t *x;
+
+ x = stream_xfr_alloc(s, nng_stream_send, buf, size);
+ if (x == NULL) {
+ return (x);
+ }
+ stream_xfr_start(x);
+ return (x);
+}