aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pair1/pair.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pair1/pair.c')
-rw-r--r--src/protocol/pair1/pair.c134
1 files changed, 32 insertions, 102 deletions
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index f564195d..b98975c3 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -13,16 +13,13 @@
#include "core/nng_impl.h"
#include "nng/protocol/pair1/pair.h"
-// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern,
-// usually, but it can support a polyamorous mode where a single server can
-// communicate with multiple partners.
+// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern.
#define BUMP_STAT(x) nni_stat_inc_atomic(x, 1)
typedef struct pair1_pipe pair1_pipe;
typedef struct pair1_sock pair1_sock;
-static void pair1_sock_get_cb(void *);
static void pair1_pipe_send_cb(void *);
static void pair1_pipe_recv_cb(void *);
static void pair1_pipe_get_cb(void *);
@@ -40,8 +37,6 @@ struct pair1_sock {
nni_idhash * pipes;
nni_list plist;
bool started;
- bool poly;
- nni_aio aio_get;
nni_stat_item stat_poly;
nni_stat_item stat_raw;
nni_stat_item stat_reject_mismatch;
@@ -50,13 +45,15 @@ struct pair1_sock {
nni_stat_item stat_rx_malformed;
nni_stat_item stat_tx_malformed;
nni_stat_item stat_tx_drop;
+#ifdef NNG_TEST_LIB
+ bool inject_header;
+#endif
};
// pair1_pipe is our per-pipe protocol private structure.
struct pair1_pipe {
nni_pipe * pipe;
pair1_sock * pair;
- nni_msgq * send_queue;
nni_aio aio_send;
nni_aio aio_recv;
nni_aio aio_get;
@@ -69,7 +66,6 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(&s->aio_get);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
}
@@ -87,11 +83,8 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw)
// Raw mode uses this.
nni_mtx_init(&s->mtx);
- nni_aio_init(&s->aio_get, pair1_sock_get_cb, s);
-
nni_stat_init_bool(
&s->stat_poly, "polyamorous", "polyamorous mode?", false);
- nni_stat_set_lock(&s->stat_poly, &s->mtx);
nni_sock_add_stat(sock, &s->stat_poly);
nni_stat_init_bool(&s->stat_raw, "raw", "raw mode?", raw);
@@ -131,7 +124,6 @@ pair1_sock_init_impl(void *arg, nni_sock *sock, bool raw)
s->sock = sock;
s->raw = raw;
- s->poly = false;
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
nni_atomic_init(&s->ttl);
@@ -172,25 +164,18 @@ pair1_pipe_fini(void *arg)
nni_aio_fini(&p->aio_recv);
nni_aio_fini(&p->aio_put);
nni_aio_fini(&p->aio_get);
- nni_msgq_fini(p->send_queue);
}
static int
pair1_pipe_init(void *arg, nni_pipe *pipe, void *pair)
{
pair1_pipe *p = arg;
- int rv;
nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
nni_aio_init(&p->aio_get, pair1_pipe_get_cb, p);
nni_aio_init(&p->aio_put, pair1_pipe_put_cb, p);
- if ((rv = nni_msgq_init(&p->send_queue, 2)) != 0) {
- pair1_pipe_fini(p);
- return (rv);
- }
-
p->pipe = pipe;
p->pair = pair;
@@ -218,31 +203,19 @@ pair1_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
return (rv);
}
- if (!s->poly) {
- if (!nni_list_empty(&s->plist)) {
- nni_idhash_remove(s->pipes, id);
- nni_mtx_unlock(&s->mtx);
- BUMP_STAT(&s->stat_reject_already);
- return (NNG_EBUSY);
- }
- } else {
- if (!s->started) {
- nni_msgq_aio_get(s->uwq, &s->aio_get);
- }
+ if (!nni_list_empty(&s->plist)) {
+ nni_idhash_remove(s->pipes, id);
+ nni_mtx_unlock(&s->mtx);
+ BUMP_STAT(&s->stat_reject_already);
+ return (NNG_EBUSY);
}
nni_list_append(&s->plist, p);
s->started = true;
nni_mtx_unlock(&s->mtx);
- // Schedule a get. In polyamorous mode we get on the per pipe
- // send_queue, as the socket distributes to us. In monogamous mode
- // we bypass and get from the upper write queue directly (saving a
- // set of context switches).
- if (s->poly) {
- nni_msgq_aio_get(p->send_queue, &p->aio_get);
- } else {
- nni_msgq_aio_get(s->uwq, &p->aio_get);
- }
+ // Schedule a get.
+ nni_msgq_aio_get(s->uwq, &p->aio_get);
+
// And the pipe read of course.
nni_pipe_recv(p->pipe, &p->aio_recv);
@@ -264,8 +237,6 @@ pair1_pipe_close(void *arg)
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
nni_list_node_remove(&p->node);
nni_mtx_unlock(&s->mtx);
-
- nni_msgq_close(p->send_queue);
}
static void
@@ -319,47 +290,6 @@ pair1_pipe_recv_cb(void *arg)
}
static void
-pair1_sock_get_cb(void *arg)
-{
- pair1_pipe *p;
- pair1_sock *s = arg;
- nni_msg * msg;
- uint32_t id;
-
- if (nni_aio_result(&s->aio_get) != 0) {
- // Socket closing...
- return;
- }
-
- msg = nni_aio_get_msg(&s->aio_get);
- nni_aio_set_msg(&s->aio_get, NULL);
-
- p = NULL;
- nni_mtx_lock(&s->mtx);
- // By definition we are in polyamorous mode.
- NNI_ASSERT(s->poly);
- // If no pipe was requested, we look for any connected peer.
- if (((id = nni_msg_get_pipe(msg)) == 0) &&
- (!nni_list_empty(&s->plist))) {
- p = nni_list_first(&s->plist);
- } else {
- nni_idhash_find(s->pipes, id, (void **) &p);
- }
-
- // Try a non-blocking send. If this fails we just discard the
- // message. We have to do this to avoid head-of-line blocking
- // for messages sent to other pipes. Note that there is some
- // buffering in the send_queue.
- if ((p == NULL) || nni_msgq_tryput(p->send_queue, msg) != 0) {
- BUMP_STAT(&s->stat_tx_drop);
- nni_msg_free(msg);
- }
-
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, &s->aio_get);
-}
-
-static void
pair1_pipe_put_cb(void *arg)
{
pair1_pipe *p = arg;
@@ -396,10 +326,15 @@ pair1_pipe_get_cb(void *arg)
((hops = nni_msg_header_trim_u32(msg)) > 254)) {
BUMP_STAT(&s->stat_tx_malformed);
nni_msg_free(msg);
- nni_msgq_aio_get(
- s->poly ? p->send_queue : s->uwq, &p->aio_get);
+ nni_msgq_aio_get(s->uwq, &p->aio_get);
return;
}
+#if NNG_TEST_LIB
+ } else if (s->inject_header) {
+ nni_aio_set_msg(&p->aio_send, msg);
+ nni_pipe_send(p->pipe, &p->aio_send);
+ return;
+#endif
} else {
// Strip off any previously existing header, such as when
// replying to messages.
@@ -429,9 +364,7 @@ pair1_pipe_send_cb(void *arg)
return;
}
- // In polyamorous mode, we want to get from the send_queue; in
- // monogamous we get from upper write queue.
- nni_msgq_aio_get(s->poly ? p->send_queue : s->uwq, &p->aio_get);
+ nni_msgq_aio_get(s->uwq, &p->aio_get);
}
static void
@@ -443,8 +376,7 @@ pair1_sock_open(void *arg)
static void
pair1_sock_close(void *arg)
{
- pair1_sock *s = arg;
- nni_aio_close(&s->aio_get);
+ NNI_ARG_UNUSED(arg);
}
static int
@@ -468,24 +400,19 @@ pair1_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
}
+
+#ifdef NNG_TEST_LIB
static int
-pair1_sock_set_poly(void *arg, const void *buf, size_t sz, nni_opt_type t)
+pair1_set_test_inject_header(void *arg, const void *buf, size_t sz, nni_type t)
{
pair1_sock *s = arg;
int rv;
nni_mtx_lock(&s->mtx);
- rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->poly, buf, sz, t);
- nni_stat_set_value(&s->stat_poly, s->poly);
+ rv = nni_copyin_bool(&s->inject_header, buf, sz, t);
nni_mtx_unlock(&s->mtx);
return (rv);
}
-
-static int
-pair1_sock_get_poly(void *arg, void *buf, size_t *szp, nni_opt_type t)
-{
- pair1_sock *s = arg;
- return (nni_copyout_bool(s->poly, buf, szp, t));
-}
+#endif
static void
pair1_sock_send(void *arg, nni_aio *aio)
@@ -519,11 +446,14 @@ static nni_option pair1_sock_options[] = {
.o_get = pair1_sock_get_max_ttl,
.o_set = pair1_sock_set_max_ttl,
},
+#ifdef NNG_TEST_LIB
{
- .o_name = NNG_OPT_PAIR1_POLY,
- .o_get = pair1_sock_get_poly,
- .o_set = pair1_sock_set_poly,
+ // Test only option to pass header unmolested. This allows
+ // us to inject bad header contents.
+ .o_name = "pair1_test_inject_header",
+ .o_set = pair1_set_test_inject_header,
},
+#endif
// terminate list
{
.o_name = NULL,