aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-08 11:18:16 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-08 11:18:16 -0800
commitec2574b09a746709f15d2a3f5de135e29f4bcb52 (patch)
tree25f970232f8093b9ce94969eeed2a5f230e94a89 /src
parent360d19001b90d92ac2f232efb67e356979b0bc4b (diff)
downloadnng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.gz
nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.bz2
nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.zip
Move to generic socket & pipe workers, and up to 4 each.
This should eliminate all need for protocols to do their own thread management tasks.
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/pipe.c37
-rw-r--r--src/core/pipe.h3
-rw-r--r--src/core/protocol.h37
-rw-r--r--src/core/socket.c62
-rw-r--r--src/core/socket.h3
-rw-r--r--src/protocol/pair/pair.c9
-rw-r--r--src/protocol/pipeline/pull.c10
-rw-r--r--src/protocol/pipeline/push.c9
-rw-r--r--src/protocol/pubsub/pub.c10
-rw-r--r--src/protocol/pubsub/sub.c9
-rw-r--r--src/protocol/reqrep/rep.c9
-rw-r--r--src/protocol/reqrep/req.c21
-rw-r--r--src/protocol/survey/respond.c13
14 files changed, 97 insertions, 139 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 98f4e661..9c745660 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -37,6 +37,7 @@ typedef struct nni_proto nni_proto;
typedef int nni_signal; // Wakeup channel.
typedef uint64_t nni_time; // Abs. time (usec).
typedef int64_t nni_duration; // Rel. time (usec).
+typedef void (*nni_worker)(void *);
// Used by transports for scatter gather I/O.
typedef struct {
@@ -53,6 +54,9 @@ typedef struct {
#define NNI_ALLOC_STRUCT(s) nni_alloc(sizeof (*s))
#define NNI_FREE_STRUCT(s) nni_free((s), sizeof (*s))
+// Maximum number of socket or pipe worker threads.
+#define NNI_MAXWORKERS 4
+
#define NNI_PUT16(ptr, u) \
do { \
(ptr)[0] = (uint8_t) (((uint16_t) (u)) >> 8); \
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 150fcd23..249e887c 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -88,8 +88,11 @@ nni_pipe_peer(nni_pipe *p)
void
nni_pipe_destroy(nni_pipe *p)
{
- nni_thr_fini(&p->p_send_thr);
- nni_thr_fini(&p->p_recv_thr);
+ int i;
+
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_fini(&p->p_worker_thr[i]);
+ }
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_destroy(p->p_tran_data);
@@ -109,6 +112,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
const nni_proto_pipe_ops *ops = &sock->s_pipe_ops;
void *pdata;
int rv;
+ int i;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
@@ -128,16 +132,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
return (rv);
}
p->p_proto_data = pdata;
- if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) {
- ops->pipe_fini(&p->p_proto_data);
- NNI_FREE_STRUCT(p);
- return (rv);
- }
- if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) {
- nni_thr_fini(&p->p_recv_thr);
- ops->pipe_fini(&p->p_proto_data);
- NNI_FREE_STRUCT(p);
- return (rv);
+
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_worker fn = ops->pipe_worker[i];
+ rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata);
+ if (rv != 0) {
+ while (i > 0) {
+ i--;
+ nni_thr_fini(&p->p_worker_thr[i]);
+ }
+ ops->pipe_fini(pdata);
+ NNI_FREE_STRUCT(p);
+ return (rv);
+ }
}
*pp = p;
@@ -160,6 +167,7 @@ int
nni_pipe_start(nni_pipe *pipe)
{
int rv;
+ int i;
int collide;
nni_sock *sock = pipe->p_sock;
@@ -200,8 +208,9 @@ nni_pipe_start(nni_pipe *pipe)
nni_list_append(&sock->s_pipes, pipe);
- nni_thr_run(&pipe->p_send_thr);
- nni_thr_run(&pipe->p_recv_thr);
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_run(&pipe->p_worker_thr[i]);
+ }
pipe->p_active = 1;
// XXX: Publish event
diff --git a/src/core/pipe.h b/src/core/pipe.h
index cc14de86..65d2ede5 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -27,8 +27,7 @@ struct nng_pipe {
nni_ep * p_ep;
int p_reap;
int p_active;
- nni_thr p_send_thr;
- nni_thr p_recv_thr;
+ nni_thr p_worker_thr[NNI_MAXWORKERS];
};
// Pipe operations that protocols use.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 000e9c64..11c4c304 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -24,35 +24,31 @@
struct nni_proto_pipe_ops {
// pipe_init creates the protocol-specific per pipe data structure.
// The last argument is the per-socket protocol private data.
- int (*pipe_init)(void **, nni_pipe *, void *);
+ int (*pipe_init)(void **, nni_pipe *, void *);
// pipe_fini releases any pipe data structures. This is called after
// the pipe has been removed from the protocol, and the generic
// pipe threads have been stopped.
- void (*pipe_fini)(void *);
+ void (*pipe_fini)(void *);
// pipe_add is called to register a pipe with the protocol. The
// protocol can reject this, for example if another pipe is already
// active on a 1:1 protocol. The protocol may not block during this,
// as the socket lock is held.
- int (*pipe_add)(void *);
+ int (*pipe_add)(void *);
// pipe_rem is called to unregister a pipe from the protocol.
// Threads may still acccess data structures, so the protocol
// should not free anything yet. This is called with the socket
// lock held, so the protocol may not call back into the socket, and
// must not block.
- void (*pipe_rem)(void *);
+ void (*pipe_rem)(void *);
- // pipe_send is a function run in a thread per pipe, to process
- // send activity. This can be NULL.
- void (*pipe_send)(void *);
-
- // pipe_recv is a function run in a thread per pipe, to process
- // receive activity. While this can be NULL, it should NOT be, as
- // otherwise the protocol may not be able to discover the closure of
- // the underlying transport (such as a remote disconnect).
- void (*pipe_recv)(void *);
+ // Worker functions. If non-NULL, each worker is executed and
+ // given the protocol pipe data as a argument. All workers are
+ // started, or none are started. The pipe_fini function is obliged
+ // to ensure that workers have exited.
+ nni_worker pipe_worker[NNI_MAXWORKERS];
};
struct nni_proto_sock_ops {
@@ -75,15 +71,6 @@ struct nni_proto_sock_ops {
int (*sock_setopt)(void *, int, const void *, size_t);
int (*sock_getopt)(void *, int, void *, size_t *);
- // sock_send is a send worker. It can really be anything, but it
- // is run in a separate thread (if it is non-NULL).
- void (*sock_send)(void *);
-
- // sock_recv is a receive worker. As with send it can really be
- // anything, its just a thread that runs for the duration of the
- // socket.
- void (*sock_recv)(void *);
-
// Receive filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
// delivered to the application. To drop the message, the prtocol
@@ -93,6 +80,12 @@ struct nni_proto_sock_ops {
// Send filter. This may be NULL, but if it isn't, then messages
// here are filtered just after they come from the application.
nni_msg * (*sock_sfilter)(void *, nni_msg *);
+
+ // Worker functions. If non-NULL, each worker is executed and given
+ // the protocol socket data as an argument. These will all be started
+ // at about the same time, and all will be started, or none will be
+ // started. They are obliged to exit in response to sock_close.
+ nni_worker sock_worker[NNI_MAXWORKERS];
};
struct nni_proto {
diff --git a/src/core/socket.c b/src/core/socket.c
index ad8e5703..c6b0408e 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -142,6 +142,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_sock *sock;
nni_proto *proto;
int rv;
+ int i;
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
@@ -217,7 +218,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) {
nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_recver);
+ nni_thr_fini(&sock->s_reaper);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
@@ -234,36 +235,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
return (rv);
}
- // NB: If these functions are NULL, the thread initialization is
- // largely a NO-OP. The system won't actually create the threads.
- rv = nni_thr_init(&sock->s_sender, sops->sock_send, sock->s_data);
- if (rv != 0) {
- nni_thr_wait(&sock->s_reaper);
- sops->sock_fini(&sock->s_data);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- }
- rv = nni_thr_init(&sock->s_recver, sops->sock_recv, sock->s_data);
- if (rv != 0) {
- nni_thr_wait(&sock->s_sender);
- sops->sock_fini(&sock->s_data);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_sender);
- nni_thr_fini(&sock->s_reaper);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
+ // NB: If worker functions are null, then the thread initialization
+ // turns into a NOP, and no actual thread will be started.
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_worker fn = sops->sock_worker[i];
+ rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data);
+ if (rv != 0) {
+ while (i > 0) {
+ i--;
+ nni_thr_fini(&sock->s_worker_thr[i]);
+ }
+ sops->sock_fini(&sock->s_data);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
+ }
}
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_run(&sock->s_worker_thr[i]);
+ }
nni_thr_run(&sock->s_reaper);
- nni_thr_run(&sock->s_recver);
- nni_thr_run(&sock->s_sender);
*sockp = sock;
return (0);
}
@@ -279,6 +274,7 @@ nni_sock_shutdown(nni_sock *sock)
nni_pipe *pipe;
nni_ep *ep;
nni_time linger;
+ int i;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -350,8 +346,9 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&sock->s_mx);
// Wait for the threads to exit.
- nni_thr_wait(&sock->s_sender);
- nni_thr_wait(&sock->s_recver);
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_wait(&sock->s_worker_thr[i]);
+ }
nni_thr_wait(&sock->s_reaper);
// At this point, there are no threads blocked inside of us
@@ -368,6 +365,8 @@ nni_sock_shutdown(nni_sock *sock)
void
nni_sock_close(nni_sock *sock)
{
+ int i;
+
// Shutdown everything if not already done. This operation
// is idempotent.
nni_sock_shutdown(sock);
@@ -383,9 +382,10 @@ nni_sock_close(nni_sock *sock)
sock->s_sock_ops.sock_fini(sock->s_data);
// And we need to clean up *our* state.
+ for (i = 0; i < NNI_MAXWORKERS; i++) {
+ nni_thr_fini(&sock->s_worker_thr[i]);
+ }
nni_thr_fini(&sock->s_reaper);
- nni_thr_fini(&sock->s_sender);
- nni_thr_fini(&sock->s_recver);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_cv);
diff --git a/src/core/socket.h b/src/core/socket.h
index 197b5d0d..4656c5d7 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -40,8 +40,7 @@ struct nng_socket {
nni_list s_reaps; // pipes to reap
nni_thr s_reaper;
- nni_thr s_sender;
- nni_thr s_recver;
+ nni_thr s_worker_thr[NNI_MAXWORKERS];
int s_ep_pend; // EP dial/listen in progress
int s_closing; // Socket is closing
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 51608e07..ec6709ac 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -200,20 +200,15 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = {
.pipe_fini = nni_pair_pipe_fini,
.pipe_add = nni_pair_pipe_add,
.pipe_rem = nni_pair_pipe_rem,
- .pipe_send = nni_pair_pipe_send,
- .pipe_recv = nni_pair_pipe_recv,
+ .pipe_worker = { nni_pair_pipe_send,
+ nni_pair_pipe_recv },
};
static nni_proto_sock_ops nni_pair_sock_ops = {
.sock_init = nni_pair_sock_init,
.sock_fini = nni_pair_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_pair_sock_setopt,
.sock_getopt = nni_pair_sock_getopt,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
- .sock_send = NULL,
- .sock_recv = NULL,
};
nni_proto nni_pair_proto = {
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 9612177c..dc37e72b 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -139,22 +139,14 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
- .pipe_add = NULL,
- .pipe_rem = NULL,
- .pipe_send = NULL,
- .pipe_recv = nni_pull_pipe_recv,
+ .pipe_worker = { nni_pull_pipe_recv },
};
static nni_proto_sock_ops nni_pull_sock_ops = {
.sock_init = nni_pull_sock_init,
.sock_fini = nni_pull_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_pull_sock_setopt,
.sock_getopt = nni_pull_sock_getopt,
- .sock_send = NULL,
- .sock_recv = NULL,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
};
nni_proto nni_pull_proto = {
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 6cdc9cc5..e3b9ace8 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -288,8 +288,8 @@ static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_fini = nni_push_pipe_fini,
.pipe_add = nni_push_pipe_add,
.pipe_rem = nni_push_pipe_rem,
- .pipe_send = nni_push_pipe_send,
- .pipe_recv = nni_push_pipe_recv,
+ .pipe_worker = { nni_push_pipe_send,
+ nni_push_pipe_recv },
};
static nni_proto_sock_ops nni_push_sock_ops = {
@@ -298,10 +298,7 @@ static nni_proto_sock_ops nni_push_sock_ops = {
.sock_close = nni_push_sock_close,
.sock_setopt = nni_push_sock_setopt,
.sock_getopt = nni_push_sock_getopt,
- .sock_send = nni_push_sock_send,
- .sock_recv = NULL,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
+ .sock_worker = { nni_push_sock_send },
};
nni_proto nni_push_proto = {
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 684b916d..b597f896 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -257,20 +257,16 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = {
.pipe_fini = nni_pub_pipe_fini,
.pipe_add = nni_pub_pipe_add,
.pipe_rem = nni_pub_pipe_rem,
- .pipe_send = nni_pub_pipe_send,
- .pipe_recv = nni_pub_pipe_recv,
+ .pipe_worker = { nni_pub_pipe_send,
+ nni_pub_pipe_recv },
};
nni_proto_sock_ops nni_pub_sock_ops = {
.sock_init = nni_pub_sock_init,
.sock_fini = nni_pub_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_pub_sock_setopt,
.sock_getopt = nni_pub_sock_getopt,
- .sock_send = nni_pub_sock_send,
- .sock_recv = NULL,
- .sock_rfilter = NULL,
- .sock_sfilter = NULL,
+ .sock_worker = { nni_pub_sock_send },
};
nni_proto nni_pub_proto = {
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index dd288d5e..0a6ce5fd 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -297,22 +297,15 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
- .pipe_add = NULL,
- .pipe_rem = NULL,
- .pipe_send = NULL,
- .pipe_recv = nni_sub_pipe_recv,
+ .pipe_worker = { nni_sub_pipe_recv },
};
static nni_proto_sock_ops nni_sub_sock_ops = {
.sock_init = nni_sub_sock_init,
.sock_fini = nni_sub_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_sub_sock_setopt,
.sock_getopt = nni_sub_sock_getopt,
.sock_rfilter = nni_sub_sock_rfilter,
- .sock_sfilter = NULL,
- .sock_send = NULL,
- .sock_recv = NULL,
};
nni_proto nni_sub_proto = {
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 3cf215b5..485fd4bc 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -26,7 +26,6 @@ struct nni_rep_sock {
nni_msgq * urq;
int raw;
int ttl;
- nni_thr sender;
nni_idhash * pipes;
char * btrace;
size_t btrace_len;
@@ -395,20 +394,18 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = {
.pipe_fini = nni_rep_pipe_fini,
.pipe_add = nni_rep_pipe_add,
.pipe_rem = nni_rep_pipe_rem,
- .pipe_send = nni_rep_pipe_send,
- .pipe_recv = nni_rep_pipe_recv,
+ .pipe_worker = { nni_rep_pipe_send,
+ nni_rep_pipe_recv },
};
static nni_proto_sock_ops nni_rep_sock_ops = {
.sock_init = nni_rep_sock_init,
.sock_fini = nni_rep_sock_fini,
- .sock_close = NULL,
.sock_setopt = nni_rep_sock_setopt,
.sock_getopt = nni_rep_sock_getopt,
.sock_rfilter = nni_rep_sock_rfilter,
.sock_sfilter = nni_rep_sock_sfilter,
- .sock_send = nni_rep_sock_send,
- .sock_recv = NULL,
+ .sock_worker = { nni_rep_sock_send },
};
nni_proto nni_rep_proto = {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index d8104342..3c3bba9c 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -13,7 +13,7 @@
#include "core/nng_impl.h"
// Request protocol. The REQ protocol is the "request" side of a
-// request-reply pair. This is useful for bulding RPC clients, for
+// request-reply pair. This is useful for building RPC clients, for
// example.
typedef struct nni_req_pipe nni_req_pipe;
@@ -27,7 +27,6 @@ struct nni_req_sock {
nni_msgq * urq;
nni_duration retry;
nni_time resend;
- nni_thr resender;
int raw;
int closing;
nni_msg * reqmsg;
@@ -41,7 +40,6 @@ struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
int sigclose;
- nni_list_node node;
};
static void nni_req_resender(void *);
@@ -72,13 +70,6 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
req->urq = nni_sock_recvq(sock);
*reqp = req;
nni_sock_recverr(sock, NNG_ESTATE);
- rv = nni_thr_init(&req->resender, nni_req_resender, req);
- if (rv != 0) {
- nni_cv_fini(&req->cv);
- NNI_FREE_STRUCT(req);
- return (rv);
- }
- nni_thr_run(&req->resender);
return (0);
}
@@ -100,7 +91,6 @@ nni_req_sock_fini(void *arg)
{
nni_req_sock *req = arg;
- nni_thr_fini(&req->resender);
nni_cv_fini(&req->cv);
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
@@ -275,7 +265,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
-nni_req_resender(void *arg)
+nni_req_sock_resend(void *arg)
{
nni_req_sock *req = arg;
nni_mtx *mx = nni_sock_mtx(req->sock);
@@ -395,8 +385,8 @@ static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_fini = nni_req_pipe_fini,
.pipe_add = nni_req_pipe_add,
.pipe_rem = nni_req_pipe_rem,
- .pipe_send = nni_req_pipe_send,
- .pipe_recv = nni_req_pipe_recv,
+ .pipe_worker = { nni_req_pipe_send,
+ nni_req_pipe_recv },
};
static nni_proto_sock_ops nni_req_sock_ops = {
@@ -407,8 +397,7 @@ static nni_proto_sock_ops nni_req_sock_ops = {
.sock_getopt = nni_req_sock_getopt,
.sock_rfilter = nni_req_sock_rfilter,
.sock_sfilter = nni_req_sock_sfilter,
- .sock_send = NULL,
- .sock_recv = NULL,
+ .sock_worker = { nni_req_sock_resend },
};
nni_proto nni_req_proto = {
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 24c607b9..3b23dff8 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -19,18 +19,17 @@
typedef struct nni_resp_pipe nni_resp_pipe;
typedef struct nni_resp_sock nni_resp_sock;
-// An nni_rep_sock is our per-socket protocol private structure.
+// An nni_resp_sock is our per-socket protocol private structure.
struct nni_resp_sock {
nni_sock * sock;
int raw;
int ttl;
- nni_thr sender;
nni_idhash * pipes;
char * btrace;
size_t btrace_len;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
+// An nni_resp_pipe is our per-pipe protocol private structure.
struct nni_resp_pipe {
nni_pipe * pipe;
nni_resp_sock * resp;
@@ -38,8 +37,6 @@ struct nni_resp_pipe {
int sigclose;
};
-static void nni_rep_topsender(void *);
-
static int
nni_resp_sock_init(void **respp, nni_sock *sock)
{
@@ -388,8 +385,7 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = {
.pipe_fini = nni_resp_pipe_fini,
.pipe_add = nni_resp_pipe_add,
.pipe_rem = nni_resp_pipe_rem,
- .pipe_send = nni_resp_pipe_send,
- .pipe_recv = nni_resp_pipe_recv,
+ .pipe_worker = { nni_resp_pipe_send,nni_resp_pipe_recv },
};
static nni_proto_sock_ops nni_resp_sock_ops = {
@@ -400,8 +396,7 @@ static nni_proto_sock_ops nni_resp_sock_ops = {
.sock_getopt = nni_resp_sock_getopt,
.sock_rfilter = nni_resp_sock_rfilter,
.sock_sfilter = nni_resp_sock_sfilter,
- .sock_send = nni_resp_sock_send,
- .sock_recv = NULL,
+ .sock_worker = { nni_resp_sock_send },
};
nni_proto nni_respondent_proto = {