summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-01-11 23:01:33 -0800
committerGarrett D'Amore <garrett@damore.org>2021-01-18 17:50:35 -0800
commit85dccbf083e3c6115c5e3757b8efb7aa069acab6 (patch)
treeb82ed18a069e3efd199efac6005e6f4a24d98649
parent35533f4b968fc14b80a085e0246aa3074c1dec6f (diff)
downloadnng-85dccbf083e3c6115c5e3757b8efb7aa069acab6.tar.gz
nng-85dccbf083e3c6115c5e3757b8efb7aa069acab6.tar.bz2
nng-85dccbf083e3c6115c5e3757b8efb7aa069acab6.zip
fixes #808 Very slow PAIR performance compared to nanomsg
This is only the pair v1 protocol. Pair v0 and polyamorous mode still have work to do. We probably won't "fix" the performance for poly mode since that's deprecated anyway.
-rw-r--r--src/core/message.c26
-rw-r--r--src/core/message.h9
-rw-r--r--src/core/reconnect_test.c4
-rw-r--r--src/sp/protocol/pair1/pair.c471
-rw-r--r--src/sp/protocol/pair1/pair1_test.c210
5 files changed, 627 insertions, 93 deletions
diff --git a/src/core/message.c b/src/core/message.c
index 79031400..824fc079 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -326,6 +326,12 @@ nni_msg_unique(nni_msg *m)
return (m2);
}
+bool
+nni_msg_shared(nni_msg *m)
+{
+ return (nni_atomic_get(&m->m_refcnt) > 1);
+}
+
// nni_msg_pull_up ensures that the message is unique, and that any header
// is merged with the message. The main purpose of doing this is to break
// up the inproc binding -- protocols send messages to inproc with a
@@ -575,6 +581,24 @@ nni_msg_header_append_u32(nni_msg *m, uint32_t val)
m->m_header_len += sizeof(val);
}
+uint32_t
+nni_msg_header_peek_u32(nni_msg *m)
+{
+ uint32_t val;
+ uint8_t *dst;
+ dst = (void *) m->m_header_buf;
+ NNI_GET32(dst, val);
+ return (val);
+}
+
+void
+nni_msg_header_poke_u32(nni_msg *m, uint32_t val)
+{
+ uint8_t *dst;
+ dst = (void *) m->m_header_buf;
+ NNI_PUT32(dst, val);
+}
+
void
nni_msg_clear(nni_msg *m)
{
diff --git a/src/core/message.h b/src/core/message.h
index 03991166..6400fd55 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -36,6 +36,11 @@ extern void nni_msg_dump(const char *, const nni_msg *);
extern void nni_msg_header_append_u32(nni_msg *, uint32_t);
extern uint32_t nni_msg_header_trim_u32(nni_msg *);
extern uint32_t nni_msg_trim_u32(nni_msg *);
+// Peek and poke variants just access the first uint32 in the
+// header. This is useful when incrementing reference counts, etc.
+// It's faster than trim and append, but logically equivalent.
+extern uint32_t nni_msg_header_peek_u32(nni_msg *);
+extern void nni_msg_header_poke_u32(nni_msg *, uint32_t);
extern void nni_msg_set_pipe(nni_msg *, uint32_t);
extern uint32_t nni_msg_get_pipe(const nni_msg *);
@@ -46,6 +51,8 @@ extern uint32_t nni_msg_get_pipe(const nni_msg *);
// Failure to do so will likely result in corruption.
extern void nni_msg_clone(nni_msg *);
extern nni_msg *nni_msg_unique(nni_msg *);
+extern bool nni_msg_shared(nni_msg *);
+
// nni_msg_pull_up ensures that the message is unique, and that any
// header present is "pulled up" into the message body. If the function
// cannot do this for any reason (out of space in the body), then NULL
diff --git a/src/core/reconnect_test.c b/src/core/reconnect_test.c
index 308a3f78..120f0517 100644
--- a/src/core/reconnect_test.c
+++ b/src/core/reconnect_test.c
@@ -64,6 +64,10 @@ test_reconnect(void)
// Close the listener
NUTS_PASS(nng_listener_close(l));
+ // We need to wait 100 ms, or so, to allow the receiver to
+ // the disconnect.
+ NUTS_SLEEP(100);
+
NUTS_PASS(nng_listen(s1, addr, &l, 0));
NUTS_SEND(s1, "again");
NUTS_RECV(s2, "again");
diff --git a/src/sp/protocol/pair1/pair.c b/src/sp/protocol/pair1/pair.c
index ba497c42..60de92e0 100644
--- a/src/sp/protocol/pair1/pair.c
+++ b/src/sp/protocol/pair1/pair.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -26,19 +26,26 @@ typedef struct pair1_sock pair1_sock;
static void pair1_pipe_send_cb(void *);
static void pair1_pipe_recv_cb(void *);
-static void pair1_pipe_get_cb(void *);
-static void pair1_pipe_put_cb(void *);
static void pair1_pipe_fini(void *);
+static void pair1_send_sched(pair1_sock *);
+static void pair1_pipe_send(pair1_pipe *, nni_msg *);
// pair1_sock is our per-socket protocol private structure.
struct pair1_sock {
- nni_msgq * uwq;
- nni_msgq * urq;
nni_sock * sock;
bool raw;
pair1_pipe * p;
nni_atomic_int ttl;
nni_mtx mtx;
+ nni_lmq wmq;
+ nni_list waq;
+ nni_lmq rmq;
+ nni_list raq;
+ nni_pollable writable;
+ nni_pollable readable;
+ bool rd_ready; // pipe ready for read
+ bool wr_ready; // pipe ready for write
+
#ifdef NNG_ENABLE_STATS
nni_stat_item stat_poly;
nni_stat_item stat_raw;
@@ -60,8 +67,6 @@ struct pair1_pipe {
pair1_sock *pair;
nni_aio aio_send;
nni_aio aio_recv;
- nni_aio aio_get;
- nni_aio aio_put;
};
static void
@@ -69,6 +74,10 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
+ nni_lmq_fini(&s->rmq);
+ nni_lmq_fini(&s->wmq);
+ nni_pollable_fini(&s->writable);
+ nni_pollable_fini(&s->readable);
nni_mtx_fini(&s->mtx);
}
@@ -90,6 +99,16 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw)
// Raw mode uses this.
nni_mtx_init(&s->mtx);
s->sock = sock;
+ s->raw = raw;
+
+ nni_lmq_init(&s->rmq, 0);
+ nni_lmq_init(&s->wmq, 0);
+ nni_aio_list_init(&s->raq);
+ nni_aio_list_init(&s->waq);
+ nni_pollable_init(&s->writable);
+ nni_pollable_init(&s->readable);
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
#ifdef NNG_ENABLE_STATS
static const nni_stat_info poly_info = {
@@ -161,12 +180,6 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw)
nni_stat_set_bool(&s->stat_poly, false);
#endif
- s->raw = raw;
- s->uwq = nni_sock_sendq(sock);
- s->urq = nni_sock_recvq(sock);
- nni_atomic_init(&s->ttl);
- nni_atomic_set(&s->ttl, 8);
-
return (0);
}
@@ -191,12 +204,22 @@ pair1_pipe_stop(void *arg)
nni_mtx_lock(&s->mtx);
if (s->p == p) {
s->p = NULL;
+ if (s->rd_ready) {
+ nni_msg *m = nni_aio_get_msg(&p->aio_recv);
+ nni_msg_free(m);
+ s->rd_ready = false;
+ }
+ if (s->wr_ready) {
+ s->wr_ready = false;
+ nni_pollable_clear(&s->writable);
+ }
+ if (nni_lmq_empty(&s->rmq)) {
+ nni_pollable_clear(&s->readable);
+ }
}
nni_mtx_unlock(&s->mtx);
nni_aio_stop(&p->aio_send);
nni_aio_stop(&p->aio_recv);
- nni_aio_stop(&p->aio_put);
- nni_aio_stop(&p->aio_get);
}
static void
@@ -206,8 +229,6 @@ pair1_pipe_fini(void *arg)
nni_aio_fini(&p->aio_send);
nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_put);
- nni_aio_fini(&p->aio_get);
}
static int
@@ -217,8 +238,6 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair)
nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
- nni_aio_init(&p->aio_get, pair1_pipe_get_cb, p);
- nni_aio_init(&p->aio_put, pair1_pipe_put_cb, p);
p->pipe = pipe;
p->pair = pair;
@@ -226,6 +245,19 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair)
return (0);
}
+static void
+pair1_cancel(nni_aio *aio, void *arg, int rv)
+{
+ pair1_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
static int
pair1_pipe_start(void *arg)
{
@@ -244,11 +276,11 @@ pair1_pipe_start(void *arg)
BUMP_STAT(&s->stat_reject_already);
return (NNG_EBUSY);
}
- s->p = p;
+ s->p = p;
+ s->rd_ready = false;
nni_mtx_unlock(&s->mtx);
- // Schedule a get.
- nni_msgq_aio_get(s->uwq, &p->aio_get);
+ pair1_send_sched(s);
// And the pipe read of course.
nni_pipe_recv(p->pipe, &p->aio_recv);
@@ -263,8 +295,6 @@ pair1_pipe_close(void *arg)
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
- nni_aio_close(&p->aio_put);
- nni_aio_close(&p->aio_get);
}
static void
@@ -276,6 +306,7 @@ pair1_pipe_recv_cb(void *arg)
uint32_t hdr;
nni_pipe * pipe = p->pipe;
size_t len;
+ nni_aio * a;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -283,13 +314,13 @@ pair1_pipe_recv_cb(void *arg)
}
msg = nni_aio_get_msg(&p->aio_recv);
- nni_aio_set_msg(&p->aio_recv, NULL);
// Store the pipe ID.
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+ len = nni_msg_len(msg);
// If the message is missing the hop count header, scrap it.
- if ((nni_msg_len(msg) < sizeof(uint32_t)) ||
+ if ((len < sizeof(uint32_t)) ||
((hdr = nni_msg_trim_u32(msg)) > 0xff)) {
BUMP_STAT(&s->stat_rx_malformed);
nni_msg_free(msg);
@@ -297,93 +328,101 @@ pair1_pipe_recv_cb(void *arg)
return;
}
- len = nni_msg_len(msg);
-
// If we bounced too many times, discard the message, but
// keep getting more.
if ((int) hdr > nni_atomic_get(&s->ttl)) {
BUMP_STAT(&s->stat_ttl_drop);
nni_msg_free(msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_recv(pipe, &p->aio_recv);
return;
}
+ nni_sock_bump_rx(s->sock, len);
+
// Store the hop count in the header.
nni_msg_header_append_u32(msg, hdr);
- // Send the message up.
- nni_aio_set_msg(&p->aio_put, msg);
- nni_sock_bump_rx(s->sock, len);
- nni_msgq_aio_put(s->urq, &p->aio_put);
-}
-
-static void
-pair1_pipe_put_cb(void *arg)
-{
- pair1_pipe *p = arg;
+ nni_mtx_lock(&s->mtx);
- if (nni_aio_result(&p->aio_put) != 0) {
- nni_msg_free(nni_aio_get_msg(&p->aio_put));
- nni_aio_set_msg(&p->aio_put, NULL);
- nni_pipe_close(p->pipe);
+ // if anyone is blocking, then the lmq will be empty, and
+ // we should deliver it there.
+ if ((a = nni_list_first(&s->raq)) != NULL) {
+ nni_aio_list_remove(a);
+ nni_aio_set_msg(a, msg);
+ nni_pipe_recv(pipe, &p->aio_recv);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_sync(a, 0, len);
return;
}
- nni_pipe_recv(p->pipe, &p->aio_recv);
+
+ // maybe we have room in the rmq?
+ if (!nni_lmq_full(&s->rmq)) {
+ nni_lmq_putq(&s->rmq, msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(pipe, &p->aio_recv);
+ } else {
+ s->rd_ready = true;
+ }
+ nni_pollable_raise(&s->readable);
+ nni_mtx_unlock(&s->mtx);
}
static void
-pair1_pipe_get_cb(void *arg)
+pair1_send_sched(pair1_sock *s)
{
- pair1_pipe *p = arg;
- pair1_sock *s = p->pair;
- nni_msg * msg;
- uint32_t hops;
+ pair1_pipe *p;
+ nni_msg * m;
+ nni_aio * a = NULL;
+ size_t l;
- if (nni_aio_result(&p->aio_get) != 0) {
- nni_pipe_close(p->pipe);
+ nni_mtx_lock(&s->mtx);
+
+ if ((p = s->p) == NULL) {
+ nni_mtx_unlock(&s->mtx);
return;
}
- msg = nni_aio_get_msg(&p->aio_get);
- nni_aio_set_msg(&p->aio_get, NULL);
+ s->wr_ready = true;
- // Raw mode messages have the header already formed, with a hop count.
- // Cooked mode messages have no header so we have to add one.
- if (s->raw) {
- if ((nni_msg_header_len(msg) != sizeof(uint32_t)) ||
- ((hops = nni_msg_header_trim_u32(msg)) > 254)) {
- BUMP_STAT(&s->stat_tx_malformed);
- nni_msg_free(msg);
- nni_msgq_aio_get(s->uwq, &p->aio_get);
- return;
+ // if message waiting in buffered queue, then we prefer that.
+ if (nni_lmq_getq(&s->wmq, &m) == 0) {
+ pair1_pipe_send(p, m);
+
+ if ((a = nni_list_first(&s->waq)) != NULL) {
+ nni_aio_list_remove(a);
+ m = nni_aio_get_msg(a);
+ l = nni_msg_len(m);
+ nni_lmq_putq(&s->wmq, m);
}
-#if NNG_TEST_LIB
- } else if (s->inject_header) {
- nni_aio_set_msg(&p->aio_send, msg);
- nni_pipe_send(p->pipe, &p->aio_send);
- return;
-#endif
- } else {
- // Strip off any previously existing header, such as when
- // replying to messages.
- nni_msg_header_clear(msg);
- hops = 0;
+
+ } else if ((a = nni_list_first(&s->waq)) != NULL) {
+ // Looks like we had the unbuffered case, but
+ // someone was waiting.
+ nni_aio_list_remove(a);
+
+ m = nni_aio_get_msg(a);
+ l = nni_msg_len(m);
+ pair1_pipe_send(p, m);
}
- hops++;
+ // if we were blocked before, but not now, update.
+ if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) {
+ nni_pollable_raise(&s->writable);
+ }
- // Insert the hops header.
- nni_msg_header_append_u32(msg, hops);
+ nni_mtx_unlock(&s->mtx);
- nni_aio_set_msg(&p->aio_send, msg);
- nni_pipe_send(p->pipe, &p->aio_send);
+ if (a != NULL) {
+ nni_aio_set_msg(a, NULL);
+ nni_aio_finish_sync(a, 0, l);
+ }
}
static void
pair1_pipe_send_cb(void *arg)
{
pair1_pipe *p = arg;
- pair1_sock *s = p->pair;
if (nni_aio_result(&p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(&p->aio_send));
@@ -392,7 +431,7 @@ pair1_pipe_send_cb(void *arg)
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_get);
+ pair1_send_sched(p->pair);
}
static void
@@ -404,7 +443,20 @@ pair1_sock_open(void *arg)
static void
pair1_sock_close(void *arg)
{
- NNI_ARG_UNUSED(arg);
+ pair1_sock *s = arg;
+ nni_aio * a;
+ nni_msg * m;
+ nni_mtx_lock(&s->mtx);
+ while (((a = nni_list_first(&s->raq)) != NULL) ||
+ ((a = nni_list_first(&s->waq)) != NULL)) {
+ nni_aio_list_remove(a);
+ nni_aio_finish_error(a, NNG_ECLOSED);
+ }
+ while ((nni_lmq_getq(&s->rmq, &m) == 0) ||
+ (nni_lmq_getq(&s->wmq, &m) == 0)) {
+ nni_msg_free(m);
+ }
+ nni_mtx_unlock(&s->mtx);
}
static int
@@ -442,20 +494,255 @@ pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t)
#endif
static void
+pair1_pipe_send(pair1_pipe *p, nni_msg *m)
+{
+ pair1_sock *s = p->pair;
+ // assumption: we have unique access to the message at this point.
+ NNI_ASSERT(!nni_msg_shared(m));
+
+#if NNG_TEST_LIB
+ if (s->inject_header) {
+ goto inject;
+ }
+#endif
+ NNI_ASSERT(nni_msg_header_len(m) == sizeof(uint32_t));
+ nni_msg_header_poke_u32(m, nni_msg_header_peek_u32(m) + 1);
+
+#if NNG_TEST_LIB
+inject:
+#endif
+
+ nni_aio_set_msg(&p->aio_send, m);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ s->wr_ready = false;
+}
+
+static void
pair1_sock_send(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
+ nni_msg * m;
+ size_t len;
+ int rv;
+
+ m = nni_aio_get_msg(aio);
+ len = nni_msg_len(m);
+ nni_sock_bump_tx(s->sock, len);
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+#if NNG_TEST_LIB
+ if (s->inject_header) {
+ goto inject;
+ }
+#endif
- nni_sock_bump_tx(s->sock, nni_msg_len(nni_aio_get_msg(aio)));
- nni_msgq_aio_put(s->uwq, aio);
+ // Raw mode messages have the header already formed, with a hop count.
+ // Cooked mode messages have no header so we have to add one.
+ if (s->raw) {
+ if ((nni_msg_header_len(m) != sizeof(uint32_t)) ||
+ (nni_msg_header_peek_u32(m) >= 0xff)) {
+ BUMP_STAT(&s->stat_tx_malformed);
+ nni_aio_finish_error(aio, NNG_EPROTO);
+ return;
+ }
+
+ } else {
+ // Strip off any previously existing header, such as when
+ // replying to messages.
+ nni_msg_header_clear(m);
+ nni_msg_header_append_u32(m, 0);
+ }
+
+#if NNG_TEST_LIB
+inject:
+#endif
+
+ nni_mtx_lock(&s->mtx);
+ if (s->wr_ready) {
+ pair1_pipe *p = s->p;
+ if (nni_lmq_full(&s->wmq)) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ pair1_pipe_send(p, m);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ // Can we maybe queue it.
+ if (nni_lmq_putq(&s->wmq, m) == 0) {
+ // Yay, we can. So we're done.
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ if (nni_lmq_full(&s->wmq)) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_aio_list_append(&s->waq, aio);
+ nni_mtx_unlock(&s->mtx);
}
static void
pair1_sock_recv(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
+ pair1_pipe *p;
+ nni_msg * m;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->mtx);
+ p = s->p;
+
+ // Buffered read. If there is a message waiting for us, pick
+ // it up. We might need to post another read request as well.
+ if (nni_lmq_getq(&s->rmq, &m) == 0) {
+ nni_aio_set_msg(aio, m);
+ nni_aio_finish(aio, 0, nni_msg_len(m));
+ if (s->rd_ready) {
+ s->rd_ready = false;
+ m = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_lmq_putq(&s->rmq, m);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ }
+ if (nni_lmq_empty(&s->rmq)) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ // Unbuffered -- but waiting.
+ if (s->rd_ready) {
+ s->rd_ready = false;
+ m = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_aio_set_msg(aio, m);
+ nni_aio_finish(aio, 0, nni_msg_len(m));
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ nni_pollable_clear(&s->readable);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_list_append(&s->raq, aio);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static int
+pair1_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+ int rv;
+
+ if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_lmq_resize(&s->wmq, (size_t) val);
+ // Changing the size of the queue can affect our readiness.
+ if (!nni_lmq_full(&s->wmq)) {
+ nni_pollable_raise(&s->writable);
+ } else if (!s->wr_ready) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair1_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+
+ nni_mtx_lock(&s->mtx);
+ val = nni_lmq_cap(&s->wmq);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
+}
+
+static int
+pair1_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+ int rv;
+
+ if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_lmq_resize(&s->rmq, (size_t) val);
+ // Changing the size of the queue can affect our readiness.
+ if (!nni_lmq_empty(&s->rmq)) {
+ nni_pollable_raise(&s->readable);
+ } else if (!s->rd_ready) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair1_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+
+ nni_mtx_lock(&s->mtx);
+ val = nni_lmq_cap(&s->rmq);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
+}
+
+static int
+pair1_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+pair1_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int rv;
+ int fd;
- nni_msgq_aio_get(s->urq, aio);
+ if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
}
static nni_proto_pipe_ops pair1_pipe_ops = {
@@ -473,6 +760,24 @@ static nni_option pair1_sock_options[] = {
.o_get = pair1_sock_get_max_ttl,
.o_set = pair1_sock_set_max_ttl,
},
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = pair1_sock_get_recv_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = pair1_sock_get_send_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = pair1_get_send_buf_len,
+ .o_set = pair1_set_send_buf_len,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = pair1_get_recv_buf_len,
+ .o_set = pair1_set_recv_buf_len,
+ },
#ifdef NNG_TEST_LIB
{
// Test only option to pass header unmolested. This allows
diff --git a/src/sp/protocol/pair1/pair1_test.c b/src/sp/protocol/pair1/pair1_test.c
index 881c4ac8..3185a3a4 100644
--- a/src/sp/protocol/pair1/pair1_test.c
+++ b/src/sp/protocol/pair1/pair1_test.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -144,6 +144,22 @@ test_mono_back_pressure(void)
}
void
+test_send_no_peer(void)
+{
+ nng_socket s1;
+ nng_msg * msg;
+ nng_duration to = 100;
+
+ NUTS_PASS(nng_pair1_open(&s1));
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to));
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_FAIL(nng_sendmsg(s1, msg, 0), NNG_ETIMEDOUT);
+ nng_msg_free(msg);
+ NUTS_CLOSE(s1);
+}
+
+void
test_mono_raw_exchange(void)
{
nng_socket s1;
@@ -210,8 +226,8 @@ test_mono_raw_header(void)
// Missing bits in the header
NUTS_PASS(nng_msg_alloc(&msg, 0));
- NUTS_PASS(nng_sendmsg(c1, msg, 0));
- NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+ NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO);
+ nng_msg_free(msg);
// Valid header works
NUTS_PASS(nng_msg_alloc(&msg, 0));
@@ -226,14 +242,14 @@ test_mono_raw_header(void)
// Header with reserved bits set dropped
NUTS_PASS(nng_msg_alloc(&msg, 0));
NUTS_PASS(nng_msg_header_append_u32(msg, 0xDEAD0000));
- NUTS_PASS(nng_sendmsg(c1, msg, 0));
- NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+ NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO);
+ nng_msg_free(msg);
// Header with no chance to add another hop gets dropped
NUTS_PASS(nng_msg_alloc(&msg, 0));
NUTS_PASS(nng_msg_header_append_u32(msg, 0xff));
- NUTS_PASS(nng_sendmsg(c1, msg, 0));
- NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+ NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO);
+ nng_msg_free(msg);
// With the same bits clear it works
NUTS_PASS(nng_msg_alloc(&msg, 0));
@@ -250,6 +266,26 @@ test_mono_raw_header(void)
}
void
+test_pair1_send_closed_aio(void)
+{
+ nng_socket s1;
+ nng_aio * aio;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_pair1_open(&s1));
+ nng_aio_set_msg(aio, msg);
+ nng_aio_stop(aio);
+ nng_send_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ nng_msg_free(msg);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
test_pair1_raw(void)
{
nng_socket s1;
@@ -408,7 +444,7 @@ test_pair1_recv_garbage(void)
// ridiculous hop count
NUTS_PASS(nng_msg_alloc(&m, 0));
- NUTS_PASS(nng_msg_append_u32(m, 0x1000));
+ NUTS_PASS(nng_msg_header_append_u32(m, 0x1000));
NUTS_PASS(nng_sendmsg(c, m, 0));
NUTS_FAIL(nng_recvmsg(s, &m, 0), NNG_ETIMEDOUT);
@@ -416,18 +452,176 @@ test_pair1_recv_garbage(void)
NUTS_CLOSE(s);
}
+static void
+test_pair1_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_pair1_open(&s));
+ NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair1_send_buffer(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+
+ NUTS_PASS(nng_pair1_open(&s));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v));
+ NUTS_TRUE(v == 0);
+ NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_SENDBUF, &b), NNG_EBADTYPE);
+ sz = 1;
+ NUTS_FAIL(nng_getopt(s, NNG_OPT_SENDBUF, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, 100000), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_SENDBUF, false), NNG_EBADTYPE);
+ NUTS_FAIL(nng_setopt(s, NNG_OPT_SENDBUF, &b, 1), NNG_EINVAL);
+ NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 100));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v));
+ NUTS_TRUE(v == 100);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair1_recv_buffer(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+
+ NUTS_PASS(nng_pair1_open(&s));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v));
+ NUTS_TRUE(v == 0);
+ NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_RECVBUF, &b), NNG_EBADTYPE);
+ sz = 1;
+ NUTS_FAIL(nng_getopt(s, NNG_OPT_RECVBUF, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, 100000), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_RECVBUF, false), NNG_EBADTYPE);
+ NUTS_FAIL(nng_setopt(s, NNG_OPT_RECVBUF, &b, 1), NNG_EINVAL);
+ NUTS_PASS(nng_setopt_int(s, NNG_OPT_RECVBUF, 100));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v));
+ NUTS_TRUE(v == 100);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair1_poll_readable(void)
+{
+ int fd;
+ nng_socket s1;
+ nng_socket s2;
+
+ NUTS_PASS(nng_pair1_open(&s1));
+ NUTS_PASS(nng_pair1_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_SEND(s2, "abc");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // and receiving makes it no longer ready
+ NUTS_RECV(s1, "abc");
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // also let's confirm handling when we shrink the buffer size.
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 1));
+ NUTS_SEND(s2, "def");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 0));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ // growing doesn't magically make it readable either
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 10));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+}
+
+static void
+test_pair1_poll_writable(void)
+{
+ int fd;
+ nng_socket s1;
+ nng_socket s2;
+
+ NUTS_PASS(nng_pair1_open(&s1));
+ NUTS_PASS(nng_pair1_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But after connect, we can.
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // We are unbuffered.
+ NUTS_SEND(s1, "abc"); // first one in the receiver
+ NUTS_SEND(s1, "def"); // second one on the sending pipe
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // and receiving makes it ready
+ NUTS_RECV(s2, "abc");
+ NUTS_SLEEP(100); // time for the sender to complete
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // close the peer for now.
+ NUTS_CLOSE(s2);
+ NUTS_SLEEP(100);
+
+ // resize up, makes us writable.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 1));
+ NUTS_TRUE(nuts_poll_fd(fd));
+ // resize down and we aren't anymore.
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 0));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s1);
+}
+
NUTS_TESTS = {
{ "pair1 mono identity", test_mono_identity },
{ "pair1 mono cooked", test_mono_cooked },
{ "pair1 mono faithful", test_mono_faithful },
{ "pair1 mono back pressure", test_mono_back_pressure },
+ { "pair1 send no peer", test_send_no_peer },
{ "pair1 mono raw exchange", test_mono_raw_exchange },
{ "pair1 mono raw header", test_mono_raw_header },
+ { "pair1 send closed aio", test_pair1_send_closed_aio },
{ "pair1 raw", test_pair1_raw },
{ "pair1 ttl", test_pair1_ttl },
{ "pair1 validate peer", test_pair1_validate_peer },
{ "pair1 recv no header", test_pair1_recv_no_header },
{ "pair1 recv garbage", test_pair1_recv_garbage },
+ { "pair1 no context", test_pair1_no_context },
+ { "pair1 send buffer", test_pair1_send_buffer },
+ { "pair1 recv buffer", test_pair1_recv_buffer },
+ { "pair1 poll readable", test_pair1_poll_readable },
+ { "pair1 poll writable", test_pair1_poll_writable },
{ NULL, NULL },
};