aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-04 13:36:54 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-10 15:40:00 -0700
commit5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 (patch)
tree39debf4ecde234b2a0be19c9cb15628cc32c2edb /src/protocol
parent56f1bf30e61c53646dd2f8425da7c7fa0d97b3e1 (diff)
downloadnng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.gz
nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.bz2
nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.zip
fixes #334 Separate context for state machines from sockets
This provides context support for REQ and REP sockets. More discussion around this is in the issue itself. Optionally we would like to extend this to the surveyor pattern. Note that we specifically do not support pollable descriptors for non-default contexts, and the results of using file descriptors for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined. In the future, it might be nice to figure out how to factor in optional use of a message queue for users who want more buffering, but we think there is little need for this with cooked mode.
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/reqrep0/CMakeLists.txt6
-rw-r--r--src/protocol/reqrep0/rep.c621
-rw-r--r--src/protocol/reqrep0/req.c861
-rw-r--r--src/protocol/reqrep0/xrep.c434
-rw-r--r--src/protocol/reqrep0/xreq.c324
5 files changed, 1707 insertions, 539 deletions
diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt
index 7b04aa2d..071c28f1 100644
--- a/src/protocol/reqrep0/CMakeLists.txt
+++ b/src/protocol/reqrep0/CMakeLists.txt
@@ -11,12 +11,14 @@
# Req/Rep protocol
if (NNG_PROTO_REQ0)
- set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h)
+ set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/xreq.c
+ protocol/reqrep0/req.h)
set(REQ0_HEADERS protocol/reqrep0/req.h)
endif()
if (NNG_PROTO_REP0)
- set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h)
+ set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/xrep.c
+ protocol/reqrep0/rep.h)
set(REP0_HEADERS protocol/reqrep0/rep.h)
endif()
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 78a1f2ee..e512c18b 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -28,48 +28,219 @@
typedef struct rep0_pipe rep0_pipe;
typedef struct rep0_sock rep0_sock;
+typedef struct rep0_ctx rep0_ctx;
-static void rep0_sock_getq_cb(void *);
-static void rep0_pipe_getq_cb(void *);
-static void rep0_pipe_putq_cb(void *);
static void rep0_pipe_send_cb(void *);
static void rep0_pipe_recv_cb(void *);
static void rep0_pipe_fini(void *);
+struct rep0_ctx {
+ rep0_sock * sock;
+ bool closed;
+ char * btrace;
+ size_t btrace_len;
+ size_t btrace_size;
+ int ttl;
+ uint32_t pipe_id;
+ nni_aio * saio; // send aio
+ nni_aio * raio; // recv aio
+ nni_list_node sqnode;
+ nni_list_node rqnode;
+};
+
// rep0_sock is our per-socket protocol private structure.
struct rep0_sock {
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_mtx lk;
- int ttl;
- nni_idhash *pipes;
- char * btrace;
- size_t btrace_len;
- nni_aio * aio_getq;
+ nni_mtx lk;
+ int ttl;
+ nni_idhash * pipes;
+ nni_list recvpipes; // list of pipes with data to receive
+ nni_list recvq;
+ bool closed;
+ rep0_ctx * ctx;
+ nni_pollable *recvable;
+ nni_pollable *sendable;
};
// rep0_pipe is our per-pipe protocol private structure.
struct rep0_pipe {
- nni_pipe * pipe;
- rep0_sock *rep;
- nni_msgq * sendq;
- nni_aio * aio_getq;
- nni_aio * aio_send;
- nni_aio * aio_recv;
- nni_aio * aio_putq;
+ nni_pipe * pipe;
+ rep0_sock * rep;
+ uint32_t id;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_list_node rnode; // receivable list linkage
+ nni_list sendq; // contexts waiting to send
+ bool busy;
};
static void
+rep0_ctx_close(void *arg)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+ nni_aio * aio;
+
+ nni_mtx_lock(&s->lk);
+ ctx->closed = true;
+ if ((aio = ctx->saio) != NULL) {
+ nni_msg *msg;
+ nni_list_node_remove(&ctx->sqnode);
+ msg = nni_aio_get_msg(aio);
+ nni_msg_free(msg);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ if ((aio = ctx->raio) != NULL) {
+ nni_list_remove(&s->recvq, ctx);
+ ctx->raio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&s->lk);
+}
+
+static void
+rep0_ctx_fini(void *arg)
+{
+ rep0_ctx *ctx = arg;
+
+ rep0_ctx_close(ctx);
+ nni_free(ctx->btrace, ctx->btrace_size);
+ NNI_FREE_STRUCT(ctx);
+}
+
+static int
+rep0_ctx_init(void **ctxp, void *sarg)
+{
+ rep0_sock *s = sarg;
+ rep0_ctx * ctx;
+
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // this is 1kB, which covers the worst case.
+ ctx->btrace_size = 256 * sizeof(uint32_t);
+ if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) {
+ NNI_FREE_STRUCT(ctx);
+ return (NNG_ENOMEM);
+ }
+ NNI_LIST_NODE_INIT(&ctx->sqnode);
+ NNI_LIST_NODE_INIT(&ctx->rqnode);
+ ctx->btrace_len = 0;
+ ctx->sock = s;
+ ctx->pipe_id = 0;
+ *ctxp = ctx;
+
+ return (0);
+}
+
+static void
+rep0_ctx_cancel_send(nni_aio *aio, int rv)
+{
+ rep0_ctx * ctx = nni_aio_get_prov_data(aio);
+ rep0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->lk);
+ if (ctx->saio != aio) {
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+ nni_list_node_remove(&ctx->sqnode);
+ ctx->saio = NULL;
+ nni_mtx_unlock(&s->lk);
+
+ nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers
+ nni_aio_finish_error(aio, rv);
+}
+
+static void
+rep0_ctx_send(void *arg, nni_aio *aio)
+{
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+ rep0_pipe *p;
+ nni_msg * msg;
+ int rv;
+ size_t len;
+ uint32_t p_id; // pipe id
+
+ msg = nni_aio_get_msg(aio);
+ nni_msg_header_clear(msg);
+
+ nni_mtx_lock(&s->lk);
+ len = ctx->btrace_len;
+ p_id = ctx->pipe_id;
+
+ // Assert "completion" of the previous req request. This ensures
+ // exactly one send for one receive ordering.
+ ctx->btrace_len = 0;
+ ctx->pipe_id = 0;
+
+ if (ctx == s->ctx) {
+ // No matter how this goes, we will no longer be able
+ // to send on the socket (root context). That's because
+ // we will have finished (successfully or otherwise) the
+ // reply for the single request we got.
+ nni_pollable_clear(s->sendable);
+ }
+
+ if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) {
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+ if (ctx->closed) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if (len == 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) {
+ // Pipe is gone. Make this look like a good send to avoid
+ // disrupting the state machine. We don't care if the peer
+ // lost interest in our reply.
+ nni_aio_set_msg(aio, NULL);
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ return;
+ }
+ if (p->busy) {
+ ctx->saio = aio;
+ nni_list_append(&p->sendq, ctx);
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+
+ p->busy = true;
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
+ nni_mtx_unlock(&s->lk);
+
+ nni_aio_finish(aio, 0, len);
+}
+
+static void
rep0_sock_fini(void *arg)
{
rep0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
- nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
- if (s->btrace != NULL) {
- nni_free(s->btrace, s->btrace_len);
+ if (s->ctx != NULL) {
+ rep0_ctx_fini(s->ctx);
}
+ nni_pollable_free(s->sendable);
+ nni_pollable_free(s->recvable);
nni_mtx_fini(&s->lk);
NNI_FREE_STRUCT(s);
}
@@ -80,21 +251,34 @@ rep0_sock_init(void **sp, nni_sock *sock)
rep0_sock *s;
int rv;
+ NNI_ARG_UNUSED(sock);
+
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&s->lk);
- if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
- ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) {
+ if ((rv = nni_idhash_init(&s->pipes)) != 0) {
+ rep0_sock_fini(s);
+ return (rv);
+ }
+
+ NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode);
+ NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode);
+
+ s->ttl = 8;
+
+ if ((rv = rep0_ctx_init((void **) &s->ctx, s)) != 0) {
rep0_sock_fini(s);
return (rv);
}
- s->ttl = 8; // Per RFC
- s->btrace = NULL;
- s->btrace_len = 0;
- s->uwq = nni_sock_sendq(sock);
- s->urq = nni_sock_recvq(sock);
+ // We start off without being either readable or pollable.
+ // Readability comes when there is something on the socket.
+ if (((rv = nni_pollable_alloc(&s->sendable)) != 0) ||
+ ((rv = nni_pollable_alloc(&s->recvable)) != 0)) {
+ rep0_sock_fini(s);
+ return (rv);
+ }
*sp = s;
@@ -104,9 +288,7 @@ rep0_sock_init(void **sp, nni_sock *sock)
static void
rep0_sock_open(void *arg)
{
- rep0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -114,7 +296,7 @@ rep0_sock_close(void *arg)
{
rep0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ rep0_ctx_close(s->ctx);
}
static void
@@ -122,11 +304,8 @@ rep0_pipe_fini(void *arg)
{
rep0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
- nni_msgq_fini(p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -139,15 +318,15 @@ rep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) {
+ if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) {
rep0_pipe_fini(p);
return (rv);
}
+ NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode);
+
+ p->id = nni_pipe_id(pipe);
p->pipe = pipe;
p->rep = s;
*pp = p;
@@ -164,8 +343,8 @@ rep0_pipe_start(void *arg)
if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
-
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ // By definition, we have not received a request yet on this pipe,
+ // so it cannot cause us to become sendable.
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
@@ -175,94 +354,136 @@ rep0_pipe_stop(void *arg)
{
rep0_pipe *p = arg;
rep0_sock *s = p->rep;
+ rep0_ctx * ctx;
+
+ nni_mtx_lock(&s->lk);
+ while ((ctx = nni_list_first(&p->sendq)) != NULL) {
+ nni_aio *aio;
+ nni_msg *msg;
+ // Pipe was closed. To avoid pushing an error back to the
+ // entire socket, we pretend we completed this successfully.
+ nni_list_remove(&p->sendq, ctx);
+ aio = ctx->saio;
+ ctx->saio = NULL;
+ msg = nni_aio_get_msg(aio);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
+ nni_msg_free(msg);
+ }
+ if (p->id == s->ctx->pipe_id) {
+ // We "can" send. (Well, not really, but we will happily
+ // accept a message and discard it.)
+ nni_pollable_raise(s->sendable);
+ }
+ nni_mtx_unlock(&s->lk);
- nni_msgq_close(p->sendq);
- nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
}
static void
-rep0_sock_getq_cb(void *arg)
+rep0_pipe_send_cb(void *arg)
{
- rep0_sock *s = arg;
- nni_msgq * uwq = s->uwq;
+ rep0_pipe *p = arg;
+ rep0_sock *s = p->rep;
+ rep0_ctx * ctx;
+ nni_aio * aio;
nni_msg * msg;
- uint32_t id;
- rep0_pipe *p;
- int rv;
-
- // This watches for messages from the upper write queue,
- // extracts the destination pipe, and forwards it to the appropriate
- // destination pipe via a separate queue. This prevents a single bad
- // or slow pipe from gumming up the works for the entire socket.
+ size_t len;
- if (nni_aio_result(s->aio_getq) != 0) {
- // Closed socket?
+ nni_mtx_lock(&s->lk);
+ p->busy = false;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
+ nni_mtx_unlock(&s->lk);
return;
}
-
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
-
- // We yank the outgoing pipe id from the header
- if (nni_msg_header_len(msg) < 4) {
- nni_msg_free(msg);
-
- // Look for another message on the upper write queue.
- nni_msgq_aio_get(uwq, s->aio_getq);
+ if ((ctx = nni_list_first(&p->sendq)) == NULL) {
+ // Nothing else to send.
+ if (p->id == s->ctx->pipe_id) {
+ // Mark us ready for the other side to send!
+ nni_pollable_raise(s->sendable);
+ }
+ nni_mtx_unlock(&s->lk);
return;
}
- id = nni_msg_header_trim_u32(msg);
+ nni_list_remove(&p->sendq, ctx);
+ aio = ctx->saio;
+ ctx->saio = NULL;
+ p->busy = true;
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_aio_set_msg(aio, NULL);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
- // Look for the pipe, and attempt to put the message there
- // (nonblocking) if we can. If we can't for any reason, then we
- // free the message.
- // XXX: LOCKING?!?!
- if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
- rv = nni_msgq_tryput(p->sendq, msg);
- }
- if (rv != 0) {
- nni_msg_free(msg);
- }
+ nni_mtx_unlock(&s->lk);
- // Now look for another message on the upper write queue.
- nni_msgq_aio_get(uwq, s->aio_getq);
+ nni_aio_finish_synch(aio, 0, len);
}
static void
-rep0_pipe_getq_cb(void *arg)
+rep0_cancel_recv(nni_aio *aio, int rv)
{
- rep0_pipe *p = arg;
+ rep0_ctx * ctx = nni_aio_get_prov_data(aio);
+ rep0_sock *s = ctx->sock;
- if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
- return;
+ nni_mtx_lock(&s->lk);
+ if (ctx->raio == aio) {
+ nni_list_remove(&s->recvq, ctx);
+ ctx->raio = NULL;
+ nni_aio_finish_error(aio, rv);
}
-
- nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
-
- nni_pipe_send(p->pipe, p->aio_send);
+ nni_mtx_unlock(&s->lk);
}
static void
-rep0_pipe_send_cb(void *arg)
+rep0_ctx_recv(void *arg, nni_aio *aio)
{
- rep0_pipe *p = arg;
+ rep0_ctx * ctx = arg;
+ rep0_sock *s = ctx->sock;
+ rep0_pipe *p;
+ size_t len;
+ nni_msg * msg;
- if (nni_aio_result(p->aio_send) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_send));
- nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_mtx_lock(&s->lk);
+ if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) {
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+ if (ctx->closed) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
+ if ((p = nni_list_first(&s->recvpipes)) == NULL) {
+ nni_pollable_clear(s->recvable);
+ ctx->raio = aio;
+ nni_list_append(&s->recvq, ctx);
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_list_remove(&s->recvpipes, p);
+ if (nni_list_empty(&s->recvpipes)) {
+ nni_pollable_clear(s->recvable);
+ }
+ nni_pipe_recv(p->pipe, p->aio_recv);
+
+ len = nni_msg_header_len(msg);
+ memcpy(ctx->btrace, nni_msg_header(msg), len);
+ ctx->btrace_len = len;
+ ctx->pipe_id = nni_pipe_id(p->pipe);
+ nni_mtx_unlock(&s->lk);
- nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_msg_header_clear(msg);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
}
static void
@@ -270,9 +491,12 @@ rep0_pipe_recv_cb(void *arg)
{
rep0_pipe *p = arg;
rep0_sock *s = p->rep;
+ rep0_ctx * ctx;
nni_msg * msg;
int rv;
uint8_t * body;
+ nni_aio * aio;
+ size_t len;
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
@@ -281,28 +505,22 @@ rep0_pipe_recv_cb(void *arg)
}
msg = nni_aio_get_msg(p->aio_recv);
- nni_aio_set_msg(p->aio_recv, NULL);
- nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
-
- // Store the pipe id in the header, first thing.
- rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
- if (rv != 0) {
- // Failure here causes us to drop the message.
- goto drop;
- }
+ nni_msg_set_pipe(msg, p->id);
// Move backtrace from body to header
hops = 1;
for (;;) {
int end = 0;
- if (hops >= s->ttl) {
+
+ if (hops > s->ttl) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
// can legitimately receive messages with too many
// hops from devices, etc.
goto drop;
}
+ hops++;
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
@@ -313,10 +531,7 @@ rep0_pipe_recv_cb(void *arg)
end = (body[0] & 0x80) ? 1 : 0;
rv = nni_msg_header_append(msg, body, 4);
if (rv != 0) {
- // Presumably this is due to out of memory.
- // We could just discard and try again, but we
- // just toss the connection for now. Given the
- // out of memory situation, this is not unreasonable.
+ // Out of memory, so drop it.
goto drop;
}
nni_msg_trim(msg, 4);
@@ -325,28 +540,46 @@ rep0_pipe_recv_cb(void *arg)
}
}
- // Go ahead and send it up.
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(s->urq, p->aio_putq);
- return;
+ len = nni_msg_header_len(msg);
-drop:
- nni_msg_free(msg);
+ nni_mtx_lock(&s->lk);
+
+ if ((ctx = nni_list_first(&s->recvq)) == NULL) {
+ // No one waiting to receive yet, holding pattern.
+ nni_list_append(&s->recvpipes, p);
+ nni_pollable_raise(s->recvable);
+ nni_mtx_unlock(&s->lk);
+ return;
+ }
+
+ nni_list_remove(&s->recvq, ctx);
+ aio = ctx->raio;
+ ctx->raio = NULL;
+ nni_aio_set_msg(aio, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
+
+ // schedule another receive
nni_pipe_recv(p->pipe, p->aio_recv);
-}
-static void
-rep0_pipe_putq_cb(void *arg)
-{
- rep0_pipe *p = arg;
+ ctx->btrace_len = len;
+ memcpy(ctx->btrace, nni_msg_header(msg), len);
+ nni_msg_header_clear(msg);
+ ctx->pipe_id = p->id;
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->pipe);
- return;
+ // If we got a request on a pipe that wasn't busy, we should mark
+ // it sendable. (The sendable flag is not set when there is no
+ // request needing a reply.)
+ if ((ctx == s->ctx) && (!p->busy)) {
+ nni_pollable_raise(s->sendable);
}
+ nni_mtx_unlock(&s->lk);
+
+ nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ return;
+
+drop:
+ nni_msg_free(msg);
nni_pipe_recv(p->pipe, p->aio_recv);
}
@@ -354,6 +587,7 @@ static int
rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
{
rep0_sock *s = arg;
+
return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
}
@@ -361,75 +595,43 @@ static int
rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
{
rep0_sock *s = arg;
+
return (nni_copyout_int(s->ttl, buf, szp, typ));
}
-static nni_msg *
-rep0_sock_filter(void *arg, nni_msg *msg)
+static int
+rep0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ)
{
rep0_sock *s = arg;
- char * header;
- size_t len;
-
- nni_mtx_lock(&s->lk);
+ int rv;
+ int fd;
- len = nni_msg_header_len(msg);
- header = nni_msg_header(msg);
- if (s->btrace != NULL) {
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- }
- if ((s->btrace = nni_alloc(len)) == NULL) {
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) {
+ return (rv);
}
- s->btrace_len = len;
- memcpy(s->btrace, header, len);
- nni_msg_header_clear(msg);
- nni_mtx_unlock(&s->lk);
- return (msg);
+ return (nni_copyout_int(fd, buf, szp, typ));
}
-static void
-rep0_sock_send_raw(void *arg, nni_aio *aio)
+static int
+rep0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ)
{
rep0_sock *s = arg;
- nni_msgq_aio_put(s->uwq, aio);
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) {
+ return (rv);
+ }
+
+ return (nni_copyout_int(fd, buf, szp, typ));
}
static void
rep0_sock_send(void *arg, nni_aio *aio)
{
rep0_sock *s = arg;
- int rv;
- nni_msg * msg;
-
- nni_mtx_lock(&s->lk);
- if (s->btrace == NULL) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
- }
-
- msg = nni_aio_get_msg(aio);
-
- // drop anything else in the header... (it should already be
- // empty, but there can be stale backtrace info there.)
- nni_msg_header_clear(msg);
-
- if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) {
- nni_mtx_unlock(&s->lk);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- nni_mtx_unlock(&s->lk);
- nni_msgq_aio_put(s->uwq, aio);
+ rep0_ctx_send(s->ctx, aio);
}
static void
@@ -437,7 +639,7 @@ rep0_sock_recv(void *arg, nni_aio *aio)
{
rep0_sock *s = arg;
- nni_msgq_aio_get(s->urq, aio);
+ rep0_ctx_recv(s->ctx, aio);
}
// This is the global protocol structure -- our linkage to the core.
@@ -449,6 +651,21 @@ static nni_proto_pipe_ops rep0_pipe_ops = {
.pipe_stop = rep0_pipe_stop,
};
+static nni_proto_ctx_option rep0_ctx_options[] = {
+ // terminate list
+ {
+ .co_name = NULL,
+ },
+};
+
+static nni_proto_ctx_ops rep0_ctx_ops = {
+ .ctx_init = rep0_ctx_init,
+ .ctx_fini = rep0_ctx_fini,
+ .ctx_send = rep0_ctx_send,
+ .ctx_recv = rep0_ctx_recv,
+ .ctx_options = rep0_ctx_options,
+};
+
static nni_proto_sock_option rep0_sock_options[] = {
{
.pso_name = NNG_OPT_MAXTTL,
@@ -456,6 +673,18 @@ static nni_proto_sock_option rep0_sock_options[] = {
.pso_getopt = rep0_sock_getopt_maxttl,
.pso_setopt = rep0_sock_setopt_maxttl,
},
+ {
+ .pso_name = NNG_OPT_RECVFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = rep0_sock_getopt_recvfd,
+ .pso_setopt = NULL,
+ },
+ {
+ .pso_name = NNG_OPT_SENDFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = rep0_sock_getopt_sendfd,
+ .pso_setopt = NULL,
+ },
// terminate list
{
.pso_name = NULL,
@@ -468,38 +697,18 @@ static nni_proto_sock_ops rep0_sock_ops = {
.sock_open = rep0_sock_open,
.sock_close = rep0_sock_close,
.sock_options = rep0_sock_options,
- .sock_filter = rep0_sock_filter,
.sock_send = rep0_sock_send,
.sock_recv = rep0_sock_recv,
};
-static nni_proto_sock_ops rep0_sock_ops_raw = {
- .sock_init = rep0_sock_init,
- .sock_fini = rep0_sock_fini,
- .sock_open = rep0_sock_open,
- .sock_close = rep0_sock_close,
- .sock_options = rep0_sock_options,
- .sock_filter = NULL, // No filtering for raw mode
- .sock_send = rep0_sock_send_raw,
- .sock_recv = rep0_sock_recv,
-};
-
static nni_proto rep0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REP_V0, "rep" },
.proto_peer = { NNI_PROTO_REQ_V0, "req" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &rep0_sock_ops,
.proto_pipe_ops = &rep0_pipe_ops,
-};
-
-static nni_proto rep0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REP_V0, "rep" },
- .proto_peer = { NNI_PROTO_REQ_V0, "req" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &rep0_sock_ops_raw,
- .proto_pipe_ops = &rep0_pipe_ops,
+ .proto_ctx_ops = &rep0_ctx_ops,
};
int
@@ -507,9 +716,3 @@ nng_rep0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &rep0_proto));
}
-
-int
-nng_rep0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &rep0_proto_raw));
-}
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 4d35ca1f..8149ce08 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -28,34 +28,53 @@
typedef struct req0_pipe req0_pipe;
typedef struct req0_sock req0_sock;
+typedef struct req0_ctx req0_ctx;
-static void req0_resend(req0_sock *);
-static void req0_timeout(void *);
+static void req0_run_sendq(req0_sock *, nni_list *);
+static void req0_ctx_reset(req0_ctx *);
+static void req0_ctx_timeout(void *);
static void req0_pipe_fini(void *);
+static void req0_ctx_fini(void *);
+static int req0_ctx_init(void **, void *);
+
+// A req0_ctx is a "context" for the request. It uses most of the
+// socket, but keeps track of its own outstanding replays, the request ID,
+// and so forth.
+struct req0_ctx {
+ nni_list_node snode;
+ nni_list_node sqnode; // node on the sendq
+ nni_list_node pnode; // node on the pipe list
+ uint32_t reqid;
+ req0_sock * sock;
+ nni_aio * raio; // user aio waiting to receive - only one!
+ nni_aio * saio;
+ nng_msg * reqmsg; // request message
+ size_t reqlen;
+ nng_msg * repmsg; // reply message
+ nni_timer_node timer;
+ nni_duration retry;
+};
// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
- nni_msgq * uwq;
- nni_msgq * urq;
+ nni_sock * nsock;
nni_duration retry;
- nni_time resend;
- bool raw;
- bool wantw;
bool closed;
int ttl;
- nni_msg * reqmsg;
- req0_pipe *pendpipe;
+ req0_ctx *ctx; // base socket ctx
nni_list readypipes;
nni_list busypipes;
+ nni_list ctxs;
- nni_timer_node timer;
+ nni_list sendq; // contexts waiting to send.
+ nni_idhash * reqids; // contexts by request ID
+ nni_pollable *recvable;
+ nni_pollable *sendable;
- uint32_t nextid; // next id
- uint8_t reqid[4]; // outstanding request ID (big endian)
- nni_mtx mtx;
- nni_cv cv;
+ nni_mtx mtx;
+ nni_cv cv;
};
// A req0_pipe is our per-pipe protocol private structure.
@@ -63,60 +82,61 @@ struct req0_pipe {
nni_pipe * pipe;
req0_sock * req;
nni_list_node node;
- nni_aio * aio_getq; // raw mode only
- nni_aio * aio_sendraw; // raw mode only
- nni_aio * aio_sendcooked; // cooked mode only
+ nni_list ctxs; // ctxs with pending traffic
+ nni_aio * aio_send;
nni_aio * aio_recv;
- nni_aio * aio_putq;
- nni_mtx mtx;
};
-static void req0_getq_cb(void *);
-static void req0_sendraw_cb(void *);
-static void req0_sendcooked_cb(void *);
+static void req0_sock_fini(void *);
+static void req0_send_cb(void *);
static void req0_recv_cb(void *);
-static void req0_putq_cb(void *);
static int
-req0_sock_init_impl(void **sp, nni_sock *sock, bool raw)
+req0_sock_init(void **sp, nni_sock *sock)
{
req0_sock *s;
+ int rv;
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_idhash_init(&s->reqids)) != 0) {
+ NNI_FREE_STRUCT(s);
+ return (rv);
+ }
+
+ // Request IDs are 32 bits, with the high order bit set.
+ // We start at a random point, to minimize likelihood of
+ // accidental collision across restarts.
+ nni_idhash_set_limits(
+ s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u);
+
nni_mtx_init(&s->mtx);
nni_cv_init(&s->cv, &s->mtx);
NNI_LIST_INIT(&s->readypipes, req0_pipe, node);
NNI_LIST_INIT(&s->busypipes, req0_pipe, node);
- nni_timer_init(&s->timer, req0_timeout, s);
+ NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode);
+ NNI_LIST_INIT(&s->ctxs, req0_ctx, snode);
// this is "semi random" start for request IDs.
- s->nextid = nni_random();
- s->retry = NNI_SECOND * 60;
- s->reqmsg = NULL;
- s->raw = raw;
- s->wantw = false;
- s->resend = NNI_TIME_ZERO;
- s->ttl = 8;
- s->uwq = nni_sock_sendq(sock);
- s->urq = nni_sock_recvq(sock);
- *sp = s;
+ s->nsock = sock;
+ s->retry = NNI_SECOND * 60;
- return (0);
-}
+ if ((rv = req0_ctx_init((void **) &s->ctx, s)) != 0) {
+ req0_sock_fini(s);
+ return (rv);
+ }
+ if (((rv = nni_pollable_alloc(&s->sendable)) != 0) ||
+ ((rv = nni_pollable_alloc(&s->recvable)) != 0)) {
+ req0_sock_fini(s);
+ return (rv);
+ }
-static int
-req0_sock_init(void **sp, nni_sock *sock)
-{
- return (req0_sock_init_impl(sp, sock, false));
-}
+ s->ttl = 8;
+ *sp = s;
-static int
-req0_sock_init_raw(void **sp, nni_sock *sock)
-{
- return (req0_sock_init_impl(sp, sock, true));
+ return (0);
}
static void
@@ -129,12 +149,18 @@ static void
req0_sock_close(void *arg)
{
req0_sock *s = arg;
+ req0_ctx * ctx;
nni_mtx_lock(&s->mtx);
s->closed = true;
+ NNI_LIST_FOREACH (&s->ctxs, ctx) {
+ if (ctx->raio != NULL) {
+ nni_aio_finish_error(ctx->raio, NNG_ECLOSED);
+ ctx->raio = NULL;
+ req0_ctx_reset(ctx);
+ }
+ }
nni_mtx_unlock(&s->mtx);
-
- nni_timer_cancel(&s->timer);
}
static void
@@ -147,10 +173,13 @@ req0_sock_fini(void *arg)
(!nni_list_empty(&s->busypipes))) {
nni_cv_wait(&s->cv);
}
- if (s->reqmsg != NULL) {
- nni_msg_free(s->reqmsg);
- }
nni_mtx_unlock(&s->mtx);
+ if (s->ctx) {
+ req0_ctx_fini(s->ctx);
+ }
+ nni_pollable_free(s->recvable);
+ nni_pollable_free(s->sendable);
+ nni_idhash_fini(s->reqids);
nni_cv_fini(&s->cv);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
@@ -161,12 +190,8 @@ req0_pipe_fini(void *arg)
{
req0_pipe *p = arg;
- nni_aio_fini(p->aio_getq);
- nni_aio_fini(p->aio_putq);
nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_sendcooked);
- nni_aio_fini(p->aio_sendraw);
- nni_mtx_fini(&p->mtx);
+ nni_aio_fini(p->aio_send);
NNI_FREE_STRUCT(p);
}
@@ -179,18 +204,14 @@ req0_pipe_init(void **pp, nni_pipe *pipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&p->mtx);
- if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) !=
- 0)) {
+ if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) {
req0_pipe_fini(p);
return (rv);
}
NNI_LIST_NODE_INIT(&p->node);
+ NNI_LIST_INIT(&p->ctxs, req0_ctx, pnode);
p->pipe = pipe;
p->req = s;
*pp = p;
@@ -213,14 +234,10 @@ req0_pipe_start(void *arg)
return (NNG_ECLOSED);
}
nni_list_append(&s->readypipes, p);
- // If sock was waiting for somewhere to send data, go ahead and
- // send it to this pipe.
- if (s->wantw) {
- req0_resend(s);
- }
+ nni_pollable_raise(s->sendable);
+ req0_run_sendq(s, NULL);
nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, p->aio_getq);
nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
@@ -230,12 +247,10 @@ req0_pipe_stop(void *arg)
{
req0_pipe *p = arg;
req0_sock *s = p->req;
+ req0_ctx * ctx;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_putq);
nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_sendcooked);
- nni_aio_stop(p->aio_sendraw);
+ nni_aio_stop(p->aio_send);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
@@ -249,126 +264,54 @@ req0_pipe_stop(void *arg)
nni_cv_wake(&s->cv);
}
}
-
- if ((p == s->pendpipe) && (s->reqmsg != NULL)) {
- // removing the pipe we sent the last request on...
- // schedule immediate resend.
- s->pendpipe = NULL;
- s->resend = NNI_TIME_ZERO;
- s->wantw = true;
- req0_resend(s);
+ if (nni_list_empty(&s->readypipes)) {
+ nni_pollable_clear(s->sendable);
}
- nni_mtx_unlock(&s->mtx);
-}
-static int
-req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
-{
- req0_sock *s = arg;
- return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
-}
-
-static int
-req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
-{
- req0_sock *s = arg;
- return (nni_copyout_int(s->ttl, buf, szp, typ));
-}
-
-static int
-req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ)
-{
- req0_sock *s = arg;
- return (nni_copyin_ms(&s->retry, buf, sz, typ));
-}
-
-static int
-req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ)
-{
- req0_sock *s = arg;
- return (nni_copyout_ms(s->retry, buf, szp, typ));
-}
-
-// Raw and cooked mode differ in the way they send messages out.
-//
-// For cooked mdes, we have a getq callback on the upper write queue, which
-// when it finds a message, cancels any current processing, and saves a copy
-// of the message, and then tries to "resend" the message, looking for a
-// suitable available outgoing pipe. If no suitable pipe is available,
-// a flag is set, so that as soon as such a pipe is available we trigger
-// a resend attempt. We also trigger the attempt on either timeout, or if
-// the underlying pipe we chose disconnects.
-//
-// For raw mode we can just let the pipes "contend" via getq to get a
-// message from the upper write queue. The msgqueue implementation
-// actually provides ordering, so load will be spread automatically.
-// (NB: We may have to revise this in the future if we want to provide some
-// kind of priority.)
-
-static void
-req0_getq_cb(void *arg)
-{
- req0_pipe *p = arg;
-
- // We should be in RAW mode. Cooked mode traffic bypasses
- // the upper write queue entirely, and should never end up here.
- // If the mode changes, we may briefly deliver a message, but
- // that's ok (there's an inherent race anyway). (One minor
- // exception: we wind up here in error state when the uwq is closed.)
-
- if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
- return;
+ while ((ctx = nni_list_first(&p->ctxs)) != NULL) {
+ nni_list_remove(&p->ctxs, ctx);
+ // Reset the timer on this so it expires immediately.
+ // This is actually easier than canceling the timer and
+ // running the sendq separately. (In particular, it avoids
+ // a potential deadlock on cancelling the timer.)
+ nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
}
-
- nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq));
- nni_aio_set_msg(p->aio_getq, NULL);
-
- // Send the message, but use the raw mode aio.
- nni_pipe_send(p->pipe, p->aio_sendraw);
+ nni_mtx_unlock(&s->mtx);
}
-static void
-req0_sendraw_cb(void *arg)
-{
- req0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_sendraw) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_sendraw));
- nni_aio_set_msg(p->aio_sendraw, NULL);
- nni_pipe_stop(p->pipe);
- return;
- }
-
- // Sent a message so we just need to look for another one.
- nni_msgq_aio_get(p->req->uwq, p->aio_getq);
-}
+// For cooked mode, we use a context, and send out that way. This
+// completely bypasses the upper write queue. Each context keeps one
+// message pending; these are "scheduled" via the sendq. The sendq
+// is ordered, so FIFO ordering between contexts is provided for.
static void
-req0_sendcooked_cb(void *arg)
+req0_send_cb(void *arg)
{
req0_pipe *p = arg;
req0_sock *s = p->req;
+ nni_aio * aio;
+ nni_list aios;
- if (nni_aio_result(p->aio_sendcooked) != 0) {
+ nni_aio_list_init(&aios);
+ if (nni_aio_result(p->aio_send) != 0) {
// We failed to send... clean up and deal with it.
- // We leave ourselves on the busy list for now, which
- // means no new asynchronous traffic can occur here.
- nni_msg_free(nni_aio_get_msg(p->aio_sendcooked));
- nni_aio_set_msg(p->aio_sendcooked, NULL);
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->pipe);
return;
}
- // Cooked mode. We completed a cooked send, so we need to
- // reinsert ourselves in the ready list, and possibly schedule
- // a resend.
+ // We completed a cooked send, so we need to reinsert ourselves
+ // in the ready list, and re-run the sendq.
nni_mtx_lock(&s->mtx);
if (nni_list_active(&s->busypipes, p)) {
nni_list_remove(&s->busypipes, p);
nni_list_append(&s->readypipes, p);
- req0_resend(s);
+ if (nni_list_empty(&s->sendq)) {
+ nni_pollable_raise(s->sendable);
+ }
+ req0_run_sendq(s, &aios);
} else {
// We wind up here if stop was called from the reader
// side while we were waiting to be scheduled to run for the
@@ -377,29 +320,22 @@ req0_sendcooked_cb(void *arg)
nni_pipe_stop(p->pipe);
}
nni_mtx_unlock(&s->mtx);
-}
-static void
-req0_putq_cb(void *arg)
-{
- req0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->pipe);
- return;
+ while ((aio = nni_list_first(&aios)) != NULL) {
+ nni_list_remove(&aios, aio);
+ nni_aio_finish_synch(aio, 0, 0);
}
- nni_aio_set_msg(p->aio_putq, NULL);
-
- nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
req0_recv_cb(void *arg)
{
req0_pipe *p = arg;
+ req0_sock *s = p->req;
+ req0_ctx * ctx;
nni_msg * msg;
+ nni_aio * aio;
+ uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->pipe);
@@ -410,22 +346,58 @@ req0_recv_cb(void *arg)
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
- // We yank 4 bytes of body, and move them to the header.
+ // We yank 4 bytes from front of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
// Malformed message.
goto malformed;
}
- if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
+ id = nni_msg_trim_u32(msg);
+ if (nni_msg_header_append_u32(msg, id) != 0) {
// Arguably we could just discard and carry on. But
// dropping the connection is probably more helpful since
// it lets the other side see that a problem occurred.
// Plus it gives us a chance to reclaim some memory.
goto malformed;
}
- (void) nni_msg_trim(msg, 4); // Cannot fail
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(p->req->urq, p->aio_putq);
+ // Schedule another receive while we are processing this.
+ nni_mtx_lock(&s->mtx);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+
+ // Look for a context to receive it.
+ if ((nni_idhash_find(s->reqids, id, (void **) &ctx) != 0) ||
+ (ctx->saio != NULL) || (ctx->repmsg != NULL)) {
+ nni_mtx_unlock(&s->mtx);
+ // No waiting context, we have not sent the request out to
+ // the wire yet, or context already has a reply ready.
+ // Discard the message.
+ nni_msg_free(msg);
+ return;
+ }
+
+ // We have our match, so we can remove this.
+ nni_list_node_remove(&ctx->sqnode);
+ nni_idhash_remove(s->reqids, id);
+ ctx->reqid = 0;
+ if (ctx->reqmsg != NULL) {
+ nni_msg_free(ctx->reqmsg);
+ ctx->reqmsg = NULL;
+ }
+
+ // Is there an aio waiting for us?
+ if ((aio = ctx->raio) != NULL) {
+ ctx->raio = NULL;
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_set_msg(aio, msg);
+ nni_aio_finish_synch(aio, 0, nni_msg_len(msg));
+ } else {
+ // No AIO, so stash msg. Receive will pick it up later.
+ ctx->repmsg = msg;
+ if (ctx == s->ctx) {
+ nni_pollable_raise(s->recvable);
+ }
+ nni_mtx_unlock(&s->mtx);
+ }
return;
malformed:
@@ -434,191 +406,417 @@ malformed:
}
static void
-req0_timeout(void *arg)
+req0_ctx_timeout(void *arg)
{
- req0_sock *s = arg;
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
nni_mtx_lock(&s->mtx);
- if (s->reqmsg != NULL) {
- s->wantw = true;
- req0_resend(s);
+ if ((ctx->reqmsg != NULL) && (!s->closed)) {
+ if (!nni_list_node_active(&ctx->sqnode)) {
+ nni_list_append(&s->sendq, ctx);
+ }
+ req0_run_sendq(s, NULL);
}
nni_mtx_unlock(&s->mtx);
}
-static void
-req0_resend(req0_sock *s)
+static int
+req0_ctx_init(void **cpp, void *sarg)
{
- req0_pipe *p;
- nni_msg * msg;
+ req0_sock *s = sarg;
+ req0_ctx * ctx;
- // Note: This routine should be called with the socket lock held.
- // Also, this should only be called while handling cooked mode
- // requests.
- if ((msg = s->reqmsg) == NULL) {
- return;
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
}
- if (s->closed) {
- s->reqmsg = NULL;
- nni_msg_free(msg);
+ nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);
+
+ nni_mtx_lock(&s->mtx);
+ ctx->sock = s;
+ ctx->raio = NULL;
+ ctx->retry = s->retry;
+ nni_list_append(&s->ctxs, ctx);
+ nni_mtx_unlock(&s->mtx);
+
+ *cpp = ctx;
+ return (0);
+}
+
+static void
+req0_ctx_fini(void *arg)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+ nni_aio * aio;
+
+ nni_mtx_lock(&s->mtx);
+ if ((aio = ctx->raio) != NULL) {
+ ctx->raio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
+ if ((aio = ctx->saio) != NULL) {
+ ctx->saio = NULL;
+ nni_aio_set_msg(aio, ctx->reqmsg);
+ ctx->reqmsg = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ req0_ctx_reset(ctx);
+ nni_list_remove(&s->ctxs, ctx);
+ nni_mtx_unlock(&s->mtx);
- if (s->wantw) {
- s->wantw = false;
+ nni_timer_cancel(&ctx->timer);
+ nni_timer_fini(&ctx->timer);
- if (nni_msg_dup(&msg, s->reqmsg) != 0) {
- // Failed to alloc message, reschedule it. Also,
- // mark that we have a message we want to resend,
- // in case something comes available.
- s->wantw = true;
- nni_timer_schedule(&s->timer, nni_clock() + s->retry);
- return;
- }
+ NNI_FREE_STRUCT(ctx);
+}
+
+static int
+req0_ctx_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ)
+{
+ req0_ctx *ctx = arg;
+ return (nni_copyin_ms(&ctx->retry, buf, sz, typ));
+}
+
+static int
+req0_ctx_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ)
+{
+ req0_ctx *ctx = arg;
+ return (nni_copyout_ms(ctx->retry, buf, szp, typ));
+}
+
+static void
+req0_run_sendq(req0_sock *s, nni_list *aiolist)
+{
+ req0_ctx *ctx;
+ nni_aio * aio;
+
+ // Note: This routine should be called with the socket lock held.
+ while ((ctx = nni_list_first(&s->sendq)) != NULL) {
+ nni_msg * msg;
+ req0_pipe *p;
- // Now we iterate across all possible outpipes, until
- // one accepts it.
if ((p = nni_list_first(&s->readypipes)) == NULL) {
- // No pipes ready to process us. Note that we have
- // something to send, and schedule it.
- nni_msg_free(msg);
- s->wantw = true;
return;
}
+ // We have a place to send it, so do the send.
+ // If a sending error occurs that causes the message to
+ // be dropped, we rely on the resend timer to pick it up.
+ // We also notify the completion callback if this is the
+ // first send attempt.
+ nni_list_remove(&s->sendq, ctx);
+
+ // Schedule a resubmit timer. We only do this if we got
+ // a pipe to send to. Otherwise, we should get handled
+ // the next time that the sendq is run.
+ nni_timer_schedule(&ctx->timer, nni_clock() + ctx->retry);
+
+ if (nni_msg_dup(&msg, ctx->reqmsg) != 0) {
+ // Oops. Well, keep trying each context; maybe
+ // one of them will get lucky.
+ continue;
+ }
+
+ // Put us on the pipe list of active contexts.
+ // This gives the pipe a chance to kick a resubmit
+ // if the pipe is removed.
+ nni_list_node_remove(&ctx->pnode);
+ nni_list_append(&p->ctxs, ctx);
+
nni_list_remove(&s->readypipes, p);
nni_list_append(&s->busypipes, p);
- s->pendpipe = p;
- s->resend = nni_clock() + s->retry;
- nni_aio_set_msg(p->aio_sendcooked, msg);
+ if ((aio = ctx->saio) != NULL) {
+ ctx->saio = NULL;
+ nni_aio_bump_count(aio, ctx->reqlen);
+ // If the list was passed in, we want to do a
+ // synchronous completion later.
+ if (aiolist != NULL) {
+ nni_list_append(aiolist, aio);
+ } else {
+ nni_aio_finish(aio, 0, 0);
+ }
+ if (ctx == s->ctx) {
+ if (nni_list_empty(&s->readypipes)) {
+ nni_pollable_clear(s->sendable);
+ } else {
+ nni_pollable_raise(s->sendable);
+ }
+ }
+ }
+
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->pipe, p->aio_send);
+ }
+}
- // Note that because we were ready rather than busy, we
- // should not have any I/O oustanding and hence the aio
- // object will be available for our use.
- nni_pipe_send(p->pipe, p->aio_sendcooked);
- nni_timer_schedule(&s->timer, s->resend);
+void
+req0_ctx_reset(req0_ctx *ctx)
+{
+ req0_sock *s = ctx->sock;
+ // Call with sock lock held!
+
+ // We cannot safely "wait" using nni_timer_cancel, but this removes
+ // any scheduled timer activation. If the timeout is already running
+ // concurrently, it will still run. It should do nothing, because
+ // we toss the reqmsg. There is still a very narrow race if the
+ // timeout fires, but doesn't actually start running before we
+ // both finish this function, *and* manage to reschedule another
+ // request. The consequence of that occurring is that the request
+ // will be emitted on the wire twice. This is not actually tragic.
+ nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER);
+
+ nni_list_node_remove(&ctx->pnode);
+ nni_list_node_remove(&ctx->sqnode);
+ if (ctx->reqid != 0) {
+ nni_idhash_remove(s->reqids, ctx->reqid);
+ ctx->reqid = 0;
+ }
+ if (ctx->reqmsg != NULL) {
+ nni_msg_free(ctx->reqmsg);
+ ctx->reqmsg = NULL;
+ }
+ if (ctx->repmsg != NULL) {
+ nni_msg_free(ctx->repmsg);
+ ctx->repmsg = NULL;
}
}
static void
-req0_sock_send(void *arg, nni_aio *aio)
+req0_ctx_cancel_recv(nni_aio *aio, int rv)
{
- req0_sock *s = arg;
- uint32_t id;
- size_t len;
- nni_msg * msg;
- int rv;
+ req0_ctx * ctx = nni_aio_get_prov_data(aio);
+ req0_sock *s = ctx->sock;
nni_mtx_lock(&s->mtx);
+ if (ctx->raio != aio) {
+ // already completed, ignore this.
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
+ ctx->raio = NULL;
- msg = nni_aio_get_msg(aio);
- len = nni_msg_len(msg);
+ // Cancellation of a pending receive is treated as aborting the
+ // entire state machine. This allows us to preserve the semantic of
+ // exactly one receive operation per send operation, and should
+ // be the least surprising for users. The main consequence is that
+ // if a receive operation is completed (in error or otherwise), the
+ // user must submit a new send operation to restart the state machine.
+ req0_ctx_reset(ctx);
- // In cooked mode, because we need to manage our own resend logic,
- // we bypass the upper writeq entirely.
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&s->mtx);
+}
- // Generate a new request ID. We always set the high
- // order bit so that the peer can locate the end of the
- // backtrace. (Pipe IDs have the high order bit clear.)
- id = (s->nextid++) | 0x80000000u;
- // Request ID is in big endian format.
- NNI_PUT32(s->reqid, id);
+static void
+req0_ctx_recv(void *arg, nni_aio *aio)
+{
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+ nni_msg * msg;
- if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) {
+ nni_mtx_lock(&s->mtx);
+ if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) {
nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
return;
}
-
- // If another message is there, this cancels it.
- if (s->reqmsg != NULL) {
- nni_msg_free(s->reqmsg);
- s->reqmsg = NULL;
+ if (s->closed) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ return;
+ }
+ if ((ctx->raio != NULL) ||
+ ((ctx->reqmsg == NULL) && (ctx->repmsg == NULL))) {
+ // We have already got a pending receive or have not
+ // tried to send a request yet.
+ // Either of these violate our basic state assumptions.
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
}
- nni_aio_set_msg(aio, NULL);
-
- // Make a duplicate message... for retries.
- s->reqmsg = msg;
- // Schedule for immediate send
- s->resend = NNI_TIME_ZERO;
- s->wantw = true;
+ if ((msg = ctx->repmsg) == NULL) {
+ ctx->raio = aio;
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
- req0_resend(s);
+ ctx->repmsg = NULL;
+ // We have got a message to pass up, yay!
+ nni_aio_set_msg(aio, msg);
+ if (ctx == s->ctx) {
+ nni_pollable_clear(s->recvable);
+ }
nni_mtx_unlock(&s->mtx);
-
- nni_aio_finish(aio, 0, len);
+ nni_aio_finish(aio, 0, nni_msg_len(msg));
}
static void
-req0_sock_send_raw(void *arg, nni_aio *aio)
+req0_ctx_cancel_send(nni_aio *aio, int rv)
{
- req0_sock *s = arg;
+ req0_ctx * ctx = nni_aio_get_prov_data(aio);
+ req0_sock *s = ctx->sock;
+
+ nni_mtx_lock(&s->mtx);
+ if (ctx->saio != aio) {
+ // already completed, ignore this.
+ nni_mtx_unlock(&s->mtx);
+ return;
+ }
- nni_msgq_aio_put(s->uwq, aio);
+ // There should not be a pending reply, because we canceled
+ // it while we were waiting.
+ NNI_ASSERT(ctx->raio == NULL);
+ ctx->saio = NULL;
+ // Restore the message back to the aio.
+ nni_aio_set_msg(aio, ctx->reqmsg);
+ nni_msg_header_clear(ctx->reqmsg);
+ ctx->reqmsg = NULL;
+
+ // Cancellation of a pending receive is treated as aborting the
+ // entire state machine. This allows us to preserve the semantic of
+ // exactly one receive operation per send operation, and should
+ // be the least surprising for users. The main consequence is that
+ // if a receive operation is completed (in error or otherwise), the
+ // user must submit a new send operation to restart the state machine.
+ req0_ctx_reset(ctx);
+
+ nni_aio_finish_error(aio, rv);
+ nni_mtx_unlock(&s->mtx);
}
-static nni_msg *
-req0_sock_filter(void *arg, nni_msg *msg)
+static void
+req0_ctx_send(void *arg, nni_aio *aio)
{
- req0_sock *s = arg;
- nni_msg * rmsg;
+ req0_ctx * ctx = arg;
+ req0_sock *s = ctx->sock;
+ nng_msg * msg = nni_aio_get_msg(aio);
+ uint64_t id;
+ int rv;
nni_mtx_lock(&s->mtx);
-
- if (nni_msg_header_len(msg) < 4) {
+ // Even though we always complete synchronously, this guards against
+ // restarting a request that was stopped.
+ if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) {
nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
- return (NULL);
+ return;
+ }
+ // Sending a new requst cancels the old one, including any
+ // outstanding reply.
+ if (ctx->raio != NULL) {
+ nni_aio_finish_error(ctx->raio, NNG_ECANCELED);
+ ctx->raio = NULL;
+ }
+ if (ctx->saio != NULL) {
+ nni_aio_set_msg(ctx->saio, ctx->reqmsg);
+ nni_msg_header_clear(ctx->reqmsg);
+ ctx->reqmsg = NULL;
+ nni_aio_finish_error(ctx->saio, NNG_ECANCELED);
+ ctx->saio = NULL;
+ nni_list_remove(&s->sendq, ctx);
}
- if ((rmsg = s->reqmsg) == NULL) {
- // We had no outstanding request. (Perhaps canceled,
- // or duplicate response.)
+ // This resets the entire state machine.
+ req0_ctx_reset(ctx);
+
+ // Insert us on the per ID hash list, so that receives can find us.
+ if ((rv = nni_idhash_alloc(s->reqids, &id, ctx)) != 0) {
nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
- return (NULL);
+ nni_aio_finish_error(aio, rv);
+ return;
}
-
- if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) {
- // Wrong request id.
+ ctx->reqid = (uint32_t) id;
+ if ((rv = nni_msg_header_append_u32(msg, ctx->reqid)) != 0) {
+ nni_idhash_remove(s->reqids, id);
nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
- return (NULL);
+ nni_aio_finish_error(aio, rv);
+ return;
}
+ ctx->reqlen = nni_msg_len(msg);
+ ctx->reqmsg = msg;
+ ctx->saio = aio;
+ nni_aio_set_msg(aio, NULL);
- s->reqmsg = NULL;
- s->pendpipe = NULL;
- nni_mtx_unlock(&s->mtx);
+ // Stick us on the sendq list.
+ nni_list_append(&s->sendq, ctx);
- nni_msg_free(rmsg);
+ req0_run_sendq(s, NULL);
+ nni_mtx_unlock(&s->mtx);
+}
- return (msg);
+static void
+req0_sock_send(void *arg, nni_aio *aio)
+{
+ req0_sock *s = arg;
+ req0_ctx_send(s->ctx, aio);
}
static void
req0_sock_recv(void *arg, nni_aio *aio)
{
req0_sock *s = arg;
+ req0_ctx_recv(s->ctx, aio);
+}
- nni_mtx_lock(&s->mtx);
- if (s->reqmsg == NULL) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
+static int
+req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
+{
+ req0_sock *s = arg;
+ return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
+}
+
+static int
+req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
+{
+ req0_sock *s = arg;
+ return (nni_copyout_int(s->ttl, buf, szp, typ));
+}
+
+static int
+req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ)
+{
+ req0_sock *s = arg;
+ int rv;
+ rv = req0_ctx_setopt_resendtime(s->ctx, buf, sz, typ);
+ s->retry = s->ctx->retry;
+ return (rv);
+}
+
+static int
+req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ)
+{
+ req0_sock *s = arg;
+ return (req0_ctx_getopt_resendtime(s->ctx, buf, szp, typ));
+}
+
+static int
+req0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ)
+{
+ req0_sock *s = arg;
+ int rv;
+ int fd;
+
+ if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) {
+ return (rv);
}
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->urq, aio);
+ return (nni_copyout_int(fd, buf, szp, typ));
}
-static void
-req0_sock_recv_raw(void *arg, nni_aio *aio)
+static int
+req0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ)
{
req0_sock *s = arg;
+ int rv;
+ int fd;
- nni_msgq_aio_get(s->urq, aio);
+ if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) {
+ return (rv);
+ }
+
+ return (nni_copyout_int(fd, buf, szp, typ));
}
static nni_proto_pipe_ops req0_pipe_ops = {
@@ -628,6 +826,26 @@ static nni_proto_pipe_ops req0_pipe_ops = {
.pipe_stop = req0_pipe_stop,
};
+static nni_proto_ctx_option req0_ctx_options[] = {
+ {
+ .co_name = NNG_OPT_REQ_RESENDTIME,
+ .co_type = NNI_TYPE_DURATION,
+ .co_getopt = req0_ctx_getopt_resendtime,
+ .co_setopt = req0_ctx_setopt_resendtime,
+ },
+ {
+ .co_name = NULL,
+ },
+};
+
+static nni_proto_ctx_ops req0_ctx_ops = {
+ .ctx_init = req0_ctx_init,
+ .ctx_fini = req0_ctx_fini,
+ .ctx_recv = req0_ctx_recv,
+ .ctx_send = req0_ctx_send,
+ .ctx_options = req0_ctx_options,
+};
+
static nni_proto_sock_option req0_sock_options[] = {
{
.pso_name = NNG_OPT_MAXTTL,
@@ -641,6 +859,18 @@ static nni_proto_sock_option req0_sock_options[] = {
.pso_getopt = req0_sock_getopt_resendtime,
.pso_setopt = req0_sock_setopt_resendtime,
},
+ {
+ .pso_name = NNG_OPT_RECVFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = req0_sock_getopt_recvfd,
+ .pso_setopt = NULL,
+ },
+ {
+ .pso_name = NNG_OPT_SENDFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = req0_sock_getopt_sendfd,
+ .pso_setopt = NULL,
+ },
// terminate list
{
.pso_name = NULL,
@@ -653,7 +883,6 @@ static nni_proto_sock_ops req0_sock_ops = {
.sock_open = req0_sock_open,
.sock_close = req0_sock_close,
.sock_options = req0_sock_options,
- .sock_filter = req0_sock_filter,
.sock_send = req0_sock_send,
.sock_recv = req0_sock_recv,
};
@@ -662,9 +891,10 @@ static nni_proto req0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_REQ_V0, "req" },
.proto_peer = { NNI_PROTO_REP_V0, "rep" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &req0_sock_ops,
.proto_pipe_ops = &req0_pipe_ops,
+ .proto_ctx_ops = &req0_ctx_ops,
};
int
@@ -672,28 +902,3 @@ nng_req0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &req0_proto));
}
-
-static nni_proto_sock_ops req0_sock_ops_raw = {
- .sock_init = req0_sock_init_raw,
- .sock_fini = req0_sock_fini,
- .sock_open = req0_sock_open,
- .sock_close = req0_sock_close,
- .sock_options = req0_sock_options,
- .sock_send = req0_sock_send_raw,
- .sock_recv = req0_sock_recv_raw,
-};
-
-static nni_proto req0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_REQ_V0, "req" },
- .proto_peer = { NNI_PROTO_REP_V0, "rep" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &req0_sock_ops_raw,
- .proto_pipe_ops = &req0_pipe_ops,
-};
-
-int
-nng_req0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &req0_proto_raw));
-} \ No newline at end of file
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
new file mode 100644
index 00000000..f7189453
--- /dev/null
+++ b/src/protocol/reqrep0/xrep.c
@@ -0,0 +1,434 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "protocol/reqrep0/rep.h"
+
+// Response protocol in raw mode. The REP protocol is the "reply" side of a
+// request-reply pair. This is useful for building RPC servers, for
+// example.
+
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
+
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
+
+typedef struct xrep0_pipe xrep0_pipe;
+typedef struct xrep0_sock xrep0_sock;
+
+static void xrep0_sock_getq_cb(void *);
+static void xrep0_pipe_getq_cb(void *);
+static void xrep0_pipe_putq_cb(void *);
+static void xrep0_pipe_send_cb(void *);
+static void xrep0_pipe_recv_cb(void *);
+static void xrep0_pipe_fini(void *);
+
+// xrep0_sock is our per-socket protocol private structure.
+struct xrep0_sock {
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_mtx lk;
+ int ttl;
+ nni_idhash *pipes;
+ nni_aio * aio_getq;
+};
+
+// xrep0_pipe is our per-pipe protocol private structure.
+struct xrep0_pipe {
+ nni_pipe * pipe;
+ xrep0_sock *rep;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
+};
+
+static void
+xrep0_sock_fini(void *arg)
+{
+ xrep0_sock *s = arg;
+
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ nni_mtx_fini(&s->lk);
+ NNI_FREE_STRUCT(s);
+}
+
+static int
+xrep0_sock_init(void **sp, nni_sock *sock)
+{
+ xrep0_sock *s;
+ int rv;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_init(&s->lk);
+ if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
+ ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) {
+ xrep0_sock_fini(s);
+ return (rv);
+ }
+
+ s->ttl = 8; // Per RFC
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
+
+ *sp = s;
+
+ return (0);
+}
+
+static void
+xrep0_sock_open(void *arg)
+{
+ xrep0_sock *s = arg;
+
+ // This starts us retrieving message from the upper write q.
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+}
+
+static void
+xrep0_sock_close(void *arg)
+{
+ xrep0_sock *s = arg;
+
+ nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+}
+
+static void
+xrep0_pipe_fini(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+xrep0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+{
+ xrep0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ // We want a pretty deep sendq on pipes. The rationale here is
+ // that the send rate will be mitigated by the receive rate.
+ // If a slow pipe (req pipe not reading its own responses!?)
+ // comes up, then we will start discarding its replies eventually,
+ // but it takes some time. It would be poor form for a peer to
+ // smash us with requests, but be unable to handle replies faster
+ // than we can forward them. If they do that, their replies get
+ // dropped. (From a DDoS perspective, it might be nice in the
+ // future if we had a way to exert backpressure to the send side --
+ // essentially don't let peers send requests faster than they are
+ // willing to receive replies. Something to think about for the
+ // future.)
+ if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) {
+ xrep0_pipe_fini(p);
+ return (rv);
+ }
+
+ p->pipe = pipe;
+ p->rep = s;
+ *pp = p;
+ return (0);
+}
+
+static int
+xrep0_pipe_start(void *arg)
+{
+ xrep0_pipe *p = arg;
+ xrep0_sock *s = p->rep;
+ int rv;
+
+ if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
+ return (rv);
+ }
+
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ return (0);
+}
+
+static void
+xrep0_pipe_stop(void *arg)
+{
+ xrep0_pipe *p = arg;
+ xrep0_sock *s = p->rep;
+
+ nni_msgq_close(p->sendq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
+
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
+}
+
+static void
+xrep0_sock_getq_cb(void *arg)
+{
+ xrep0_sock *s = arg;
+ nni_msgq * uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ xrep0_pipe *p;
+ int rv;
+
+ // This watches for messages from the upper write queue,
+ // extracts the destination pipe, and forwards it to the appropriate
+ // destination pipe via a separate queue. This prevents a single bad
+ // or slow pipe from gumming up the works for the entire socket.
+
+ if (nni_aio_result(s->aio_getq) != 0) {
+ // Closed socket?
+ return;
+ }
+
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
+
+ // We yank the outgoing pipe id from the header
+ if (nni_msg_header_len(msg) < 4) {
+ nni_msg_free(msg);
+
+ // Look for another message on the upper write queue.
+ nni_msgq_aio_get(uwq, s->aio_getq);
+ return;
+ }
+
+ id = nni_msg_header_trim_u32(msg);
+
+ // Look for the pipe, and attempt to put the message there
+ // (nonblocking) if we can. If we can't for any reason, then we
+ // free the message.
+ if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
+ rv = nni_msgq_tryput(p->sendq, msg);
+ }
+ if (rv != 0) {
+ nni_msg_free(msg);
+ }
+
+ // Now look for another message on the upper write queue.
+ nni_msgq_aio_get(uwq, s->aio_getq);
+}
+
+static void
+xrep0_pipe_getq_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+
+ nni_pipe_send(p->pipe, p->aio_send);
+}
+
+static void
+xrep0_pipe_send_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+}
+
+static void
+xrep0_pipe_recv_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+ xrep0_sock *s = p->rep;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+ int hops;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+
+ // Store the pipe id in the header, first thing.
+ rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
+ if (rv != 0) {
+ // Failure here causes us to drop the message.
+ goto drop;
+ }
+
+ // Move backtrace from body to header
+ hops = 1;
+ for (;;) {
+ int end = 0;
+ if (hops > s->ttl) {
+ // This isn't malformed, but it has gone through
+ // too many hops. Do not disconnect, because we
+ // can legitimately receive messages with too many
+ // hops from devices, etc.
+ goto drop;
+ }
+ hops++;
+ if (nni_msg_len(msg) < 4) {
+ // Peer is speaking garbage. Kick it.
+ nni_msg_free(msg);
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+ body = nni_msg_body(msg);
+ end = (body[0] & 0x80) ? 1 : 0;
+ rv = nni_msg_header_append(msg, body, 4);
+ if (rv != 0) {
+ // Out of memory most likely, but keep going to
+ // avoid breaking things.
+ goto drop;
+ }
+ nni_msg_trim(msg, 4);
+ if (end) {
+ break;
+ }
+ }
+
+ // Go ahead and send it up.
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
+ return;
+
+drop:
+ nni_msg_free(msg);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+}
+
+static void
+xrep0_pipe_putq_cb(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ nni_pipe_recv(p->pipe, p->aio_recv);
+}
+
+static int
+xrep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
+{
+ xrep0_sock *s = arg;
+ return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
+}
+
+static int
+xrep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
+{
+ xrep0_sock *s = arg;
+ return (nni_copyout_int(s->ttl, buf, szp, typ));
+}
+
+static void
+xrep0_sock_send(void *arg, nni_aio *aio)
+{
+ xrep0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+xrep0_sock_recv(void *arg, nni_aio *aio)
+{
+ xrep0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+// This is the global protocol structure -- our linkage to the core.
+// This should be the only global non-static symbol in this file.
+static nni_proto_pipe_ops xrep0_pipe_ops = {
+ .pipe_init = xrep0_pipe_init,
+ .pipe_fini = xrep0_pipe_fini,
+ .pipe_start = xrep0_pipe_start,
+ .pipe_stop = xrep0_pipe_stop,
+};
+
+static nni_proto_sock_option xrep0_sock_options[] = {
+ {
+ .pso_name = NNG_OPT_MAXTTL,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = xrep0_sock_getopt_maxttl,
+ .pso_setopt = xrep0_sock_setopt_maxttl,
+ },
+ // terminate list
+ {
+ .pso_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xrep0_sock_ops = {
+ .sock_init = xrep0_sock_init,
+ .sock_fini = xrep0_sock_fini,
+ .sock_open = xrep0_sock_open,
+ .sock_close = xrep0_sock_close,
+ .sock_options = xrep0_sock_options,
+ .sock_filter = NULL, // No filtering for raw mode
+ .sock_send = xrep0_sock_send,
+ .sock_recv = xrep0_sock_recv,
+};
+
+static nni_proto xrep0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_REP_V0, "rep" },
+ .proto_peer = { NNI_PROTO_REQ_V0, "req" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xrep0_sock_ops,
+ .proto_pipe_ops = &xrep0_pipe_ops,
+};
+
+int
+nng_rep0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xrep0_proto));
+}
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
new file mode 100644
index 00000000..5c1841b2
--- /dev/null
+++ b/src/protocol/reqrep0/xreq.c
@@ -0,0 +1,324 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "core/nng_impl.h"
+#include "protocol/reqrep0/req.h"
+
+// Request protocol. The REQ protocol is the "request" side of a
+// request-reply pair. This is useful for building RPC clients, for example.
+
+#ifndef NNI_PROTO_REQ_V0
+#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0)
+#endif
+
+#ifndef NNI_PROTO_REP_V0
+#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1)
+#endif
+
+typedef struct xreq0_pipe xreq0_pipe;
+typedef struct xreq0_sock xreq0_sock;
+
+// An xreq0_sock is our per-socket protocol private structure.
+struct xreq0_sock {
+ nni_msgq *uwq;
+ nni_msgq *urq;
+ int ttl;
+};
+
+// A req0_pipe is our per-pipe protocol private structure.
+struct xreq0_pipe {
+ nni_pipe * pipe;
+ xreq0_sock *req;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
+};
+
+static void xreq0_sock_fini(void *);
+static void xreq0_getq_cb(void *);
+static void xreq0_send_cb(void *);
+static void xreq0_recv_cb(void *);
+static void xreq0_putq_cb(void *);
+
+static int
+xreq0_sock_init(void **sp, nni_sock *sock)
+{
+ xreq0_sock *s;
+
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ s->ttl = 8;
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
+
+ return (0);
+}
+
+static void
+xreq0_sock_open(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xreq0_sock_close(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+static void
+xreq0_sock_fini(void *arg)
+{
+ xreq0_sock *s = arg;
+
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+xreq0_pipe_fini(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_send);
+ NNI_FREE_STRUCT(p);
+}
+
+static int
+xreq0_pipe_init(void **pp, nni_pipe *pipe, void *s)
+{
+ xreq0_pipe *p;
+ int rv;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) {
+ xreq0_pipe_fini(p);
+ return (rv);
+ }
+
+ p->pipe = pipe;
+ p->req = s;
+ *pp = p;
+ return (0);
+}
+
+static int
+xreq0_pipe_start(void *arg)
+{
+ xreq0_pipe *p = arg;
+ xreq0_sock *s = p->req;
+
+ if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) {
+ return (NNG_EPROTO);
+ }
+
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ return (0);
+}
+
+static void
+xreq0_pipe_stop(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+
+ // At this point there should not be any further AIOs running.
+ // Further, any completion tasks have completed.
+}
+
+// For raw mode we can just let the pipes "contend" via getq to get a
+// message from the upper write queue. The msgqueue implementation
+// actually provides ordering, so load will be spread automatically.
+// (NB: We may have to revise this in the future if we want to provide some
+// kind of priority.)
+
+static void
+xreq0_getq_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+
+ nni_pipe_send(p->pipe, p->aio_send);
+}
+
+static void
+xreq0_send_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ // Sent a message so we just need to look for another one.
+ nni_msgq_aio_get(p->req->uwq, p->aio_getq);
+}
+
+static void
+xreq0_putq_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+ nni_aio_set_msg(p->aio_putq, NULL);
+
+ nni_pipe_recv(p->pipe, p->aio_recv);
+}
+
+static void
+xreq0_recv_cb(void *arg)
+{
+ xreq0_pipe *p = arg;
+ xreq0_sock *sock = p->req;
+ nni_msg * msg;
+ uint32_t id;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
+ return;
+ }
+
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
+
+ // We yank 4 bytes from front of body, and move them to the header.
+ if (nni_msg_len(msg) < 4) {
+ // Malformed message.
+ goto malformed;
+ }
+ id = nni_msg_trim_u32(msg);
+ if (nni_msg_header_append_u32(msg, id) != 0) {
+ // Arguably we could just discard and carry on. But
+ // dropping the connection is probably more helpful since
+ // it lets the other side see that a problem occurred.
+ // Plus it gives us a chance to reclaim some memory.
+ goto malformed;
+ }
+
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(sock->urq, p->aio_putq);
+ return;
+
+malformed:
+ nni_msg_free(msg);
+ nni_pipe_stop(p->pipe);
+}
+
+static void
+xreq0_sock_send(void *arg, nni_aio *aio)
+{
+ xreq0_sock *s = arg;
+
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+xreq0_sock_recv(void *arg, nni_aio *aio)
+{
+ xreq0_sock *s = arg;
+
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+static int
+xreq0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ)
+{
+ xreq0_sock *s = arg;
+ return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ));
+}
+
+static int
+xreq0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ)
+{
+ xreq0_sock *s = arg;
+ return (nni_copyout_int(s->ttl, buf, szp, typ));
+}
+
+static nni_proto_pipe_ops xreq0_pipe_ops = {
+ .pipe_init = xreq0_pipe_init,
+ .pipe_fini = xreq0_pipe_fini,
+ .pipe_start = xreq0_pipe_start,
+ .pipe_stop = xreq0_pipe_stop,
+};
+
+static nni_proto_sock_option xreq0_sock_options[] = {
+ {
+ .pso_name = NNG_OPT_MAXTTL,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = xreq0_sock_getopt_maxttl,
+ .pso_setopt = xreq0_sock_setopt_maxttl,
+ },
+ // terminate list
+ {
+ .pso_name = NULL,
+ },
+};
+
+static nni_proto_sock_ops xreq0_sock_ops = {
+ .sock_init = xreq0_sock_init,
+ .sock_fini = xreq0_sock_fini,
+ .sock_open = xreq0_sock_open,
+ .sock_close = xreq0_sock_close,
+ .sock_options = xreq0_sock_options,
+ .sock_send = xreq0_sock_send,
+ .sock_recv = xreq0_sock_recv,
+};
+
+static nni_proto xreq0_proto = {
+ .proto_version = NNI_PROTOCOL_VERSION,
+ .proto_self = { NNI_PROTO_REQ_V0, "req" },
+ .proto_peer = { NNI_PROTO_REP_V0, "rep" },
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
+ .proto_sock_ops = &xreq0_sock_ops,
+ .proto_pipe_ops = &xreq0_pipe_ops,
+ .proto_ctx_ops = NULL, // raw mode does not support contexts
+};
+
+int
+nng_req0_open_raw(nng_socket *sidp)
+{
+ return (nni_proto_open(sidp, &xreq0_proto));
+}