aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-10 16:33:09 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-10 16:33:09 -0700
commit34ceda3c2dd4990d15e0341e86861dd291003f63 (patch)
tree278a7bf91e466e00f4c2bea088b73cfeb285ecef /src
parentac5f0ef7cf501693a9db2fcbd95b7cde419cbb2a (diff)
downloadnng-34ceda3c2dd4990d15e0341e86861dd291003f63.tar.gz
nng-34ceda3c2dd4990d15e0341e86861dd291003f63.tar.bz2
nng-34ceda3c2dd4990d15e0341e86861dd291003f63.zip
Add new PAIR_V1 protocol.
The PAIR_V1 protocol supports both raw and cooked modes, and has loop prevention included. It also has a polyamorous mode, wherein it allows multiple connections to be established. In polyamorous mode (set by an option), the sender requests a paritcular pipe by setting it on the message. We default to PAIR_V1 now.
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/core/message.c97
-rw-r--r--src/core/message.h51
-rw-r--r--src/nng.h11
-rw-r--r--src/nng_compat.c23
-rw-r--r--src/nng_compat.h2
-rw-r--r--src/protocol/pair/pair.c307
-rw-r--r--src/protocol/pair/pair_v0.c297
-rw-r--r--src/protocol/pair/pair_v1.c467
9 files changed, 914 insertions, 344 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ad6c025e..9ecce72c 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -108,7 +108,8 @@ set (NNG_SOURCES
protocol/bus/bus.c
- protocol/pair/pair.c
+ protocol/pair/pair_v0.c
+ protocol/pair/pair_v1.c
protocol/pipeline/pull.c
protocol/pipeline/push.c
diff --git a/src/core/message.c b/src/core/message.c
index 4b563ce2..316bc035 100644
--- a/src/core/message.c
+++ b/src/core/message.c
@@ -1,5 +1,6 @@
//
// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -29,6 +30,7 @@ struct nng_msg {
nni_chunk m_body;
nni_time m_expire; // usec
nni_list m_options;
+ uint32_t m_pipe; // set on receive
};
typedef struct {
@@ -264,6 +266,42 @@ nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len)
return (0);
}
+static int
+nni_chunk_prepend_u32(nni_chunk *ch, uint32_t val)
+{
+ unsigned char buf[sizeof(uint32_t)];
+ NNI_PUT32(buf, val);
+ return (nni_chunk_prepend(ch, buf, sizeof(buf)));
+}
+
+static int
+nni_chunk_append_u32(nni_chunk *ch, uint32_t val)
+{
+ unsigned char buf[sizeof(uint32_t)];
+ NNI_PUT32(buf, val);
+ return (nni_chunk_append(ch, buf, sizeof(buf)));
+}
+
+static uint32_t
+nni_chunk_trim_u32(nni_chunk *ch)
+{
+ uint32_t v;
+ NNI_ASSERT(ch->ch_len >= sizeof(v));
+ NNI_GET32(ch->ch_ptr, v);
+ nni_chunk_trim(ch, sizeof(v));
+ return (v);
+}
+
+static uint32_t
+nni_chunk_trunc_u32(nni_chunk *ch)
+{
+ uint32_t v;
+ NNI_ASSERT(ch->ch_len >= sizeof(v));
+ NNI_GET32(ch->ch_ptr + ch->ch_len - sizeof(v), v);
+ nni_chunk_trunc(ch, sizeof(v));
+ return (v);
+}
+
int
nni_msg_alloc(nni_msg **mp, size_t sz)
{
@@ -483,7 +521,6 @@ nni_msg_append_header(nni_msg *m, const void *data, size_t len)
{
return (nni_chunk_append(&m->m_header, data, len));
}
-
int
nni_msg_prepend_header(nni_msg *m, const void *data, size_t len)
{
@@ -501,3 +538,61 @@ nni_msg_trunc_header(nni_msg *m, size_t len)
{
return (nni_chunk_trunc(&m->m_header, len));
}
+int
+nni_msg_append_u32(nni_msg *m, uint32_t val)
+{
+ return (nni_chunk_append_u32(&m->m_body, val));
+}
+
+int
+nni_msg_prepend_u32(nni_msg *m, uint32_t val)
+{
+ return (nni_chunk_prepend_u32(&m->m_body, val));
+}
+
+int
+nni_msg_header_append_u32(nni_msg *m, uint32_t val)
+{
+ return (nni_chunk_append_u32(&m->m_header, val));
+}
+
+int
+nni_msg_header_prepend_u32(nni_msg *m, uint32_t val)
+{
+ return (nni_chunk_prepend_u32(&m->m_header, val));
+}
+
+uint32_t
+nni_msg_trunc_u32(nni_msg *m)
+{
+ return (nni_chunk_trunc_u32(&m->m_body));
+}
+
+uint32_t
+nni_msg_trim_u32(nni_msg *m)
+{
+ return (nni_chunk_trim_u32(&m->m_body));
+}
+
+uint32_t
+nni_msg_header_trunc_u32(nni_msg *m)
+{
+ return (nni_chunk_trunc_u32(&m->m_header));
+}
+uint32_t
+nni_msg_header_trim_u32(nni_msg *m)
+{
+ return (nni_chunk_trim_u32(&m->m_header));
+}
+
+void
+nni_msg_set_pipe(nni_msg *m, uint32_t pid)
+{
+ m->m_pipe = pid;
+}
+
+uint32_t
+nni_msg_get_pipe(nni_msg *m)
+{
+ return (m->m_pipe);
+} \ No newline at end of file
diff --git a/src/core/message.h b/src/core/message.h
index 7b71bd5c..4bc15321 100644
--- a/src/core/message.h
+++ b/src/core/message.h
@@ -1,5 +1,6 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -12,24 +13,34 @@
// Internally used message API. Again, this is not part of our public API.
-extern int nni_msg_alloc(nni_msg **, size_t);
-extern void nni_msg_free(nni_msg *);
-extern int nni_msg_realloc(nni_msg *, size_t);
-extern int nni_msg_dup(nni_msg **, const nni_msg *);
-extern void * nni_msg_header(nni_msg *);
-extern size_t nni_msg_header_len(nni_msg *);
-extern void * nni_msg_body(nni_msg *);
-extern size_t nni_msg_len(nni_msg *);
-extern int nni_msg_append(nni_msg *, const void *, size_t);
-extern int nni_msg_prepend(nni_msg *, const void *, size_t);
-extern int nni_msg_append_header(nni_msg *, const void *, size_t);
-extern int nni_msg_prepend_header(nni_msg *, const void *, size_t);
-extern int nni_msg_trim(nni_msg *, size_t);
-extern int nni_msg_trunc(nni_msg *, size_t);
-extern int nni_msg_trim_header(nni_msg *, size_t);
-extern int nni_msg_trunc_header(nni_msg *, size_t);
-extern int nni_msg_setopt(nni_msg *, int, const void *, size_t);
-extern int nni_msg_getopt(nni_msg *, int, void *, size_t *);
-extern void nni_msg_dump(const char *, const nni_msg *);
+extern int nni_msg_alloc(nni_msg **, size_t);
+extern void nni_msg_free(nni_msg *);
+extern int nni_msg_realloc(nni_msg *, size_t);
+extern int nni_msg_dup(nni_msg **, const nni_msg *);
+extern void * nni_msg_header(nni_msg *);
+extern size_t nni_msg_header_len(nni_msg *);
+extern void * nni_msg_body(nni_msg *);
+extern size_t nni_msg_len(nni_msg *);
+extern int nni_msg_append(nni_msg *, const void *, size_t);
+extern int nni_msg_prepend(nni_msg *, const void *, size_t);
+extern int nni_msg_append_header(nni_msg *, const void *, size_t);
+extern int nni_msg_prepend_header(nni_msg *, const void *, size_t);
+extern int nni_msg_trim(nni_msg *, size_t);
+extern int nni_msg_trunc(nni_msg *, size_t);
+extern int nni_msg_trim_header(nni_msg *, size_t);
+extern int nni_msg_trunc_header(nni_msg *, size_t);
+extern int nni_msg_setopt(nni_msg *, int, const void *, size_t);
+extern int nni_msg_getopt(nni_msg *, int, void *, size_t *);
+extern void nni_msg_dump(const char *, const nni_msg *);
+extern int nni_msg_append_u32(nni_msg *, uint32_t);
+extern int nni_msg_prepend_u32(nni_msg *, uint32_t);
+extern int nni_msg_header_append_u32(nni_msg *, uint32_t);
+extern int nni_msg_header_prepend_u32(nni_msg *, uint32_t);
+extern uint32_t nni_msg_trim_u32(nni_msg *);
+extern uint32_t nni_msg_trunc_u32(nni_msg *);
+extern uint32_t nni_msg_header_trim_u32(nni_msg *);
+extern uint32_t nni_msg_header_trunc_u32(nni_msg *);
+extern void nni_msg_set_pipe(nni_msg *, uint32_t);
+extern uint32_t nni_msg_get_pipe(nni_msg *);
#endif // CORE_SOCKET_H
diff --git a/src/nng.h b/src/nng.h
index 99c95b92..247be1f7 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -290,6 +291,7 @@ enum nng_flag_enum {
enum nng_proto_enum {
NNG_PROTO_NONE = NNG_PROTO(0, 0),
NNG_PROTO_PAIR_V0 = NNG_PROTO(1, 0),
+ NNG_PROTO_PAIR_V1 = NNG_PROTO(1, 1),
NNG_PROTO_PUB_V0 = NNG_PROTO(2, 0),
NNG_PROTO_SUB_V0 = NNG_PROTO(2, 1),
NNG_PROTO_REQ_V0 = NNG_PROTO(3, 0),
@@ -301,9 +303,10 @@ enum nng_proto_enum {
NNG_PROTO_BUS_V0 = NNG_PROTO(7, 0),
NNG_PROTO_STAR_V0 = NNG_PROTO(100, 0),
- // "Legacy" names. Please use explicit versioned names above.
+ // "Default" names. Use the explicit version to guarantee
+ // backwards compatibility.
NNG_PROTO_BUS = NNG_PROTO_BUS_V0,
- NNG_PROTO_PAIR = NNG_PROTO_PAIR_V0,
+ NNG_PROTO_PAIR = NNG_PROTO_PAIR_V1,
NNG_PROTO_SUB = NNG_PROTO_SUB_V0,
NNG_PROTO_PUB = NNG_PROTO_PUB_V0,
NNG_PROTO_REQ = NNG_PROTO_REQ_V0,
@@ -317,6 +320,7 @@ enum nng_proto_enum {
// Builtin protocol socket constructors.
extern int nng_bus0_open(nng_socket *);
extern int nng_pair0_open(nng_socket *);
+extern int nng_pair1_open(nng_socket *);
extern int nng_pub0_open(nng_socket *);
extern int nng_sub0_open(nng_socket *);
extern int nng_push0_open(nng_socket *);
@@ -330,7 +334,7 @@ extern int nng_respondent0_open(nng_socket *);
// the actual protocols are baked into the binary; this should avoid
// suprising. Choosing a new protocol should be done explicitly.
#define nng_bus_open nng_bus0_open
-#define nng_pair_open nng_pair0_open
+#define nng_pair_open nng_pair1_open
#define nng_pub_open nng_pub0_open
#define nng_sub_open nng_sub0_open
#define nng_push_open nng_push0_open
@@ -369,6 +373,7 @@ enum nng_opt_enum {
NNG_OPT_REMOTEADDR = NNG_OPT_SOCKET(17),
NNG_OPT_RCVFD = NNG_OPT_SOCKET(18),
NNG_OPT_SNDFD = NNG_OPT_SOCKET(19),
+ NNG_OPT_POLYAMOROUS = NNG_OPT_SOCKET(20),
};
// XXX: TBD: priorities, socket names, ipv4only
diff --git a/src/nng_compat.c b/src/nng_compat.c
index e03ff42d..dfcb2134 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -92,19 +92,18 @@ static const struct {
uint16_t p_id;
int (*p_open)(nng_socket *);
} nn_protocols[] = {
- // clang-format off
- { NNG_PROTO_BUS_V0, nng_bus_open },
- { NNG_PROTO_PAIR_V0, nng_pair_open },
- { NNG_PROTO_PUSH_V0, nng_push_open },
- { NNG_PROTO_PULL_V0, nng_pull_open },
- { NNG_PROTO_PUB_V0, nng_pub_open },
- { NNG_PROTO_SUB_V0, nng_sub_open },
- { NNG_PROTO_REQ_V0, nng_req_open },
- { NNG_PROTO_REP_V0, nng_rep_open },
- { NNG_PROTO_SURVEYOR_V0, nng_surveyor_open },
- { NNG_PROTO_RESPONDENT_V0, nng_respondent_open },
+ { NNG_PROTO_BUS_V0, nng_bus0_open },
+ { NNG_PROTO_PAIR_V0, nng_pair0_open },
+ { NNG_PROTO_PAIR_V0, nng_pair1_open },
+ { NNG_PROTO_PUSH_V0, nng_push0_open },
+ { NNG_PROTO_PULL_V0, nng_pull0_open },
+ { NNG_PROTO_PUB_V0, nng_pub0_open },
+ { NNG_PROTO_SUB_V0, nng_sub0_open },
+ { NNG_PROTO_REQ_V0, nng_req0_open },
+ { NNG_PROTO_REP_V0, nng_rep0_open },
+ { NNG_PROTO_SURVEYOR_V0, nng_surveyor0_open },
+ { NNG_PROTO_RESPONDENT_V0, nng_respondent0_open },
{ NNG_PROTO_NONE, NULL },
- // clang-format on
};
int
diff --git a/src/nng_compat.h b/src/nng_compat.h
index e83c20b8..76494b5c 100644
--- a/src/nng_compat.h
+++ b/src/nng_compat.h
@@ -72,6 +72,8 @@ extern "C" {
#define NN_PROTO_BUS 7
#define NN_PAIR (NN_PROTO_PAIR * 16 + 0)
+#define NN_PAIR_v0 (NN_PROTO_PAIR * 16 + 0)
+#define NN_PAIR_V1 (NN_PROTO_PAIR * 16 + 1)
#define NN_PUB (NN_PROTO_PUBSUB * 16 + 0)
#define NN_SUB (NN_PROTO_PUBSUB * 16 + 1)
#define NN_REQ (NN_PROTO_REQREP * 16 + 0)
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
deleted file mode 100644
index 59465293..00000000
--- a/src/protocol/pair/pair.c
+++ /dev/null
@@ -1,307 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include <stdlib.h>
-#include <string.h>
-
-#include "core/nng_impl.h"
-
-// Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern.
-// While a peer is connected to the server, all other peer connection
-// attempts are discarded.
-
-typedef struct nni_pair_pipe nni_pair_pipe;
-typedef struct nni_pair_sock nni_pair_sock;
-
-static void nni_pair_send_cb(void *);
-static void nni_pair_recv_cb(void *);
-static void nni_pair_getq_cb(void *);
-static void nni_pair_putq_cb(void *);
-static void nni_pair_pipe_fini(void *);
-
-// An nni_pair_sock is our per-socket protocol private structure.
-struct nni_pair_sock {
- nni_sock * nsock;
- nni_pair_pipe *ppipe;
- nni_msgq * uwq;
- nni_msgq * urq;
- int raw;
- nni_mtx mtx;
-};
-
-// An nni_pair_pipe is our per-pipe protocol private structure. We keep
-// one of these even though in theory we'd only have a single underlying
-// pipe. The separate data structure is more like other protocols that do
-// manage multiple pipes.
-struct nni_pair_pipe {
- nni_pipe * npipe;
- nni_pair_sock *psock;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
-};
-
-static int
-nni_pair_sock_init(void **sp, nni_sock *nsock)
-{
- nni_pair_sock *psock;
- int rv;
-
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
- return (NNG_ENOMEM);
- }
- if ((rv = nni_mtx_init(&psock->mtx)) != 0) {
- NNI_FREE_STRUCT(psock);
- return (rv);
- }
- psock->nsock = nsock;
- psock->ppipe = NULL;
- psock->raw = 0;
- psock->uwq = nni_sock_sendq(nsock);
- psock->urq = nni_sock_recvq(nsock);
- *sp = psock;
- return (0);
-}
-
-static void
-nni_pair_sock_fini(void *arg)
-{
- nni_pair_sock *psock = arg;
-
- if (psock != NULL) {
- nni_mtx_fini(&psock->mtx);
-
- NNI_FREE_STRUCT(psock);
- }
-}
-
-static int
-nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
-{
- nni_pair_pipe *ppipe;
- int rv;
-
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
- return (NNG_ENOMEM);
- }
- rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe);
- if (rv != 0) {
- goto fail;
- }
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
- return (0);
-
-fail:
- nni_pair_pipe_fini(ppipe);
- return (rv);
-}
-
-static void
-nni_pair_pipe_fini(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_aio_fini(&ppipe->aio_getq);
- NNI_FREE_STRUCT(ppipe);
-}
-
-static int
-nni_pair_pipe_start(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
- nni_pair_sock *psock = ppipe->psock;
-
- nni_mtx_lock(&psock->mtx);
- if (psock->ppipe != NULL) {
- nni_mtx_unlock(&psock->mtx);
- return (NNG_EBUSY); // Already have a peer, denied.
- }
- psock->ppipe = ppipe;
- nni_mtx_unlock(&psock->mtx);
-
- // Schedule a getq on the upper, and a read from the pipe.
- // Each of these also sets up another hold on the pipe itself.
- nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
-
- return (0);
-}
-
-static void
-nni_pair_pipe_stop(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
- nni_pair_sock *psock = ppipe->psock;
-
- nni_aio_cancel(&ppipe->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&ppipe->aio_getq, NNG_ECANCELED);
-
- nni_mtx_lock(&psock->mtx);
- if (psock->ppipe == ppipe) {
- psock->ppipe = NULL;
- }
- nni_mtx_unlock(&psock->mtx);
-}
-
-static void
-nni_pair_recv_cb(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
- nni_pair_sock *psock = ppipe->psock;
-
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_pipe_stop(ppipe->npipe);
- return;
- }
-
- ppipe->aio_putq.a_msg = ppipe->aio_recv.a_msg;
- ppipe->aio_recv.a_msg = NULL;
- nni_msgq_aio_put(psock->urq, &ppipe->aio_putq);
-}
-
-static void
-nni_pair_putq_cb(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
-
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
- return;
- }
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
-}
-
-static void
-nni_pair_getq_cb(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
- nni_pair_sock *psock = ppipe->psock;
-
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pipe_stop(ppipe->npipe);
- return;
- }
-
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
-}
-
-static void
-nni_pair_send_cb(void *arg)
-{
- nni_pair_pipe *ppipe = arg;
- nni_pair_sock *psock = ppipe->psock;
-
- if (nni_aio_result(&ppipe->aio_send) != 0) {
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
- return;
- }
-
- nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
-}
-
-static void
-nni_pair_sock_open(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-static void
-nni_pair_sock_close(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-static int
-nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
-{
- nni_pair_sock *psock = arg;
- int rv;
-
- switch (opt) {
- case NNG_OPT_RAW:
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
- break;
- default:
- rv = NNG_ENOTSUP;
- }
- return (rv);
-}
-
-static int
-nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
-{
- nni_pair_sock *psock = arg;
- int rv;
-
- switch (opt) {
- case NNG_OPT_RAW:
- rv = nni_getopt_int(&psock->raw, buf, szp);
- break;
- default:
- rv = NNG_ENOTSUP;
- }
- return (rv);
-}
-
-static nni_proto_pipe_ops nni_pair_pipe_ops = {
- .pipe_init = nni_pair_pipe_init,
- .pipe_fini = nni_pair_pipe_fini,
- .pipe_start = nni_pair_pipe_start,
- .pipe_stop = nni_pair_pipe_stop,
-};
-
-static nni_proto_sock_ops nni_pair_sock_ops = {
- .sock_init = nni_pair_sock_init,
- .sock_fini = nni_pair_sock_fini,
- .sock_open = nni_pair_sock_open,
- .sock_close = nni_pair_sock_close,
- .sock_setopt = nni_pair_sock_setopt,
- .sock_getopt = nni_pair_sock_getopt,
-};
-
-nni_proto nni_pair_proto = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNG_PROTO_PAIR_V0, "pair" },
- .proto_peer = { NNG_PROTO_PAIR_V0, "pair" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_pair_sock_ops,
- .proto_pipe_ops = &nni_pair_pipe_ops,
-};
-
-int
-nng_pair0_open(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &nni_pair_proto));
-}
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
new file mode 100644
index 00000000..a0e907f2
--- /dev/null
+++ b/src/protocol/pair/pair_v0.c
@@ -0,0 +1,297 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Pair protocol. The PAIR protocol is a simple 1:1 messaging pattern.
+// While a peer is connected to the server, all other peer connection
+// attempts are discarded.
+
+typedef struct pair0_pipe pair0_pipe;
+typedef struct pair0_sock pair0_sock;
+
+static void pair0_send_cb(void *);
+static void pair0_recv_cb(void *);
+static void pair0_getq_cb(void *);
+static void pair0_putq_cb(void *);
+static void pair0_pipe_fini(void *);
+
+// pair0_sock is our per-socket protocol private structure.
+struct pair0_sock {
+ nni_sock * nsock;
+ pair0_pipe *ppipe;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ int raw;
+ nni_mtx mtx;
+};
+
+// An pair0_pipe is our per-pipe protocol private structure. We keep
+// one of these even though in theory we'd only have a single underlying
+// pipe. The separate data structure is more like other protocols that do
+// manage multiple pipes.
+struct pair0_pipe {
+ nni_pipe * npipe;
+ pair0_sock *psock;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+};
+
+static int
+pair0_sock_init(void **sp, nni_sock *nsock)
+{
+ pair0_sock *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mtx_init(&s->mtx)) != 0) {
+ NNI_FREE_STRUCT(s);
+ return (rv);
+ }
+ s->nsock = nsock;
+ s->ppipe = NULL;
+ s->raw = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ *sp = s;
+ return (0);
+}
+
+static void
+pair0_sock_fini(void *arg)
+{
+ pair0_sock *s = arg;
+
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+static int
+pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+{
+ pair0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ pair0_pipe_fini(p);
+ } else {
+ p->npipe = npipe;
+ p->psock = psock;
+ *pp = p;
+ }
+ return (rv);
+}
+
+static void
+pair0_pipe_fini(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_putq);
+ nni_aio_fini(&p->aio_getq);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+pair0_pipe_start(void *arg)
+{
+ pair0_pipe *p = arg;
+ pair0_sock *s = p->psock;
+
+ nni_mtx_lock(&s->mtx);
+ if (s->ppipe != NULL) {
+ nni_mtx_unlock(&s->mtx);
+ return (NNG_EBUSY); // Already have a peer, denied.
+ }
+ s->ppipe = p;
+ nni_mtx_unlock(&s->mtx);
+
+ // Schedule a getq on the upper, and a read from the pipe.
+ // Each of these also sets up another hold on the pipe itself.
+ nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_pipe_recv(p->npipe, &p->aio_recv);
+
+ return (0);
+}
+
+static void
+pair0_pipe_stop(void *arg)
+{
+ pair0_pipe *p = arg;
+ pair0_sock *s = p->psock;
+
+ nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+
+ nni_mtx_lock(&s->mtx);
+ if (s->ppipe == p) {
+ s->ppipe = NULL;
+ }
+ nni_mtx_unlock(&s->mtx);
+}
+
+static void
+pair0_recv_cb(void *arg)
+{
+ pair0_pipe *p = arg;
+ pair0_sock *s = p->psock;
+ nni_msg * msg;
+
+ if (nni_aio_result(&p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ msg = p->aio_recv.a_msg;
+ p->aio_putq.a_msg = msg;
+ p->aio_recv.a_msg = NULL;
+
+ nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
+ nni_msgq_aio_put(s->urq, &p->aio_putq);
+}
+
+static void
+pair0_putq_cb(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(p->aio_putq.a_msg);
+ p->aio_putq.a_msg = NULL;
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+ nni_pipe_recv(p->npipe, &p->aio_recv);
+}
+
+static void
+pair0_getq_cb(void *arg)
+{
+ pair0_pipe *p = arg;
+ pair0_sock *s = p->psock;
+
+ if (nni_aio_result(&p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ p->aio_send.a_msg = p->aio_getq.a_msg;
+ p->aio_getq.a_msg = NULL;
+ nni_pipe_send(p->npipe, &p->aio_send);
+}
+
+static void
+pair0_send_cb(void *arg)
+{
+ pair0_pipe *p = arg;
+ pair0_sock *s = p->psock;
+
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(p->aio_send.a_msg);
+ p->aio_send.a_msg = NULL;
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ nni_msgq_aio_get(s->uwq, &p->aio_getq);
+}
+
+static void
+pair0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+pair0_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static int
+pair0_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ pair0_sock *s = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+static int
+pair0_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ pair0_sock *s = arg;
+ int rv;
+
+ switch (opt) {
+ case NNG_OPT_RAW:
+ rv = nni_getopt_int(&s->raw, buf, szp);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ return (rv);
+}
+
+static nni_proto_pipe_ops pair0_pipe_ops = {
+ .pipe_init = pair0_pipe_init,
+ .pipe_fini = pair0_pipe_fini,
+ .pipe_start = pair0_pipe_start,
+ .pipe_stop = pair0_pipe_stop,
+};
+
+static nni_proto_sock_ops pair0_sock_ops = {
+ .sock_init = pair0_sock_init,
+ .sock_fini = pair0_sock_fini,
+ .sock_open = pair0_sock_open,
+ .sock_close = pair0_sock_close,
+ .sock_setopt = pair0_sock_setopt,
+ .sock_getopt = pair0_sock_getopt,
+};
+
+// Legacy protocol (v0)
+static nni_proto pair0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_PAIR_V0, "pair" },
+ .proto_peer = { NNG_PROTO_PAIR_V0, "pair" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_sock_ops = &pair0_sock_ops,
+ .proto_pipe_ops = &pair0_pipe_ops,
+};
+
+int
+nng_pair0_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pair0_proto));
+}
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
new file mode 100644
index 00000000..ea8c1226
--- /dev/null
+++ b/src/protocol/pair/pair_v1.c
@@ -0,0 +1,467 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+
+// Pair protocol. The PAIRv1 protocol is a simple 1:1 messaging pattern.
+
+typedef struct pair1_pipe pair1_pipe;
+typedef struct pair1_sock pair1_sock;
+
+static void pair1_sock_getq_cb(void *);
+static void pair1_pipe_send_cb(void *);
+static void pair1_pipe_recv_cb(void *);
+static void pair1_pipe_getq_cb(void *);
+static void pair1_pipe_putq_cb(void *);
+static void pair1_pipe_fini(void *);
+
+// pair1_sock is our per-socket protocol private structure.
+struct pair1_sock {
+ nni_sock * nsock;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ int raw;
+ int ttl;
+ nni_mtx mtx;
+ nni_idhash *pipes;
+ int started;
+ int poly;
+ nni_aio aio_getq;
+ pair1_pipe *pipe; // cooked mode only
+};
+
+// pair1_pipe is our per-pipe protocol private structure.
+struct pair1_pipe {
+ nni_pipe * npipe;
+ pair1_sock *psock;
+ nni_msgq * sendq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+};
+
+static void
+pair1_sock_fini(void *arg)
+{
+ pair1_sock *s = arg;
+
+ nni_aio_fini(&s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ nni_mtx_fini(&s->mtx);
+
+ NNI_FREE_STRUCT(s);
+}
+
+static int
+pair1_sock_init(void **sp, nni_sock *nsock)
+{
+ pair1_sock *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ // Raw mode uses this.
+ if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
+ ((rv = nni_mtx_init(&s->mtx)) != 0) ||
+ ((rv = nni_idhash_init(&s->pipes)) != 0)) {
+ pair1_sock_fini(s);
+ } else {
+ s->nsock = nsock;
+ s->raw = 0;
+ s->poly = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
+ }
+ return (0);
+}
+
+static int
+pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+{
+ pair1_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ pair1_pipe_fini(p);
+ } else {
+ p->npipe = npipe;
+ p->psock = psock;
+ *pp = p;
+ }
+ return (rv);
+}
+
+static void
+pair1_pipe_fini(void *arg)
+{
+ pair1_pipe *p = arg;
+ nni_aio_fini(&p->aio_send);
+ nni_aio_fini(&p->aio_recv);
+ nni_aio_fini(&p->aio_putq);
+ nni_aio_fini(&p->aio_getq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+pair1_pipe_start(void *arg)
+{
+ pair1_pipe *p = arg;
+ pair1_sock *s = p->psock;
+ uint32_t id;
+ int rv;
+
+ id = nni_pipe_id(p->npipe);
+ if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&s->mtx);
+ if (!s->poly) {
+ if (s->pipe != NULL) {
+ nni_mtx_unlock(&s->mtx);
+ nni_idhash_remove(s->pipes, id);
+ return (NNG_EBUSY);
+ }
+ } else {
+ if (!s->started) {
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ }
+ }
+
+ s->pipe = p;
+ s->started = 1;
+ nni_mtx_unlock(&s->mtx);
+
+ // Schedule a getq. In polyamorous mode we get on the per pipe
+ // sendq, as the socket distributes to us. In monogamous mode we
+ // bypass and get from the upper writeq directly (saving a set of
+ // context switches).
+ if (s->poly) {
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ } else {
+ nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ }
+ // And the pipe read of course.
+ nni_pipe_recv(p->npipe, &p->aio_recv);
+
+ return (0);
+}
+
+static void
+pair1_pipe_stop(void *arg)
+{
+ pair1_pipe *p = arg;
+ pair1_sock *s = p->psock;
+
+ nni_msgq_close(p->sendq);
+ nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+
+ nni_mtx_lock(&s->mtx);
+ if (s->pipe == p) {
+ s->pipe = NULL;
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->npipe));
+}
+
+static void
+pair1_pipe_recv_cb(void *arg)
+{
+ pair1_pipe *p = arg;
+ pair1_sock *s = p->psock;
+ nni_msg * msg;
+ uint32_t hdr;
+ nni_pipe * npipe = p->npipe;
+ int rv;
+
+ if (nni_aio_result(&p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ msg = p->aio_recv.a_msg;
+ p->aio_recv.a_msg = NULL;
+
+ // Store the pipe ID.
+ nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
+
+ // If the message is missing the hop count header, scrap it.
+ if (nni_msg_len(msg) < sizeof(uint32_t)) {
+ nni_msg_free(msg);
+ nni_pipe_stop(npipe);
+ return;
+ }
+ hdr = nni_msg_trim_u32(msg);
+ if (hdr & 0xffffff00) {
+ nni_msg_free(msg);
+ nni_pipe_stop(npipe);
+ return;
+ }
+
+ // If we bounced too many times, discard the message, but
+ // keep getting more.
+ if (hdr >= s->ttl) {
+ nni_msg_free(msg);
+ nni_pipe_recv(npipe, &p->aio_recv);
+ return;
+ }
+
+ // Store the pipe id followed by the hop count.
+ if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
+ nni_msg_free(msg);
+ nni_pipe_recv(npipe, &p->aio_recv);
+ return;
+ }
+
+ // Send the message up.
+ p->aio_putq.a_msg = msg;
+ nni_msgq_aio_put(s->urq, &p->aio_putq);
+}
+
+static void
+pair1_sock_getq_cb(void *arg)
+{
+ pair1_pipe *p;
+ pair1_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+
+ if (nni_aio_result(&s->aio_getq) != 0) {
+ // Socket closing...
+ return;
+ }
+
+ msg = s->aio_getq.a_msg;
+ s->aio_getq.a_msg = NULL;
+
+ // By definition we are in polyamorous mode.
+ NNI_ASSERT(s->poly);
+
+ nni_mtx_lock(&s->mtx);
+
+ // If no pipe was requested, we look for any connected peer.
+ if (((id = nni_msg_get_pipe(msg)) == 0) && (s->pipe != NULL)) {
+ id = nni_pipe_id(s->pipe->npipe);
+ }
+
+ if (nni_idhash_find(s->pipes, id, (void **) &p) != 0) {
+ // Pipe not present!
+ nni_mtx_unlock(&s->mtx);
+ nni_msg_free(msg);
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ return;
+ }
+
+ // Try a non-blocking send. If this fails we just discard the
+ // message. We have to do this to avoid head-of-line blocking
+ // for messages sent to other pipes. Note that there is some
+ // buffering in the sendq.
+ if (nni_msgq_tryput(p->sendq, msg) != 0) {
+ nni_msg_free(msg);
+ }
+
+ nni_mtx_unlock(&s->mtx);
+ nni_msgq_aio_get(s->uwq, &s->aio_getq);
+}
+
+static void
+pair1_pipe_putq_cb(void *arg)
+{
+ pair1_pipe *p = arg;
+
+ if (nni_aio_result(&p->aio_putq) != 0) {
+ nni_msg_free(p->aio_putq.a_msg);
+ p->aio_putq.a_msg = NULL;
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+ nni_pipe_recv(p->npipe, &p->aio_recv);
+}
+
+static void
+pair1_pipe_getq_cb(void *arg)
+{
+ pair1_pipe *p = arg;
+ pair1_sock *s = p->psock;
+ nni_msg * msg;
+ uint32_t hops;
+ uint8_t * data;
+
+ if (nni_aio_result(&p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ msg = p->aio_getq.a_msg;
+ p->aio_getq.a_msg = NULL;
+
+ // Raw mode messages have the header already formed, with
+ // a hop count. Cooked mode messages have no
+ // header so we have to add one.
+ if (s->raw) {
+ if (nni_msg_header_len(msg) != sizeof(uint32_t)) {
+ goto badmsg;
+ }
+ hops = nni_msg_header_trim_u32(msg);
+ } else {
+ hops = 0;
+ }
+
+ hops++;
+
+ // Insert the hops header.
+ if (nni_msg_header_append_u32(msg, hops) != 0) {
+ goto badmsg;
+ }
+
+ p->aio_send.a_msg = msg;
+ nni_pipe_send(p->npipe, &p->aio_send);
+ return;
+
+badmsg:
+ nni_msg_free(msg);
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq);
+}
+
+static void
+pair1_pipe_send_cb(void *arg)
+{
+ pair1_pipe *p = arg;
+ pair1_sock *s = p->psock;
+
+ if (nni_aio_result(&p->aio_send) != 0) {
+ nni_msg_free(p->aio_send.a_msg);
+ p->aio_send.a_msg = NULL;
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+
+ // In polyamorous mode, we want to get from the sendq; in
+ // monogamous we get from upper writeq.
+ if (s->poly) {
+ nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ } else {
+ nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ }
+}
+
+static void
+pair1_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+pair1_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static int
+pair1_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+{
+ pair1_sock *s = arg;
+ int rv;
+
+ nni_mtx_lock(&s->mtx);
+ switch (opt) {
+ case NNG_OPT_RAW:
+ if (s->started) {
+ rv = NNG_ESTATE;
+ } else {
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ }
+ break;
+ case NNG_OPT_POLYAMOROUS:
+ if (s->started) {
+ rv = NNG_ESTATE;
+ } else {
+ rv = nni_setopt_int(&s->poly, buf, sz, 0, 1);
+ }
+ break;
+ case NNG_OPT_MAXTTL:
+ rv = nni_setopt_int(&s->ttl, buf, sz, 0, 255);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static int
+pair1_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+{
+ pair1_sock *s = arg;
+ int rv;
+
+ nni_mtx_lock(&s->mtx);
+ switch (opt) {
+ case NNG_OPT_RAW:
+ rv = nni_getopt_int(&s->raw, buf, szp);
+ break;
+ case NNG_OPT_MAXTTL:
+ rv = nni_getopt_int(&s->ttl, buf, szp);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ }
+ nni_mtx_unlock(&s->mtx);
+ return (rv);
+}
+
+static nni_proto_pipe_ops pair1_pipe_ops = {
+ .pipe_init = pair1_pipe_init,
+ .pipe_fini = pair1_pipe_fini,
+ .pipe_start = pair1_pipe_start,
+ .pipe_stop = pair1_pipe_stop,
+};
+
+static nni_proto_sock_ops pair1_sock_ops = {
+ .sock_init = pair1_sock_init,
+ .sock_fini = pair1_sock_fini,
+ .sock_open = pair1_sock_open,
+ .sock_close = pair1_sock_close,
+ .sock_setopt = pair1_sock_setopt,
+ .sock_getopt = pair1_sock_getopt,
+};
+
+static nni_proto pair1_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNG_PROTO_PAIR_V1, "pair1" },
+ .proto_peer = { NNG_PROTO_PAIR_V1, "pair1" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_sock_ops = &pair1_sock_ops,
+ .proto_pipe_ops = &pair1_pipe_ops,
+};
+
+int
+nng_pair1_open(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &pair1_proto));
+}