aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-01-24 21:22:13 -0800
committerGarrett D'Amore <garrett@damore.org>2021-01-24 21:55:35 -0800
commite9c28eed5ffb8da59ba8a6805e85936f04bdb644 (patch)
treeb81c76676f3b314751ca315342eca29747cbf80a /src
parent14ee891c58585a2b14dfcac53858d4b418d0f776 (diff)
downloadnng-e9c28eed5ffb8da59ba8a6805e85936f04bdb644.tar.gz
nng-e9c28eed5ffb8da59ba8a6805e85936f04bdb644.tar.bz2
nng-e9c28eed5ffb8da59ba8a6805e85936f04bdb644.zip
fixes #1419 pairv0 performance work
This takes the performance work we did for pairv1, and provides an implementation for pairv0. The upshot should be a nice performance boost for pair v0.
Diffstat (limited to 'src')
-rw-r--r--src/sp/protocol/pair0/CMakeLists.txt5
-rw-r--r--src/sp/protocol/pair0/pair.c478
-rw-r--r--src/sp/protocol/pair0/pair0_test.c439
3 files changed, 832 insertions, 90 deletions
diff --git a/src/sp/protocol/pair0/CMakeLists.txt b/src/sp/protocol/pair0/CMakeLists.txt
index b12583ab..2f74e139 100644
--- a/src/sp/protocol/pair0/CMakeLists.txt
+++ b/src/sp/protocol/pair0/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+# Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
# Copyright 2018 Capitar IT Group BV <info@capitar.com>
#
# This software is supplied under the terms of the MIT License, a
@@ -13,4 +13,5 @@ nng_directory(pair0)
nng_sources_if(NNG_PROTO_PAIR0 pair.c)
nng_headers_if(NNG_PROTO_PAIR0 nng/protocol/pair0/pair.h)
-nng_defines_if(NNG_PROTO_PAIR0 NNG_HAVE_PAIR0) \ No newline at end of file
+nng_defines_if(NNG_PROTO_PAIR0 NNG_HAVE_PAIR0)
+nng_test(pair0_test)
diff --git a/src/sp/protocol/pair0/pair.c b/src/sp/protocol/pair0/pair.c
index 41f88c7c..c2407d81 100644
--- a/src/sp/protocol/pair0/pair.c
+++ b/src/sp/protocol/pair0/pair.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -24,18 +24,24 @@
typedef struct pair0_pipe pair0_pipe;
typedef struct pair0_sock pair0_sock;
-static void pair0_send_cb(void *);
-static void pair0_recv_cb(void *);
-static void pair0_getq_cb(void *);
-static void pair0_putq_cb(void *);
+static void pair0_pipe_send_cb(void *);
+static void pair0_pipe_recv_cb(void *);
static void pair0_pipe_fini(void *);
+static void pair0_send_sched(pair0_sock *);
+static void pair0_pipe_send(pair0_pipe *, nni_msg *);
// pair0_sock is our per-socket protocol private structure.
struct pair0_sock {
- pair0_pipe *ppipe;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_mtx mtx;
+ pair0_pipe * p;
+ nni_mtx mtx;
+ nni_lmq wmq;
+ nni_list waq;
+ nni_lmq rmq;
+ nni_list raq;
+ nni_pollable readable;
+ nni_pollable writable;
+ bool rd_ready; // pipe ready for read
+ bool wr_ready; // pipe ready for write
};
// An pair0_pipe is our per-pipe protocol private structure. We keep
@@ -43,23 +49,28 @@ struct pair0_sock {
// pipe. The separate data structure is more like other protocols that do
// manage multiple pipes.
struct pair0_pipe {
- nni_pipe * npipe;
- pair0_sock *psock;
+ nni_pipe * pipe;
+ pair0_sock *pair;
nni_aio aio_send;
nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
};
static int
-pair0_sock_init(void *arg, nni_sock *nsock)
+pair0_sock_init(void *arg, nni_sock *sock)
{
pair0_sock *s = arg;
+ NNI_ARG_UNUSED(sock);
nni_mtx_init(&s->mtx);
- s->ppipe = NULL;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+
+ nni_lmq_init(&s->rmq, 0);
+ nni_lmq_init(&s->wmq, 0);
+ nni_aio_list_init(&s->raq);
+ nni_aio_list_init(&s->waq);
+ nni_pollable_init(&s->writable);
+ nni_pollable_init(&s->readable);
+
+ s->p = NULL;
return (0);
}
@@ -68,6 +79,8 @@ pair0_sock_fini(void *arg)
{
pair0_sock *s = arg;
+ nni_lmq_fini(&s->rmq);
+ nni_lmq_fini(&s->wmq);
nni_mtx_fini(&s->mtx);
}
@@ -75,11 +88,28 @@ static void
pair0_pipe_stop(void *arg)
{
pair0_pipe *p = arg;
+ pair0_sock *s = p->pair;
+
+ nni_mtx_lock(&s->mtx);
+ if (s->p == p) {
+ s->p = NULL;
+ if (s->rd_ready) {
+ nni_msg *m = nni_aio_get_msg(&p->aio_recv);
+ nni_msg_free(m);
+ s->rd_ready = false;
+ }
+ if (s->wr_ready) {
+ s->wr_ready = false;
+ nni_pollable_clear(&s->writable);
+ }
+ if (nni_lmq_empty(&s->rmq)) {
+ nni_pollable_clear(&s->readable);
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
nni_aio_stop(&p->aio_send);
nni_aio_stop(&p->aio_recv);
- nni_aio_stop(&p->aio_putq);
- nni_aio_stop(&p->aio_getq);
}
static void
@@ -89,48 +119,59 @@ pair0_pipe_fini(void *arg)
nni_aio_fini(&p->aio_send);
nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
}
static int
-pair0_pipe_init(void *arg, nni_pipe *npipe, void *psock)
+pair0_pipe_init(void *arg, nni_pipe *pipe, void *pair)
{
pair0_pipe *p = arg;
- nni_aio_init(&p->aio_send, pair0_send_cb, p);
- nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+ nni_aio_init(&p->aio_send, pair0_pipe_send_cb, p);
+ nni_aio_init(&p->aio_recv, pair0_pipe_recv_cb, p);
+
+ p->pipe = pipe;
+ p->pair = pair;
- p->npipe = npipe;
- p->psock = psock;
return (0);
}
+static void
+pair0_cancel(nni_aio *aio, void *arg, int rv)
+{
+ pair0_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
static int
pair0_pipe_start(void *arg)
{
pair0_pipe *p = arg;
- pair0_sock *s = p->psock;
+ pair0_sock *s = p->pair;
- if (nni_pipe_peer(p->npipe) != NNI_PROTO_PAIR_V0) {
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_PAIR_V0) {
// Peer protocol mismatch.
return (NNG_EPROTO);
}
nni_mtx_lock(&s->mtx);
- if (s->ppipe != NULL) {
+ if (s->p != NULL) {
nni_mtx_unlock(&s->mtx);
return (NNG_EBUSY); // Already have a peer, denied.
}
- s->ppipe = p;
+ s->p = p;
+ s->rd_ready = false;
nni_mtx_unlock(&s->mtx);
- // Schedule a getq on the upper, and a read from the pipe.
- // Each of these also sets up another hold on the pipe itself.
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ pair0_send_sched(s);
+
+ // And the pipe read of course.
+ nni_pipe_recv(p->pipe, &p->aio_recv);
return (0);
}
@@ -139,83 +180,117 @@ static void
pair0_pipe_close(void *arg)
{
pair0_pipe *p = arg;
- pair0_sock *s = p->psock;
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
- nni_aio_close(&p->aio_putq);
- nni_aio_close(&p->aio_getq);
-
- nni_mtx_lock(&s->mtx);
- if (s->ppipe == p) {
- s->ppipe = NULL;
- }
- nni_mtx_unlock(&s->mtx);
}
static void
-pair0_recv_cb(void *arg)
+pair0_pipe_recv_cb(void *arg)
{
pair0_pipe *p = arg;
- pair0_sock *s = p->psock;
+ pair0_sock *s = p->pair;
nni_msg * msg;
+ nni_aio * a;
if (nni_aio_result(&p->aio_recv) != 0) {
- nni_pipe_close(p->npipe);
+ nni_pipe_close(p->pipe);
return;
}
msg = nni_aio_get_msg(&p->aio_recv);
- nni_aio_set_msg(&p->aio_putq, msg);
- nni_aio_set_msg(&p->aio_recv, NULL);
+ // Store the pipe ID.
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_msgq_aio_put(s->urq, &p->aio_putq);
-}
-
-static void
-pair0_putq_cb(void *arg)
-{
- pair0_pipe *p = arg;
+ nni_mtx_lock(&s->mtx);
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(&p->aio_putq));
- nni_aio_set_msg(&p->aio_putq, NULL);
- nni_pipe_close(p->npipe);
+ // if anyone is blocking, then the lmq will be empty, and
+ // we should deliver it there.
+ if ((a = nni_list_first(&s->raq)) != NULL) {
+ nni_aio_list_remove(a);
+ nni_aio_set_msg(a, msg);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_sync(a, 0, nni_msg_len(msg));
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+
+ // maybe we have room in the rmq?
+ if (!nni_lmq_full(&s->rmq)) {
+ nni_lmq_putq(&s->rmq, msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ } else {
+ s->rd_ready = true;
+ }
+ nni_pollable_raise(&s->readable);
+ nni_mtx_unlock(&s->mtx);
}
static void
-pair0_getq_cb(void *arg)
+pair0_send_sched(pair0_sock *s)
{
- pair0_pipe *p = arg;
+ pair0_pipe *p;
+ nni_msg * m;
+ nni_aio * a = NULL;
+ size_t l = 0;
- if (nni_aio_result(&p->aio_getq) != 0) {
- nni_pipe_close(p->npipe);
+ nni_mtx_lock(&s->mtx);
+
+ if ((p = s->p) == NULL) {
+ nni_mtx_unlock(&s->mtx);
return;
}
- nni_aio_set_msg(&p->aio_send, nni_aio_get_msg(&p->aio_getq));
- nni_aio_set_msg(&p->aio_getq, NULL);
- nni_pipe_send(p->npipe, &p->aio_send);
+ s->wr_ready = true;
+
+ // if message waiting in buffered queue, then we prefer that.
+ if (nni_lmq_getq(&s->wmq, &m) == 0) {
+ pair0_pipe_send(p, m);
+
+ if ((a = nni_list_first(&s->waq)) != NULL) {
+ nni_aio_list_remove(a);
+ m = nni_aio_get_msg(a);
+ l = nni_msg_len(m);
+ nni_lmq_putq(&s->wmq, m);
+ }
+
+ } else if ((a = nni_list_first(&s->waq)) != NULL) {
+ // Looks like we had the unbuffered case, but
+ // someone was waiting.
+ nni_aio_list_remove(a);
+
+ m = nni_aio_get_msg(a);
+ l = nni_msg_len(m);
+ pair0_pipe_send(p, m);
+ }
+
+ // if we were blocked before, but not now, update.
+ if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) {
+ nni_pollable_raise(&s->writable);
+ }
+
+ nni_mtx_unlock(&s->mtx);
+
+ if (a != NULL) {
+ nni_aio_set_msg(a, NULL);
+ nni_aio_finish_sync(a, 0, l);
+ }
}
static void
-pair0_send_cb(void *arg)
+pair0_pipe_send_cb(void *arg)
{
pair0_pipe *p = arg;
- pair0_sock *s = p->psock;
if (nni_aio_result(&p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(&p->aio_send));
nni_aio_set_msg(&p->aio_send, NULL);
- nni_pipe_close(p->npipe);
+ nni_pipe_close(p->pipe);
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ pair0_send_sched(p->pair);
}
static void
@@ -227,25 +302,259 @@ pair0_sock_open(void *arg)
static void
pair0_sock_close(void *arg)
{
- NNI_ARG_UNUSED(arg);
+ pair0_sock *s = arg;
+ nni_aio * a;
+ nni_msg * m;
+ nni_mtx_lock(&s->mtx);
+ while (((a = nni_list_first(&s->raq)) != NULL) ||
+ ((a = nni_list_first(&s->waq)) != NULL)) {
+ nni_aio_list_remove(a);
+ nni_aio_finish_error(a, NNG_ECLOSED);
+ }
+ while ((nni_lmq_getq(&s->rmq, &m) == 0) ||
+ (nni_lmq_getq(&s->wmq, &m) == 0)) {
+ nni_msg_free(m);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+pair0_pipe_send(pair0_pipe *p, nni_msg *m)
+{
+ pair0_sock *s = p->pair;
+ // assumption: we have unique access to the message at this point.
+ NNI_ASSERT(!nni_msg_shared(m));
+
+ nni_aio_set_msg(&p->aio_send, m);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ s->wr_ready = false;
}
static void
pair0_sock_send(void *arg, nni_aio *aio)
{
pair0_sock *s = arg;
+ nni_msg * m;
+ size_t len;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ m = nni_aio_get_msg(aio);
+ len = nni_msg_len(m);
+
+ nni_mtx_lock(&s->mtx);
+ if (s->wr_ready) {
+ pair0_pipe *p = s->p;
+ if (nni_lmq_full(&s->wmq)) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ pair0_pipe_send(p, m);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ // Can we maybe queue it.
+ if (nni_lmq_putq(&s->wmq, m) == 0) {
+ // Yay, we can. So we're done.
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ if (nni_lmq_full(&s->wmq)) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
- nni_msgq_aio_put(s->uwq, aio);
+ if ((rv = nni_aio_schedule(aio, pair0_cancel, s)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_aio_list_append(&s->waq, aio);
+ nni_mtx_unlock(&s->mtx);
}
static void
pair0_sock_recv(void *arg, nni_aio *aio)
{
pair0_sock *s = arg;
+ pair0_pipe *p;
+ nni_msg * m;
+ int rv;
- nni_msgq_aio_get(s->urq, aio);
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->mtx);
+ p = s->p;
+
+ // Buffered read. If there is a message waiting for us, pick
+ // it up. We might need to post another read request as well.
+ if (nni_lmq_getq(&s->rmq, &m) == 0) {
+ nni_aio_set_msg(aio, m);
+ nni_aio_finish(aio, 0, nni_msg_len(m));
+ if (s->rd_ready) {
+ s->rd_ready = false;
+ m = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_lmq_putq(&s->rmq, m);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ }
+ if (nni_lmq_empty(&s->rmq)) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ // Unbuffered -- but waiting.
+ if (s->rd_ready) {
+ s->rd_ready = false;
+ m = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_aio_set_msg(aio, m);
+ nni_aio_finish(aio, 0, nni_msg_len(m));
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ nni_pollable_clear(&s->readable);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, pair0_cancel, s)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_list_append(&s->raq, aio);
+ }
+ nni_mtx_unlock(&s->mtx);
}
+static int
+pair0_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pair0_sock *s = arg;
+ int val;
+ int rv;
+
+ if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_lmq_resize(&s->wmq, (size_t) val);
+ // Changing the size of the queue can affect our readiness.
+ if (!nni_lmq_full(&s->wmq)) {
+ nni_pollable_raise(&s->writable);
+ } else if (!s->wr_ready) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair0_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair0_sock *s = arg;
+ int val;
+
+ nni_mtx_lock(&s->mtx);
+ val = nni_lmq_cap(&s->wmq);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
+}
+
+static int
+pair0_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pair0_sock *s = arg;
+ int val;
+ int rv;
+
+ if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_lmq_resize(&s->rmq, (size_t) val);
+ // Changing the size of the queue can affect our readiness.
+ if (!nni_lmq_empty(&s->rmq)) {
+ nni_pollable_raise(&s->readable);
+ } else if (!s->rd_ready) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair0_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair0_sock *s = arg;
+ int val;
+
+ nni_mtx_lock(&s->mtx);
+ val = nni_lmq_cap(&s->rmq);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
+}
+
+static int
+pair0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+pair0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static nni_option pair0_sock_options[] = {
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = pair0_sock_get_recv_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = pair0_sock_get_send_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = pair0_get_send_buf_len,
+ .o_set = pair0_set_send_buf_len,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = pair0_get_recv_buf_len,
+ .o_set = pair0_set_recv_buf_len,
+ },
+ // terminate list
+ {
+ .o_name = NULL,
+ },
+};
+
static nni_proto_pipe_ops pair0_pipe_ops = {
.pipe_size = sizeof(pair0_pipe),
.pipe_init = pair0_pipe_init,
@@ -255,13 +564,6 @@ static nni_proto_pipe_ops pair0_pipe_ops = {
.pipe_stop = pair0_pipe_stop,
};
-static nni_option pair0_sock_options[] = {
- // terminate list
- {
- .o_name = NULL,
- }
-};
-
static nni_proto_sock_ops pair0_sock_ops = {
.sock_size = sizeof(pair0_sock),
.sock_init = pair0_sock_init,
@@ -293,13 +595,13 @@ static nni_proto pair0_proto_raw = {
};
int
-nng_pair0_open(nng_socket *sidp)
+nng_pair0_open(nng_socket *sock)
{
- return (nni_proto_open(sidp, &pair0_proto));
+ return (nni_proto_open(sock, &pair0_proto));
}
int
-nng_pair0_open_raw(nng_socket *sidp)
+nng_pair0_open_raw(nng_socket *sock)
{
- return (nni_proto_open(sidp, &pair0_proto_raw));
+ return (nni_proto_open(sock, &pair0_proto_raw));
}
diff --git a/src/sp/protocol/pair0/pair0_test.c b/src/sp/protocol/pair0/pair0_test.c
new file mode 100644
index 00000000..20acc200
--- /dev/null
+++ b/src/sp/protocol/pair0/pair0_test.c
@@ -0,0 +1,439 @@
+//
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <nuts.h>
+
+#define SECOND 1000
+
+#define APPEND_STR(m, s) NUTS_TRUE(nng_msg_append(m, s, strlen(s)) == 0)
+#define CHECK_STR(m, s) \
+ NUTS_TRUE(nng_msg_len(m) == strlen(s)); \
+ NUTS_TRUE(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+
+static void
+test_identity(void)
+{
+ nng_socket s;
+ int p;
+ char * n;
+
+ NUTS_PASS(nng_pair0_open(&s));
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PROTO, &p));
+ NUTS_TRUE(p == NUTS_PROTO(1u, 0u)); // 16
+ NUTS_PASS(nng_socket_get_int(s, NNG_OPT_PEER, &p));
+ NUTS_TRUE(p == NUTS_PROTO(1u, 0u)); // 17
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PROTONAME, &n));
+ NUTS_MATCH(n, "pair");
+ nng_strfree(n);
+ NUTS_PASS(nng_socket_get_string(s, NNG_OPT_PEERNAME, &n));
+ NUTS_MATCH(n, "pair");
+ nng_strfree(n);
+ NUTS_CLOSE(s);
+}
+
+void
+test_cooked(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_pair0_open(&c1));
+ NUTS_PASS(nuts_marry(s1, c1));
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_msg_append(msg, "ALPHA", strlen("ALPHA") + 1));
+ NUTS_PASS(nng_sendmsg(c1, msg, 0));
+ NUTS_PASS(nng_recvmsg(s1, &msg, 0));
+ NUTS_TRUE(nng_msg_len(msg) == strlen("ALPHA") + 1);
+ NUTS_MATCH(nng_msg_body(msg), "ALPHA");
+ nng_msg_free(msg);
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_msg_append(msg, "BETA", strlen("BETA") + 1));
+ NUTS_PASS(nng_sendmsg(s1, msg, 0));
+ NUTS_PASS(nng_recvmsg(c1, &msg, 0));
+ NUTS_TRUE(nng_msg_len(msg) == strlen("BETA") + 1);
+ NUTS_MATCH(nng_msg_body(msg), "BETA");
+
+ nng_msg_free(msg);
+ NUTS_CLOSE(c1);
+ NUTS_CLOSE(s1);
+}
+
+void
+test_faithful(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ nng_socket c2;
+ nng_msg * msg;
+ const char *addr = "inproc://pair0_mono_faithful";
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_pair0_open(&c1));
+ NUTS_PASS(nng_pair0_open(&c2));
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND / 4));
+ NUTS_PASS(nng_setopt_ms(c1, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_ms(c2, NNG_OPT_SENDTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_int(c2, NNG_OPT_SENDBUF, 2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_MARRY(s1, c1);
+ NUTS_PASS(nng_dial(c2, addr, NULL, 0));
+
+ NUTS_SLEEP(100);
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ APPEND_STR(msg, "ONE");
+ NUTS_PASS(nng_sendmsg(c1, msg, 0));
+ NUTS_PASS(nng_recvmsg(s1, &msg, 0));
+ CHECK_STR(msg, "ONE");
+ nng_msg_free(msg);
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ APPEND_STR(msg, "TWO");
+ NUTS_PASS(nng_sendmsg(c2, msg, 0));
+ NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(c1);
+ NUTS_CLOSE(c2);
+}
+
+void
+test_back_pressure(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+ int i;
+ int rv;
+ nng_msg * msg;
+ nng_duration to = 100;
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_pair0_open(&c1));
+ NUTS_PASS(nng_setopt_int(s1, NNG_OPT_RECVBUF, 1));
+ NUTS_PASS(nng_setopt_int(s1, NNG_OPT_SENDBUF, 1));
+ NUTS_PASS(nng_setopt_int(c1, NNG_OPT_RECVBUF, 1));
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to));
+
+ NUTS_MARRY(s1, c1);
+
+ // We choose to allow some buffering. In reality the
+ // buffer size is just 1, and we will fail after 2.
+ for (i = 0, rv = 0; i < 10; i++) {
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ if ((rv = nng_sendmsg(s1, msg, 0)) != 0) {
+ nng_msg_free(msg);
+ break;
+ }
+ }
+ NUTS_FAIL(rv, NNG_ETIMEDOUT);
+ NUTS_TRUE(i < 10);
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(c1);
+}
+
+void
+test_send_no_peer(void)
+{
+ nng_socket s1;
+ nng_msg * msg;
+ nng_duration to = 100;
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to));
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_FAIL(nng_sendmsg(s1, msg, 0), NNG_ETIMEDOUT);
+ nng_msg_free(msg);
+ NUTS_CLOSE(s1);
+}
+
+void
+test_raw_exchange(void)
+{
+ nng_socket s1;
+ nng_socket c1;
+
+ nng_msg *msg;
+
+ NUTS_PASS(nng_pair0_open_raw(&s1));
+ NUTS_PASS(nng_pair0_open_raw(&c1));
+
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_PASS(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, SECOND));
+ NUTS_MARRY(s1, c1);
+
+ nng_pipe p = NNG_PIPE_INITIALIZER;
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ APPEND_STR(msg, "GAMMA");
+ NUTS_PASS(nng_sendmsg(c1, msg, 0));
+ NUTS_PASS(nng_recvmsg(s1, &msg, 0));
+ p = nng_msg_get_pipe(msg);
+ NUTS_TRUE(nng_pipe_id(p) > 0);
+
+ CHECK_STR(msg, "GAMMA");
+ nng_msg_free(msg);
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ APPEND_STR(msg, "EPSILON");
+ NUTS_PASS(nng_sendmsg(s1, msg, 0));
+ NUTS_PASS(nng_recvmsg(c1, &msg, 0));
+ CHECK_STR(msg, "EPSILON");
+ p = nng_msg_get_pipe(msg);
+ NUTS_TRUE(nng_pipe_id(p) > 0);
+
+ nng_msg_free(msg);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(c1);
+}
+
+void
+test_pair0_send_closed_aio(void)
+{
+ nng_socket s1;
+ nng_aio * aio;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_pair0_open(&s1));
+ nng_aio_set_msg(aio, msg);
+ nng_aio_stop(aio);
+ nng_send_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ nng_msg_free(msg);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
+test_pair0_raw(void)
+{
+ nng_socket s1;
+ bool raw;
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_getopt_bool(s1, NNG_OPT_RAW, &raw));
+ NUTS_TRUE(raw == false);
+ NUTS_FAIL(nng_setopt_bool(s1, NNG_OPT_RAW, true), NNG_EREADONLY);
+ NUTS_PASS(nng_close(s1));
+
+ NUTS_PASS(nng_pair0_open_raw(&s1));
+ NUTS_PASS(nng_getopt_bool(s1, NNG_OPT_RAW, &raw));
+ NUTS_TRUE(raw == true);
+ NUTS_FAIL(nng_setopt_bool(s1, NNG_OPT_RAW, false), NNG_EREADONLY);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
+test_pair0_validate_peer(void)
+{
+ nng_socket s1, s2;
+ nng_stat * stats;
+ nng_stat * reject;
+ char * addr;
+
+ NUTS_ADDR(addr, "inproc");
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_pair1_open(&s2));
+
+ NUTS_PASS(nng_listen(s1, addr, NULL, 0));
+ NUTS_PASS(nng_dial(s2, addr, NULL, NNG_FLAG_NONBLOCK));
+
+ NUTS_SLEEP(100);
+ NUTS_PASS(nng_stats_get(&stats));
+
+ NUTS_TRUE(stats != NULL);
+ NUTS_TRUE((reject = nng_stat_find_socket(stats, s1)) != NULL);
+ NUTS_TRUE((reject = nng_stat_find(reject, "reject")) != NULL);
+
+ NUTS_TRUE(nng_stat_type(reject) == NNG_STAT_COUNTER);
+ NUTS_TRUE(nng_stat_value(reject) > 0);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+ nng_stats_free(stats);
+}
+
+static void
+test_pair0_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_pair0_open(&s));
+ NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair0_send_buffer(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+
+ NUTS_PASS(nng_pair0_open(&s));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v));
+ NUTS_TRUE(v == 0);
+ NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_SENDBUF, &b), NNG_EBADTYPE);
+ sz = 1;
+ NUTS_FAIL(nng_getopt(s, NNG_OPT_SENDBUF, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, 100000), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_SENDBUF, false), NNG_EBADTYPE);
+ NUTS_FAIL(nng_setopt(s, NNG_OPT_SENDBUF, &b, 1), NNG_EINVAL);
+ NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 100));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v));
+ NUTS_TRUE(v == 100);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair0_recv_buffer(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+
+ NUTS_PASS(nng_pair0_open(&s));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v));
+ NUTS_TRUE(v == 0);
+ NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_RECVBUF, &b), NNG_EBADTYPE);
+ sz = 1;
+ NUTS_FAIL(nng_getopt(s, NNG_OPT_RECVBUF, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, 100000), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_RECVBUF, false), NNG_EBADTYPE);
+ NUTS_FAIL(nng_setopt(s, NNG_OPT_RECVBUF, &b, 1), NNG_EINVAL);
+ NUTS_PASS(nng_setopt_int(s, NNG_OPT_RECVBUF, 100));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v));
+ NUTS_TRUE(v == 100);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair0_poll_readable(void)
+{
+ int fd;
+ nng_socket s1;
+ nng_socket s2;
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_pair0_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_SEND(s2, "abc");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // and receiving makes it no longer ready
+ NUTS_RECV(s1, "abc");
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // also let's confirm handling when we shrink the buffer size.
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 1));
+ NUTS_SEND(s2, "def");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 0));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ // growing doesn't magically make it readable either
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 10));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+}
+
+static void
+test_pair0_poll_writable(void)
+{
+ int fd;
+ nng_socket s1;
+ nng_socket s2;
+
+ NUTS_PASS(nng_pair0_open(&s1));
+ NUTS_PASS(nng_pair0_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But after connect, we can.
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // We are unbuffered.
+ NUTS_SEND(s1, "abc"); // first one in the receiver
+ NUTS_SEND(s1, "def"); // second one on the sending pipe
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // and receiving makes it ready
+ NUTS_RECV(s2, "abc");
+ NUTS_SLEEP(100); // time for the sender to complete
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // close the peer for now.
+ NUTS_CLOSE(s2);
+ NUTS_SLEEP(100);
+
+ // resize up, makes us writable.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 1));
+ NUTS_TRUE(nuts_poll_fd(fd));
+ // resize down and we aren't anymore.
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 0));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s1);
+}
+
+NUTS_TESTS = {
+ { "pair0 identity", test_identity },
+ { "pair0 cooked", test_cooked },
+ { "pair0 faithful", test_faithful },
+ { "pair0 back pressure", test_back_pressure },
+ { "pair0 send no peer", test_send_no_peer },
+ { "pair0 raw exchange", test_raw_exchange },
+ { "pair0 send closed aio", test_pair0_send_closed_aio },
+ { "pair0 raw", test_pair0_raw },
+ { "pair0 validate peer", test_pair0_validate_peer },
+ { "pair0 no context", test_pair0_no_context },
+ { "pair0 send buffer", test_pair0_send_buffer },
+ { "pair0 recv buffer", test_pair0_recv_buffer },
+ { "pair0 poll readable", test_pair0_poll_readable },
+ { "pair0 poll writable", test_pair0_poll_writable },
+
+ { NULL, NULL },
+};