diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-07 21:49:48 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-07 21:52:30 -0800 |
| commit | bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 (patch) | |
| tree | 55ca7c800e9dfa54bb58b3f2323b1cb5996fab09 /src | |
| parent | ffdceebc19214f384f1b1b6b358f1b2301384135 (diff) | |
| download | nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.gz nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.bz2 nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.zip | |
Simplify locking for protocols.
In an attempt to simplify the protocol implementation, and hopefully
track down a close related race, we've made it so that most protocols
need not worry about locks, and can access the socket lock if they do
need a lock. They also let the socket manage their workers, for the
most part. (The req protocol is special, since it needs a top level
work distributor, *and* a resender.)
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/defs.h | 27 | ||||
| -rw-r--r-- | src/core/pipe.c | 32 | ||||
| -rw-r--r-- | src/core/pipe.h | 1 | ||||
| -rw-r--r-- | src/core/protocol.h | 52 | ||||
| -rw-r--r-- | src/core/socket.c | 120 | ||||
| -rw-r--r-- | src/core/socket.h | 63 | ||||
| -rw-r--r-- | src/core/thread.c | 8 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 40 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 79 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 93 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 70 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 87 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 99 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 93 |
14 files changed, 421 insertions, 443 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index 9ae734f7..98f4e661 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -18,24 +18,25 @@ #define NNI_ARG_UNUSED(x) ((void) x); // These types are common but have names shared with user space. -typedef struct nng_socket nni_sock; -typedef struct nng_endpoint nni_ep; -typedef struct nng_pipe nni_pipe; -typedef struct nng_msg nni_msg; -typedef struct nng_sockaddr nni_sockaddr; +typedef struct nng_socket nni_sock; +typedef struct nng_endpoint nni_ep; +typedef struct nng_pipe nni_pipe; +typedef struct nng_msg nni_msg; +typedef struct nng_sockaddr nni_sockaddr; // These are our own names. -typedef struct nni_tran nni_tran; -typedef struct nni_tran_ep nni_tran_ep; -typedef struct nni_tran_pipe nni_tran_pipe; +typedef struct nni_tran nni_tran; +typedef struct nni_tran_ep nni_tran_ep; +typedef struct nni_tran_pipe nni_tran_pipe; -typedef struct nni_proto_pipe nni_proto_pipe; -typedef struct nni_proto nni_proto; +typedef struct nni_proto_sock_ops nni_proto_sock_ops; +typedef struct nni_proto_pipe_ops nni_proto_pipe_ops; +typedef struct nni_proto nni_proto; -typedef int nni_signal; // Turnstile/wakeup channel. -typedef uint64_t nni_time; // Absolute time (usec). -typedef int64_t nni_duration; // Relative time (usec). +typedef int nni_signal; // Wakeup channel. +typedef uint64_t nni_time; // Abs. time (usec). +typedef int64_t nni_duration; // Rel. time (usec). // Used by transports for scatter gather I/O. typedef struct { diff --git a/src/core/pipe.c b/src/core/pipe.c index e49ad05a..b3a107ff 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -77,7 +77,7 @@ nni_pipe_destroy(nni_pipe *p) p->p_tran_ops.pipe_destroy(p->p_tran_data); } if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_fini(p->p_proto_data); + p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data); } NNI_FREE_STRUCT(p); } @@ -88,8 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) { nni_pipe *p; nni_sock *sock = ep->ep_sock; - nni_proto *proto = &sock->s_proto; - const nni_proto_pipe *pops = proto->proto_pipe; + const nni_proto_pipe_ops *ops = &sock->s_pipe_ops; + void *pdata; int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { @@ -105,23 +105,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *ep->ep_tran->tran_pipe; - // Make a copy of the protocol ops. Same rationale. - p->p_proto_ops = *pops; - - if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) { + if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) { NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data); - if (rv != 0) { - pops->pipe_fini(&p->p_proto_data); + p->p_proto_data = pdata; + if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) { + ops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data); - if (rv != 0) { + if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) { nni_thr_fini(&p->p_recv_thr); - pops->pipe_fini(&p->p_proto_data); + ops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } @@ -155,9 +151,16 @@ nni_pipe_start(nni_pipe *pipe) nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); return (NNG_ECLOSED); } + if (nni_pipe_peer(pipe) != sock->s_peer) { + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + return (NNG_EPROTO); + } + do { // We generate a new pipe ID, but we make sure it does not // collide with any we already have. This can only normally @@ -174,8 +177,7 @@ nni_pipe_start(nni_pipe *pipe) } } while (collide); - rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data); - if (rv != 0) { + if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { nni_mtx_unlock(&sock->s_mx); nni_pipe_close(pipe); return (rv); diff --git a/src/core/pipe.h b/src/core/pipe.h index 242e1d8c..cc14de86 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -21,7 +21,6 @@ struct nng_pipe { uint32_t p_id; nni_tran_pipe p_tran_ops; void * p_tran_data; - nni_proto_pipe p_proto_ops; void * p_proto_data; nni_list_node p_node; nni_sock * p_sock; diff --git a/src/core/protocol.h b/src/core/protocol.h index c858ad39..000e9c64 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,5 +1,5 @@ // -// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Garrett D'Amore <garrett@damore.org> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -21,7 +21,7 @@ // implementations. // nni_proto_pipe contains protocol-specific per-pipe operations. -struct nni_proto_pipe { +struct nni_proto_pipe_ops { // pipe_init creates the protocol-specific per pipe data structure. // The last argument is the per-socket protocol private data. int (*pipe_init)(void **, nni_pipe *, void *); @@ -55,32 +55,52 @@ struct nni_proto_pipe { void (*pipe_recv)(void *); }; -struct nni_proto { - uint16_t proto_self; // our 16-bit protocol ID - uint16_t proto_peer; // who we peer with (ID) - const char * proto_name; // string version of our name - const nni_proto_pipe * proto_pipe; // Per-pipe operations. +struct nni_proto_sock_ops { + // sock_initf creates the protocol instance, which will be stored on + // the socket. This is run without the sock lock held, and allocates + // storage or other resources for the socket. + int (*sock_init)(void **, nni_sock *); - // Create protocol instance, which will be stored on the socket. - int (*proto_init)(void **, nni_sock *); + // sock_fini destroys the protocol instance. This is run without the + // socket lock held, and is intended to release resources. It may + // block as needed. + void (*sock_fini)(void *); - // Destroy the protocol instance. - void (*proto_fini)(void *); + // Close the protocol instance. This is run with the lock held, + // and intended to initiate closure of the socket. For example, + // it can signal the socket worker threads to exit. + void (*sock_close)(void *); // Option manipulation. These may be NULL. - int (*proto_setopt)(void *, int, const void *, - size_t); - int (*proto_getopt)(void *, int, void *, size_t *); + int (*sock_setopt)(void *, int, const void *, size_t); + int (*sock_getopt)(void *, int, void *, size_t *); + + // sock_send is a send worker. It can really be anything, but it + // is run in a separate thread (if it is non-NULL). + void (*sock_send)(void *); + + // sock_recv is a receive worker. As with send it can really be + // anything, its just a thread that runs for the duration of the + // socket. + void (*sock_recv)(void *); // Receive filter. This may be NULL, but if it isn't, then // messages coming into the system are routed here just before being // delivered to the application. To drop the message, the prtocol // should return NULL, otherwise the message (possibly modified). - nni_msg * (*proto_recv_filter)(void *, nni_msg *); + nni_msg * (*sock_rfilter)(void *, nni_msg *); // Send filter. This may be NULL, but if it isn't, then messages // here are filtered just after they come from the application. - nni_msg * (*proto_send_filter)(void *, nni_msg *); + nni_msg * (*sock_sfilter)(void *, nni_msg *); +}; + +struct nni_proto { + uint16_t proto_self; // our 16-bit D + uint16_t proto_peer; // who we peer with (ID) + const char * proto_name; // Our name + const nni_proto_sock_ops * proto_sock_ops; // Per-socket opeations + const nni_proto_pipe_ops * proto_pipe_ops; // Per-pipe operations. }; // These functions are not used by protocols, but rather by the socket diff --git a/src/core/socket.c b/src/core/socket.c index 4e8eccbf..2ef35d7a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -56,7 +56,7 @@ nni_reaper(void *arg) // Note that if a protocol has rejected the pipe, it // won't have any data. if (pipe->p_active) { - pipe->p_proto_ops.pipe_rem(pipe->p_proto_data); + sock->s_pipe_ops.pipe_rem(pipe->p_proto_data); } nni_mtx_unlock(&sock->s_mx); @@ -78,6 +78,13 @@ nni_reaper(void *arg) } +nni_mtx * +nni_sock_mtx(nni_sock *sock) +{ + return (&sock->s_mx); +} + + static nni_msg * nni_sock_nullfilter(void *arg, nni_msg *mp) { @@ -108,6 +115,22 @@ nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) } +static void +nni_sock_nullop(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static int +nni_sock_nulladdpipe(void *arg) +{ + NNI_ARG_UNUSED(arg); + + return (0); +} + + // nn_sock_open creates the underlying socket. int nni_sock_open(nni_sock **sockp, uint16_t pnum) @@ -124,7 +147,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } // We make a copy of the protocol operations. - sock->s_proto = *proto; + sock->s_protocol = proto->proto_self; + sock->s_peer = proto->proto_peer; sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; @@ -135,6 +159,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); + sock->s_sock_ops = *proto->proto_sock_ops; + if (sock->s_sock_ops.sock_sfilter == NULL) { + sock->s_sock_ops.sock_sfilter = nni_sock_nullfilter; + } + if (sock->s_sock_ops.sock_rfilter == NULL) { + sock->s_sock_ops.sock_rfilter = nni_sock_nullfilter; + } + if (sock->s_sock_ops.sock_getopt == NULL) { + sock->s_sock_ops.sock_getopt = nni_sock_nullgetopt; + } + if (sock->s_sock_ops.sock_setopt == NULL) { + sock->s_sock_ops.sock_setopt = nni_sock_nullsetopt; + } + if (sock->s_sock_ops.sock_close == NULL) { + sock->s_sock_ops.sock_close = nni_sock_nullop; + } + sock->s_pipe_ops = *proto->proto_pipe_ops; + if (sock->s_pipe_ops.pipe_add == NULL) { + sock->s_pipe_ops.pipe_add = nni_sock_nulladdpipe; + } + if (sock->s_pipe_ops.pipe_rem == NULL) { + sock->s_pipe_ops.pipe_rem = nni_sock_nullop; + } + if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { NNI_FREE_STRUCT(sock); return (rv); @@ -145,6 +193,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) return (rv); } + if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -161,14 +210,14 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); + nni_thr_fini(&sock->s_recver); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); } - if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) { nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); @@ -177,19 +226,39 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_FREE_STRUCT(sock); return (rv); } - if (sock->s_proto.proto_send_filter == NULL) { - sock->s_proto.proto_send_filter = nni_sock_nullfilter; - } - if (sock->s_proto.proto_recv_filter == NULL) { - sock->s_proto.proto_recv_filter = nni_sock_nullfilter; - } - if (sock->s_proto.proto_getopt == NULL) { - sock->s_proto.proto_getopt = nni_sock_nullgetopt; + + // NB: If these functions are NULL, the thread initialization is + // largely a NO-OP. The system won't actually create the threads. + rv = nni_thr_init(&sock->s_sender, sock->s_sock_ops.sock_send, + sock->s_data); + if (rv != 0) { + nni_thr_wait(&sock->s_reaper); + sock->s_sock_ops.sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_thr_fini(&sock->s_reaper); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); } - if (sock->s_proto.proto_setopt == NULL) { - sock->s_proto.proto_setopt = nni_sock_nullsetopt; + rv = nni_thr_init(&sock->s_recver, sock->s_sock_ops.sock_recv, + sock->s_data); + if (rv != 0) { + nni_thr_wait(&sock->s_sender); + sock->s_sock_ops.sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_thr_fini(&sock->s_sender); + nni_thr_fini(&sock->s_reaper); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); } + + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_recver); + nni_thr_run(&sock->s_sender); *sockp = sock; return (0); } @@ -265,10 +334,14 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_lock(&sock->s_mx); } + sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); - // Wait for the reaper to exit. + // Wait for the threads to exit. + nni_thr_wait(&sock->s_sender); + nni_thr_wait(&sock->s_recver); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -297,7 +370,7 @@ nni_sock_close(nni_sock *sock) // the results may be tragic. // The protocol needs to clean up its state. - sock->s_proto.proto_fini(sock->s_data); + sock->s_sock_ops.sock_fini(sock->s_data); // And we need to clean up *our* state. nni_thr_fini(&sock->s_reaper); @@ -328,9 +401,10 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) return (rv); } besteffort = sock->s_besteffort; + + msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); nni_mtx_unlock(&sock->s_mx); - msg = sock->s_proto.proto_send_filter(sock->s_data, msg); if (msg == NULL) { return (0); } @@ -372,7 +446,9 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - msg = sock->s_proto.proto_recv_filter(sock->s_data, msg); + nni_mtx_lock(&sock->s_mx); + msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); + nni_mtx_unlock(&sock->s_mx); if (msg != NULL) { break; } @@ -388,14 +464,14 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_proto.proto_self); + return (sock->s_protocol); } uint16_t nni_sock_peer(nni_sock *sock) { - return (sock->s_proto.proto_peer); + return (sock->s_peer); } @@ -488,7 +564,7 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); + rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); return (rv); @@ -533,7 +609,7 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); + rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); return (rv); diff --git a/src/core/socket.h b/src/core/socket.h index 62347f61..197b5d0d 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -14,36 +14,42 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. struct nng_socket { - nni_mtx s_mx; - nni_cv s_cv; + nni_mtx s_mx; + nni_cv s_cv; - nni_msgq * s_uwq; // Upper write queue - nni_msgq * s_urq; // Upper read queue + nni_msgq * s_uwq; // Upper write queue + nni_msgq * s_urq; // Upper read queue - nni_proto s_proto; + uint16_t s_protocol; + uint16_t s_peer; - void * s_data; // Protocol private + nni_proto_pipe_ops s_pipe_ops; + nni_proto_sock_ops s_sock_ops; - // XXX: options - nni_duration s_linger; // linger time - nni_duration s_sndtimeo; // send timeout - nni_duration s_rcvtimeo; // receive timeout - nni_duration s_reconn; // reconnect time - nni_duration s_reconnmax; // max reconnect time - - nni_list s_eps; // active endpoints - nni_list s_pipes; // pipes for this socket - - nni_list s_reaps; // pipes to reap - nni_thr s_reaper; + void * s_data; // Protocol private - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use - int s_recverr; // Protocol state machine use - - uint32_t s_nextid; // Next Pipe ID. + // XXX: options + nni_duration s_linger; // linger time + nni_duration s_sndtimeo; // send timeout + nni_duration s_rcvtimeo; // receive timeout + nni_duration s_reconn; // reconnect time + nni_duration s_reconnmax; // max reconnect time + + nni_list s_eps; // active endpoints + nni_list s_pipes; // pipes for this socket + + nni_list s_reaps; // pipes to reap + nni_thr s_reaper; + nni_thr s_sender; + nni_thr s_recver; + + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use + int s_recverr; // Protocol state machine use + + uint32_t s_nextid; // Next Pipe ID. }; extern int nni_sock_open(nni_sock **, uint16_t); @@ -76,5 +82,12 @@ extern nni_msgq *nni_sock_sendq(nni_sock *); // inject incoming messages from pipes to it. extern nni_msgq *nni_sock_recvq(nni_sock *); +// nni_sock_mtx obtains the socket mutex. This is for protocols to use +// from separate threads; they must not hold the lock for extended periods. +// Additionally, this can only be acquired from separate threads. The +// synchronous entry points (excluding the send/recv thread workers) will +// be called with this lock already held. We expose the mutex directly +// here so that protocols can use it to initialize condvars. +extern nni_mtx *nni_sock_mtx(nni_sock *); #endif // CORE_SOCKET_H diff --git a/src/core/thread.c b/src/core/thread.c index 7678ad0a..4a871ede 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -128,6 +128,10 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) nni_plat_mtx_fini(&thr->mtx); return (rv); } + if (fn == NULL) { + thr->done = 1; + return (0); + } if ((rv = nni_plat_thr_init(&thr->thr, nni_thr_wrap, thr)) != 0) { nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); @@ -170,7 +174,9 @@ nni_thr_fini(nni_thr *thr) nni_plat_cv_wait(&thr->cv); } nni_plat_mtx_unlock(&thr->mtx); - nni_plat_thr_fini(&thr->thr); + if (thr->fn != NULL) { + nni_plat_thr_fini(&thr->thr); + } nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); } diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 830a02e6..51608e07 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -41,7 +41,7 @@ static void nni_pair_receiver(void *); static void nni_pair_sender(void *); static int -nni_pair_init(void **pairp, nni_sock *sock) +nni_pair_sock_init(void **pairp, nni_sock *sock) { nni_pair_sock *pair; int rv; @@ -59,7 +59,7 @@ nni_pair_init(void **pairp, nni_sock *sock) static void -nni_pair_fini(void *arg) +nni_pair_sock_fini(void *arg) { nni_pair_sock *pair = arg; @@ -102,9 +102,6 @@ nni_pair_pipe_add(void *arg) nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; - if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PAIR) { - return (NNG_EPROTO); - } if (pair->pipe != NULL) { return (NNG_EBUSY); // Already have a peer, denied. } @@ -182,14 +179,14 @@ nni_pair_pipe_recv(void *arg) // TODO: probably we could replace these with NULL, since we have no // protocol specific options? static int -nni_pair_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { return (NNG_ENOTSUP); } static int -nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { return (NNG_ENOTSUP); } @@ -198,7 +195,7 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pair_proto_pipe = { +static nni_proto_pipe_ops nni_pair_pipe_ops = { .pipe_init = nni_pair_pipe_init, .pipe_fini = nni_pair_pipe_fini, .pipe_add = nni_pair_pipe_add, @@ -207,15 +204,22 @@ static nni_proto_pipe nni_pair_proto_pipe = { .pipe_recv = nni_pair_pipe_recv, }; +static nni_proto_sock_ops nni_pair_sock_ops = { + .sock_init = nni_pair_sock_init, + .sock_fini = nni_pair_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pair_sock_setopt, + .sock_getopt = nni_pair_sock_getopt, + .sock_rfilter = NULL, + .sock_sfilter = NULL, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_pair_proto = { - .proto_self = NNG_PROTO_PAIR, - .proto_peer = NNG_PROTO_PAIR, - .proto_name = "pair", - .proto_pipe = &nni_pair_proto_pipe, - .proto_init = nni_pair_init, - .proto_fini = nni_pair_fini, - .proto_setopt = nni_pair_setopt, - .proto_getopt = nni_pair_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PAIR, + .proto_peer = NNG_PROTO_PAIR, + .proto_name = "pair", + .proto_sock_ops = &nni_pair_sock_ops, + .proto_pipe_ops = &nni_pair_pipe_ops, }; diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 695b730e..9612177c 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -19,7 +19,6 @@ typedef struct nni_pull_sock nni_pull_sock; // An nni_pull_sock is our per-socket protocol private structure. struct nni_pull_sock { - nni_mtx mx; nni_msgq * urq; int raw; }; @@ -31,7 +30,7 @@ struct nni_pull_pipe { }; static int -nni_pull_init(void **pullp, nni_sock *sock) +nni_pull_sock_init(void **pullp, nni_sock *sock) { nni_pull_sock *pull; int rv; @@ -39,10 +38,6 @@ nni_pull_init(void **pullp, nni_sock *sock) if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pull->mx)) != 0) { - NNI_FREE_STRUCT(pull); - return (rv); - } pull->raw = 0; pull->urq = nni_sock_recvq(sock); *pullp = pull; @@ -52,11 +47,10 @@ nni_pull_init(void **pullp, nni_sock *sock) static void -nni_pull_fini(void *arg) +nni_pull_sock_fini(void *arg) { nni_pull_sock *pull = arg; - nni_mtx_fini(&pull->mx); NNI_FREE_STRUCT(pull); } @@ -86,32 +80,6 @@ nni_pull_pipe_fini(void *arg) } -static int -nni_pull_pipe_add(void *arg) -{ - nni_pull_pipe *pp = arg; - - if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) { - return (NNG_EPROTO); - } - return (0); -} - - -static void -nni_pull_pipe_rem(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - -static void -nni_pull_pipe_send(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - static void nni_pull_pipe_recv(void *arg) { @@ -133,16 +101,14 @@ nni_pull_pipe_recv(void *arg) static int -nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pull_sock *pull = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pull->mx); rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1); - nni_mtx_unlock(&pull->mx); break; default: rv = NNG_ENOTSUP; @@ -152,16 +118,14 @@ nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pull_sock *pull = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pull->mx); rv = nni_getopt_int(&pull->raw, buf, szp); - nni_mtx_unlock(&pull->mx); break; default: rv = NNG_ENOTSUP; @@ -172,24 +136,31 @@ nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pull_proto_pipe = { +static nni_proto_pipe_ops nni_pull_pipe_ops = { .pipe_init = nni_pull_pipe_init, .pipe_fini = nni_pull_pipe_fini, - .pipe_add = nni_pull_pipe_add, - .pipe_rem = nni_pull_pipe_rem, - .pipe_send = nni_pull_pipe_send, + .pipe_add = NULL, + .pipe_rem = NULL, + .pipe_send = NULL, .pipe_recv = nni_pull_pipe_recv, }; +static nni_proto_sock_ops nni_pull_sock_ops = { + .sock_init = nni_pull_sock_init, + .sock_fini = nni_pull_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pull_sock_setopt, + .sock_getopt = nni_pull_sock_getopt, + .sock_send = NULL, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_pull_proto = { - .proto_self = NNG_PROTO_PULL, - .proto_peer = NNG_PROTO_PUSH, - .proto_name = "pull", - .proto_pipe = &nni_pull_proto_pipe, - .proto_init = nni_pull_init, - .proto_fini = nni_pull_fini, - .proto_setopt = nni_pull_setopt, - .proto_getopt = nni_pull_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PULL, + .proto_peer = NNG_PROTO_PUSH, + .proto_name = "pull", + .proto_pipe_ops = &nni_pull_pipe_ops, + .proto_sock_ops = &nni_pull_sock_ops, }; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index de774125..6cdc9cc5 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -21,16 +21,15 @@ typedef struct nni_push_sock nni_push_sock; // An nni_push_sock is our per-socket protocol private structure. struct nni_push_sock { - nni_mtx mx; nni_cv cv; nni_msgq * uwq; - nni_thr sender; int raw; int closing; int wantw; nni_list pipes; nni_push_pipe * nextpipe; int npipes; + nni_sock * sock; }; // An nni_push_pipe is our per-pipe protocol private structure. @@ -42,10 +41,8 @@ struct nni_push_pipe { nni_list_node node; }; -static void nni_push_rrdist(void *); - static int -nni_push_init(void **pushp, nni_sock *sock) +nni_push_sock_init(void **pushp, nni_sock *sock) { nni_push_sock *push; int rv; @@ -53,12 +50,7 @@ nni_push_init(void **pushp, nni_sock *sock) if ((push = NNI_ALLOC_STRUCT(push)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&push->mx)) != 0) { - NNI_FREE_STRUCT(push); - return (rv); - } - if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) { - nni_mtx_fini(&push->mx); + if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) { NNI_FREE_STRUCT(push); return (rv); } @@ -67,36 +59,32 @@ nni_push_init(void **pushp, nni_sock *sock) push->npipes = 0; push->wantw = 0; push->nextpipe = NULL; + push->sock = sock; push->uwq = nni_sock_sendq(sock); *pushp = push; nni_sock_recverr(sock, NNG_ENOTSUP); - rv = nni_thr_init(&push->sender, nni_push_rrdist, push); - if (rv != 0) { - nni_cv_fini(&push->cv); - nni_mtx_fini(&push->mx); - NNI_FREE_STRUCT(push); - return (rv); - } - nni_thr_run(&push->sender); return (0); } static void -nni_push_fini(void *arg) +nni_push_sock_close(void *arg) { nni_push_sock *push = arg; // Shut down the resender. We request it to exit by clearing // its old value, then kick it. - nni_mtx_lock(&push->mx); push->closing = 1; nni_cv_wake(&push->cv); - nni_mtx_unlock(&push->mx); +} + + +static void +nni_push_sock_fini(void *arg) +{ + nni_push_sock *push = arg; - nni_thr_fini(&push->sender); nni_cv_fini(&push->cv); - nni_mtx_fini(&push->mx); NNI_FREE_STRUCT(push); } @@ -142,9 +130,6 @@ nni_push_pipe_add(void *arg) if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) { return (NNG_EPROTO); } - // Wake the sender since we have a new pipe. - nni_mtx_lock(&push->mx); - // Turns out it should not really matter where we stick this. // The end makes our test cases easier. nni_list_append(&push->pipes, pp); @@ -152,7 +137,6 @@ nni_push_pipe_add(void *arg) // Wake the top sender, as we can accept a job. push->npipes++; nni_cv_wake(&push->cv); - nni_mtx_unlock(&push->mx); return (0); } @@ -163,13 +147,11 @@ nni_push_pipe_rem(void *arg) nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; - nni_mtx_lock(&push->mx); if (pp == push->nextpipe) { push->nextpipe = nni_list_next(&push->pipes, pp); } push->npipes--; nni_list_remove(&push->pipes, pp); - nni_mtx_unlock(&push->mx); } @@ -178,17 +160,18 @@ nni_push_pipe_send(void *arg) { nni_push_pipe *pp = arg; nni_push_sock *push = pp->push; + nni_mtx *mx = nni_sock_mtx(push->sock); nni_msg *msg; for (;;) { if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) { break; } - nni_mtx_lock(&push->mx); + nni_mtx_lock(mx); if (push->wantw) { nni_cv_wake(&push->cv); } - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); if (nni_pipe_send(pp->pipe, msg) != 0) { nni_msg_free(msg); break; @@ -216,16 +199,14 @@ nni_push_pipe_recv(void *arg) static int -nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_push_sock *push = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&push->mx); rv = nni_setopt_int(&push->raw, buf, sz, 0, 1); - nni_mtx_unlock(&push->mx); break; default: rv = NNG_ENOTSUP; @@ -235,16 +216,14 @@ nni_push_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_push_sock *push = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&push->mx); rv = nni_getopt_int(&push->raw, buf, szp); - nni_mtx_unlock(&push->mx); break; default: rv = NNG_ENOTSUP; @@ -254,12 +233,13 @@ nni_push_getopt(void *arg, int opt, void *buf, size_t *szp) static void -nni_push_rrdist(void *arg) +nni_push_sock_send(void *arg) { nni_push_sock *push = arg; nni_push_pipe *pp; nni_msgq *uwq = push->uwq; nni_msg *msg = NULL; + nni_mtx *mx = nni_sock_mtx(push->sock); int rv; int i; @@ -269,10 +249,10 @@ nni_push_rrdist(void *arg) return; } - nni_mtx_lock(&push->mx); + nni_mtx_lock(mx); if (push->closing) { if (msg != NULL) { - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); nni_msg_free(msg); return; } @@ -296,14 +276,14 @@ nni_push_rrdist(void *arg) } else { push->wantw = 0; } - nni_mtx_unlock(&push->mx); + nni_mtx_unlock(mx); } } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_push_proto_pipe = { +static nni_proto_pipe_ops nni_push_pipe_ops = { .pipe_init = nni_push_pipe_init, .pipe_fini = nni_push_pipe_fini, .pipe_add = nni_push_pipe_add, @@ -312,15 +292,22 @@ static nni_proto_pipe nni_push_proto_pipe = { .pipe_recv = nni_push_pipe_recv, }; +static nni_proto_sock_ops nni_push_sock_ops = { + .sock_init = nni_push_sock_init, + .sock_fini = nni_push_sock_fini, + .sock_close = nni_push_sock_close, + .sock_setopt = nni_push_sock_setopt, + .sock_getopt = nni_push_sock_getopt, + .sock_send = nni_push_sock_send, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_push_proto = { - .proto_self = NNG_PROTO_PUSH, - .proto_peer = NNG_PROTO_PULL, - .proto_name = "push", - .proto_pipe = &nni_push_proto_pipe, - .proto_init = nni_push_init, - .proto_fini = nni_push_fini, - .proto_setopt = nni_push_setopt, - .proto_getopt = nni_push_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PUSH, + .proto_peer = NNG_PROTO_PULL, + .proto_name = "push", + .proto_pipe_ops = &nni_push_pipe_ops, + .proto_sock_ops = &nni_push_sock_ops, }; diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 860c7c7d..684b916d 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -23,10 +23,8 @@ typedef struct nni_pub_sock nni_pub_sock; // An nni_pub_sock is our per-socket protocol private structure. struct nni_pub_sock { nni_sock * sock; - nni_mtx mx; nni_msgq * uwq; int raw; - nni_thr sender; nni_list pipes; }; @@ -39,10 +37,10 @@ struct nni_pub_pipe { int sigclose; }; -static void nni_pub_broadcast(void *); +static void nni_pub_sock_send(void *); static int -nni_pub_init(void **pubp, nni_sock *sock) +nni_pub_sock_init(void **pubp, nni_sock *sock) { nni_pub_sock *pub; int rv; @@ -50,36 +48,23 @@ nni_pub_init(void **pubp, nni_sock *sock) if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&pub->mx)) != 0) { - NNI_FREE_STRUCT(pub); - return (rv); - } pub->sock = sock; pub->raw = 0; NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node); pub->uwq = nni_sock_sendq(sock); - rv = nni_thr_init(&pub->sender, nni_pub_broadcast, pub); - if (rv != 0) { - nni_mtx_fini(&pub->mx); - NNI_FREE_STRUCT(pub); - return (rv); - } *pubp = pub; nni_sock_recverr(sock, NNG_ENOTSUP); - nni_thr_run(&pub->sender); return (0); } static void -nni_pub_fini(void *arg) +nni_pub_sock_fini(void *arg) { nni_pub_sock *pub = arg; - nni_thr_fini(&pub->sender); - nni_mtx_fini(&pub->mx); NNI_FREE_STRUCT(pub); } @@ -125,10 +110,7 @@ nni_pub_pipe_add(void *arg) if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) { return (NNG_EPROTO); } - nni_mtx_lock(&pub->mx); nni_list_append(&pub->pipes, pp); - nni_mtx_unlock(&pub->mx); - return (0); } @@ -139,18 +121,17 @@ nni_pub_pipe_rem(void *arg) nni_pub_pipe *pp = arg; nni_pub_sock *pub = pp->pub; - nni_mtx_lock(&pub->mx); nni_list_remove(&pub->pipes, pp); - nni_mtx_unlock(&pub->mx); } static void -nni_pub_broadcast(void *arg) +nni_pub_sock_send(void *arg) { nni_pub_sock *pub = arg; nni_msgq *uwq = pub->uwq; nni_msg *msg, *dup; + nni_mtx *mx = nni_sock_mtx(pub->sock); for (;;) { nni_pub_pipe *pp; @@ -161,7 +142,7 @@ nni_pub_broadcast(void *arg) break; } - nni_mtx_lock(&pub->mx); + nni_mtx_lock(mx); last = nni_list_last(&pub->pipes); NNI_LIST_FOREACH (&pub->pipes, pp) { if (pp != last) { @@ -176,7 +157,7 @@ nni_pub_broadcast(void *arg) nni_msg_free(dup); } } - nni_mtx_unlock(&pub->mx); + nni_mtx_unlock(mx); if (last == NULL) { nni_msg_free(msg); @@ -236,16 +217,14 @@ nni_pub_pipe_recv(void *arg) static int -nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_pub_sock *pub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pub->mx); rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1); - nni_mtx_unlock(&pub->mx); break; default: rv = NNG_ENOTSUP; @@ -255,16 +234,14 @@ nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_pub_sock *pub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&pub->mx); rv = nni_getopt_int(&pub->raw, buf, szp); - nni_mtx_unlock(&pub->mx); break; default: rv = NNG_ENOTSUP; @@ -275,7 +252,7 @@ nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_pub_proto_pipe = { +static nni_proto_pipe_ops nni_pub_pipe_ops = { .pipe_init = nni_pub_pipe_init, .pipe_fini = nni_pub_pipe_fini, .pipe_add = nni_pub_pipe_add, @@ -284,15 +261,22 @@ static nni_proto_pipe nni_pub_proto_pipe = { .pipe_recv = nni_pub_pipe_recv, }; +nni_proto_sock_ops nni_pub_sock_ops = { + .sock_init = nni_pub_sock_init, + .sock_fini = nni_pub_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_pub_sock_setopt, + .sock_getopt = nni_pub_sock_getopt, + .sock_send = nni_pub_sock_send, + .sock_recv = NULL, + .sock_rfilter = NULL, + .sock_sfilter = NULL, +}; + nni_proto nni_pub_proto = { - .proto_self = NNG_PROTO_PUB, - .proto_peer = NNG_PROTO_SUB, - .proto_name = "pub", - .proto_pipe = &nni_pub_proto_pipe, - .proto_init = nni_pub_init, - .proto_fini = nni_pub_fini, - .proto_setopt = nni_pub_setopt, - .proto_getopt = nni_pub_getopt, - .proto_recv_filter = NULL, - .proto_send_filter = NULL, + .proto_self = NNG_PROTO_PUB, + .proto_peer = NNG_PROTO_SUB, + .proto_name = "pub", + .proto_sock_ops = &nni_pub_sock_ops, + .proto_pipe_ops = &nni_pub_pipe_ops, }; diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 19c06aa0..dd288d5e 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -29,7 +29,6 @@ struct nni_sub_topic { // An nni_rep_sock is our per-socket protocol private structure. struct nni_sub_sock { nni_sock * sock; - nni_mtx mx; nni_list topics; nni_msgq * urq; int raw; @@ -42,7 +41,7 @@ struct nni_sub_pipe { }; static int -nni_sub_init(void **subp, nni_sock *sock) +nni_sub_sock_init(void **subp, nni_sock *sock) { nni_sub_sock *sub; int rv; @@ -50,10 +49,6 @@ nni_sub_init(void **subp, nni_sock *sock) if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&sub->mx)) != 0) { - NNI_FREE_STRUCT(sub); - return (rv); - } NNI_LIST_INIT(&sub->topics, nni_sub_topic, node); sub->sock = sock; sub->raw = 0; @@ -66,7 +61,7 @@ nni_sub_init(void **subp, nni_sock *sock) static void -nni_sub_fini(void *arg) +nni_sub_sock_fini(void *arg) { nni_sub_sock *sub = arg; nni_sub_topic *topic; @@ -76,7 +71,6 @@ nni_sub_fini(void *arg) nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } - nni_mtx_fini(&sub->mx); NNI_FREE_STRUCT(sub); } @@ -106,32 +100,6 @@ nni_sub_pipe_fini(void *arg) } -static int -nni_sub_pipe_add(void *arg) -{ - nni_sub_pipe *sp = arg; - - if (nni_pipe_peer(sp->pipe) != NNG_PROTO_PUB) { - return (NNG_EPROTO); - } - return (0); -} - - -static void -nni_sub_pipe_rem(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - -static void -nni_sub_pipe_send(void *arg) -{ - NNI_ARG_UNUSED(arg); -} - - static void nni_sub_pipe_recv(void *arg) { @@ -242,26 +210,20 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz) static int -nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_sub_sock *sub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&sub->mx); rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1); - nni_mtx_unlock(&sub->mx); break; case NNG_OPT_SUBSCRIBE: - nni_mtx_lock(&sub->mx); rv = nni_sub_subscribe(sub, buf, sz); - nni_mtx_unlock(&sub->mx); break; case NNG_OPT_UNSUBSCRIBE: - nni_mtx_lock(&sub->mx); rv = nni_sub_unsubscribe(sub, buf, sz); - nni_mtx_unlock(&sub->mx); break; default: rv = NNG_ENOTSUP; @@ -271,16 +233,14 @@ nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_sub_sock *sub = arg; int rv; switch (opt) { case NNG_OPT_RAW: - nni_mtx_lock(&sub->mx); rv = nni_getopt_int(&sub->raw, buf, szp); - nni_mtx_unlock(&sub->mx); break; default: rv = NNG_ENOTSUP; @@ -290,7 +250,7 @@ nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * -nni_sub_recvfilter(void *arg, nni_msg *msg) +nni_sub_sock_rfilter(void *arg, nni_msg *msg) { nni_sub_sock *sub = arg; nni_sub_topic *topic; @@ -298,9 +258,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) size_t len; int match; - nni_mtx_lock(&sub->mx); if (sub->raw) { - nni_mtx_unlock(&sub->mx); return (msg); } @@ -326,7 +284,6 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) break; } } - nni_mtx_unlock(&sub->mx); if (!match) { nni_msg_free(msg); return (NULL); @@ -337,23 +294,31 @@ nni_sub_recvfilter(void *arg, nni_msg *msg) // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_sub_proto_pipe = { +static nni_proto_pipe_ops nni_sub_pipe_ops = { .pipe_init = nni_sub_pipe_init, .pipe_fini = nni_sub_pipe_fini, - .pipe_add = nni_sub_pipe_add, - .pipe_rem = nni_sub_pipe_rem, - .pipe_send = nni_sub_pipe_send, + .pipe_add = NULL, + .pipe_rem = NULL, + .pipe_send = NULL, .pipe_recv = nni_sub_pipe_recv, }; +static nni_proto_sock_ops nni_sub_sock_ops = { + .sock_init = nni_sub_sock_init, + .sock_fini = nni_sub_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_sub_sock_setopt, + .sock_getopt = nni_sub_sock_getopt, + .sock_rfilter = nni_sub_sock_rfilter, + .sock_sfilter = NULL, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_sub_proto = { - .proto_self = NNG_PROTO_SUB, - .proto_peer = NNG_PROTO_PUB, - .proto_name = "sub", - .proto_pipe = &nni_sub_proto_pipe, - .proto_init = nni_sub_init, - .proto_fini = nni_sub_fini, - .proto_setopt = nni_sub_setopt, - .proto_getopt = nni_sub_getopt, - .proto_recv_filter = nni_sub_recvfilter, + .proto_self = NNG_PROTO_SUB, + .proto_peer = NNG_PROTO_PUB, + .proto_name = "sub", + .proto_sock_ops = &nni_sub_sock_ops, + .proto_pipe_ops = &nni_sub_pipe_ops, }; diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 56ee2367..8de196c4 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -22,7 +22,6 @@ typedef struct nni_rep_sock nni_rep_sock; // An nni_rep_sock is our per-socket protocol private structure. struct nni_rep_sock { nni_sock * sock; - nni_mtx mx; nni_msgq * uwq; nni_msgq * urq; int raw; @@ -44,7 +43,7 @@ struct nni_rep_pipe { static void nni_rep_topsender(void *); static int -nni_rep_init(void **repp, nni_sock *sock) +nni_rep_sock_init(void **repp, nni_sock *sock) { nni_rep_sock *rep; int rv; @@ -52,17 +51,12 @@ nni_rep_init(void **repp, nni_sock *sock) if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&rep->mx)) != 0) { - NNI_FREE_STRUCT(rep); - return (rv); - } rep->ttl = 8; // Per RFC rep->sock = sock; rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; if ((rv = nni_idhash_create(&rep->pipes)) != 0) { - nni_mtx_fini(&rep->mx); NNI_FREE_STRUCT(rep); return (rv); } @@ -70,28 +64,18 @@ nni_rep_init(void **repp, nni_sock *sock) rep->uwq = nni_sock_sendq(sock); rep->urq = nni_sock_recvq(sock); - rv = nni_thr_init(&rep->sender, nni_rep_topsender, rep); - if (rv != 0) { - nni_idhash_destroy(rep->pipes); - nni_mtx_fini(&rep->mx); - NNI_FREE_STRUCT(rep); - return (rv); - } *repp = rep; nni_sock_senderr(sock, NNG_ESTATE); - nni_thr_run(&rep->sender); return (0); } static void -nni_rep_fini(void *arg) +nni_rep_sock_fini(void *arg) { nni_rep_sock *rep = arg; - nni_thr_fini(&rep->sender); nni_idhash_destroy(rep->pipes); - nni_mtx_fini(&rep->mx); if (rep->btrace != NULL) { nni_free(rep->btrace, rep->btrace_len); } @@ -135,16 +119,8 @@ nni_rep_pipe_add(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - int rv; - - if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) { - return (NNG_EPROTO); - } - nni_mtx_lock(&rep->mx); - rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp); - nni_mtx_unlock(&rep->mx); - return (rv); + return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp)); } @@ -154,22 +130,21 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); - nni_mtx_unlock(&rep->mx); } -// nni_rep_topsender watches for messages from the upper write queue, +// nni_rep_sock_send watches for messages from the upper write queue, // extracts the destination pipe, and forwards it to the appropriate // destination pipe via a separate queue. This prevents a single bad // or slow pipe from gumming up the works for the entire socket. static void -nni_rep_topsender(void *arg) +nni_rep_sock_send(void *arg) { nni_rep_sock *rep = arg; nni_msgq *uwq = rep->uwq; nni_msgq *urq = rep->urq; + nni_mtx *mx = nni_sock_mtx(rep->sock); nni_msg *msg; for (;;) { @@ -190,9 +165,9 @@ nni_rep_topsender(void *arg) NNI_GET32(header, id); nni_msg_trim_header(msg, 4); - nni_mtx_lock(&rep->mx); + nni_mtx_lock(mx); if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) { - nni_mtx_unlock(&rep->mx); + nni_mtx_unlock(mx); nni_msg_free(msg); continue; } @@ -204,7 +179,7 @@ nni_rep_topsender(void *arg) // circumstances. nni_msg_free(msg); } - nni_mtx_unlock(&rep->mx); + nni_mtx_unlock(mx); } } @@ -218,8 +193,6 @@ nni_rep_pipe_send(void *arg) nni_msgq *wq = rp->sendq; nni_pipe *pipe = rp->pipe; nni_msg *msg; - uint8_t *body; - size_t size; int rv; for (;;) { @@ -311,21 +284,17 @@ again: static int -nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_rep_sock *rep = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255); - nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); - nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -335,21 +304,17 @@ nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_rep_sock *rep = arg; int rv; switch (opt) { case NNG_OPT_MAXTTL: - nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->ttl, buf, szp); - nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->raw, buf, szp); - nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -359,14 +324,12 @@ nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) static nni_msg * -nni_rep_sendfilter(void *arg, nni_msg *msg) +nni_rep_sock_sfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; size_t len; - nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mtx_unlock(&rep->mx); return (msg); } @@ -376,7 +339,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. if (rep->btrace == NULL) { - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -388,7 +350,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -396,21 +357,18 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mtx_unlock(&rep->mx); return (msg); } static nni_msg * -nni_rep_recvfilter(void *arg, nni_msg *msg) +nni_rep_sock_rfilter(void *arg, nni_msg *msg) { nni_rep_sock *rep = arg; char *header; size_t len; - nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mtx_unlock(&rep->mx); return (msg); } @@ -423,21 +381,19 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) rep->btrace_len = 0; } if ((rep->btrace = nni_alloc(len)) == NULL) { - nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } rep->btrace_len = len; memcpy(rep->btrace, header, len); nni_msg_trunc_header(msg, len); - nni_mtx_unlock(&rep->mx); return (msg); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_rep_proto_pipe = { +static nni_proto_pipe_ops nni_rep_pipe_ops = { .pipe_init = nni_rep_pipe_init, .pipe_fini = nni_rep_pipe_fini, .pipe_add = nni_rep_pipe_add, @@ -446,15 +402,22 @@ static nni_proto_pipe nni_rep_proto_pipe = { .pipe_recv = nni_rep_pipe_recv, }; +static nni_proto_sock_ops nni_rep_sock_ops = { + .sock_init = nni_rep_sock_init, + .sock_fini = nni_rep_sock_fini, + .sock_close = NULL, + .sock_setopt = nni_rep_sock_setopt, + .sock_getopt = nni_rep_sock_getopt, + .sock_rfilter = nni_rep_sock_rfilter, + .sock_sfilter = nni_rep_sock_sfilter, + .sock_send = nni_rep_sock_send, + .sock_recv = NULL, +}; + nni_proto nni_rep_proto = { - .proto_self = NNG_PROTO_REP, - .proto_peer = NNG_PROTO_REQ, - .proto_name = "rep", - .proto_pipe = &nni_rep_proto_pipe, - .proto_init = nni_rep_init, - .proto_fini = nni_rep_fini, - .proto_setopt = nni_rep_setopt, - .proto_getopt = nni_rep_getopt, - .proto_recv_filter = nni_rep_recvfilter, - .proto_send_filter = nni_rep_sendfilter, + .proto_self = NNG_PROTO_REP, + .proto_peer = NNG_PROTO_REQ, + .proto_name = "rep", + .proto_sock_ops = &nni_rep_sock_ops, + .proto_pipe_ops = &nni_rep_pipe_ops, }; diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 1c58d7a1..d8104342 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -22,7 +22,6 @@ typedef struct nni_req_sock nni_req_sock; // An nni_req_sock is our per-socket protocol private structure. struct nni_req_sock { nni_sock * sock; - nni_mtx mx; nni_cv cv; nni_msgq * uwq; nni_msgq * urq; @@ -48,7 +47,7 @@ struct nni_req_pipe { static void nni_req_resender(void *); static int -nni_req_init(void **reqp, nni_sock *sock) +nni_req_sock_init(void **reqp, nni_sock *sock) { nni_req_sock *req; int rv; @@ -56,12 +55,7 @@ nni_req_init(void **reqp, nni_sock *sock) if ((req = NNI_ALLOC_STRUCT(req)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mtx_init(&req->mx)) != 0) { - NNI_FREE_STRUCT(req); - return (rv); - } - if ((rv = nni_cv_init(&req->cv, &req->mx)) != 0) { - nni_mtx_fini(&req->mx); + if ((rv = nni_cv_init(&req->cv, nni_sock_mtx(sock))) != 0) { NNI_FREE_STRUCT(req); return (rv); } @@ -81,7 +75,6 @@ nni_req_init(void **reqp, nni_sock *sock) rv = nni_thr_init(&req->resender, nni_req_resender, req); if (rv != 0) { nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mx); NNI_FREE_STRUCT(req); return (rv); } @@ -91,20 +84,24 @@ nni_req_init(void **reqp, nni_sock *sock) static void -nni_req_fini(void *arg) +nni_req_sock_close(void *arg) { nni_req_sock *req = arg; // Shut down the resender. We request it to exit by clearing // its old value, then kick it. - nni_mtx_lock(&req->mx); req->closing = 1; nni_cv_wake(&req->cv); - nni_mtx_unlock(&req->mx); +} + + +static void +nni_req_sock_fini(void *arg) +{ + nni_req_sock *req = arg; nni_thr_fini(&req->resender); nni_cv_fini(&req->cv); - nni_mtx_fini(&req->mx); if (req->reqmsg != NULL) { nni_msg_free(req->reqmsg); } @@ -168,15 +165,16 @@ nni_req_pipe_send(void *arg) nni_msgq *uwq = req->uwq; nni_msgq *urq = req->urq; nni_pipe *pipe = rp->pipe; + nni_mtx *mx = nni_sock_mtx(req->sock); nni_msg *msg; int rv; for (;;) { - nni_mtx_lock(&req->mx); + nni_mtx_lock(mx); if ((msg = req->retrymsg) != NULL) { req->retrymsg = NULL; } - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); if (msg == NULL) { rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); if (rv != 0) { @@ -237,21 +235,17 @@ nni_req_pipe_recv(void *arg) static int -nni_req_setopt(void *arg, int opt, const void *buf, size_t sz) +nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz) { nni_req_sock *req = arg; int rv; switch (opt) { case NNG_OPT_RESENDTIME: - nni_mtx_lock(&req->mx); rv = nni_setopt_duration(&req->retry, buf, sz); - nni_mtx_unlock(&req->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&req->mx); rv = nni_setopt_int(&req->raw, buf, sz, 0, 1); - nni_mtx_unlock(&req->mx); break; default: rv = NNG_ENOTSUP; @@ -261,21 +255,17 @@ nni_req_setopt(void *arg, int opt, const void *buf, size_t sz) static int -nni_req_getopt(void *arg, int opt, void *buf, size_t *szp) +nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp) { nni_req_sock *req = arg; int rv; switch (opt) { case NNG_OPT_RESENDTIME: - nni_mtx_lock(&req->mx); rv = nni_getopt_duration(&req->retry, buf, szp); - nni_mtx_unlock(&req->mx); break; case NNG_OPT_RAW: - nni_mtx_lock(&req->mx); rv = nni_getopt_int(&req->raw, buf, szp); - nni_mtx_unlock(&req->mx); break; default: rv = NNG_ENOTSUP; @@ -288,17 +278,18 @@ static void nni_req_resender(void *arg) { nni_req_sock *req = arg; + nni_mtx *mx = nni_sock_mtx(req->sock); int rv; for (;;) { - nni_mtx_lock(&req->mx); + nni_mtx_lock(mx); if (req->closing) { - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); return; } if (req->reqmsg == NULL) { nni_cv_wait(&req->cv); - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); continue; } rv = nni_cv_until(&req->cv, req->resend); @@ -309,22 +300,20 @@ nni_req_resender(void *arg) } req->resend = nni_clock() + req->retry; } - nni_mtx_unlock(&req->mx); + nni_mtx_unlock(mx); } } static nni_msg * -nni_req_sendfilter(void *arg, nni_msg *msg) +nni_req_sock_sfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; uint32_t id; - nni_mtx_lock(&req->mx); if (req->raw) { // No automatic retry, and the request ID must // be in the header coming down. - nni_mtx_unlock(&req->mx); return (msg); } @@ -338,7 +327,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg) if (nni_msg_append_header(msg, req->reqid, 4) != 0) { // Should be ENOMEM. - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -351,7 +339,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg) // Make a duplicate message... for retries. if (nni_msg_dup(&req->reqmsg, msg) != 0) { - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -362,39 +349,33 @@ nni_req_sendfilter(void *arg, nni_msg *msg) // Clear the error condition. nni_sock_recverr(req->sock, 0); - nni_mtx_unlock(&req->mx); return (msg); } static nni_msg * -nni_req_recvfilter(void *arg, nni_msg *msg) +nni_req_sock_rfilter(void *arg, nni_msg *msg) { nni_req_sock *req = arg; - nni_mtx_lock(&req->mx); if (req->raw) { // Pass it unmolested - nni_mtx_unlock(&req->mx); return (msg); } if (nni_msg_header_len(msg) < 4) { - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } if (req->reqmsg == NULL) { // We had no outstanding request. - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) { // Wrong request id - nni_mtx_unlock(&req->mx); nni_msg_free(msg); return (NULL); } @@ -403,14 +384,13 @@ nni_req_recvfilter(void *arg, nni_msg *msg) nni_msg_free(req->reqmsg); req->reqmsg = NULL; nni_cv_wake(&req->cv); - nni_mtx_unlock(&req->mx); return (msg); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. -static nni_proto_pipe nni_req_proto_pipe = { +static nni_proto_pipe_ops nni_req_pipe_ops = { .pipe_init = nni_req_pipe_init, .pipe_fini = nni_req_pipe_fini, .pipe_add = nni_req_pipe_add, @@ -419,15 +399,22 @@ static nni_proto_pipe nni_req_proto_pipe = { .pipe_recv = nni_req_pipe_recv, }; +static nni_proto_sock_ops nni_req_sock_ops = { + .sock_init = nni_req_sock_init, + .sock_fini = nni_req_sock_fini, + .sock_close = nni_req_sock_close, + .sock_setopt = nni_req_sock_setopt, + .sock_getopt = nni_req_sock_getopt, + .sock_rfilter = nni_req_sock_rfilter, + .sock_sfilter = nni_req_sock_sfilter, + .sock_send = NULL, + .sock_recv = NULL, +}; + nni_proto nni_req_proto = { - .proto_self = NNG_PROTO_REQ, - .proto_peer = NNG_PROTO_REP, - .proto_name = "req", - .proto_pipe = &nni_req_proto_pipe, - .proto_init = nni_req_init, - .proto_fini = nni_req_fini, - .proto_setopt = nni_req_setopt, - .proto_getopt = nni_req_getopt, - .proto_recv_filter = nni_req_recvfilter, - .proto_send_filter = nni_req_sendfilter, + .proto_self = NNG_PROTO_REQ, + .proto_peer = NNG_PROTO_REP, + .proto_name = "req", + .proto_sock_ops = &nni_req_sock_ops, + .proto_pipe_ops = &nni_req_pipe_ops, }; |
