aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-05 18:02:22 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-05 18:02:22 -0800
commit1b6e9985960a1079be81a576d52aa7f3fe47c92b (patch)
tree2f6c9b33571cf30e28ca721064a9c0d038be4c42
parentb17703d1e708a99e9a46ceb012676dc89df40df5 (diff)
downloadnng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.tar.gz
nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.tar.bz2
nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.zip
Add nng_shutdown() for sockets to help avoid close race.
Also we added a two phase shutdown for threads.
-rw-r--r--src/core/endpt.c4
-rw-r--r--src/core/socket.c68
-rw-r--r--src/core/socket.h4
-rw-r--r--src/core/thread.c13
-rw-r--r--src/core/thread.h42
-rw-r--r--src/nng.c12
-rw-r--r--src/nng.h9
-rw-r--r--tests/sock.c6
8 files changed, 144 insertions, 14 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 58a4365b..70dfefa7 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -59,10 +59,6 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
return (rv);
}
- nni_mtx_lock(&sock->s_mx);
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
-
*epp = ep;
return (0);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 26ca055e..e48c4475 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -195,15 +195,22 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
-// nni_sock_close closes the underlying socket.
+// nni_sock_shutdown shuts down the socket; after this point no further
+// access to the socket will function, and any threads blocked in entry
+// points will be woken (and the functions they are blocked in will return
+// NNG_ECLOSED.)
int
-nni_sock_close(nni_sock *sock)
+nni_sock_shutdown(nni_sock *sock)
{
nni_pipe *pipe;
nni_ep *ep;
nni_time linger;
nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
@@ -262,19 +269,43 @@ nni_sock_close(nni_sock *sock)
nni_mtx_unlock(&sock->s_mx);
// Wait for the reaper to exit.
- nni_thr_fini(&sock->s_reaper);
+ nni_thr_wait(&sock->s_reaper);
+
+ // At this point, there are no threads blocked inside of us
+ // that are referencing socket state. User code should call
+ // nng_close to release the last resources.
+ return (0);
+}
+
+
+// nni_sock_close shuts down the socket, then releases any resources
+// associated with it. It is a programmer error to reference the socket
+// after this function is called, as the pointer may reference invalid
+// memory or other objects.
+void
+nni_sock_close(nni_sock *sock)
+{
+ // Shutdown everything if not already done. This operation
+ // is idempotent.
+ nni_sock_shutdown(sock);
// At this point nothing else should be referencing us.
+ // As with UNIX close, it is a gross error for the caller
+ // to have concurrent threads using this. We've taken care to
+ // ensure that any active consumers have been stopped, but if
+ // user code attempts to utilize the socket *after* this point,
+ // the results may be tragic.
+
// The protocol needs to clean up its state.
sock->s_proto.proto_fini(sock->s_data);
// And we need to clean up *our* state.
+ nni_thr_fini(&sock->s_reaper);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
- return (0);
}
@@ -374,9 +405,18 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
+
rv = nni_ep_dial(ep, flags);
if (rv != 0) {
nni_ep_close(ep);
@@ -385,6 +425,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
*epp = ep;
}
}
+
return (rv);
}
@@ -395,9 +436,19 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+
if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
+
rv = nni_ep_listen(ep, flags);
if (rv != 0) {
nni_ep_close(ep);
@@ -406,6 +457,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
*epp = ep;
}
}
+
return (rv);
}
@@ -432,6 +484,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
@@ -473,6 +529,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
diff --git a/src/core/socket.h b/src/core/socket.h
index 5a0dbb84..68a7bc1c 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -37,6 +37,7 @@ struct nng_socket {
nni_list s_reaps; // pipes to reap
nni_thr s_reaper;
+ int s_ep_pend; // EP dial/listen in progress
int s_closing; // Socket is closing
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
@@ -46,7 +47,8 @@ struct nng_socket {
};
extern int nni_sock_open(nni_sock **, uint16_t);
-extern int nni_sock_close(nni_sock *);
+extern void nni_sock_close(nni_sock *);
+extern int nni_sock_shutdown(nni_sock *);
extern uint16_t nni_sock_proto(nni_sock *);
extern uint16_t nni_sock_peer(nni_sock *);
extern int nni_sock_setopt(nni_sock *, int, const void *, size_t);
diff --git a/src/core/thread.c b/src/core/thread.c
index d1f46e3f..74d80513 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -148,6 +148,19 @@ nni_thr_run(nni_thr *thr)
void
+nni_thr_wait(nni_thr *thr)
+{
+ nni_plat_mtx_lock(&thr->mtx);
+ thr->stop = 1;
+ nni_plat_cv_wake(&thr->cv);
+ while (!thr->done) {
+ nni_plat_cv_wait(&thr->cv);
+ }
+ nni_plat_mtx_unlock(&thr->mtx);
+}
+
+
+void
nni_thr_fini(nni_thr *thr)
{
nni_plat_mtx_lock(&thr->mtx);
diff --git a/src/core/thread.h b/src/core/thread.h
index 8ef463e5..33270a75 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -33,20 +33,62 @@ typedef struct {
int done;
} nni_thr;
+// nni_mtx_init initializes the mutex. (Win32 programmers take note;
+// our mutexes are actually CriticalSections on Win32.)
extern int nni_mtx_init(nni_mtx *mtx);
+
+// nni_mtx_fini destroys the mutex and releases any resources used by it.
extern void nni_mtx_fini(nni_mtx *mtx);
+
+// nni_mtx_lock locks the given mutex, waiting if necessary. Recursive
+// entry is not supported; attempts to do so will result in undefined
+// behavior.
extern void nni_mtx_lock(nni_mtx *mtx);
+
+// nni_mutex_unlock unlocks the given mutex. The mutex must be
+// owned by the calling thread.
extern void nni_mtx_unlock(nni_mtx *mtx);
+
+// nni_mtx_trylock attempts to acquire the given mutex. It returns
+// NNG_EBUSY if the mutex is locked.
extern int nni_mtx_trylock(nni_mtx *mtx);
+// nni_cv_init initializes the condition variable. The mutex supplied
+// must always be locked with the condition variable.
extern int nni_cv_init(nni_cv *cv, nni_mtx *);
+
+// nni_cv_fini releases resources associated with the condition variable,
+// which must not be in use at the time.
extern void nni_cv_fini(nni_cv *cv);
+
+// nni_cv_wake wakes all waiters on the condition variable.
extern void nni_cv_wake(nni_cv *cv);
+
+// nni_cv_wait waits until nni_cv_wake is called on the condition variable.
+// The wait is indefinite. Premature wakeups are possible, so the caller
+// must verify any related condition.
extern void nni_cv_wait(nni_cv *cv);
+
+// nni_cv_until waits until the condition variable is signaled with
+// nni_cv_wake the system indicated is reached. If the time expires,
+// the return will be NNG_ETIMEDOUT.
extern int nni_cv_until(nni_cv *cv, nni_time when);
+// nni_thr_init creates the thread, but the thread starts "stalled", until
+// it is either run, or a wait or or fini is called.
extern int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg);
+
+// nni_thr_fini waits for the thread to finish (if it s running), then
+// reclaims any resources associated with it.
extern void nni_thr_fini(nni_thr *thr);
+
+// nni_thr_run runs the given thread, which must have been initialized.
extern void nni_thr_run(nni_thr *thr);
+// nni_thr_wait waits for the thread to complete execution, but does not
+// release resources associated with it. It is idempotent. If the this
+// is called before nni_thr_run is called, then the thread will not run
+// at all.
+extern void nni_thr_wait(nni_thr *thr);
+
#endif CORE_THREAD_H
diff --git a/src/nng.c b/src/nng.c
index b0f39222..f3c4c294 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -42,10 +42,18 @@ nng_open(nng_socket **s, uint16_t proto)
int
-nng_close(nng_socket *s)
+nng_shutdown(nng_socket *s)
{
NNI_INIT_INT();
- return (nni_sock_close(s));
+ return (nni_sock_shutdown(s));
+}
+
+
+void
+nng_close(nng_socket *s)
+{
+ NNI_INIT_VOID();
+ nni_sock_close(s);
}
diff --git a/src/nng.h b/src/nng.h
index 61894ce8..6cf7a568 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -54,7 +54,14 @@ NNG_DECL int nng_open(nng_socket **, uint16_t proto);
// error to reference the socket in any way after this is called. Likewise,
// it is an error to reference any resources such as endpoints or
// pipes associated with the socket.
-NNG_DECL int nng_close(nng_socket *);
+NNG_DECL void nng_close(nng_socket *);
+
+// nng_shutdown shuts down the socket. This causes any threads doing
+// work for the socket or blocked in socket functions to be woken (and
+// return NNG_ECLOSED). The socket resources are still present, so it
+// is safe to call other functions; they will just return NNG_ECLOSED.
+// A call to nng_close is still required to release the resources.
+NNG_DECL int nng_shutdown(nng_socket *);
// nng_protocol returns the protocol number of the socket.
uint16_t nng_protocol(nng_socket *);
diff --git a/tests/sock.c b/tests/sock.c
index 929e73c8..e0743936 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -21,9 +21,11 @@ TestMain("Socket Operations", {
So(rv == 0);
So(sock != NULL);
- Convey("And we can close it", {
- rv = nng_close(sock);
+ Convey("And we can shut it down", {
+ rv = nng_shutdown(sock);
So(rv == 0);
+ rv = nng_shutdown(sock);
+ So(rv == NNG_ECLOSED);
})
Reset({