summaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/rep.c')
-rw-r--r--src/protocol/reqrep/rep.c178
1 files changed, 82 insertions, 96 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 7d887b55..cfc83a5b 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -16,9 +16,8 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct nni_rep_pipe nni_rep_pipe;
-typedef struct nni_rep_sock nni_rep_sock;
-
+typedef struct nni_rep_pipe nni_rep_pipe;
+typedef struct nni_rep_sock nni_rep_sock;
static void nni_rep_sock_getq_cb(void *);
static void nni_rep_pipe_getq_cb(void *);
@@ -29,29 +28,29 @@ static void nni_rep_pipe_fini(void *);
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_rep_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- nni_msgq * urq;
- int raw;
- int ttl;
- nni_idhash pipes;
- char * btrace;
- size_t btrace_len;
- nni_aio aio_getq;
- nni_mtx mtx;
+ nni_sock * sock;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ int raw;
+ int ttl;
+ nni_idhash pipes;
+ char * btrace;
+ size_t btrace_len;
+ nni_aio aio_getq;
+ nni_mtx mtx;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
struct nni_rep_pipe {
- nni_pipe * pipe;
- nni_rep_sock * rep;
- nni_msgq * sendq;
- uint32_t id; // we have to save it
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * pipe;
+ nni_rep_sock *rep;
+ nni_msgq * sendq;
+ uint32_t id; // we have to save it
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
static void
@@ -68,20 +67,19 @@ nni_rep_sock_fini(void *arg)
NNI_FREE_STRUCT(rep);
}
-
static int
nni_rep_sock_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
- int rv;
+ int rv;
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
- rep->ttl = 8; // Per RFC
- rep->sock = sock;
- rep->raw = 0;
- rep->btrace = NULL;
+ rep->ttl = 8; // Per RFC
+ rep->sock = sock;
+ rep->raw = 0;
+ rep->btrace = NULL;
rep->btrace_len = 0;
if (((rv = nni_mtx_init(&rep->mtx)) != 0) ||
((rv = nni_idhash_init(&rep->pipes)) != 0)) {
@@ -106,7 +104,6 @@ fail:
return (rv);
}
-
static void
nni_rep_sock_open(void *arg)
{
@@ -115,7 +112,6 @@ nni_rep_sock_open(void *arg)
nni_msgq_aio_get(rep->uwq, &rep->aio_getq);
}
-
static void
nni_rep_sock_close(void *arg)
{
@@ -124,12 +120,11 @@ nni_rep_sock_close(void *arg)
nni_aio_stop(&rep->aio_getq);
}
-
static int
nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_rep_pipe *rp;
- int rv;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
@@ -138,21 +133,25 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
((rv = nni_mtx_init(&rp->mtx)) != 0)) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) !=
+ 0) {
goto fail;
}
rp->pipe = pipe;
- rp->rep = rsock;
- *rpp = rp;
+ rp->rep = rsock;
+ *rpp = rp;
return (0);
fail:
@@ -160,7 +159,6 @@ fail:
return (rv);
}
-
static void
nni_rep_pipe_fini(void *arg)
{
@@ -175,13 +173,12 @@ nni_rep_pipe_fini(void *arg)
NNI_FREE_STRUCT(rp);
}
-
static int
nni_rep_pipe_start(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int rv;
+ int rv;
rp->id = nni_pipe_id(rp->pipe);
@@ -197,13 +194,12 @@ nni_rep_pipe_start(void *arg)
return (0);
}
-
static void
nni_rep_pipe_stop(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- uint32_t id;
+ uint32_t id;
nni_aio_stop(&rp->aio_getq);
nni_aio_stop(&rp->aio_putq);
@@ -211,8 +207,8 @@ nni_rep_pipe_stop(void *arg)
nni_aio_stop(&rp->aio_recv);
nni_mtx_lock(&rp->mtx);
- id = rp->id;
- rp->id = 0; // makes this idempotent
+ id = rp->id;
+ rp->id = 0; // makes this idempotent
nni_msgq_close(rp->sendq);
nni_mtx_unlock(&rp->mtx);
@@ -223,17 +219,16 @@ nni_rep_pipe_stop(void *arg)
}
}
-
static void
nni_rep_sock_getq_cb(void *arg)
{
nni_rep_sock *rep = arg;
- nni_msgq *uwq = rep->uwq;
- nni_msg *msg;
- uint8_t *header;
- uint32_t id;
+ nni_msgq * uwq = rep->uwq;
+ nni_msg * msg;
+ uint8_t * header;
+ uint32_t id;
nni_rep_pipe *rp;
- int rv;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -245,7 +240,7 @@ nni_rep_sock_getq_cb(void *arg)
return;
}
- msg = rep->aio_getq.a_msg;
+ msg = rep->aio_getq.a_msg;
rep->aio_getq.a_msg = NULL;
// We yank the outgoing pipe id from the header
@@ -278,7 +273,6 @@ nni_rep_sock_getq_cb(void *arg)
nni_msgq_aio_get(uwq, &rep->aio_getq);
}
-
static void
nni_rep_pipe_getq_cb(void *arg)
{
@@ -295,7 +289,6 @@ nni_rep_pipe_getq_cb(void *arg)
nni_pipe_send(rp->pipe, &rp->aio_send);
}
-
static void
nni_rep_pipe_send_cb(void *arg)
{
@@ -311,17 +304,16 @@ nni_rep_pipe_send_cb(void *arg)
nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
}
-
static void
nni_rep_pipe_recv_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_msg *msg;
- int rv;
- uint8_t idbuf[4];
- uint8_t *body;
- int hops;
+ nni_msg * msg;
+ int rv;
+ uint8_t idbuf[4];
+ uint8_t * body;
+ int hops;
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_stop(rp->pipe);
@@ -330,7 +322,7 @@ nni_rep_pipe_recv_cb(void *arg)
NNI_PUT32(idbuf, rp->id);
- msg = rp->aio_recv.a_msg;
+ msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;
// Store the pipe id in the header, first thing.
@@ -350,8 +342,8 @@ nni_rep_pipe_recv_cb(void *arg)
goto malformed;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_append_header(msg, body, 4);
+ end = (body[0] & 0x80) ? 1 : 0;
+ rv = nni_msg_append_header(msg, body, 4);
if (rv != 0) {
// Presumably this is due to out of memory.
// We could just discard and try again, but we
@@ -376,7 +368,6 @@ malformed:
nni_pipe_stop(rp->pipe);
}
-
static void
nni_rep_pipe_putq_cb(void *arg)
{
@@ -392,12 +383,11 @@ nni_rep_pipe_putq_cb(void *arg)
nni_pipe_recv(rp->pipe, &rp->aio_recv);
}
-
static int
nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_rep_sock *rep = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -413,12 +403,11 @@ nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_rep_sock *rep = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -433,7 +422,6 @@ nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_rep_sock_sfilter(void *arg, nni_msg *msg)
{
@@ -458,36 +446,35 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg)
if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) {
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
return (msg);
}
-
static nni_msg *
nni_rep_sock_rfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
- char *header;
- size_t len;
+ char * header;
+ size_t len;
if (rep->raw) {
return (msg);
}
nni_sock_senderr(rep->sock, 0);
- len = nni_msg_header_len(msg);
+ len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
}
if ((rep->btrace = nni_alloc(len)) == NULL) {
@@ -500,32 +487,31 @@ nni_rep_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_rep_pipe_ops = {
- .pipe_init = nni_rep_pipe_init,
- .pipe_fini = nni_rep_pipe_fini,
- .pipe_start = nni_rep_pipe_start,
- .pipe_stop = nni_rep_pipe_stop,
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_start = nni_rep_pipe_start,
+ .pipe_stop = nni_rep_pipe_stop,
};
static nni_proto_sock_ops nni_rep_sock_ops = {
- .sock_init = nni_rep_sock_init,
- .sock_fini = nni_rep_sock_fini,
- .sock_open = nni_rep_sock_open,
- .sock_close = nni_rep_sock_close,
- .sock_setopt = nni_rep_sock_setopt,
- .sock_getopt = nni_rep_sock_getopt,
- .sock_rfilter = nni_rep_sock_rfilter,
- .sock_sfilter = nni_rep_sock_sfilter,
+ .sock_init = nni_rep_sock_init,
+ .sock_fini = nni_rep_sock_fini,
+ .sock_open = nni_rep_sock_open,
+ .sock_close = nni_rep_sock_close,
+ .sock_setopt = nni_rep_sock_setopt,
+ .sock_getopt = nni_rep_sock_getopt,
+ .sock_rfilter = nni_rep_sock_rfilter,
+ .sock_sfilter = nni_rep_sock_sfilter,
};
nni_proto nni_rep_proto = {
- .proto_self = NNG_PROTO_REP,
- .proto_peer = NNG_PROTO_REQ,
- .proto_name = "rep",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_REP,
+ .proto_peer = NNG_PROTO_REQ,
+ .proto_name = "rep",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_rep_sock_ops,
.proto_pipe_ops = &nni_rep_pipe_ops,
};