aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-05 18:02:22 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-05 18:02:22 -0800
commit1b6e9985960a1079be81a576d52aa7f3fe47c92b (patch)
tree2f6c9b33571cf30e28ca721064a9c0d038be4c42 /src/core/socket.c
parentb17703d1e708a99e9a46ceb012676dc89df40df5 (diff)
downloadnng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.tar.gz
nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.tar.bz2
nng-1b6e9985960a1079be81a576d52aa7f3fe47c92b.zip
Add nng_shutdown() for sockets to help avoid close race.
Also we added a two phase shutdown for threads.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c68
1 files changed, 64 insertions, 4 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 26ca055e..e48c4475 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -195,15 +195,22 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
-// nni_sock_close closes the underlying socket.
+// nni_sock_shutdown shuts down the socket; after this point no further
+// access to the socket will function, and any threads blocked in entry
+// points will be woken (and the functions they are blocked in will return
+// NNG_ECLOSED.)
int
-nni_sock_close(nni_sock *sock)
+nni_sock_shutdown(nni_sock *sock)
{
nni_pipe *pipe;
nni_ep *ep;
nni_time linger;
nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
@@ -262,19 +269,43 @@ nni_sock_close(nni_sock *sock)
nni_mtx_unlock(&sock->s_mx);
// Wait for the reaper to exit.
- nni_thr_fini(&sock->s_reaper);
+ nni_thr_wait(&sock->s_reaper);
+
+ // At this point, there are no threads blocked inside of us
+ // that are referencing socket state. User code should call
+ // nng_close to release the last resources.
+ return (0);
+}
+
+
+// nni_sock_close shuts down the socket, then releases any resources
+// associated with it. It is a programmer error to reference the socket
+// after this function is called, as the pointer may reference invalid
+// memory or other objects.
+void
+nni_sock_close(nni_sock *sock)
+{
+ // Shutdown everything if not already done. This operation
+ // is idempotent.
+ nni_sock_shutdown(sock);
// At this point nothing else should be referencing us.
+ // As with UNIX close, it is a gross error for the caller
+ // to have concurrent threads using this. We've taken care to
+ // ensure that any active consumers have been stopped, but if
+ // user code attempts to utilize the socket *after* this point,
+ // the results may be tragic.
+
// The protocol needs to clean up its state.
sock->s_proto.proto_fini(sock->s_data);
// And we need to clean up *our* state.
+ nni_thr_fini(&sock->s_reaper);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
- return (0);
}
@@ -374,9 +405,18 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
+
rv = nni_ep_dial(ep, flags);
if (rv != 0) {
nni_ep_close(ep);
@@ -385,6 +425,7 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
*epp = ep;
}
}
+
return (rv);
}
@@ -395,9 +436,19 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+
if ((rv = nni_ep_create(&ep, sock, addr)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
+
rv = nni_ep_listen(ep, flags);
if (rv != 0) {
nni_ep_close(ep);
@@ -406,6 +457,7 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
*epp = ep;
}
}
+
return (rv);
}
@@ -432,6 +484,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
@@ -473,6 +529,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);