From bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 7 Jan 2017 21:49:48 -0800 Subject: Simplify locking for protocols. In an attempt to simplify the protocol implementation, and hopefully track down a close related race, we've made it so that most protocols need not worry about locks, and can access the socket lock if they do need a lock. They also let the socket manage their workers, for the most part. (The req protocol is special, since it needs a top level work distributor, *and* a resender.) --- src/core/defs.h | 27 ++++++------ src/core/pipe.c | 32 +++++++------- src/core/pipe.h | 1 - src/core/protocol.h | 52 ++++++++++++++++------- src/core/socket.c | 120 ++++++++++++++++++++++++++++++++++++++++++---------- src/core/socket.h | 63 ++++++++++++++++----------- src/core/thread.c | 8 +++- 7 files changed, 210 insertions(+), 93 deletions(-) (limited to 'src/core') diff --git a/src/core/defs.h b/src/core/defs.h index 9ae734f7..98f4e661 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -18,24 +18,25 @@ #define NNI_ARG_UNUSED(x) ((void) x); // These types are common but have names shared with user space. -typedef struct nng_socket nni_sock; -typedef struct nng_endpoint nni_ep; -typedef struct nng_pipe nni_pipe; -typedef struct nng_msg nni_msg; -typedef struct nng_sockaddr nni_sockaddr; +typedef struct nng_socket nni_sock; +typedef struct nng_endpoint nni_ep; +typedef struct nng_pipe nni_pipe; +typedef struct nng_msg nni_msg; +typedef struct nng_sockaddr nni_sockaddr; // These are our own names. -typedef struct nni_tran nni_tran; -typedef struct nni_tran_ep nni_tran_ep; -typedef struct nni_tran_pipe nni_tran_pipe; +typedef struct nni_tran nni_tran; +typedef struct nni_tran_ep nni_tran_ep; +typedef struct nni_tran_pipe nni_tran_pipe; -typedef struct nni_proto_pipe nni_proto_pipe; -typedef struct nni_proto nni_proto; +typedef struct nni_proto_sock_ops nni_proto_sock_ops; +typedef struct nni_proto_pipe_ops nni_proto_pipe_ops; +typedef struct nni_proto nni_proto; -typedef int nni_signal; // Turnstile/wakeup channel. -typedef uint64_t nni_time; // Absolute time (usec). -typedef int64_t nni_duration; // Relative time (usec). +typedef int nni_signal; // Wakeup channel. +typedef uint64_t nni_time; // Abs. time (usec). +typedef int64_t nni_duration; // Rel. time (usec). // Used by transports for scatter gather I/O. typedef struct { diff --git a/src/core/pipe.c b/src/core/pipe.c index e49ad05a..b3a107ff 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -77,7 +77,7 @@ nni_pipe_destroy(nni_pipe *p) p->p_tran_ops.pipe_destroy(p->p_tran_data); } if (p->p_proto_data != NULL) { - p->p_proto_ops.pipe_fini(p->p_proto_data); + p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data); } NNI_FREE_STRUCT(p); } @@ -88,8 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) { nni_pipe *p; nni_sock *sock = ep->ep_sock; - nni_proto *proto = &sock->s_proto; - const nni_proto_pipe *pops = proto->proto_pipe; + const nni_proto_pipe_ops *ops = &sock->s_pipe_ops; + void *pdata; int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { @@ -105,23 +105,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *ep->ep_tran->tran_pipe; - // Make a copy of the protocol ops. Same rationale. - p->p_proto_ops = *pops; - - if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) { + if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) { NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data); - if (rv != 0) { - pops->pipe_fini(&p->p_proto_data); + p->p_proto_data = pdata; + if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) { + ops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data); - if (rv != 0) { + if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) { nni_thr_fini(&p->p_recv_thr); - pops->pipe_fini(&p->p_proto_data); + ops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } @@ -155,9 +151,16 @@ nni_pipe_start(nni_pipe *pipe) nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); return (NNG_ECLOSED); } + if (nni_pipe_peer(pipe) != sock->s_peer) { + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + return (NNG_EPROTO); + } + do { // We generate a new pipe ID, but we make sure it does not // collide with any we already have. This can only normally @@ -174,8 +177,7 @@ nni_pipe_start(nni_pipe *pipe) } } while (collide); - rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data); - if (rv != 0) { + if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { nni_mtx_unlock(&sock->s_mx); nni_pipe_close(pipe); return (rv); diff --git a/src/core/pipe.h b/src/core/pipe.h index 242e1d8c..cc14de86 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -21,7 +21,6 @@ struct nng_pipe { uint32_t p_id; nni_tran_pipe p_tran_ops; void * p_tran_data; - nni_proto_pipe p_proto_ops; void * p_proto_data; nni_list_node p_node; nni_sock * p_sock; diff --git a/src/core/protocol.h b/src/core/protocol.h index c858ad39..000e9c64 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -1,5 +1,5 @@ // -// Copyright 2016 Garrett D'Amore +// Copyright 2017 Garrett D'Amore // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -21,7 +21,7 @@ // implementations. // nni_proto_pipe contains protocol-specific per-pipe operations. -struct nni_proto_pipe { +struct nni_proto_pipe_ops { // pipe_init creates the protocol-specific per pipe data structure. // The last argument is the per-socket protocol private data. int (*pipe_init)(void **, nni_pipe *, void *); @@ -55,32 +55,52 @@ struct nni_proto_pipe { void (*pipe_recv)(void *); }; -struct nni_proto { - uint16_t proto_self; // our 16-bit protocol ID - uint16_t proto_peer; // who we peer with (ID) - const char * proto_name; // string version of our name - const nni_proto_pipe * proto_pipe; // Per-pipe operations. +struct nni_proto_sock_ops { + // sock_initf creates the protocol instance, which will be stored on + // the socket. This is run without the sock lock held, and allocates + // storage or other resources for the socket. + int (*sock_init)(void **, nni_sock *); - // Create protocol instance, which will be stored on the socket. - int (*proto_init)(void **, nni_sock *); + // sock_fini destroys the protocol instance. This is run without the + // socket lock held, and is intended to release resources. It may + // block as needed. + void (*sock_fini)(void *); - // Destroy the protocol instance. - void (*proto_fini)(void *); + // Close the protocol instance. This is run with the lock held, + // and intended to initiate closure of the socket. For example, + // it can signal the socket worker threads to exit. + void (*sock_close)(void *); // Option manipulation. These may be NULL. - int (*proto_setopt)(void *, int, const void *, - size_t); - int (*proto_getopt)(void *, int, void *, size_t *); + int (*sock_setopt)(void *, int, const void *, size_t); + int (*sock_getopt)(void *, int, void *, size_t *); + + // sock_send is a send worker. It can really be anything, but it + // is run in a separate thread (if it is non-NULL). + void (*sock_send)(void *); + + // sock_recv is a receive worker. As with send it can really be + // anything, its just a thread that runs for the duration of the + // socket. + void (*sock_recv)(void *); // Receive filter. This may be NULL, but if it isn't, then // messages coming into the system are routed here just before being // delivered to the application. To drop the message, the prtocol // should return NULL, otherwise the message (possibly modified). - nni_msg * (*proto_recv_filter)(void *, nni_msg *); + nni_msg * (*sock_rfilter)(void *, nni_msg *); // Send filter. This may be NULL, but if it isn't, then messages // here are filtered just after they come from the application. - nni_msg * (*proto_send_filter)(void *, nni_msg *); + nni_msg * (*sock_sfilter)(void *, nni_msg *); +}; + +struct nni_proto { + uint16_t proto_self; // our 16-bit D + uint16_t proto_peer; // who we peer with (ID) + const char * proto_name; // Our name + const nni_proto_sock_ops * proto_sock_ops; // Per-socket opeations + const nni_proto_pipe_ops * proto_pipe_ops; // Per-pipe operations. }; // These functions are not used by protocols, but rather by the socket diff --git a/src/core/socket.c b/src/core/socket.c index 4e8eccbf..2ef35d7a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -56,7 +56,7 @@ nni_reaper(void *arg) // Note that if a protocol has rejected the pipe, it // won't have any data. if (pipe->p_active) { - pipe->p_proto_ops.pipe_rem(pipe->p_proto_data); + sock->s_pipe_ops.pipe_rem(pipe->p_proto_data); } nni_mtx_unlock(&sock->s_mx); @@ -78,6 +78,13 @@ nni_reaper(void *arg) } +nni_mtx * +nni_sock_mtx(nni_sock *sock) +{ + return (&sock->s_mx); +} + + static nni_msg * nni_sock_nullfilter(void *arg, nni_msg *mp) { @@ -108,6 +115,22 @@ nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) } +static void +nni_sock_nullop(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static int +nni_sock_nulladdpipe(void *arg) +{ + NNI_ARG_UNUSED(arg); + + return (0); +} + + // nn_sock_open creates the underlying socket. int nni_sock_open(nni_sock **sockp, uint16_t pnum) @@ -124,7 +147,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } // We make a copy of the protocol operations. - sock->s_proto = *proto; + sock->s_protocol = proto->proto_self; + sock->s_peer = proto->proto_peer; sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; @@ -135,6 +159,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); + sock->s_sock_ops = *proto->proto_sock_ops; + if (sock->s_sock_ops.sock_sfilter == NULL) { + sock->s_sock_ops.sock_sfilter = nni_sock_nullfilter; + } + if (sock->s_sock_ops.sock_rfilter == NULL) { + sock->s_sock_ops.sock_rfilter = nni_sock_nullfilter; + } + if (sock->s_sock_ops.sock_getopt == NULL) { + sock->s_sock_ops.sock_getopt = nni_sock_nullgetopt; + } + if (sock->s_sock_ops.sock_setopt == NULL) { + sock->s_sock_ops.sock_setopt = nni_sock_nullsetopt; + } + if (sock->s_sock_ops.sock_close == NULL) { + sock->s_sock_ops.sock_close = nni_sock_nullop; + } + sock->s_pipe_ops = *proto->proto_pipe_ops; + if (sock->s_pipe_ops.pipe_add == NULL) { + sock->s_pipe_ops.pipe_add = nni_sock_nulladdpipe; + } + if (sock->s_pipe_ops.pipe_rem == NULL) { + sock->s_pipe_ops.pipe_rem = nni_sock_nullop; + } + if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { NNI_FREE_STRUCT(sock); return (rv); @@ -145,6 +193,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) return (rv); } + if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -161,14 +210,14 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); + nni_thr_fini(&sock->s_recver); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); } - if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) { nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); @@ -177,19 +226,39 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_FREE_STRUCT(sock); return (rv); } - if (sock->s_proto.proto_send_filter == NULL) { - sock->s_proto.proto_send_filter = nni_sock_nullfilter; - } - if (sock->s_proto.proto_recv_filter == NULL) { - sock->s_proto.proto_recv_filter = nni_sock_nullfilter; - } - if (sock->s_proto.proto_getopt == NULL) { - sock->s_proto.proto_getopt = nni_sock_nullgetopt; + + // NB: If these functions are NULL, the thread initialization is + // largely a NO-OP. The system won't actually create the threads. + rv = nni_thr_init(&sock->s_sender, sock->s_sock_ops.sock_send, + sock->s_data); + if (rv != 0) { + nni_thr_wait(&sock->s_reaper); + sock->s_sock_ops.sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_thr_fini(&sock->s_reaper); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); } - if (sock->s_proto.proto_setopt == NULL) { - sock->s_proto.proto_setopt = nni_sock_nullsetopt; + rv = nni_thr_init(&sock->s_recver, sock->s_sock_ops.sock_recv, + sock->s_data); + if (rv != 0) { + nni_thr_wait(&sock->s_sender); + sock->s_sock_ops.sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_thr_fini(&sock->s_sender); + nni_thr_fini(&sock->s_reaper); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); } + + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_recver); + nni_thr_run(&sock->s_sender); *sockp = sock; return (0); } @@ -265,10 +334,14 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_lock(&sock->s_mx); } + sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); - // Wait for the reaper to exit. + // Wait for the threads to exit. + nni_thr_wait(&sock->s_sender); + nni_thr_wait(&sock->s_recver); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -297,7 +370,7 @@ nni_sock_close(nni_sock *sock) // the results may be tragic. // The protocol needs to clean up its state. - sock->s_proto.proto_fini(sock->s_data); + sock->s_sock_ops.sock_fini(sock->s_data); // And we need to clean up *our* state. nni_thr_fini(&sock->s_reaper); @@ -328,9 +401,10 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) return (rv); } besteffort = sock->s_besteffort; + + msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); nni_mtx_unlock(&sock->s_mx); - msg = sock->s_proto.proto_send_filter(sock->s_data, msg); if (msg == NULL) { return (0); } @@ -372,7 +446,9 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - msg = sock->s_proto.proto_recv_filter(sock->s_data, msg); + nni_mtx_lock(&sock->s_mx); + msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); + nni_mtx_unlock(&sock->s_mx); if (msg != NULL) { break; } @@ -388,14 +464,14 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_proto.proto_self); + return (sock->s_protocol); } uint16_t nni_sock_peer(nni_sock *sock) { - return (sock->s_proto.proto_peer); + return (sock->s_peer); } @@ -488,7 +564,7 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); + rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); return (rv); @@ -533,7 +609,7 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); + rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); return (rv); diff --git a/src/core/socket.h b/src/core/socket.h index 62347f61..197b5d0d 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -14,36 +14,42 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. struct nng_socket { - nni_mtx s_mx; - nni_cv s_cv; + nni_mtx s_mx; + nni_cv s_cv; - nni_msgq * s_uwq; // Upper write queue - nni_msgq * s_urq; // Upper read queue + nni_msgq * s_uwq; // Upper write queue + nni_msgq * s_urq; // Upper read queue - nni_proto s_proto; + uint16_t s_protocol; + uint16_t s_peer; - void * s_data; // Protocol private + nni_proto_pipe_ops s_pipe_ops; + nni_proto_sock_ops s_sock_ops; - // XXX: options - nni_duration s_linger; // linger time - nni_duration s_sndtimeo; // send timeout - nni_duration s_rcvtimeo; // receive timeout - nni_duration s_reconn; // reconnect time - nni_duration s_reconnmax; // max reconnect time - - nni_list s_eps; // active endpoints - nni_list s_pipes; // pipes for this socket - - nni_list s_reaps; // pipes to reap - nni_thr s_reaper; + void * s_data; // Protocol private - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use - int s_recverr; // Protocol state machine use - - uint32_t s_nextid; // Next Pipe ID. + // XXX: options + nni_duration s_linger; // linger time + nni_duration s_sndtimeo; // send timeout + nni_duration s_rcvtimeo; // receive timeout + nni_duration s_reconn; // reconnect time + nni_duration s_reconnmax; // max reconnect time + + nni_list s_eps; // active endpoints + nni_list s_pipes; // pipes for this socket + + nni_list s_reaps; // pipes to reap + nni_thr s_reaper; + nni_thr s_sender; + nni_thr s_recver; + + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use + int s_recverr; // Protocol state machine use + + uint32_t s_nextid; // Next Pipe ID. }; extern int nni_sock_open(nni_sock **, uint16_t); @@ -76,5 +82,12 @@ extern nni_msgq *nni_sock_sendq(nni_sock *); // inject incoming messages from pipes to it. extern nni_msgq *nni_sock_recvq(nni_sock *); +// nni_sock_mtx obtains the socket mutex. This is for protocols to use +// from separate threads; they must not hold the lock for extended periods. +// Additionally, this can only be acquired from separate threads. The +// synchronous entry points (excluding the send/recv thread workers) will +// be called with this lock already held. We expose the mutex directly +// here so that protocols can use it to initialize condvars. +extern nni_mtx *nni_sock_mtx(nni_sock *); #endif // CORE_SOCKET_H diff --git a/src/core/thread.c b/src/core/thread.c index 7678ad0a..4a871ede 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -128,6 +128,10 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) nni_plat_mtx_fini(&thr->mtx); return (rv); } + if (fn == NULL) { + thr->done = 1; + return (0); + } if ((rv = nni_plat_thr_init(&thr->thr, nni_thr_wrap, thr)) != 0) { nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); @@ -170,7 +174,9 @@ nni_thr_fini(nni_thr *thr) nni_plat_cv_wait(&thr->cv); } nni_plat_mtx_unlock(&thr->mtx); - nni_plat_thr_fini(&thr->thr); + if (thr->fn != NULL) { + nni_plat_thr_fini(&thr->thr); + } nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); } -- cgit v1.2.3-70-g09d2