diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/defs.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 37 | ||||
| -rw-r--r-- | src/core/pipe.h | 4 | ||||
| -rw-r--r-- | src/core/protocol.c | 34 | ||||
| -rw-r--r-- | src/core/protocol.h | 77 | ||||
| -rw-r--r-- | src/core/socket.c | 95 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 75 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 82 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 75 |
10 files changed, 303 insertions, 181 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index b10701e0..dd225567 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -28,7 +28,8 @@ typedef struct nni_tran nni_tran; typedef struct nni_tran_ep nni_tran_ep; typedef struct nni_tran_pipe nni_tran_pipe; -typedef struct nni_protocol nni_protocol; +typedef struct nni_proto_pipe nni_proto_pipe; +typedef struct nni_proto nni_proto; typedef int nni_signal; // Turnstile/wakeup channel. typedef uint64_t nni_time; // Absolute time (usec). diff --git a/src/core/pipe.c b/src/core/pipe.c index e8864d58..768b7240 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -76,8 +76,8 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.pipe_destroy(p->p_tran_data); } - if (p->p_pdata != NULL) { - nni_free(p->p_pdata, p->p_psize); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); } NNI_FREE_STRUCT(p); } @@ -88,7 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) { nni_pipe *p; nni_sock *sock = ep->ep_sock; - nni_protocol *proto = &sock->s_ops; + nni_proto *proto = &sock->s_proto; + const nni_proto_pipe *pops = proto->proto_pipe; int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { @@ -96,32 +97,34 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) } p->p_sock = sock; p->p_tran_data = NULL; + p->p_proto_data = NULL; p->p_active = 0; - p->p_psize = proto->proto_pipe_size; NNI_LIST_NODE_INIT(&p->p_node); // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *ep->ep_tran->tran_pipe; - if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + // Make a copy of the protocol ops. Same rationale. + p->p_proto_ops = *pops; + + if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) { NNI_FREE_STRUCT(p); - return (NNG_ENOMEM); + return (rv); } - rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata); + rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data); if (rv != 0) { - nni_free(p->p_pdata, p->p_psize); + pops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata); + rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data); if (rv != 0) { nni_thr_fini(&p->p_recv_thr); - nni_free(p->p_pdata, p->p_psize); + pops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - p->p_psize = sock->s_ops.proto_pipe_size; nni_mtx_lock(&sock->s_mx); nni_list_append(&sock->s_pipes, p); nni_mtx_unlock(&sock->s_mx); @@ -171,9 +174,11 @@ nni_pipe_start(nni_pipe *pipe) } } while (collide); - rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); + rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data); if (rv != 0) { - goto fail; + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + return (rv); } nni_thr_run(&pipe->p_send_thr); nni_thr_run(&pipe->p_recv_thr); @@ -183,10 +188,4 @@ nni_pipe_start(nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); return (0); - -fail: - pipe->p_reap = 1; - nni_mtx_unlock(&sock->s_mx); - nni_pipe_close(pipe); - return (rv); } diff --git a/src/core/pipe.h b/src/core/pipe.h index f037bb3b..d2819a31 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -21,8 +21,8 @@ struct nng_pipe { uint32_t p_id; nni_tran_pipe p_tran_ops; void * p_tran_data; - void * p_pdata; // protocol specific data - size_t p_psize; // size of protocol data + nni_proto_pipe p_proto_ops; + void * p_proto_data; nni_list_node p_node; nni_sock * p_sock; nni_ep * p_ep; diff --git a/src/core/protocol.c b/src/core/protocol.c index 6399cd41..8669275a 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -16,18 +16,18 @@ // The list of protocols is hardwired. This is reasonably unlikely to // change, as adding new protocols is not something intended to be done // outside of the core. -extern nni_protocol nni_pair_protocol; +extern nni_proto nni_pair_proto; -static nni_protocol *protocols[] = { - &nni_pair_protocol, +static nni_proto *protocols[] = { + &nni_pair_proto, NULL }; -nni_protocol * -nni_protocol_find(uint16_t num) +nni_proto * +nni_proto_find(uint16_t num) { int i; - nni_protocol *p; + nni_proto *p; for (i = 0; (p = protocols[i]) != NULL; i++) { if (p->proto_self == num) { @@ -39,11 +39,11 @@ nni_protocol_find(uint16_t num) const char * -nni_protocol_name(uint16_t num) +nni_proto_name(uint16_t num) { - nni_protocol *p; + nni_proto *p; - if ((p = nni_protocol_find(num)) == NULL) { + if ((p = nni_proto_find(num)) == NULL) { return (NULL); } return (p->proto_name); @@ -51,9 +51,9 @@ nni_protocol_name(uint16_t num) uint16_t -nni_protocol_number(const char *name) +nni_proto_number(const char *name) { - nni_protocol *p; + nni_proto *p; int i; for (i = 0; (p = protocols[i]) != NULL; i++) { @@ -63,3 +63,15 @@ nni_protocol_number(const char *name) } return (NNG_PROTO_NONE); } + +uint16_t +nni_proto_peer(uint16_t num) +{ + nni_proto *p; + + if ((p = nni_proto_find(num)) == NULL) { + return (NNG_PROTO_NONE); + } + return (p->proto_peer); +} + diff --git a/src/core/protocol.h b/src/core/protocol.h index 301782b4..c858ad39 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -19,48 +19,75 @@ // work, and the pipe functions generally assume no locking is needed. // As a consequence, most of the concurrency in nng exists in the protocol // implementations. -struct nni_protocol { - uint16_t proto_self; // our 16-bit protocol ID - uint16_t proto_peer; // who we peer with (ID) - const char * proto_name; // string version of our name - size_t proto_pipe_size; // pipe private data size - //Create protocol instance, which will be stored on the socket. - int (*proto_create)(void **, nni_sock *); +// nni_proto_pipe contains protocol-specific per-pipe operations. +struct nni_proto_pipe { + // pipe_init creates the protocol-specific per pipe data structure. + // The last argument is the per-socket protocol private data. + int (*pipe_init)(void **, nni_pipe *, void *); - // Destroy the protocol instance. - void (*proto_destroy)(void *); + // pipe_fini releases any pipe data structures. This is called after + // the pipe has been removed from the protocol, and the generic + // pipe threads have been stopped. + void (*pipe_fini)(void *); + + // pipe_add is called to register a pipe with the protocol. The + // protocol can reject this, for example if another pipe is already + // active on a 1:1 protocol. The protocol may not block during this, + // as the socket lock is held. + int (*pipe_add)(void *); + + // pipe_rem is called to unregister a pipe from the protocol. + // Threads may still acccess data structures, so the protocol + // should not free anything yet. This is called with the socket + // lock held, so the protocol may not call back into the socket, and + // must not block. + void (*pipe_rem)(void *); + + // pipe_send is a function run in a thread per pipe, to process + // send activity. This can be NULL. + void (*pipe_send)(void *); - // Add and remove pipes. These are called as connections are - // created or destroyed. - int (*proto_add_pipe)(void *, nni_pipe *, void *); - void (*proto_rem_pipe)(void *, void *); + // pipe_recv is a function run in a thread per pipe, to process + // receive activity. While this can be NULL, it should NOT be, as + // otherwise the protocol may not be able to discover the closure of + // the underlying transport (such as a remote disconnect). + void (*pipe_recv)(void *); +}; + +struct nni_proto { + uint16_t proto_self; // our 16-bit protocol ID + uint16_t proto_peer; // who we peer with (ID) + const char * proto_name; // string version of our name + const nni_proto_pipe * proto_pipe; // Per-pipe operations. - // Thread functions for processing send & receive sides of - // protocol pipes. Send may be NULL, but recv should should not. - // (Recv needs to detect a closed pipe, if nothing else.) - void (*proto_pipe_send)(void *); - void (*proto_pipe_recv)(void *); + // Create protocol instance, which will be stored on the socket. + int (*proto_init)(void **, nni_sock *); + + // Destroy the protocol instance. + void (*proto_fini)(void *); // Option manipulation. These may be NULL. - int (*proto_setopt)(void *, int, const void *, size_t); - int (*proto_getopt)(void *, int, void *, size_t *); + int (*proto_setopt)(void *, int, const void *, + size_t); + int (*proto_getopt)(void *, int, void *, size_t *); // Receive filter. This may be NULL, but if it isn't, then // messages coming into the system are routed here just before being // delivered to the application. To drop the message, the prtocol // should return NULL, otherwise the message (possibly modified). - nni_msg * (*proto_recv_filter)(void *, nni_msg *); + nni_msg * (*proto_recv_filter)(void *, nni_msg *); // Send filter. This may be NULL, but if it isn't, then messages // here are filtered just after they come from the application. - nni_msg * (*proto_send_filter)(void *, nni_msg *); + nni_msg * (*proto_send_filter)(void *, nni_msg *); }; // These functions are not used by protocols, but rather by the socket // core implementation. The lookups can be used by transports as well. -extern nni_protocol *nni_protocol_find(uint16_t); -extern const char *nni_protocol_name(uint16_t); -extern uint16_t nni_protocol_number(const char *); +extern nni_proto *nni_proto_find(uint16_t); +extern const char *nni_proto_name(uint16_t); +extern uint16_t nni_proto_number(const char *); +extern uint16_t nni_proto_peer(uint16_t); #endif // CORE_PROTOCOL_H diff --git a/src/core/socket.c b/src/core/socket.c index 6aa40bfc..cad4b10a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -50,16 +50,15 @@ nni_reaper(void *arg) ep->ep_pipe = NULL; nni_cv_wake(&ep->ep_cv); } - nni_mtx_unlock(&sock->s_mx); // Remove the pipe from the protocol. Protocols may // keep lists of pipes for managing their topologies. // Note that if a protocol has rejected the pipe, it // won't have any data. if (pipe->p_active) { - sock->s_ops.proto_rem_pipe(sock->s_data, - pipe->p_pdata); + pipe->p_proto_ops.pipe_rem(pipe->p_proto_data); } + nni_mtx_unlock(&sock->s_mx); // XXX: also publish event... nni_pipe_destroy(pipe); @@ -79,21 +78,53 @@ nni_reaper(void *arg) } +static nni_msg * +nni_sock_nullfilter(void *arg, nni_msg *mp) +{ + NNI_ARG_UNUSED(arg); + return (mp); +} + + +static int +nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(num); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(szp); + return (NNG_ENOTSUP); +} + + +static int +nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(num); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(sz); + return (NNG_ENOTSUP); +} + + // nn_sock_open creates the underlying socket. int -nni_sock_open(nni_sock **sockp, uint16_t proto) +nni_sock_open(nni_sock **sockp, uint16_t pnum) { nni_sock *sock; - nni_protocol *ops; + nni_proto *proto; int rv; - if ((ops = nni_protocol_find(proto)) == NULL) { + if ((proto = nni_proto_find(pnum)) == NULL) { return (NNG_ENOTSUP); } if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { return (NNG_ENOMEM); } - sock->s_ops = *ops; + + // We make a copy of the protocol operations. + sock->s_proto = *proto; sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; @@ -137,7 +168,7 @@ nni_sock_open(nni_sock **sockp, uint16_t proto) return (rv); } - if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) { nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); @@ -146,6 +177,18 @@ nni_sock_open(nni_sock **sockp, uint16_t proto) NNI_FREE_STRUCT(sock); return (rv); } + if (sock->s_proto.proto_send_filter == NULL) { + sock->s_proto.proto_send_filter = nni_sock_nullfilter; + } + if (sock->s_proto.proto_recv_filter == NULL) { + sock->s_proto.proto_recv_filter = nni_sock_nullfilter; + } + if (sock->s_proto.proto_getopt == NULL) { + sock->s_proto.proto_getopt = nni_sock_nullgetopt; + } + if (sock->s_proto.proto_setopt == NULL) { + sock->s_proto.proto_setopt = nni_sock_nullsetopt; + } nni_thr_run(&sock->s_reaper); *sockp = sock; return (0); @@ -223,7 +266,7 @@ nni_sock_close(nni_sock *sock) // At this point nothing else should be referencing us. // The protocol needs to clean up its state. - sock->s_ops.proto_destroy(sock->s_data); + sock->s_proto.proto_fini(sock->s_data); // And we need to clean up *our* state. nni_msgq_fini(sock->s_urq); @@ -256,11 +299,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) besteffort = sock->s_besteffort; nni_mtx_unlock(&sock->s_mx); - if (sock->s_ops.proto_send_filter != NULL) { - msg = sock->s_ops.proto_send_filter(sock->s_data, msg); - if (msg == NULL) { - return (0); - } + msg = sock->s_proto.proto_send_filter(sock->s_data, msg); + if (msg == NULL) { + return (0); } if (besteffort) { @@ -300,9 +341,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - if (sock->s_ops.proto_recv_filter != NULL) { - msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); - } + msg = sock->s_proto.proto_recv_filter(sock->s_data, msg); if (msg != NULL) { break; } @@ -318,7 +357,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_ops.proto_self); + return (sock->s_proto.proto_self); } @@ -386,12 +425,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); - if (sock->s_ops.proto_setopt != NULL) { - rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } + rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); } switch (opt) { case NNG_OPT_LINGER: @@ -429,12 +466,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); - if (sock->s_ops.proto_getopt != NULL) { - rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } + rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); } switch (opt) { case NNG_OPT_LINGER: diff --git a/src/core/socket.h b/src/core/socket.h index 548b4697..8b3c3ff9 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -20,7 +20,7 @@ struct nng_socket { nni_msgq * s_uwq; // Upper write queue nni_msgq * s_urq; // Upper read queue - nni_protocol s_ops; + nni_proto s_proto; void * s_data; // Protocol private diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 037d5f1e..1d4e58d0 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -23,7 +23,6 @@ typedef struct nni_pair_sock nni_pair_sock; struct nni_pair_sock { nni_sock * sock; nni_pair_pipe * pipe; - nni_mtx mx; nni_msgq * uwq; nni_msgq * urq; }; @@ -42,7 +41,7 @@ static void nni_pair_receiver(void *); static void nni_pair_sender(void *); static int -nni_pair_create(void **pairp, nni_sock *sock) +nni_pair_init(void **pairp, nni_sock *sock) { nni_pair_sock *pair; int rv; @@ -50,10 +49,6 @@ nni_pair_create(void **pairp, nni_sock *sock) if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pair->mx)) != 0) { - NNI_FREE_STRUCT(pair); - return (rv); - } pair->sock = sock; pair->pipe = NULL; pair->uwq = nni_sock_sendq(sock); @@ -64,7 +59,7 @@ nni_pair_create(void **pairp, nni_sock *sock) static void -nni_pair_destroy(void *arg) +nni_pair_fini(void *arg) { nni_pair_sock *pair = arg; @@ -72,49 +67,61 @@ nni_pair_destroy(void *arg) // this wold be the time to shut them all down. We don't, because // the socket already shut us down, and we don't have any other // threads that run. - nni_mtx_fini(&pair->mx); NNI_FREE_STRUCT(pair); } static int -nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data) +nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock) { - nni_pair_sock *pair = arg; - nni_pair_pipe *pp = data; - int rv; + nni_pair_pipe *pp; + if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) { + return (NNG_ENOMEM); + } pp->pipe = pipe; pp->sigclose = 0; - pp->pair = pair; + pp->pair = psock; + *ppp = pp; + return (0); +} + + +static void +nni_pair_pipe_fini(void *arg) +{ + nni_pair_pipe *pp = arg; + NNI_FREE_STRUCT(pp); +} + +static int +nni_pair_pipe_add(void *arg) +{ + nni_pair_pipe *pp = arg; + nni_pair_sock *pair = pp->pair; - nni_mtx_lock(&pair->mx); if (pair->pipe != NULL) { - nni_mtx_unlock(&pair->mx); return (NNG_EBUSY); // Already have a peer, denied. } pair->pipe = pp; - nni_mtx_unlock(&pair->mx); return (0); } static void -nni_pair_rem_pipe(void *arg, void *data) +nni_pair_pipe_rem(void *arg) { - nni_pair_sock *pair = arg; - nni_pair_pipe *pp = data; + nni_pair_pipe *pp = arg; + nni_pair_sock *pair = pp->pair; - nni_mtx_lock(&pair->mx); if (pair->pipe == pp) { pair->pipe = NULL; } - nni_mtx_unlock(&pair->mx); } static void -nni_pair_sender(void *arg) +nni_pair_pipe_send(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; @@ -141,7 +148,7 @@ nni_pair_sender(void *arg) static void -nni_pair_receiver(void *arg) +nni_pair_pipe_recv(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; @@ -185,17 +192,23 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_pair_protocol = { + +static nni_proto_pipe nni_pair_proto_pipe = { + .pipe_init = nni_pair_pipe_init, + .pipe_fini = nni_pair_pipe_fini, + .pipe_add = nni_pair_pipe_add, + .pipe_rem = nni_pair_pipe_rem, + .pipe_send = nni_pair_pipe_send, + .pipe_recv = nni_pair_pipe_recv, +}; + +nni_proto nni_pair_proto = { .proto_self = NNG_PROTO_PAIR, .proto_peer = NNG_PROTO_PAIR, .proto_name = "pair", - .proto_create = nni_pair_create, - .proto_destroy = nni_pair_destroy, - .proto_add_pipe = nni_pair_add_pipe, - .proto_rem_pipe = nni_pair_rem_pipe, - .proto_pipe_size = sizeof (nni_pair_pipe), - .proto_pipe_send = nni_pair_sender, - .proto_pipe_recv = nni_pair_receiver, + .proto_pipe = &nni_pair_proto_pipe, + .proto_init = nni_pair_init, + .proto_fini = nni_pair_fini, .proto_setopt = nni_pair_setopt, .proto_getopt = nni_pair_getopt, .proto_recv_filter = NULL, diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 75f2af71..8d51ee0a 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -41,12 +41,10 @@ struct nni_rep_pipe { int sigclose; }; -static void nni_rep_receiver(void *); -static void nni_rep_sender(void *); static void nni_rep_topsender(void *); static int -nni_rep_create(void **repp, nni_sock *sock) +nni_rep_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; int rv; @@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock) static void -nni_rep_destroy(void *arg) +nni_rep_fini(void *arg) { nni_rep_sock *rep = arg; @@ -102,41 +100,60 @@ nni_rep_destroy(void *arg) static int -nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) +nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { - nni_rep_sock *rep = arg; nni_rep_pipe *rp; int rv; + if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) { + NNI_FREE_STRUCT(rp); + return (rv); + } rp->pipe = pipe; + rp->rep = rsock; rp->sigclose = 0; + *rpp = rp; + return (0); +} + + +static void +nni_rep_pipe_fini(void *arg) +{ + nni_rep_pipe *rp = arg; + + nni_msgq_fini(rp->sendq); + NNI_FREE_STRUCT(rp); +} - rv = nni_msgq_init(&rp->sendq, 2); - if (rv != 0) { - return (rv); - } + +static int +nni_rep_pipe_add(void *arg) +{ + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; + int rv; nni_mtx_lock(&rep->mx); - if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { - nni_msgq_fini(rp->sendq); - nni_mtx_unlock(&rep->mx); - return (rv); - } + rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); nni_mtx_unlock(&rep->mx); - return (0); + + return (rv); } static void -nni_rep_rem_pipe(void *arg, void *data) +nni_rep_pipe_rem(void *arg) { - nni_rep_sock *rep = arg; - nni_rep_pipe *rp = data; + nni_rep_pipe *rp = arg; + nni_rep_sock *rep = rp->rep; nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); nni_mtx_unlock(&rep->mx); - nni_msgq_fini(rp->sendq); } @@ -197,7 +214,7 @@ nni_rep_topsender(void *arg) static void -nni_rep_sender(void *arg) +nni_rep_pipe_send(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; @@ -227,7 +244,7 @@ nni_rep_sender(void *arg) static void -nni_rep_receiver(void *arg) +nni_rep_pipe_recv(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; @@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_rep_protocol = { +static nni_proto_pipe nni_rep_proto_pipe = { + .pipe_init = nni_rep_pipe_init, + .pipe_fini = nni_rep_pipe_fini, + .pipe_add = nni_rep_pipe_add, + .pipe_rem = nni_rep_pipe_rem, + .pipe_send = nni_rep_pipe_send, + .pipe_recv = nni_rep_pipe_recv, +}; + +nni_proto nni_rep_protocol = { .proto_self = NNG_PROTO_REP, .proto_peer = NNG_PROTO_REQ, .proto_name = "rep", - .proto_create = nni_rep_create, - .proto_destroy = nni_rep_destroy, - .proto_add_pipe = nni_rep_add_pipe, - .proto_rem_pipe = nni_rep_rem_pipe, - .proto_pipe_size = sizeof (nni_rep_pipe), - .proto_pipe_send = nni_rep_sender, - .proto_pipe_recv = nni_rep_receiver, + .proto_pipe = &nni_rep_proto_pipe, + .proto_init = nni_rep_init, + .proto_fini = nni_rep_fini, .proto_setopt = nni_rep_setopt, .proto_getopt = nni_rep_getopt, .proto_recv_filter = nni_rep_recvfilter, diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index ba7d4f6b..c26d66e8 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -31,7 +31,6 @@ struct nni_req_sock { nni_thr resender; int raw; int closing; - nni_list pipes; nni_msg * reqmsg; uint32_t nextid; // next id uint8_t reqid[4]; // outstanding request ID (big endian) @@ -45,12 +44,10 @@ struct nni_req_pipe { nni_list_node node; }; -static void nni_req_receiver(void *); -static void nni_req_sender(void *); static void nni_req_resender(void *); static int -nni_req_create(void **reqp, nni_sock *sock) +nni_req_init(void **reqp, nni_sock *sock) { nni_req_sock *req; int rv; @@ -74,7 +71,6 @@ nni_req_create(void **reqp, nni_sock *sock) req->reqmsg = NULL; req->raw = 0; req->resend = NNI_TIME_ZERO; - NNI_LIST_INIT(&req->pipes, nni_req_pipe, node); req->uwq = nni_sock_sendq(sock); req->urq = nni_sock_recvq(sock); @@ -93,7 +89,7 @@ nni_req_create(void **reqp, nni_sock *sock) static void -nni_req_destroy(void *arg) +nni_req_fini(void *arg) { nni_req_sock *req = arg; @@ -112,37 +108,49 @@ nni_req_destroy(void *arg) static int -nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data) +nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock) { - nni_req_sock *req = arg; - nni_req_pipe *rp = data; - int rv; + nni_req_pipe *rp; + if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) { + return (NNG_ENOMEM); + } rp->pipe = pipe; rp->sigclose = 0; - rp->req = req; - - nni_mtx_lock(&req->mx); - nni_list_append(&req->pipes, rp); - nni_mtx_unlock(&req->mx); + rp->req = rsock; return (0); } static void -nni_req_rem_pipe(void *arg, void *data) +nni_req_pipe_fini(void *arg) { - nni_req_sock *req = arg; - nni_req_pipe *rp = data; + nni_req_pipe *rp = arg; - nni_mtx_lock(&req->mx); - nni_list_remove(&req->pipes, rp); - nni_mtx_unlock(&req->mx); + NNI_FREE_STRUCT(rp); +} + + +static int +nni_req_pipe_add(void *arg) +{ + // We have nothing to do, since we don't need to maintain a global + // list of related pipes. + NNI_ARG_UNUSED(arg); + return (0); +} + + +static void +nni_req_pipe_rem(void *arg) +{ + // As with add, nothing to do here. + NNI_ARG_UNUSED(arg); } static void -nni_req_sender(void *arg) +nni_req_pipe_send(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; @@ -169,7 +177,7 @@ nni_req_sender(void *arg) static void -nni_req_receiver(void *arg) +nni_req_pipe_recv(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; @@ -397,19 +405,24 @@ nni_req_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -struct nni_protocol nni_req_protocol = { +static nni_proto_pipe nni_req_proto_pipe = { + .pipe_init = nni_req_pipe_init, + .pipe_fini = nni_req_pipe_fini, + .pipe_add = nni_req_pipe_add, + .pipe_rem = nni_req_pipe_rem, + .pipe_send = nni_req_pipe_send, + .pipe_recv = nni_req_pipe_recv, +}; + +nni_proto nni_req_protocol = { .proto_self = NNG_PROTO_REQ, .proto_peer = NNG_PROTO_REP, .proto_name = "req", - .proto_create = nni_req_create, - .proto_destroy = nni_req_destroy, - .proto_add_pipe = nni_req_add_pipe, - .proto_rem_pipe = nni_req_rem_pipe, + .proto_pipe = &nni_req_proto_pipe, + .proto_init = nni_req_init, + .proto_fini = nni_req_fini, .proto_setopt = nni_req_setopt, .proto_getopt = nni_req_getopt, - .proto_pipe_size = sizeof (nni_req_pipe), - .proto_pipe_send = nni_req_sender, - .proto_pipe_recv = nni_req_receiver, .proto_recv_filter = nni_req_recvfilter, .proto_send_filter = nni_req_sendfilter, }; |
