diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-05 18:02:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-05 18:02:22 -0800 |
| commit | 1b6e9985960a1079be81a576d52aa7f3fe47c92b (patch) | |
| tree | 2f6c9b33571cf30e28ca721064a9c0d038be4c42 /src/core | |
| parent | b17703d1e708a99e9a46ceb012676dc89df40df5 (diff) | |
| download | nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.tar.gz nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.tar.bz2 nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.zip | |
Add nng_shutdown() for sockets to help avoid close race.
Also we added a two phase shutdown for threads.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 4 | ||||
| -rw-r--r-- | src/core/socket.c | 68 | ||||
| -rw-r--r-- | src/core/socket.h | 4 | ||||
| -rw-r--r-- | src/core/thread.c | 13 | ||||
| -rw-r--r-- | src/core/thread.h | 42 |
5 files changed, 122 insertions, 9 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 58a4365b..70dfefa7 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -59,10 +59,6 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) return (rv); } - nni_mtx_lock(&sock->s_mx); - nni_list_append(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); - *epp = ep; return (0); } diff --git a/src/core/socket.c b/src/core/socket.c index 26ca055e..e48c4475 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -195,15 +195,22 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } -// nni_sock_close closes the underlying socket. +// nni_sock_shutdown shuts down the socket; after this point no further +// access to the socket will function, and any threads blocked in entry +// points will be woken (and the functions they are blocked in will return +// NNG_ECLOSED.) int -nni_sock_close(nni_sock *sock) +nni_sock_shutdown(nni_sock *sock) { nni_pipe *pipe; nni_ep *ep; nni_time linger; nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; @@ -262,19 +269,43 @@ nni_sock_close(nni_sock *sock) nni_mtx_unlock(&sock->s_mx); // Wait for the reaper to exit. - nni_thr_fini(&sock->s_reaper); + nni_thr_wait(&sock->s_reaper); + + // At this point, there are no threads blocked inside of us + // that are referencing socket state. User code should call + // nng_close to release the last resources. + return (0); +} + + +// nni_sock_close shuts down the socket, then releases any resources +// associated with it. It is a programmer error to reference the socket +// after this function is called, as the pointer may reference invalid +// memory or other objects. +void +nni_sock_close(nni_sock *sock) +{ + // Shutdown everything if not already done. This operation + // is idempotent. + nni_sock_shutdown(sock); // At this point nothing else should be referencing us. + // As with UNIX close, it is a gross error for the caller + // to have concurrent threads using this. We've taken care to + // ensure that any active consumers have been stopped, but if + // user code attempts to utilize the socket *after* this point, + // the results may be tragic. + // The protocol needs to clean up its state. sock->s_proto.proto_fini(sock->s_data); // And we need to clean up *our* state. + nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); - return (0); } @@ -374,9 +405,18 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = nni_ep_create(&ep, sock, addr)) != 0) { + nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); + rv = nni_ep_dial(ep, flags); if (rv != 0) { nni_ep_close(ep); @@ -385,6 +425,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) *epp = ep; } } + return (rv); } @@ -395,9 +436,19 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_ep *ep; int rv; + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } + if ((rv = nni_ep_create(&ep, sock, addr)) != 0) { + nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); + rv = nni_ep_listen(ep, flags); if (rv != 0) { nni_ep_close(ep); @@ -406,6 +457,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) *epp = ep; } } + return (rv); } @@ -432,6 +484,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); @@ -473,6 +529,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 5a0dbb84..68a7bc1c 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -37,6 +37,7 @@ struct nng_socket { nni_list s_reaps; // pipes to reap nni_thr s_reaper; + int s_ep_pend; // EP dial/listen in progress int s_closing; // Socket is closing int s_besteffort; // Best effort mode delivery int s_senderr; // Protocol state machine use @@ -46,7 +47,8 @@ struct nng_socket { }; extern int nni_sock_open(nni_sock **, uint16_t); -extern int nni_sock_close(nni_sock *); +extern void nni_sock_close(nni_sock *); +extern int nni_sock_shutdown(nni_sock *); extern uint16_t nni_sock_proto(nni_sock *); extern uint16_t nni_sock_peer(nni_sock *); extern int nni_sock_setopt(nni_sock *, int, const void *, size_t); diff --git a/src/core/thread.c b/src/core/thread.c index d1f46e3f..74d80513 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -148,6 +148,19 @@ nni_thr_run(nni_thr *thr) void +nni_thr_wait(nni_thr *thr) +{ + nni_plat_mtx_lock(&thr->mtx); + thr->stop = 1; + nni_plat_cv_wake(&thr->cv); + while (!thr->done) { + nni_plat_cv_wait(&thr->cv); + } + nni_plat_mtx_unlock(&thr->mtx); +} + + +void nni_thr_fini(nni_thr *thr) { nni_plat_mtx_lock(&thr->mtx); diff --git a/src/core/thread.h b/src/core/thread.h index 8ef463e5..33270a75 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -33,20 +33,62 @@ typedef struct { int done; } nni_thr; +// nni_mtx_init initializes the mutex. (Win32 programmers take note; +// our mutexes are actually CriticalSections on Win32.) extern int nni_mtx_init(nni_mtx *mtx); + +// nni_mtx_fini destroys the mutex and releases any resources used by it. extern void nni_mtx_fini(nni_mtx *mtx); + +// nni_mtx_lock locks the given mutex, waiting if necessary. Recursive +// entry is not supported; attempts to do so will result in undefined +// behavior. extern void nni_mtx_lock(nni_mtx *mtx); + +// nni_mutex_unlock unlocks the given mutex. The mutex must be +// owned by the calling thread. extern void nni_mtx_unlock(nni_mtx *mtx); + +// nni_mtx_trylock attempts to acquire the given mutex. It returns +// NNG_EBUSY if the mutex is locked. extern int nni_mtx_trylock(nni_mtx *mtx); +// nni_cv_init initializes the condition variable. The mutex supplied +// must always be locked with the condition variable. extern int nni_cv_init(nni_cv *cv, nni_mtx *); + +// nni_cv_fini releases resources associated with the condition variable, +// which must not be in use at the time. extern void nni_cv_fini(nni_cv *cv); + +// nni_cv_wake wakes all waiters on the condition variable. extern void nni_cv_wake(nni_cv *cv); + +// nni_cv_wait waits until nni_cv_wake is called on the condition variable. +// The wait is indefinite. Premature wakeups are possible, so the caller +// must verify any related condition. extern void nni_cv_wait(nni_cv *cv); + +// nni_cv_until waits until the condition variable is signaled with +// nni_cv_wake the system indicated is reached. If the time expires, +// the return will be NNG_ETIMEDOUT. extern int nni_cv_until(nni_cv *cv, nni_time when); +// nni_thr_init creates the thread, but the thread starts "stalled", until +// it is either run, or a wait or or fini is called. extern int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg); + +// nni_thr_fini waits for the thread to finish (if it s running), then +// reclaims any resources associated with it. extern void nni_thr_fini(nni_thr *thr); + +// nni_thr_run runs the given thread, which must have been initialized. extern void nni_thr_run(nni_thr *thr); +// nni_thr_wait waits for the thread to complete execution, but does not +// release resources associated with it. It is idempotent. If the this +// is called before nni_thr_run is called, then the thread will not run +// at all. +extern void nni_thr_wait(nni_thr *thr); + #endif CORE_THREAD_H |
