diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-08 11:18:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-08 11:18:16 -0800 |
| commit | ec2574b09a746709f15d2a3f5de135e29f4bcb52 (patch) | |
| tree | 25f970232f8093b9ce94969eeed2a5f230e94a89 /src | |
| parent | 360d19001b90d92ac2f232efb67e356979b0bc4b (diff) | |
| download | nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.gz nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.tar.bz2 nng-ec2574b09a746709f15d2a3f5de135e29f4bcb52.zip | |
Move to generic socket & pipe workers, and up to 4 each.
This should eliminate all need for protocols to do their own
thread management tasks.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/defs.h | 4 | ||||
| -rw-r--r-- | src/core/pipe.c | 37 | ||||
| -rw-r--r-- | src/core/pipe.h | 3 | ||||
| -rw-r--r-- | src/core/protocol.h | 37 | ||||
| -rw-r--r-- | src/core/socket.c | 62 | ||||
| -rw-r--r-- | src/core/socket.h | 3 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 9 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 10 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 10 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 9 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 21 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 13 |
14 files changed, 97 insertions, 139 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 98f4e661..9c745660 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -37,6 +37,7 @@ typedef struct nni_proto nni_proto; typedef int nni_signal; // Wakeup channel. typedef uint64_t nni_time; // Abs. time (usec). typedef int64_t nni_duration; // Rel. time (usec). +typedef void (*nni_worker)(void *); // Used by transports for scatter gather I/O. typedef struct { @@ -53,6 +54,9 @@ typedef struct { #define NNI_ALLOC_STRUCT(s) nni_alloc(sizeof (*s)) #define NNI_FREE_STRUCT(s) nni_free((s), sizeof (*s)) +// Maximum number of socket or pipe worker threads. +#define NNI_MAXWORKERS 4 + #define NNI_PUT16(ptr, u) \ do { \ (ptr)[0] = (uint8_t) (((uint16_t) (u)) >> 8); \ diff --git a/src/core/pipe.c b/src/core/pipe.c index 150fcd23..249e887c 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -88,8 +88,11 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { - nni_thr_fini(&p->p_send_thr); - nni_thr_fini(&p->p_recv_thr); + int i; + + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&p->p_worker_thr[i]); + } if (p->p_tran_data != NULL) { p->p_tran_ops.pipe_destroy(p->p_tran_data); @@ -109,6 +112,7 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) const nni_proto_pipe_ops *ops = &sock->s_pipe_ops; void *pdata; int rv; + int i; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); @@ -128,16 +132,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) return (rv); } p->p_proto_data = pdata; - if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) { - ops->pipe_fini(&p->p_proto_data); - NNI_FREE_STRUCT(p); - return (rv); - } - if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) { - nni_thr_fini(&p->p_recv_thr); - ops->pipe_fini(&p->p_proto_data); - NNI_FREE_STRUCT(p); - return (rv); + + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_worker fn = ops->pipe_worker[i]; + rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata); + if (rv != 0) { + while (i > 0) { + i--; + nni_thr_fini(&p->p_worker_thr[i]); + } + ops->pipe_fini(pdata); + NNI_FREE_STRUCT(p); + return (rv); + } } *pp = p; @@ -160,6 +167,7 @@ int nni_pipe_start(nni_pipe *pipe) { int rv; + int i; int collide; nni_sock *sock = pipe->p_sock; @@ -200,8 +208,9 @@ nni_pipe_start(nni_pipe *pipe) nni_list_append(&sock->s_pipes, pipe); - nni_thr_run(&pipe->p_send_thr); - nni_thr_run(&pipe->p_recv_thr); + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_run(&pipe->p_worker_thr[i]); + } pipe->p_active = 1; // XXX: Publish event diff --git a/src/core/pipe.h b/src/core/pipe.h index cc14de86..65d2ede5 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -27,8 +27,7 @@ struct nng_pipe { nni_ep * p_ep; int p_reap; int p_active; - nni_thr p_send_thr; - nni_thr p_recv_thr; + nni_thr p_worker_thr[NNI_MAXWORKERS]; }; // Pipe operations that protocols use. diff --git a/src/core/protocol.h b/src/core/protocol.h index 000e9c64..11c4c304 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -24,35 +24,31 @@ struct nni_proto_pipe_ops { // pipe_init creates the protocol-specific per pipe data structure. // The last argument is the per-socket protocol private data. - int (*pipe_init)(void **, nni_pipe *, void *); + int (*pipe_init)(void **, nni_pipe *, void *); // pipe_fini releases any pipe data structures. This is called after // the pipe has been removed from the protocol, and the generic // pipe threads have been stopped. - void (*pipe_fini)(void *); + void (*pipe_fini)(void *); // pipe_add is called to register a pipe with the protocol. The // protocol can reject this, for example if another pipe is already // active on a 1:1 protocol. The protocol may not block during this, // as the socket lock is held. - int (*pipe_add)(void *); + int (*pipe_add)(void *); // pipe_rem is called to unregister a pipe from the protocol. // Threads may still acccess data structures, so the protocol // should not free anything yet. This is called with the socket // lock held, so the protocol may not call back into the socket, and // must not block. - void (*pipe_rem)(void *); + void (*pipe_rem)(void *); - // pipe_send is a function run in a thread per pipe, to process - // send activity. This can be NULL. - void (*pipe_send)(void *); - - // pipe_recv is a function run in a thread per pipe, to process - // receive activity. While this can be NULL, it should NOT be, as - // otherwise the protocol may not be able to discover the closure of - // the underlying transport (such as a remote disconnect). - void (*pipe_recv)(void *); + // Worker functions. If non-NULL, each worker is executed and + // given the protocol pipe data as a argument. All workers are + // started, or none are started. The pipe_fini function is obliged + // to ensure that workers have exited. + nni_worker pipe_worker[NNI_MAXWORKERS]; }; struct nni_proto_sock_ops { @@ -75,15 +71,6 @@ struct nni_proto_sock_ops { int (*sock_setopt)(void *, int, const void *, size_t); int (*sock_getopt)(void *, int, void *, size_t *); - // sock_send is a send worker. It can really be anything, but it - // is run in a separate thread (if it is non-NULL). - void (*sock_send)(void *); - - // sock_recv is a receive worker. As with send it can really be - // anything, its just a thread that runs for the duration of the - // socket. - void (*sock_recv)(void *); - // Receive filter. This may be NULL, but if it isn't, then // messages coming into the system are routed here just before being // delivered to the application. To drop the message, the prtocol @@ -93,6 +80,12 @@ struct nni_proto_sock_ops { // Send filter. This may be NULL, but if it isn't, then messages // here are filtered just after they come from the application. nni_msg * (*sock_sfilter)(void *, nni_msg *); + + // Worker functions. If non-NULL, each worker is executed and given + // the protocol socket data as an argument. These will all be started + // at about the same time, and all will be started, or none will be + // started. They are obliged to exit in response to sock_close. + nni_worker sock_worker[NNI_MAXWORKERS]; }; struct nni_proto { diff --git a/src/core/socket.c b/src/core/socket.c index ad8e5703..c6b0408e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -142,6 +142,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_sock *sock; nni_proto *proto; int rv; + int i; nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; @@ -217,7 +218,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_recver); + nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); @@ -234,36 +235,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) return (rv); } - // NB: If these functions are NULL, the thread initialization is - // largely a NO-OP. The system won't actually create the threads. - rv = nni_thr_init(&sock->s_sender, sops->sock_send, sock->s_data); - if (rv != 0) { - nni_thr_wait(&sock->s_reaper); - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - } - rv = nni_thr_init(&sock->s_recver, sops->sock_recv, sock->s_data); - if (rv != 0) { - nni_thr_wait(&sock->s_sender); - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_sender); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + // NB: If worker functions are null, then the thread initialization + // turns into a NOP, and no actual thread will be started. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_worker fn = sops->sock_worker[i]; + rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data); + if (rv != 0) { + while (i > 0) { + i--; + nni_thr_fini(&sock->s_worker_thr[i]); + } + sops->sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); + } } + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_run(&sock->s_worker_thr[i]); + } nni_thr_run(&sock->s_reaper); - nni_thr_run(&sock->s_recver); - nni_thr_run(&sock->s_sender); *sockp = sock; return (0); } @@ -279,6 +274,7 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe *pipe; nni_ep *ep; nni_time linger; + int i; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -350,8 +346,9 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&sock->s_mx); // Wait for the threads to exit. - nni_thr_wait(&sock->s_sender); - nni_thr_wait(&sock->s_recver); + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_wait(&sock->s_worker_thr[i]); + } nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -368,6 +365,8 @@ nni_sock_shutdown(nni_sock *sock) void nni_sock_close(nni_sock *sock) { + int i; + // Shutdown everything if not already done. This operation // is idempotent. nni_sock_shutdown(sock); @@ -383,9 +382,10 @@ nni_sock_close(nni_sock *sock) sock->s_sock_ops.sock_fini(sock->s_data); // And we need to clean up *our* state. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&sock->s_worker_thr[i]); + } nni_thr_fini(&sock->s_reaper); - nni_thr_fini(&sock->s_sender); - nni_thr_fini(&sock->s_recver); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_cv); diff --git a/src/core/socket.h b/src/core/socket.h index 197b5d0d..4656c5d7 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -40,8 +40,7 @@ struct nng_socket { nni_list s_reaps; // pipes to reap nni_thr s_reaper; - nni_thr s_sender; - nni_thr s_recver; + nni_thr s_worker_thr[NNI_MAXWORKERS]; int s_ep_pend; // EP dial/listen in progress int s_closing; // Socket is closing diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 51608e07..ec6709ac 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -200,20 +200,15 @@ static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_fini = nni_pair_pipe_fini, .pipe_add = nni_pair_pipe_add, .pipe_rem = nni_pair_pipe_rem, - .pipe_send = nni_pair_pipe_send, - .pipe_recv = nni_pair_pipe_recv, + .pipe_worker = { nni_pair_pipe_send, + nni_pair_pipe_recv }, }; static nni_proto_sock_ops nni_pair_sock_ops = { .sock_init = nni_pair_sock_init, .sock_fini = nni_pair_sock_fini, - .sock_close = NULL, .sock_setopt = nni_pair_sock_setopt, .sock_getopt = nni_pair_sock_getopt, - .sock_rfilter = NULL, - .sock_sfilter = NULL, - .sock_send = NULL, - .sock_recv = NULL, }; nni_proto nni_pair_proto = { diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 9612177c..dc37e72b 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -139,22 +139,14 @@ nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_add = NULL, - .pipe_rem = NULL, - .pipe_send = NULL, - .pipe_recv = nni_pull_pipe_recv, + .pipe_worker = { nni_pull_pipe_recv }, }; static nni_proto_sock_ops nni_pull_sock_ops = { .sock_init = nni_pull_sock_init, .sock_fini = nni_pull_sock_fini, - .sock_close = NULL, .sock_setopt = nni_pull_sock_setopt, .sock_getopt = nni_pull_sock_getopt, - .sock_send = NULL, - .sock_recv = NULL, - .sock_rfilter = NULL, - .sock_sfilter = NULL, }; nni_proto nni_pull_proto = { diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 6cdc9cc5..e3b9ace8 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -288,8 +288,8 @@ static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, .pipe_rem = nni_push_pipe_rem, - .pipe_send = nni_push_pipe_send, - .pipe_recv = nni_push_pipe_recv, + .pipe_worker = { nni_push_pipe_send, + nni_push_pipe_recv }, }; static nni_proto_sock_ops nni_push_sock_ops = { @@ -298,10 +298,7 @@ static nni_proto_sock_ops nni_push_sock_ops = { .sock_close = nni_push_sock_close, .sock_setopt = nni_push_sock_setopt, .sock_getopt = nni_push_sock_getopt, - .sock_send = nni_push_sock_send, - .sock_recv = NULL, - .sock_rfilter = NULL, - .sock_sfilter = NULL, + .sock_worker = { nni_push_sock_send }, }; nni_proto nni_push_proto = { diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 684b916d..b597f896 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -257,20 +257,16 @@ static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_fini = nni_pub_pipe_fini, .pipe_add = nni_pub_pipe_add, .pipe_rem = nni_pub_pipe_rem, - .pipe_send = nni_pub_pipe_send, - .pipe_recv = nni_pub_pipe_recv, + .pipe_worker = { nni_pub_pipe_send, + nni_pub_pipe_recv }, }; nni_proto_sock_ops nni_pub_sock_ops = { .sock_init = nni_pub_sock_init, .sock_fini = nni_pub_sock_fini, - .sock_close = NULL, .sock_setopt = nni_pub_sock_setopt, .sock_getopt = nni_pub_sock_getopt, - .sock_send = nni_pub_sock_send, - .sock_recv = NULL, - .sock_rfilter = NULL, - .sock_sfilter = NULL, + .sock_worker = { nni_pub_sock_send }, }; nni_proto nni_pub_proto = { diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index dd288d5e..0a6ce5fd 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -297,22 +297,15 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg) static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, - .pipe_add = NULL, - .pipe_rem = NULL, - .pipe_send = NULL, - .pipe_recv = nni_sub_pipe_recv, + .pipe_worker = { nni_sub_pipe_recv }, }; static nni_proto_sock_ops nni_sub_sock_ops = { .sock_init = nni_sub_sock_init, .sock_fini = nni_sub_sock_fini, - .sock_close = NULL, .sock_setopt = nni_sub_sock_setopt, .sock_getopt = nni_sub_sock_getopt, .sock_rfilter = nni_sub_sock_rfilter, - .sock_sfilter = NULL, - .sock_send = NULL, - .sock_recv = NULL, }; nni_proto nni_sub_proto = { diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 3cf215b5..485fd4bc 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -26,7 +26,6 @@ struct nni_rep_sock { nni_msgq * urq; int raw; int ttl; - nni_thr sender; nni_idhash * pipes; char * btrace; size_t btrace_len; @@ -395,20 +394,18 @@ static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_fini = nni_rep_pipe_fini, .pipe_add = nni_rep_pipe_add, .pipe_rem = nni_rep_pipe_rem, - .pipe_send = nni_rep_pipe_send, - .pipe_recv = nni_rep_pipe_recv, + .pipe_worker = { nni_rep_pipe_send, + nni_rep_pipe_recv }, }; static nni_proto_sock_ops nni_rep_sock_ops = { .sock_init = nni_rep_sock_init, .sock_fini = nni_rep_sock_fini, - .sock_close = NULL, .sock_setopt = nni_rep_sock_setopt, .sock_getopt = nni_rep_sock_getopt, .sock_rfilter = nni_rep_sock_rfilter, .sock_sfilter = nni_rep_sock_sfilter, - .sock_send = nni_rep_sock_send, - .sock_recv = NULL, + .sock_worker = { nni_rep_sock_send }, }; nni_proto nni_rep_proto = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index d8104342..3c3bba9c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -13,7 +13,7 @@ #include "core/nng_impl.h" // Request protocol. The REQ protocol is the "request" side of a -// request-reply pair. This is useful for bulding RPC clients, for +// request-reply pair. This is useful for building RPC clients, for // example. typedef struct nni_req_pipe nni_req_pipe; @@ -27,7 +27,6 @@ struct nni_req_sock { nni_msgq * urq; nni_duration retry; nni_time resend; - nni_thr resender; int raw; int closing; nni_msg * reqmsg; @@ -41,7 +40,6 @@ struct nni_req_pipe { nni_pipe * pipe; nni_req_sock * req; int sigclose; - nni_list_node node; }; static void nni_req_resender(void *); @@ -72,13 +70,6 @@ nni_req_sock_init(void **reqp, nni_sock *sock) req->urq = nni_sock_recvq(sock); *reqp = req; nni_sock_recverr(sock, NNG_ESTATE); - rv = nni_thr_init(&req->resender, nni_req_resender, req); - if (rv != 0) { - nni_cv_fini(&req->cv); - NNI_FREE_STRUCT(req); - return (rv); - } - nni_thr_run(&req->resender); return (0); } @@ -100,7 +91,6 @@ nni_req_sock_fini(void *arg) { nni_req_sock *req = arg; - nni_thr_fini(&req->resender); nni_cv_fini(&req->cv); if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); @@ -275,7 +265,7 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_req_resender(void *arg) +nni_req_sock_resend(void *arg) { nni_req_sock *req = arg; nni_mtx *mx = nni_sock_mtx(req->sock); @@ -395,8 +385,8 @@ static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_fini = nni_req_pipe_fini, .pipe_add = nni_req_pipe_add, .pipe_rem = nni_req_pipe_rem, - .pipe_send = nni_req_pipe_send, - .pipe_recv = nni_req_pipe_recv, + .pipe_worker = { nni_req_pipe_send, + nni_req_pipe_recv }, }; static nni_proto_sock_ops nni_req_sock_ops = { @@ -407,8 +397,7 @@ static nni_proto_sock_ops nni_req_sock_ops = { .sock_getopt = nni_req_sock_getopt, .sock_rfilter = nni_req_sock_rfilter, .sock_sfilter = nni_req_sock_sfilter, - .sock_send = NULL, - .sock_recv = NULL, + .sock_worker = { nni_req_sock_resend }, }; nni_proto nni_req_proto = { diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 24c607b9..3b23dff8 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -19,18 +19,17 @@ typedef struct nni_resp_pipe nni_resp_pipe; typedef struct nni_resp_sock nni_resp_sock; -// An nni_rep_sock is our per-socket protocol private structure. +// An nni_resp_sock is our per-socket protocol private structure. struct nni_resp_sock { nni_sock * sock; int raw; int ttl; - nni_thr sender; nni_idhash * pipes; char * btrace; size_t btrace_len; }; -// An nni_rep_pipe is our per-pipe protocol private structure. +// An nni_resp_pipe is our per-pipe protocol private structure. struct nni_resp_pipe { nni_pipe * pipe; nni_resp_sock * resp; @@ -38,8 +37,6 @@ struct nni_resp_pipe { int sigclose; }; -static void nni_rep_topsender(void *); - static int nni_resp_sock_init(void **respp, nni_sock *sock) { @@ -388,8 +385,7 @@ static nni_proto_pipe_ops nni_resp_pipe_ops = { .pipe_fini = nni_resp_pipe_fini, .pipe_add = nni_resp_pipe_add, .pipe_rem = nni_resp_pipe_rem, - .pipe_send = nni_resp_pipe_send, - .pipe_recv = nni_resp_pipe_recv, + .pipe_worker = { nni_resp_pipe_send,nni_resp_pipe_recv }, }; static nni_proto_sock_ops nni_resp_sock_ops = { @@ -400,8 +396,7 @@ static nni_proto_sock_ops nni_resp_sock_ops = { .sock_getopt = nni_resp_sock_getopt, .sock_rfilter = nni_resp_sock_rfilter, .sock_sfilter = nni_resp_sock_sfilter, - .sock_send = nni_resp_sock_send, - .sock_recv = NULL, + .sock_worker = { nni_resp_sock_send }, }; nni_proto nni_respondent_proto = { |
