summaryrefslogtreecommitdiff
path: root/src/protocol/reqrep
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep')
-rw-r--r--src/protocol/reqrep/rep.c178
-rw-r--r--src/protocol/reqrep/req.c161
2 files changed, 153 insertions, 186 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 7d887b55..cfc83a5b 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -16,9 +16,8 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct nni_rep_pipe nni_rep_pipe;
-typedef struct nni_rep_sock nni_rep_sock;
-
+typedef struct nni_rep_pipe nni_rep_pipe;
+typedef struct nni_rep_sock nni_rep_sock;
static void nni_rep_sock_getq_cb(void *);
static void nni_rep_pipe_getq_cb(void *);
@@ -29,29 +28,29 @@ static void nni_rep_pipe_fini(void *);
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_rep_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- nni_msgq * urq;
- int raw;
- int ttl;
- nni_idhash pipes;
- char * btrace;
- size_t btrace_len;
- nni_aio aio_getq;
- nni_mtx mtx;
+ nni_sock * sock;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ int raw;
+ int ttl;
+ nni_idhash pipes;
+ char * btrace;
+ size_t btrace_len;
+ nni_aio aio_getq;
+ nni_mtx mtx;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
struct nni_rep_pipe {
- nni_pipe * pipe;
- nni_rep_sock * rep;
- nni_msgq * sendq;
- uint32_t id; // we have to save it
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * pipe;
+ nni_rep_sock *rep;
+ nni_msgq * sendq;
+ uint32_t id; // we have to save it
+ nni_aio aio_getq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
static void
@@ -68,20 +67,19 @@ nni_rep_sock_fini(void *arg)
NNI_FREE_STRUCT(rep);
}
-
static int
nni_rep_sock_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
- int rv;
+ int rv;
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
- rep->ttl = 8; // Per RFC
- rep->sock = sock;
- rep->raw = 0;
- rep->btrace = NULL;
+ rep->ttl = 8; // Per RFC
+ rep->sock = sock;
+ rep->raw = 0;
+ rep->btrace = NULL;
rep->btrace_len = 0;
if (((rv = nni_mtx_init(&rep->mtx)) != 0) ||
((rv = nni_idhash_init(&rep->pipes)) != 0)) {
@@ -106,7 +104,6 @@ fail:
return (rv);
}
-
static void
nni_rep_sock_open(void *arg)
{
@@ -115,7 +112,6 @@ nni_rep_sock_open(void *arg)
nni_msgq_aio_get(rep->uwq, &rep->aio_getq);
}
-
static void
nni_rep_sock_close(void *arg)
{
@@ -124,12 +120,11 @@ nni_rep_sock_close(void *arg)
nni_aio_stop(&rep->aio_getq);
}
-
static int
nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_rep_pipe *rp;
- int rv;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
@@ -138,21 +133,25 @@ nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
((rv = nni_mtx_init(&rp->mtx)) != 0)) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp)) !=
+ 0) {
goto fail;
}
- if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) != 0) {
+ if ((rv = nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp)) !=
+ 0) {
goto fail;
}
rp->pipe = pipe;
- rp->rep = rsock;
- *rpp = rp;
+ rp->rep = rsock;
+ *rpp = rp;
return (0);
fail:
@@ -160,7 +159,6 @@ fail:
return (rv);
}
-
static void
nni_rep_pipe_fini(void *arg)
{
@@ -175,13 +173,12 @@ nni_rep_pipe_fini(void *arg)
NNI_FREE_STRUCT(rp);
}
-
static int
nni_rep_pipe_start(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int rv;
+ int rv;
rp->id = nni_pipe_id(rp->pipe);
@@ -197,13 +194,12 @@ nni_rep_pipe_start(void *arg)
return (0);
}
-
static void
nni_rep_pipe_stop(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- uint32_t id;
+ uint32_t id;
nni_aio_stop(&rp->aio_getq);
nni_aio_stop(&rp->aio_putq);
@@ -211,8 +207,8 @@ nni_rep_pipe_stop(void *arg)
nni_aio_stop(&rp->aio_recv);
nni_mtx_lock(&rp->mtx);
- id = rp->id;
- rp->id = 0; // makes this idempotent
+ id = rp->id;
+ rp->id = 0; // makes this idempotent
nni_msgq_close(rp->sendq);
nni_mtx_unlock(&rp->mtx);
@@ -223,17 +219,16 @@ nni_rep_pipe_stop(void *arg)
}
}
-
static void
nni_rep_sock_getq_cb(void *arg)
{
nni_rep_sock *rep = arg;
- nni_msgq *uwq = rep->uwq;
- nni_msg *msg;
- uint8_t *header;
- uint32_t id;
+ nni_msgq * uwq = rep->uwq;
+ nni_msg * msg;
+ uint8_t * header;
+ uint32_t id;
nni_rep_pipe *rp;
- int rv;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
@@ -245,7 +240,7 @@ nni_rep_sock_getq_cb(void *arg)
return;
}
- msg = rep->aio_getq.a_msg;
+ msg = rep->aio_getq.a_msg;
rep->aio_getq.a_msg = NULL;
// We yank the outgoing pipe id from the header
@@ -278,7 +273,6 @@ nni_rep_sock_getq_cb(void *arg)
nni_msgq_aio_get(uwq, &rep->aio_getq);
}
-
static void
nni_rep_pipe_getq_cb(void *arg)
{
@@ -295,7 +289,6 @@ nni_rep_pipe_getq_cb(void *arg)
nni_pipe_send(rp->pipe, &rp->aio_send);
}
-
static void
nni_rep_pipe_send_cb(void *arg)
{
@@ -311,17 +304,16 @@ nni_rep_pipe_send_cb(void *arg)
nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
}
-
static void
nni_rep_pipe_recv_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_msg *msg;
- int rv;
- uint8_t idbuf[4];
- uint8_t *body;
- int hops;
+ nni_msg * msg;
+ int rv;
+ uint8_t idbuf[4];
+ uint8_t * body;
+ int hops;
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_stop(rp->pipe);
@@ -330,7 +322,7 @@ nni_rep_pipe_recv_cb(void *arg)
NNI_PUT32(idbuf, rp->id);
- msg = rp->aio_recv.a_msg;
+ msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;
// Store the pipe id in the header, first thing.
@@ -350,8 +342,8 @@ nni_rep_pipe_recv_cb(void *arg)
goto malformed;
}
body = nni_msg_body(msg);
- end = (body[0] & 0x80) ? 1 : 0;
- rv = nni_msg_append_header(msg, body, 4);
+ end = (body[0] & 0x80) ? 1 : 0;
+ rv = nni_msg_append_header(msg, body, 4);
if (rv != 0) {
// Presumably this is due to out of memory.
// We could just discard and try again, but we
@@ -376,7 +368,6 @@ malformed:
nni_pipe_stop(rp->pipe);
}
-
static void
nni_rep_pipe_putq_cb(void *arg)
{
@@ -392,12 +383,11 @@ nni_rep_pipe_putq_cb(void *arg)
nni_pipe_recv(rp->pipe, &rp->aio_recv);
}
-
static int
nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_rep_sock *rep = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -413,12 +403,11 @@ nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_rep_sock *rep = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
@@ -433,7 +422,6 @@ nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
static nni_msg *
nni_rep_sock_sfilter(void *arg, nni_msg *msg)
{
@@ -458,36 +446,35 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg)
if (nni_msg_append_header(msg, rep->btrace, rep->btrace_len) != 0) {
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
return (msg);
}
-
static nni_msg *
nni_rep_sock_rfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
- char *header;
- size_t len;
+ char * header;
+ size_t len;
if (rep->raw) {
return (msg);
}
nni_sock_senderr(rep->sock, 0);
- len = nni_msg_header_len(msg);
+ len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
+ rep->btrace = NULL;
rep->btrace_len = 0;
}
if ((rep->btrace = nni_alloc(len)) == NULL) {
@@ -500,32 +487,31 @@ nni_rep_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_rep_pipe_ops = {
- .pipe_init = nni_rep_pipe_init,
- .pipe_fini = nni_rep_pipe_fini,
- .pipe_start = nni_rep_pipe_start,
- .pipe_stop = nni_rep_pipe_stop,
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_start = nni_rep_pipe_start,
+ .pipe_stop = nni_rep_pipe_stop,
};
static nni_proto_sock_ops nni_rep_sock_ops = {
- .sock_init = nni_rep_sock_init,
- .sock_fini = nni_rep_sock_fini,
- .sock_open = nni_rep_sock_open,
- .sock_close = nni_rep_sock_close,
- .sock_setopt = nni_rep_sock_setopt,
- .sock_getopt = nni_rep_sock_getopt,
- .sock_rfilter = nni_rep_sock_rfilter,
- .sock_sfilter = nni_rep_sock_sfilter,
+ .sock_init = nni_rep_sock_init,
+ .sock_fini = nni_rep_sock_fini,
+ .sock_open = nni_rep_sock_open,
+ .sock_close = nni_rep_sock_close,
+ .sock_setopt = nni_rep_sock_setopt,
+ .sock_getopt = nni_rep_sock_getopt,
+ .sock_rfilter = nni_rep_sock_rfilter,
+ .sock_sfilter = nni_rep_sock_sfilter,
};
nni_proto nni_rep_proto = {
- .proto_self = NNG_PROTO_REP,
- .proto_peer = NNG_PROTO_REQ,
- .proto_name = "rep",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_REP,
+ .proto_peer = NNG_PROTO_REQ,
+ .proto_name = "rep",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_rep_sock_ops,
.proto_pipe_ops = &nni_rep_pipe_ops,
};
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f32fd66f..f77700f5 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -7,9 +7,9 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <stdio.h>
#include "core/nng_impl.h"
@@ -17,8 +17,8 @@
// request-reply pair. This is useful for building RPC clients, for
// example.
-typedef struct nni_req_pipe nni_req_pipe;
-typedef struct nni_req_sock nni_req_sock;
+typedef struct nni_req_pipe nni_req_pipe;
+typedef struct nni_req_sock nni_req_sock;
static void nni_req_resend(nni_req_sock *);
static void nni_req_timeout(void *);
@@ -26,38 +26,38 @@ static void nni_req_pipe_fini(void *);
// An nni_req_sock is our per-socket protocol private structure.
struct nni_req_sock {
- nni_sock * sock;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_duration retry;
- nni_time resend;
- int raw;
- int wantw;
- nni_msg * reqmsg;
+ nni_sock * sock;
+ nni_msgq * uwq;
+ nni_msgq * urq;
+ nni_duration retry;
+ nni_time resend;
+ int raw;
+ int wantw;
+ nni_msg * reqmsg;
- nni_req_pipe * pendpipe;
+ nni_req_pipe *pendpipe;
- nni_list readypipes;
- nni_list busypipes;
+ nni_list readypipes;
+ nni_list busypipes;
- nni_timer_node timer;
+ nni_timer_node timer;
- uint32_t nextid; // next id
- uint8_t reqid[4]; // outstanding request ID (big endian)
- nni_mtx mtx;
+ uint32_t nextid; // next id
+ uint8_t reqid[4]; // outstanding request ID (big endian)
+ nni_mtx mtx;
};
// An nni_req_pipe is our per-pipe protocol private structure.
struct nni_req_pipe {
- nni_pipe * pipe;
- nni_req_sock * req;
- nni_list_node node;
- nni_aio aio_getq; // raw mode only
- nni_aio aio_sendraw; // raw mode only
- nni_aio aio_sendcooked; // cooked mode only
- nni_aio aio_recv;
- nni_aio aio_putq;
- nni_mtx mtx;
+ nni_pipe * pipe;
+ nni_req_sock *req;
+ nni_list_node node;
+ nni_aio aio_getq; // raw mode only
+ nni_aio aio_sendraw; // raw mode only
+ nni_aio aio_sendcooked; // cooked mode only
+ nni_aio aio_recv;
+ nni_aio aio_putq;
+ nni_mtx mtx;
};
static void nni_req_resender(void *);
@@ -71,7 +71,7 @@ static int
nni_req_sock_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
- int rv;
+ int rv;
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
@@ -88,21 +88,20 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
// this is "semi random" start for request IDs.
req->nextid = nni_random();
- req->retry = NNI_SECOND * 60;
- req->sock = sock;
+ req->retry = NNI_SECOND * 60;
+ req->sock = sock;
req->reqmsg = NULL;
- req->raw = 0;
- req->wantw = 0;
+ req->raw = 0;
+ req->wantw = 0;
req->resend = NNI_TIME_ZERO;
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
- *reqp = req;
+ *reqp = req;
nni_sock_recverr(sock, NNG_ESTATE);
return (0);
}
-
static void
nni_req_sock_close(void *arg)
{
@@ -111,7 +110,6 @@ nni_req_sock_close(void *arg)
nni_timer_cancel(&req->timer);
}
-
static void
nni_req_sock_fini(void *arg)
{
@@ -126,12 +124,11 @@ nni_req_sock_fini(void *arg)
NNI_FREE_STRUCT(req);
}
-
static int
nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
nni_req_pipe *rp;
- int rv;
+ int rv;
if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
return (NNG_ENOMEM);
@@ -159,8 +156,8 @@ nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
NNI_LIST_NODE_INIT(&rp->node);
rp->pipe = pipe;
- rp->req = rsock;
- *rpp = rp;
+ rp->req = rsock;
+ *rpp = rp;
return (0);
failed:
@@ -168,7 +165,6 @@ failed:
return (rv);
}
-
static void
nni_req_pipe_fini(void *arg)
{
@@ -185,11 +181,10 @@ nni_req_pipe_fini(void *arg)
}
}
-
static int
nni_req_pipe_start(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
@@ -203,17 +198,15 @@ nni_req_pipe_start(void *arg)
}
nni_mtx_unlock(&req->mtx);
-
nni_msgq_aio_get(req->uwq, &rp->aio_getq);
nni_pipe_recv(rp->pipe, &rp->aio_recv);
return (0);
}
-
static void
nni_req_pipe_stop(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
nni_aio_stop(&rp->aio_getq);
@@ -236,19 +229,18 @@ nni_req_pipe_stop(void *arg)
// removing the pipe we sent the last request on...
// schedule immediate resend.
req->pendpipe = NULL;
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
+ req->resend = NNI_TIME_ZERO;
+ req->wantw = 1;
nni_req_resend(req);
}
nni_mtx_unlock(&req->mtx);
}
-
static int
nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_req_sock *req = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
@@ -263,12 +255,11 @@ nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
return (rv);
}
-
static int
nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_req_sock *req = arg;
- int rv;
+ int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
@@ -283,7 +274,6 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
return (rv);
}
-
// Raw and cooked mode differ in the way they send messages out.
//
// For cooked mdes, we have a getq callback on the upper write queue, which
@@ -303,7 +293,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
nni_req_getq_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
// We should be in RAW mode. Cooked mode traffic bypasses
@@ -318,13 +308,12 @@ nni_req_getq_cb(void *arg)
}
rp->aio_sendraw.a_msg = rp->aio_getq.a_msg;
- rp->aio_getq.a_msg = NULL;
+ rp->aio_getq.a_msg = NULL;
// Send the message, but use the raw mode aio.
nni_pipe_send(rp->pipe, &rp->aio_sendraw);
}
-
static void
nni_req_sendraw_cb(void *arg)
{
@@ -341,11 +330,10 @@ nni_req_sendraw_cb(void *arg)
nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq);
}
-
static void
nni_req_sendcooked_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
if (nni_aio_result(&rp->aio_sendcooked) != 0) {
@@ -377,7 +365,6 @@ nni_req_sendcooked_cb(void *arg)
nni_mtx_unlock(&req->mtx);
}
-
static void
nni_req_putq_cb(void *arg)
{
@@ -393,19 +380,18 @@ nni_req_putq_cb(void *arg)
nni_pipe_recv(rp->pipe, &rp->aio_recv);
}
-
static void
nni_req_recv_cb(void *arg)
{
nni_req_pipe *rp = arg;
- nni_msg *msg;
+ nni_msg * msg;
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_stop(rp->pipe);
return;
}
- msg = rp->aio_recv.a_msg;
+ msg = rp->aio_recv.a_msg;
rp->aio_recv.a_msg = NULL;
// We yank 4 bytes of body, and move them to the header.
@@ -434,7 +420,6 @@ malformed:
nni_pipe_stop(rp->pipe);
}
-
static void
nni_req_timeout(void *arg)
{
@@ -448,12 +433,11 @@ nni_req_timeout(void *arg)
nni_mtx_unlock(&req->mtx);
}
-
static void
nni_req_resend(nni_req_sock *req)
{
nni_req_pipe *rp;
- nni_msg *msg;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
@@ -470,8 +454,8 @@ nni_req_resend(nni_req_sock *req)
// mark that we have a message we want to resend,
// in case something comes available.
req->wantw = 1;
- nni_timer_schedule(&req->timer,
- nni_clock() + req->retry);
+ nni_timer_schedule(
+ &req->timer, nni_clock() + req->retry);
return;
}
@@ -489,8 +473,8 @@ nni_req_resend(nni_req_sock *req)
nni_list_remove(&req->readypipes, rp);
nni_list_append(&req->busypipes, rp);
- req->pendpipe = rp;
- req->resend = nni_clock() + req->retry;
+ req->pendpipe = rp;
+ req->resend = nni_clock() + req->retry;
rp->aio_sendcooked.a_msg = msg;
// Note that because we were ready rather than busy, we
@@ -501,12 +485,11 @@ nni_req_resend(nni_req_sock *req)
}
}
-
static nni_msg *
nni_req_sock_sfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- uint32_t id;
+ uint32_t id;
if (req->raw) {
// No automatic retry, and the request ID must
@@ -542,7 +525,7 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
req->reqmsg = msg;
// Schedule for immediate send
req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
+ req->wantw = 1;
nni_req_resend(req);
nni_mtx_unlock(&req->mtx);
@@ -553,12 +536,11 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
return (NULL);
}
-
static nni_msg *
nni_req_sock_rfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- nni_msg *rmsg;
+ nni_msg * rmsg;
if (req->raw) {
// Pass it unmolested
@@ -585,7 +567,7 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
return (NULL);
}
- req->reqmsg = NULL;
+ req->reqmsg = NULL;
req->pendpipe = NULL;
nni_mtx_unlock(&req->mtx);
@@ -595,31 +577,30 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops nni_req_pipe_ops = {
- .pipe_init = nni_req_pipe_init,
- .pipe_fini = nni_req_pipe_fini,
- .pipe_start = nni_req_pipe_start,
- .pipe_stop = nni_req_pipe_stop,
+ .pipe_init = nni_req_pipe_init,
+ .pipe_fini = nni_req_pipe_fini,
+ .pipe_start = nni_req_pipe_start,
+ .pipe_stop = nni_req_pipe_stop,
};
static nni_proto_sock_ops nni_req_sock_ops = {
- .sock_init = nni_req_sock_init,
- .sock_fini = nni_req_sock_fini,
- .sock_close = nni_req_sock_close,
- .sock_setopt = nni_req_sock_setopt,
- .sock_getopt = nni_req_sock_getopt,
- .sock_rfilter = nni_req_sock_rfilter,
- .sock_sfilter = nni_req_sock_sfilter,
+ .sock_init = nni_req_sock_init,
+ .sock_fini = nni_req_sock_fini,
+ .sock_close = nni_req_sock_close,
+ .sock_setopt = nni_req_sock_setopt,
+ .sock_getopt = nni_req_sock_getopt,
+ .sock_rfilter = nni_req_sock_rfilter,
+ .sock_sfilter = nni_req_sock_sfilter,
};
nni_proto nni_req_proto = {
- .proto_self = NNG_PROTO_REQ,
- .proto_peer = NNG_PROTO_REP,
- .proto_name = "req",
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_self = NNG_PROTO_REQ,
+ .proto_peer = NNG_PROTO_REP,
+ .proto_name = "req",
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_req_sock_ops,
.proto_pipe_ops = &nni_req_pipe_ops,
};