diff options
| -rw-r--r-- | src/core/platform.h | 23 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 5 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 38 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 18 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 1 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 15 |
6 files changed, 83 insertions, 17 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index 3292d6f9..afc4c06c 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -60,10 +60,10 @@ extern void *nni_alloc(size_t); // Most implementations can just call free() here. extern void nni_free(void *, size_t); -typedef struct nni_plat_mtx nni_plat_mtx; -typedef struct nni_plat_cv nni_plat_cv; -typedef struct nni_plat_thr nni_plat_thr; -typedef struct nni_plat_tcpsock nni_plat_tcpsock; +typedef struct nni_plat_mtx nni_plat_mtx; +typedef struct nni_plat_cv nni_plat_cv; +typedef struct nni_plat_thr nni_plat_thr; +typedef struct nni_plat_tcpsock nni_plat_tcpsock; // Mutex handling. @@ -174,8 +174,19 @@ extern const char *nni_plat_strerror(int); // returned on dual stack machines. extern int nni_plat_lookup_host(const char *, nni_sockaddr *, int); -// nni_plat_tcp_close just closes a TCP socket. -extern void nni_plat_tcp_close(nni_plat_tcpsock *); +// nni_plat_tcp_init initializes the socket, for example it can +// set underlying file descriptors to -1, etc. +extern void nni_plat_tcp_init(nni_plat_tcpsock *); + +// nni_plat_tcp_fini just closes a TCP socket, and releases any related +// resources. +extern void nni_plat_tcp_fini(nni_plat_tcpsock *); + +// nni_plat_tcp_shutdown performs a shutdown of the socket. For +// BSD sockets, this closes both sides of the TCP connection gracefully, +// but the underlying file descriptor is left open. (This part is critical +// to prevention of close() related races.) +extern void nni_plat_tcp_shutdown(nni_plat_tcpsock *); // nni_plat_tcp_listen creates a TCP socket in listening mode, bound // to the specified address. Note that nni_plat_tcpsock should be defined diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 2b72f574..ed52b6fb 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -35,13 +35,16 @@ extern int nni_plat_errno(int); #ifdef PLATFORM_POSIX_NET struct nni_plat_tcpsock { - int fd; + int fd; + int devnull; // used for shutting down blocking accept() }; #endif // Define types that this platform uses. #ifdef PLATFORM_POSIX_THREAD +extern int nni_plat_devnull; // open descriptor on /dev/null + #include <pthread.h> // These types are provided for here, to permit them to be directly inlined diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index d0cfb49c..151e14f1 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -231,12 +231,35 @@ nni_plat_tcp_setopts(int fd) void -nni_plat_tcp_close(nni_plat_tcpsock *s) +nni_plat_tcp_init(nni_plat_tcpsock *s) { - (void) close(s->fd); s->fd = -1; } + +void +nni_plat_tcp_fini(nni_plat_tcpsock *s) +{ + if (s->fd != -1) { + (void) close(s->fd); + s->fd = -1; + } +} + + +void +nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +{ + if (s->fd != -1) { + (void) shutdown(s->fd, SHUT_RDWR); + // This causes the equivalent of a close. Hopefully waking + // up anything that didn't get the hint with the shutdown. + // (macOS does not see the shtudown). + (void) dup2(nni_plat_devnull, s->fd); + } +} + + // nni_plat_tcp_bind creates a file descriptor bound to the given address. // This basically does the equivalent of socket, bind, and listen. We have // chosen a default value for the listen backlog of 128, which should be @@ -257,7 +280,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) } #ifdef SOCK_CLOEXEC - fd = socket(ss.ss_family, SOCK_STREAM, SOCK_CLOEXEC); + fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); #else fd = socket(ss.ss_family, SOCK_STREAM, 0); #endif @@ -305,7 +328,7 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, } #ifdef SOCK_CLOEXEC - fd = socket(ss.ss_family, SOCK_STREAM, SOCK_CLOEXEC); + fd = socket(ss.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); #else fd = socket(ss.ss_family, SOCK_STREAM, 0); #endif @@ -346,9 +369,9 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) for (;;) { #ifdef NNG_USE_ACCEPT4 - fd = accept4(server, NULL, NULL, SOCK_CLOEXEC); + fd = accept4(server->fd, NULL, NULL, SOCK_CLOEXEC); if ((fd < 0) && ((errrno == ENOSYS) || (errno == ENOTSUP))) { - fd = accept(server, NULL, NULL); + fd = accept(server->fd, NULL, NULL); } #else fd = accept(server->fd, NULL, NULL); @@ -359,6 +382,9 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) // These are not fatal errors, keep trying continue; } + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + continue; + } return (nni_plat_errno(errno)); } else { break; diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 02f0b058..00419aac 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -19,6 +19,7 @@ #include <stdlib.h> #include <sys/types.h> #include <unistd.h> +#include <fcntl.h> static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; static int nni_plat_inited = 0; @@ -28,6 +29,11 @@ static int nni_plat_next = 0; pthread_condattr_t nni_cvattr; pthread_mutexattr_t nni_mxattr; +// We open a /dev/null file descriptor so that we can dup2() it to +// cause MacOS X to wakeup. This gives us a "safe" close semantic. + +int nni_plat_devnull = -1; + uint32_t nni_plat_nextid(void) { @@ -228,30 +234,39 @@ nni_plat_init(int (*helper)(void)) if (nni_plat_inited) { return (0); // fast path } + + if ((nni_plat_devnull = open("/dev/null", O_RDONLY)) < 0) { + return (nni_plat_errno(errno)); + } pthread_mutex_lock(&nni_plat_lock); if (nni_plat_inited) { // check again under the lock to be sure pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); return (0); } if (pthread_condattr_init(&nni_cvattr) != 0) { pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_cvattr, NNG_USE_CLOCKID) != 0) { pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mxattr) != 0) { pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); return (NNG_ENOMEM); } rv = pthread_mutexattr_settype(&nni_mxattr, PTHREAD_MUTEX_ERRORCHECK); if (rv != 0) { pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); return (NNG_ENOMEM); } @@ -276,6 +291,7 @@ nni_plat_init(int (*helper)(void)) if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { pthread_mutex_unlock(&nni_plat_lock); + (void) close(nni_plat_devnull); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { @@ -294,6 +310,8 @@ nni_plat_fini(void) if (nni_plat_inited) { pthread_mutexattr_destroy(&nni_mxattr); pthread_condattr_destroy(&nni_cvattr); + (void) close(nni_plat_devnull); + nni_plat_devnull = -1; nni_plat_inited = 0; } pthread_mutex_unlock(&nni_plat_lock); diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index b9981c3c..abd6e25c 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -139,6 +139,7 @@ static int nni_req_pipe_add(void *arg) { nni_req_pipe *rp = arg; + if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) { return (NNG_EPROTO); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 00d9a2f3..249c2329 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -55,7 +55,7 @@ nni_tcp_pipe_close(void *arg) { nni_tcp_pipe *pipe = arg; - nni_plat_tcp_close(&pipe->fd); + nni_plat_tcp_shutdown(&pipe->fd); } @@ -64,6 +64,7 @@ nni_tcp_pipe_destroy(void *arg) { nni_tcp_pipe *pipe = arg; + nni_plat_tcp_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); } @@ -176,6 +177,7 @@ nni_tcp_ep_init(void **epp, const char *url, uint16_t proto) ep->closed = 0; ep->proto = proto; ep->ipv4only = 0; + nni_plat_tcp_init(&ep->fd); (void) snprintf(ep->addr, sizeof (ep->addr), "%s", url); @@ -189,6 +191,7 @@ nni_tcp_ep_fini(void *arg) { nni_tcp_ep *ep = arg; + nni_plat_tcp_fini(&ep->fd); NNI_FREE_STRUCT(ep); } @@ -198,7 +201,7 @@ nni_tcp_ep_close(void *arg) { nni_tcp_ep *ep = arg; - nni_plat_tcp_close(&ep->fd); + nni_plat_tcp_shutdown(&ep->fd); } @@ -340,6 +343,7 @@ nni_tcp_ep_connect(void *arg, void **pipep) if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); } + nni_plat_tcp_init(&pipe->fd); pipe->proto = ep->proto; // Port is in the same place for both v4 and v6. @@ -353,7 +357,8 @@ nni_tcp_ep_connect(void *arg, void **pipep) } if ((rv = nni_tcp_negotiate(pipe)) != 0) { - nni_plat_tcp_close(&pipe->fd); + nni_plat_tcp_shutdown(&pipe->fd); + nni_plat_tcp_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); return (rv); } @@ -405,13 +410,15 @@ nni_tcp_ep_accept(void *arg, void **pipep) return (NNG_ENOMEM); } pipe->proto = ep->proto; + nni_plat_tcp_init(&pipe->fd); if ((rv = nni_plat_tcp_accept(&pipe->fd, &ep->fd)) != 0) { NNI_FREE_STRUCT(pipe); return (rv); } if ((rv = nni_tcp_negotiate(pipe)) != 0) { - nni_plat_tcp_close(&pipe->fd); + nni_plat_tcp_shutdown(&pipe->fd); + nni_plat_tcp_fini(&pipe->fd); NNI_FREE_STRUCT(pipe); return (rv); } |
