aboutsummaryrefslogtreecommitdiff
path: root/src/sp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp')
-rw-r--r--src/sp/protocol/pair1/pair.c471
-rw-r--r--src/sp/protocol/pair1/pair1_test.c210
2 files changed, 590 insertions, 91 deletions
diff --git a/src/sp/protocol/pair1/pair.c b/src/sp/protocol/pair1/pair.c
index ba497c42..60de92e0 100644
--- a/src/sp/protocol/pair1/pair.c
+++ b/src/sp/protocol/pair1/pair.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -26,19 +26,26 @@ typedef struct pair1_sock pair1_sock;
static void pair1_pipe_send_cb(void *);
static void pair1_pipe_recv_cb(void *);
-static void pair1_pipe_get_cb(void *);
-static void pair1_pipe_put_cb(void *);
static void pair1_pipe_fini(void *);
+static void pair1_send_sched(pair1_sock *);
+static void pair1_pipe_send(pair1_pipe *, nni_msg *);
// pair1_sock is our per-socket protocol private structure.
struct pair1_sock {
- nni_msgq * uwq;
- nni_msgq * urq;
nni_sock * sock;
bool raw;
pair1_pipe * p;
nni_atomic_int ttl;
nni_mtx mtx;
+ nni_lmq wmq;
+ nni_list waq;
+ nni_lmq rmq;
+ nni_list raq;
+ nni_pollable writable;
+ nni_pollable readable;
+ bool rd_ready; // pipe ready for read
+ bool wr_ready; // pipe ready for write
+
#ifdef NNG_ENABLE_STATS
nni_stat_item stat_poly;
nni_stat_item stat_raw;
@@ -60,8 +67,6 @@ struct pair1_pipe {
pair1_sock *pair;
nni_aio aio_send;
nni_aio aio_recv;
- nni_aio aio_get;
- nni_aio aio_put;
};
static void
@@ -69,6 +74,10 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
+ nni_lmq_fini(&s->rmq);
+ nni_lmq_fini(&s->wmq);
+ nni_pollable_fini(&s->writable);
+ nni_pollable_fini(&s->readable);
nni_mtx_fini(&s->mtx);
}
@@ -90,6 +99,16 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw)
// Raw mode uses this.
nni_mtx_init(&s->mtx);
s->sock = sock;
+ s->raw = raw;
+
+ nni_lmq_init(&s->rmq, 0);
+ nni_lmq_init(&s->wmq, 0);
+ nni_aio_list_init(&s->raq);
+ nni_aio_list_init(&s->waq);
+ nni_pollable_init(&s->writable);
+ nni_pollable_init(&s->readable);
+ nni_atomic_init(&s->ttl);
+ nni_atomic_set(&s->ttl, 8);
#ifdef NNG_ENABLE_STATS
static const nni_stat_info poly_info = {
@@ -161,12 +180,6 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw)
nni_stat_set_bool(&s->stat_poly, false);
#endif
- s->raw = raw;
- s->uwq = nni_sock_sendq(sock);
- s->urq = nni_sock_recvq(sock);
- nni_atomic_init(&s->ttl);
- nni_atomic_set(&s->ttl, 8);
-
return (0);
}
@@ -191,12 +204,22 @@ pair1_pipe_stop(void *arg)
nni_mtx_lock(&s->mtx);
if (s->p == p) {
s->p = NULL;
+ if (s->rd_ready) {
+ nni_msg *m = nni_aio_get_msg(&p->aio_recv);
+ nni_msg_free(m);
+ s->rd_ready = false;
+ }
+ if (s->wr_ready) {
+ s->wr_ready = false;
+ nni_pollable_clear(&s->writable);
+ }
+ if (nni_lmq_empty(&s->rmq)) {
+ nni_pollable_clear(&s->readable);
+ }
}
nni_mtx_unlock(&s->mtx);
nni_aio_stop(&p->aio_send);
nni_aio_stop(&p->aio_recv);
- nni_aio_stop(&p->aio_put);
- nni_aio_stop(&p->aio_get);
}
static void
@@ -206,8 +229,6 @@ pair1_pipe_fini(void *arg)
nni_aio_fini(&p->aio_send);
nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_put);
- nni_aio_fini(&p->aio_get);
}
static int
@@ -217,8 +238,6 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair)
nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
- nni_aio_init(&p->aio_get, pair1_pipe_get_cb, p);
- nni_aio_init(&p->aio_put, pair1_pipe_put_cb, p);
p->pipe = pipe;
p->pair = pair;
@@ -226,6 +245,19 @@ pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair)
return (0);
}
+static void
+pair1_cancel(nni_aio *aio, void *arg, int rv)
+{
+ pair1_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (nni_aio_list_active(aio)) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, rv);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
static int
pair1_pipe_start(void *arg)
{
@@ -244,11 +276,11 @@ pair1_pipe_start(void *arg)
BUMP_STAT(&s->stat_reject_already);
return (NNG_EBUSY);
}
- s->p = p;
+ s->p = p;
+ s->rd_ready = false;
nni_mtx_unlock(&s->mtx);
- // Schedule a get.
- nni_msgq_aio_get(s->uwq, &p->aio_get);
+ pair1_send_sched(s);
// And the pipe read of course.
nni_pipe_recv(p->pipe, &p->aio_recv);
@@ -263,8 +295,6 @@ pair1_pipe_close(void *arg)
nni_aio_close(&p->aio_send);
nni_aio_close(&p->aio_recv);
- nni_aio_close(&p->aio_put);
- nni_aio_close(&p->aio_get);
}
static void
@@ -276,6 +306,7 @@ pair1_pipe_recv_cb(void *arg)
uint32_t hdr;
nni_pipe * pipe = p->pipe;
size_t len;
+ nni_aio * a;
if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
@@ -283,13 +314,13 @@ pair1_pipe_recv_cb(void *arg)
}
msg = nni_aio_get_msg(&p->aio_recv);
- nni_aio_set_msg(&p->aio_recv, NULL);
// Store the pipe ID.
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+ len = nni_msg_len(msg);
// If the message is missing the hop count header, scrap it.
- if ((nni_msg_len(msg) < sizeof(uint32_t)) ||
+ if ((len < sizeof(uint32_t)) ||
((hdr = nni_msg_trim_u32(msg)) > 0xff)) {
BUMP_STAT(&s->stat_rx_malformed);
nni_msg_free(msg);
@@ -297,93 +328,101 @@ pair1_pipe_recv_cb(void *arg)
return;
}
- len = nni_msg_len(msg);
-
// If we bounced too many times, discard the message, but
// keep getting more.
if ((int) hdr > nni_atomic_get(&s->ttl)) {
BUMP_STAT(&s->stat_ttl_drop);
nni_msg_free(msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
nni_pipe_recv(pipe, &p->aio_recv);
return;
}
+ nni_sock_bump_rx(s->sock, len);
+
// Store the hop count in the header.
nni_msg_header_append_u32(msg, hdr);
- // Send the message up.
- nni_aio_set_msg(&p->aio_put, msg);
- nni_sock_bump_rx(s->sock, len);
- nni_msgq_aio_put(s->urq, &p->aio_put);
-}
-
-static void
-pair1_pipe_put_cb(void *arg)
-{
- pair1_pipe *p = arg;
+ nni_mtx_lock(&s->mtx);
- if (nni_aio_result(&p->aio_put) != 0) {
- nni_msg_free(nni_aio_get_msg(&p->aio_put));
- nni_aio_set_msg(&p->aio_put, NULL);
- nni_pipe_close(p->pipe);
+ // if anyone is blocking, then the lmq will be empty, and
+ // we should deliver it there.
+ if ((a = nni_list_first(&s->raq)) != NULL) {
+ nni_aio_list_remove(a);
+ nni_aio_set_msg(a, msg);
+ nni_pipe_recv(pipe, &p->aio_recv);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_sync(a, 0, len);
return;
}
- nni_pipe_recv(p->pipe, &p->aio_recv);
+
+ // maybe we have room in the rmq?
+ if (!nni_lmq_full(&s->rmq)) {
+ nni_lmq_putq(&s->rmq, msg);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_pipe_recv(pipe, &p->aio_recv);
+ } else {
+ s->rd_ready = true;
+ }
+ nni_pollable_raise(&s->readable);
+ nni_mtx_unlock(&s->mtx);
}
static void
-pair1_pipe_get_cb(void *arg)
+pair1_send_sched(pair1_sock *s)
{
- pair1_pipe *p = arg;
- pair1_sock *s = p->pair;
- nni_msg * msg;
- uint32_t hops;
+ pair1_pipe *p;
+ nni_msg * m;
+ nni_aio * a = NULL;
+ size_t l;
- if (nni_aio_result(&p->aio_get) != 0) {
- nni_pipe_close(p->pipe);
+ nni_mtx_lock(&s->mtx);
+
+ if ((p = s->p) == NULL) {
+ nni_mtx_unlock(&s->mtx);
return;
}
- msg = nni_aio_get_msg(&p->aio_get);
- nni_aio_set_msg(&p->aio_get, NULL);
+ s->wr_ready = true;
- // Raw mode messages have the header already formed, with a hop count.
- // Cooked mode messages have no header so we have to add one.
- if (s->raw) {
- if ((nni_msg_header_len(msg) != sizeof(uint32_t)) ||
- ((hops = nni_msg_header_trim_u32(msg)) > 254)) {
- BUMP_STAT(&s->stat_tx_malformed);
- nni_msg_free(msg);
- nni_msgq_aio_get(s->uwq, &p->aio_get);
- return;
+ // if message waiting in buffered queue, then we prefer that.
+ if (nni_lmq_getq(&s->wmq, &m) == 0) {
+ pair1_pipe_send(p, m);
+
+ if ((a = nni_list_first(&s->waq)) != NULL) {
+ nni_aio_list_remove(a);
+ m = nni_aio_get_msg(a);
+ l = nni_msg_len(m);
+ nni_lmq_putq(&s->wmq, m);
}
-#if NNG_TEST_LIB
- } else if (s->inject_header) {
- nni_aio_set_msg(&p->aio_send, msg);
- nni_pipe_send(p->pipe, &p->aio_send);
- return;
-#endif
- } else {
- // Strip off any previously existing header, such as when
- // replying to messages.
- nni_msg_header_clear(msg);
- hops = 0;
+
+ } else if ((a = nni_list_first(&s->waq)) != NULL) {
+ // Looks like we had the unbuffered case, but
+ // someone was waiting.
+ nni_aio_list_remove(a);
+
+ m = nni_aio_get_msg(a);
+ l = nni_msg_len(m);
+ pair1_pipe_send(p, m);
}
- hops++;
+ // if we were blocked before, but not now, update.
+ if ((!nni_lmq_full(&s->wmq)) || s->wr_ready) {
+ nni_pollable_raise(&s->writable);
+ }
- // Insert the hops header.
- nni_msg_header_append_u32(msg, hops);
+ nni_mtx_unlock(&s->mtx);
- nni_aio_set_msg(&p->aio_send, msg);
- nni_pipe_send(p->pipe, &p->aio_send);
+ if (a != NULL) {
+ nni_aio_set_msg(a, NULL);
+ nni_aio_finish_sync(a, 0, l);
+ }
}
static void
pair1_pipe_send_cb(void *arg)
{
pair1_pipe *p = arg;
- pair1_sock *s = p->pair;
if (nni_aio_result(&p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(&p->aio_send));
@@ -392,7 +431,7 @@ pair1_pipe_send_cb(void *arg)
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_get);
+ pair1_send_sched(p->pair);
}
static void
@@ -404,7 +443,20 @@ pair1_sock_open(void *arg)
static void
pair1_sock_close(void *arg)
{
- NNI_ARG_UNUSED(arg);
+ pair1_sock *s = arg;
+ nni_aio * a;
+ nni_msg * m;
+ nni_mtx_lock(&s->mtx);
+ while (((a = nni_list_first(&s->raq)) != NULL) ||
+ ((a = nni_list_first(&s->waq)) != NULL)) {
+ nni_aio_list_remove(a);
+ nni_aio_finish_error(a, NNG_ECLOSED);
+ }
+ while ((nni_lmq_getq(&s->rmq, &m) == 0) ||
+ (nni_lmq_getq(&s->wmq, &m) == 0)) {
+ nni_msg_free(m);
+ }
+ nni_mtx_unlock(&s->mtx);
}
static int
@@ -442,20 +494,255 @@ pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t)
#endif
static void
+pair1_pipe_send(pair1_pipe *p, nni_msg *m)
+{
+ pair1_sock *s = p->pair;
+ // assumption: we have unique access to the message at this point.
+ NNI_ASSERT(!nni_msg_shared(m));
+
+#if NNG_TEST_LIB
+ if (s->inject_header) {
+ goto inject;
+ }
+#endif
+ NNI_ASSERT(nni_msg_header_len(m) == sizeof(uint32_t));
+ nni_msg_header_poke_u32(m, nni_msg_header_peek_u32(m) + 1);
+
+#if NNG_TEST_LIB
+inject:
+#endif
+
+ nni_aio_set_msg(&p->aio_send, m);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ s->wr_ready = false;
+}
+
+static void
pair1_sock_send(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
+ nni_msg * m;
+ size_t len;
+ int rv;
+
+ m = nni_aio_get_msg(aio);
+ len = nni_msg_len(m);
+ nni_sock_bump_tx(s->sock, len);
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+#if NNG_TEST_LIB
+ if (s->inject_header) {
+ goto inject;
+ }
+#endif
- nni_sock_bump_tx(s->sock, nni_msg_len(nni_aio_get_msg(aio)));
- nni_msgq_aio_put(s->uwq, aio);
+ // Raw mode messages have the header already formed, with a hop count.
+ // Cooked mode messages have no header so we have to add one.
+ if (s->raw) {
+ if ((nni_msg_header_len(m) != sizeof(uint32_t)) ||
+ (nni_msg_header_peek_u32(m) >= 0xff)) {
+ BUMP_STAT(&s->stat_tx_malformed);
+ nni_aio_finish_error(aio, NNG_EPROTO);
+ return;
+ }
+
+ } else {
+ // Strip off any previously existing header, such as when
+ // replying to messages.
+ nni_msg_header_clear(m);
+ nni_msg_header_append_u32(m, 0);
+ }
+
+#if NNG_TEST_LIB
+inject:
+#endif
+
+ nni_mtx_lock(&s->mtx);
+ if (s->wr_ready) {
+ pair1_pipe *p = s->p;
+ if (nni_lmq_full(&s->wmq)) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ pair1_pipe_send(p, m);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ // Can we maybe queue it.
+ if (nni_lmq_putq(&s->wmq, m) == 0) {
+ // Yay, we can. So we're done.
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_finish(aio, 0, len);
+ if (nni_lmq_full(&s->wmq)) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ nni_aio_list_append(&s->waq, aio);
+ nni_mtx_unlock(&s->mtx);
}
static void
pair1_sock_recv(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
+ pair1_pipe *p;
+ nni_msg * m;
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&s->mtx);
+ p = s->p;
+
+ // Buffered read. If there is a message waiting for us, pick
+ // it up. We might need to post another read request as well.
+ if (nni_lmq_getq(&s->rmq, &m) == 0) {
+ nni_aio_set_msg(aio, m);
+ nni_aio_finish(aio, 0, nni_msg_len(m));
+ if (s->rd_ready) {
+ s->rd_ready = false;
+ m = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_lmq_putq(&s->rmq, m);
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ }
+ if (nni_lmq_empty(&s->rmq)) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ // Unbuffered -- but waiting.
+ if (s->rd_ready) {
+ s->rd_ready = false;
+ m = nni_aio_get_msg(&p->aio_recv);
+ nni_aio_set_msg(&p->aio_recv, NULL);
+ nni_aio_set_msg(aio, m);
+ nni_aio_finish(aio, 0, nni_msg_len(m));
+ nni_pipe_recv(p->pipe, &p->aio_recv);
+ nni_pollable_clear(&s->readable);
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+
+ if ((rv = nni_aio_schedule(aio, pair1_cancel, s)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ } else {
+ nni_aio_list_append(&s->raq, aio);
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static int
+pair1_set_send_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+ int rv;
+
+ if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_lmq_resize(&s->wmq, (size_t) val);
+ // Changing the size of the queue can affect our readiness.
+ if (!nni_lmq_full(&s->wmq)) {
+ nni_pollable_raise(&s->writable);
+ } else if (!s->wr_ready) {
+ nni_pollable_clear(&s->writable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair1_get_send_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+
+ nni_mtx_lock(&s->mtx);
+ val = nni_lmq_cap(&s->wmq);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
+}
+
+static int
+pair1_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+ int rv;
+
+ if ((rv = nni_copyin_int(&val, buf, sz, 0, 8192, t)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_lmq_resize(&s->rmq, (size_t) val);
+ // Changing the size of the queue can affect our readiness.
+ if (!nni_lmq_empty(&s->rmq)) {
+ nni_pollable_raise(&s->readable);
+ } else if (!s->rd_ready) {
+ nni_pollable_clear(&s->readable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair1_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int val;
+
+ nni_mtx_lock(&s->mtx);
+ val = nni_lmq_cap(&s->rmq);
+ nni_mtx_unlock(&s->mtx);
+
+ return (nni_copyout_int(val, buf, szp, t));
+}
+
+static int
+pair1_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
+}
+
+static int
+pair1_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+{
+ pair1_sock *s = arg;
+ int rv;
+ int fd;
- nni_msgq_aio_get(s->urq, aio);
+ if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
+ return (rv);
+ }
+ return (nni_copyout_int(fd, buf, szp, t));
}
static nni_proto_pipe_ops pair1_pipe_ops = {
@@ -473,6 +760,24 @@ static nni_option pair1_sock_options[] = {
.o_get = pair1_sock_get_max_ttl,
.o_set = pair1_sock_set_max_ttl,
},
+ {
+ .o_name = NNG_OPT_RECVFD,
+ .o_get = pair1_sock_get_recv_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDFD,
+ .o_get = pair1_sock_get_send_fd,
+ },
+ {
+ .o_name = NNG_OPT_SENDBUF,
+ .o_get = pair1_get_send_buf_len,
+ .o_set = pair1_set_send_buf_len,
+ },
+ {
+ .o_name = NNG_OPT_RECVBUF,
+ .o_get = pair1_get_recv_buf_len,
+ .o_set = pair1_set_recv_buf_len,
+ },
#ifdef NNG_TEST_LIB
{
// Test only option to pass header unmolested. This allows
diff --git a/src/sp/protocol/pair1/pair1_test.c b/src/sp/protocol/pair1/pair1_test.c
index 881c4ac8..3185a3a4 100644
--- a/src/sp/protocol/pair1/pair1_test.c
+++ b/src/sp/protocol/pair1/pair1_test.c
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -144,6 +144,22 @@ test_mono_back_pressure(void)
}
void
+test_send_no_peer(void)
+{
+ nng_socket s1;
+ nng_msg * msg;
+ nng_duration to = 100;
+
+ NUTS_PASS(nng_pair1_open(&s1));
+ NUTS_PASS(nng_setopt_ms(s1, NNG_OPT_SENDTIMEO, to));
+
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_FAIL(nng_sendmsg(s1, msg, 0), NNG_ETIMEDOUT);
+ nng_msg_free(msg);
+ NUTS_CLOSE(s1);
+}
+
+void
test_mono_raw_exchange(void)
{
nng_socket s1;
@@ -210,8 +226,8 @@ test_mono_raw_header(void)
// Missing bits in the header
NUTS_PASS(nng_msg_alloc(&msg, 0));
- NUTS_PASS(nng_sendmsg(c1, msg, 0));
- NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+ NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO);
+ nng_msg_free(msg);
// Valid header works
NUTS_PASS(nng_msg_alloc(&msg, 0));
@@ -226,14 +242,14 @@ test_mono_raw_header(void)
// Header with reserved bits set dropped
NUTS_PASS(nng_msg_alloc(&msg, 0));
NUTS_PASS(nng_msg_header_append_u32(msg, 0xDEAD0000));
- NUTS_PASS(nng_sendmsg(c1, msg, 0));
- NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+ NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO);
+ nng_msg_free(msg);
// Header with no chance to add another hop gets dropped
NUTS_PASS(nng_msg_alloc(&msg, 0));
NUTS_PASS(nng_msg_header_append_u32(msg, 0xff));
- NUTS_PASS(nng_sendmsg(c1, msg, 0));
- NUTS_FAIL(nng_recvmsg(s1, &msg, 0), NNG_ETIMEDOUT);
+ NUTS_FAIL(nng_sendmsg(c1, msg, 0), NNG_EPROTO);
+ nng_msg_free(msg);
// With the same bits clear it works
NUTS_PASS(nng_msg_alloc(&msg, 0));
@@ -250,6 +266,26 @@ test_mono_raw_header(void)
}
void
+test_pair1_send_closed_aio(void)
+{
+ nng_socket s1;
+ nng_aio * aio;
+ nng_msg * msg;
+
+ NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
+ NUTS_PASS(nng_msg_alloc(&msg, 0));
+ NUTS_PASS(nng_pair1_open(&s1));
+ nng_aio_set_msg(aio, msg);
+ nng_aio_stop(aio);
+ nng_send_aio(s1, aio);
+ nng_aio_wait(aio);
+ NUTS_FAIL(nng_aio_result(aio), NNG_ECANCELED);
+ nng_msg_free(msg);
+ nng_aio_free(aio);
+ NUTS_PASS(nng_close(s1));
+}
+
+void
test_pair1_raw(void)
{
nng_socket s1;
@@ -408,7 +444,7 @@ test_pair1_recv_garbage(void)
// ridiculous hop count
NUTS_PASS(nng_msg_alloc(&m, 0));
- NUTS_PASS(nng_msg_append_u32(m, 0x1000));
+ NUTS_PASS(nng_msg_header_append_u32(m, 0x1000));
NUTS_PASS(nng_sendmsg(c, m, 0));
NUTS_FAIL(nng_recvmsg(s, &m, 0), NNG_ETIMEDOUT);
@@ -416,18 +452,176 @@ test_pair1_recv_garbage(void)
NUTS_CLOSE(s);
}
+static void
+test_pair1_no_context(void)
+{
+ nng_socket s;
+ nng_ctx ctx;
+
+ NUTS_PASS(nng_pair1_open(&s));
+ NUTS_FAIL(nng_ctx_open(&ctx, s), NNG_ENOTSUP);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair1_send_buffer(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+
+ NUTS_PASS(nng_pair1_open(&s));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v));
+ NUTS_TRUE(v == 0);
+ NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_SENDBUF, &b), NNG_EBADTYPE);
+ sz = 1;
+ NUTS_FAIL(nng_getopt(s, NNG_OPT_SENDBUF, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_SENDBUF, 100000), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_SENDBUF, false), NNG_EBADTYPE);
+ NUTS_FAIL(nng_setopt(s, NNG_OPT_SENDBUF, &b, 1), NNG_EINVAL);
+ NUTS_PASS(nng_setopt_int(s, NNG_OPT_SENDBUF, 100));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_SENDBUF, &v));
+ NUTS_TRUE(v == 100);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair1_recv_buffer(void)
+{
+ nng_socket s;
+ int v;
+ bool b;
+ size_t sz;
+
+ NUTS_PASS(nng_pair1_open(&s));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v));
+ NUTS_TRUE(v == 0);
+ NUTS_FAIL(nng_getopt_bool(s, NNG_OPT_RECVBUF, &b), NNG_EBADTYPE);
+ sz = 1;
+ NUTS_FAIL(nng_getopt(s, NNG_OPT_RECVBUF, &b, &sz), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, -1), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_int(s, NNG_OPT_RECVBUF, 100000), NNG_EINVAL);
+ NUTS_FAIL(nng_setopt_bool(s, NNG_OPT_RECVBUF, false), NNG_EBADTYPE);
+ NUTS_FAIL(nng_setopt(s, NNG_OPT_RECVBUF, &b, 1), NNG_EINVAL);
+ NUTS_PASS(nng_setopt_int(s, NNG_OPT_RECVBUF, 100));
+ NUTS_PASS(nng_getopt_int(s, NNG_OPT_RECVBUF, &v));
+ NUTS_TRUE(v == 100);
+ NUTS_CLOSE(s);
+}
+
+static void
+test_pair1_poll_readable(void)
+{
+ int fd;
+ nng_socket s1;
+ nng_socket s2;
+
+ NUTS_PASS(nng_pair1_open(&s1));
+ NUTS_PASS(nng_pair1_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not readable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // Even after connect (no message yet)
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But once we send messages, it is.
+ // We have to send a request, in order to send a reply.
+ NUTS_SEND(s2, "abc");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // and receiving makes it no longer ready
+ NUTS_RECV(s1, "abc");
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // also let's confirm handling when we shrink the buffer size.
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 1));
+ NUTS_SEND(s2, "def");
+ NUTS_SLEEP(100);
+ NUTS_TRUE(nuts_poll_fd(fd));
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 0));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ // growing doesn't magically make it readable either
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_RECVBUF, 10));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s1);
+ NUTS_CLOSE(s2);
+}
+
+static void
+test_pair1_poll_writable(void)
+{
+ int fd;
+ nng_socket s1;
+ nng_socket s2;
+
+ NUTS_PASS(nng_pair1_open(&s1));
+ NUTS_PASS(nng_pair1_open(&s2));
+ NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
+ NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
+ NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_SENDFD, &fd));
+ NUTS_TRUE(fd >= 0);
+
+ // Not writable if not connected!
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // But after connect, we can.
+ NUTS_MARRY(s1, s2);
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // We are unbuffered.
+ NUTS_SEND(s1, "abc"); // first one in the receiver
+ NUTS_SEND(s1, "def"); // second one on the sending pipe
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ // and receiving makes it ready
+ NUTS_RECV(s2, "abc");
+ NUTS_SLEEP(100); // time for the sender to complete
+ NUTS_TRUE(nuts_poll_fd(fd));
+
+ // close the peer for now.
+ NUTS_CLOSE(s2);
+ NUTS_SLEEP(100);
+
+ // resize up, makes us writable.
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 1));
+ NUTS_TRUE(nuts_poll_fd(fd));
+ // resize down and we aren't anymore.
+ NUTS_PASS(nng_socket_set_int(s1, NNG_OPT_SENDBUF, 0));
+ NUTS_TRUE(nuts_poll_fd(fd) == false);
+
+ NUTS_CLOSE(s1);
+}
+
NUTS_TESTS = {
{ "pair1 mono identity", test_mono_identity },
{ "pair1 mono cooked", test_mono_cooked },
{ "pair1 mono faithful", test_mono_faithful },
{ "pair1 mono back pressure", test_mono_back_pressure },
+ { "pair1 send no peer", test_send_no_peer },
{ "pair1 mono raw exchange", test_mono_raw_exchange },
{ "pair1 mono raw header", test_mono_raw_header },
+ { "pair1 send closed aio", test_pair1_send_closed_aio },
{ "pair1 raw", test_pair1_raw },
{ "pair1 ttl", test_pair1_ttl },
{ "pair1 validate peer", test_pair1_validate_peer },
{ "pair1 recv no header", test_pair1_recv_no_header },
{ "pair1 recv garbage", test_pair1_recv_garbage },
+ { "pair1 no context", test_pair1_no_context },
+ { "pair1 send buffer", test_pair1_send_buffer },
+ { "pair1 recv buffer", test_pair1_recv_buffer },
+ { "pair1 poll readable", test_pair1_poll_readable },
+ { "pair1 poll writable", test_pair1_poll_writable },
{ NULL, NULL },
};