aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/pair/pair.c9
-rw-r--r--src/protocol/pipeline/pull.c10
-rw-r--r--src/protocol/pipeline/push.c9
-rw-r--r--src/protocol/pubsub/pub.c10
-rw-r--r--src/protocol/pubsub/sub.c9
-rw-r--r--src/protocol/reqrep/rep.c9
-rw-r--r--src/protocol/reqrep/req.c21
-rw-r--r--src/protocol/survey/respond.c13
8 files changed, 22 insertions, 68 deletions
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 51608e07..ec6709ac 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -200,20 +200,15 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = {
.pipe_fini = nni_pair_pipe_fini,
.pipe_add = nni_pair_pipe_add,
.pipe_rem = nni_pair_pipe_rem,
- .pipe_send = nni_pair_pipe_send,
- .pipe_recv = nni_pair_pipe_recv,
+ .pipe_worker = { nni_pair_pipe_send,
+ nni_pair_pipe_recv },
};
static nni_proto_sock_ops nni_pair_sock_ops = {
.sock_init = nni_pair_sock_init,
.sock_fini = nni_pair_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_pair_sock_setopt,
.sock_getopt = nni_pair_sock_getopt,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
- .sock_send = NULL,
- .sock_recv = NULL,
};
nni_proto nni_pair_proto = {
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 9612177c..dc37e72b 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -139,22 +139,14 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
- .pipe_add = NULL,
- .pipe_rem = NULL,
- .pipe_send = NULL,
- .pipe_recv = nni_pull_pipe_recv,
+ .pipe_worker = { nni_pull_pipe_recv },
};
static nni_proto_sock_ops nni_pull_sock_ops = {
.sock_init = nni_pull_sock_init,
.sock_fini = nni_pull_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_pull_sock_setopt,
.sock_getopt = nni_pull_sock_getopt,
- .sock_send = NULL,
- .sock_recv = NULL,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
};
nni_proto nni_pull_proto = {
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 6cdc9cc5..e3b9ace8 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -288,8 +288,8 @@ static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_fini = nni_push_pipe_fini,
.pipe_add = nni_push_pipe_add,
.pipe_rem = nni_push_pipe_rem,
- .pipe_send = nni_push_pipe_send,
- .pipe_recv = nni_push_pipe_recv,
+ .pipe_worker = { nni_push_pipe_send,
+ nni_push_pipe_recv },
};
static nni_proto_sock_ops nni_push_sock_ops = {
@@ -298,10 +298,7 @@ static nni_proto_sock_ops nni_push_sock_ops = {
.sock_close = nni_push_sock_close,
.sock_setopt = nni_push_sock_setopt,
.sock_getopt = nni_push_sock_getopt,
- .sock_send = nni_push_sock_send,
- .sock_recv = NULL,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
+ .sock_worker = { nni_push_sock_send },
};
nni_proto nni_push_proto = {
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 684b916d..b597f896 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -257,20 +257,16 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = {
.pipe_fini = nni_pub_pipe_fini,
.pipe_add = nni_pub_pipe_add,
.pipe_rem = nni_pub_pipe_rem,
- .pipe_send = nni_pub_pipe_send,
- .pipe_recv = nni_pub_pipe_recv,
+ .pipe_worker = { nni_pub_pipe_send,
+ nni_pub_pipe_recv },
};
nni_proto_sock_ops nni_pub_sock_ops = {
.sock_init = nni_pub_sock_init,
.sock_fini = nni_pub_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_pub_sock_setopt,
.sock_getopt = nni_pub_sock_getopt,
- .sock_send = nni_pub_sock_send,
- .sock_recv = NULL,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
+ .sock_worker = { nni_pub_sock_send },
};
nni_proto nni_pub_proto = {
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index dd288d5e..0a6ce5fd 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -297,22 +297,15 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
- .pipe_add = NULL,
- .pipe_rem = NULL,
- .pipe_send = NULL,
- .pipe_recv = nni_sub_pipe_recv,
+ .pipe_worker = { nni_sub_pipe_recv },
};
static nni_proto_sock_ops nni_sub_sock_ops = {
.sock_init = nni_sub_sock_init,
.sock_fini = nni_sub_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_sub_sock_setopt,
.sock_getopt = nni_sub_sock_getopt,
.sock_rfilter = nni_sub_sock_rfilter,
- .sock_sfilter = NULL,
- .sock_send = NULL,
- .sock_recv = NULL,
};
nni_proto nni_sub_proto = {
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 3cf215b5..485fd4bc 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -26,7 +26,6 @@ struct nni_rep_sock {
nni_msgq * urq;
int raw;
int ttl;
- nni_thr sender;
nni_idhash * pipes;
char * btrace;
size_t btrace_len;
@@ -395,20 +394,18 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = {
.pipe_fini = nni_rep_pipe_fini,
.pipe_add = nni_rep_pipe_add,
.pipe_rem = nni_rep_pipe_rem,
- .pipe_send = nni_rep_pipe_send,
- .pipe_recv = nni_rep_pipe_recv,
+ .pipe_worker = { nni_rep_pipe_send,
+ nni_rep_pipe_recv },
};
static nni_proto_sock_ops nni_rep_sock_ops = {
.sock_init = nni_rep_sock_init,
.sock_fini = nni_rep_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_rep_sock_setopt,
.sock_getopt = nni_rep_sock_getopt,
.sock_rfilter = nni_rep_sock_rfilter,
.sock_sfilter = nni_rep_sock_sfilter,
- .sock_send = nni_rep_sock_send,
- .sock_recv = NULL,
+ .sock_worker = { nni_rep_sock_send },
};
nni_proto nni_rep_proto = {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index d8104342..3c3bba9c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -13,7 +13,7 @@
#include "core/nng_impl.h"
// Request protocol. The REQ protocol is the "request" side of a
-// request-reply pair. This is useful for bulding RPC clients, for
+// request-reply pair. This is useful for building RPC clients, for
// example.
typedef struct nni_req_pipe nni_req_pipe;
@@ -27,7 +27,6 @@ struct nni_req_sock {
nni_msgq * urq;
nni_duration retry;
nni_time resend;
- nni_thr resender;
int raw;
int closing;
nni_msg * reqmsg;
@@ -41,7 +40,6 @@ struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
int sigclose;
- nni_list_node node;
};
static void nni_req_resender(void *);
@@ -72,13 +70,6 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
req->urq = nni_sock_recvq(sock);
*reqp = req;
nni_sock_recverr(sock, NNG_ESTATE);
- rv = nni_thr_init(&req->resender, nni_req_resender, req);
- if (rv != 0) {
- nni_cv_fini(&req->cv);
- NNI_FREE_STRUCT(req);
- return (rv);
- }
- nni_thr_run(&req->resender);
return (0);
}
@@ -100,7 +91,6 @@ nni_req_sock_fini(void *arg)
{
nni_req_sock *req = arg;
- nni_thr_fini(&req->resender);
nni_cv_fini(&req->cv);
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
@@ -275,7 +265,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
-nni_req_resender(void *arg)
+nni_req_sock_resend(void *arg)
{
nni_req_sock *req = arg;
nni_mtx *mx = nni_sock_mtx(req->sock);
@@ -395,8 +385,8 @@ static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_fini = nni_req_pipe_fini,
.pipe_add = nni_req_pipe_add,
.pipe_rem = nni_req_pipe_rem,
- .pipe_send = nni_req_pipe_send,
- .pipe_recv = nni_req_pipe_recv,
+ .pipe_worker = { nni_req_pipe_send,
+ nni_req_pipe_recv },
};
static nni_proto_sock_ops nni_req_sock_ops = {
@@ -407,8 +397,7 @@ static nni_proto_sock_ops nni_req_sock_ops = {
.sock_getopt = nni_req_sock_getopt,
.sock_rfilter = nni_req_sock_rfilter,
.sock_sfilter = nni_req_sock_sfilter,
- .sock_send = NULL,
- .sock_recv = NULL,
+ .sock_worker = { nni_req_sock_resend },
};
nni_proto nni_req_proto = {
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 24c607b9..3b23dff8 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -19,18 +19,17 @@
typedef struct nni_resp_pipe nni_resp_pipe;
typedef struct nni_resp_sock nni_resp_sock;
-// An nni_rep_sock is our per-socket protocol private structure.
+// An nni_resp_sock is our per-socket protocol private structure.
struct nni_resp_sock {
nni_sock * sock;
int raw;
int ttl;
- nni_thr sender;
nni_idhash * pipes;
char * btrace;
size_t btrace_len;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
+// An nni_resp_pipe is our per-pipe protocol private structure.
struct nni_resp_pipe {
nni_pipe * pipe;
nni_resp_sock * resp;
@@ -38,8 +37,6 @@ struct nni_resp_pipe {
int sigclose;
};
-static void nni_rep_topsender(void *);
-
static int
nni_resp_sock_init(void **respp, nni_sock *sock)
{
@@ -388,8 +385,7 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = {
.pipe_fini = nni_resp_pipe_fini,
.pipe_add = nni_resp_pipe_add,
.pipe_rem = nni_resp_pipe_rem,
- .pipe_send = nni_resp_pipe_send,
- .pipe_recv = nni_resp_pipe_recv,
+ .pipe_worker = { nni_resp_pipe_send,nni_resp_pipe_recv },
};
static nni_proto_sock_ops nni_resp_sock_ops = {
@@ -400,8 +396,7 @@ static nni_proto_sock_ops nni_resp_sock_ops = {
.sock_getopt = nni_resp_sock_getopt,
.sock_rfilter = nni_resp_sock_rfilter,
.sock_sfilter = nni_resp_sock_sfilter,
- .sock_send = nni_resp_sock_send,
- .sock_recv = NULL,
+ .sock_worker = { nni_resp_sock_send },
};
nni_proto nni_respondent_proto = {