diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-29 13:07:35 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-29 13:07:35 -0700 |
| commit | 374f93a18edca2e0656c337a5b54927169ec31fa (patch) | |
| tree | cbaef995db10cfafd795953be203de744dc688c9 /src/platform/posix | |
| parent | 6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff) | |
| download | nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2 nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip | |
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit.
The POSIX send/recv logic still uses threads under the hood, but
makes use of the AIO framework for send/recv. This is a key stepping
stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_aio.h | 4 | ||||
| -rw-r--r-- | src/platform/posix/posix_aiothr.c | 15 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_net.c | 72 |
4 files changed, 66 insertions, 31 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h index 08c731bc..797f9e43 100644 --- a/src/platform/posix/posix_aio.h +++ b/src/platform/posix/posix_aio.h @@ -51,8 +51,8 @@ extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *); // extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int); // extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *); -extern int nni_posix_aio_read(nni_posix_aioq *, nni_aio *); -extern int nni_posix_aio_write(nni_posix_aioq *, nni_aio *); +extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *); +extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *); // extern int nni_posix_aio_connect(); // extern int nni_posix_aio_accept(); diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c index 013e4599..2d31aede 100644 --- a/src/platform/posix/posix_aiothr.c +++ b/src/platform/posix/posix_aiothr.c @@ -178,16 +178,17 @@ nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *)) } nni_list_remove(&q->aq_aios, aio); - nni_mtx_unlock(&q->aq_lk); + //nni_mtx_unlock(&q->aq_lk); // Call the callback. nni_aio_finish(aio, rv, aio->a_count); } while ((aio = nni_list_first(&q->aq_aios)) != NULL) { - nni_mtx_unlock(&q->aq_lk); + nni_list_remove(&q->aq_aios, aio); + //nni_mtx_unlock(&q->aq_lk); nni_aio_finish(aio, NNG_ECLOSED, aio->a_count); - nni_mtx_lock(&q->aq_lk); + //nni_mtx_lock(&q->aq_lk); } nni_mtx_unlock(&q->aq_lk); @@ -299,16 +300,16 @@ nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio) int -nni_posix_aio_read(nni_posix_aioq *q, nni_aio *aio) +nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio) { - return (nni_posix_aio_submit(q, aio)); + return (nni_posix_aio_submit(&p->ap_readq, aio)); } int -nni_posix_aio_write(nni_posix_aioq *q, nni_aio *aio) +nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio) { - return (nni_posix_aio_submit(q, aio)); + return (nni_posix_aio_submit(&p->ap_writeq, aio)); } diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 56cc5937..5da18323 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -35,12 +35,6 @@ extern int nni_plat_errno(int); #endif -#ifdef PLATFORM_POSIX_NET -struct nni_plat_tcpsock { - int fd; - int devnull; // used for shutting down blocking accept() -}; -#endif #ifdef PLATFORM_POSIX_IPC struct nni_plat_ipcsock { diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c index 24b45819..c7651655 100644 --- a/src/platform/posix/posix_net.c +++ b/src/platform/posix/posix_net.c @@ -8,6 +8,7 @@ // #include "core/nng_impl.h" +#include "platform/posix/posix_aio.h" #ifdef PLATFORM_POSIX_NET @@ -30,6 +31,14 @@ #define NNI_TCP_SOCKTYPE SOCK_STREAM #endif +#ifdef PLATFORM_POSIX_NET +struct nni_plat_tcpsock { + int fd; + int devnull; // used for shutting down blocking accept() + nni_posix_aio_pipe aiop; +}; +#endif + static int nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa) { @@ -164,6 +173,20 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) int +nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio) +{ + return (nni_posix_aio_write(&s->aiop, aio)); +} + + +int +nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio) +{ + return (nni_posix_aio_read(&s->aiop, aio)); +} + + +int nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt) { struct iovec iov[4]; // We never have more than 3 at present @@ -241,32 +264,39 @@ nni_plat_tcp_setopts(int fd) int -nni_plat_tcp_init(nni_plat_tcpsock *s) +nni_plat_tcp_init(nni_plat_tcpsock **tspp) { - s->fd = -1; + nni_plat_tcpsock *tsp; + + if ((tsp = NNI_ALLOC_STRUCT(tsp)) == NULL) { + return (NNG_ENOMEM); + } + tsp->fd = -1; + *tspp = tsp; return (0); } void -nni_plat_tcp_fini(nni_plat_tcpsock *s) +nni_plat_tcp_fini(nni_plat_tcpsock *tsp) { - if (s->fd != -1) { - (void) close(s->fd); - s->fd = -1; + if (tsp->fd != -1) { + (void) close(tsp->fd); + tsp->fd = -1; } + NNI_FREE_STRUCT(tsp); } void -nni_plat_tcp_shutdown(nni_plat_tcpsock *s) +nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp) { - if (s->fd != -1) { - (void) shutdown(s->fd, SHUT_RDWR); + if (tsp->fd != -1) { + (void) shutdown(tsp->fd, SHUT_RDWR); // This causes the equivalent of a close. Hopefully waking // up anything that didn't get the hint with the shutdown. // (macOS does not see the shtudown). - (void) dup2(nni_plat_devnull, s->fd); + (void) dup2(nni_plat_devnull, tsp->fd); } } @@ -278,7 +308,7 @@ nni_plat_tcp_shutdown(nni_plat_tcpsock *s) // to keep up, and your clients are going to experience bad things. Normally // the actual backlog should hover near 0 anyway.) int -nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) +nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr) { int fd; int len; @@ -310,7 +340,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) return (rv); } - s->fd = fd; + tsp->fd = fd; return (0); } @@ -319,7 +349,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr) // bind address is not null, then it will attempt to bind to the local // address specified first. int -nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, +nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr, const nni_sockaddr *bindaddr) { int fd; @@ -358,15 +388,20 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr, (void) close(fd); return (rv); } - s->fd = fd; + if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + (void) close(fd); + return (rv); + } + tsp->fd = fd; return (0); } int -nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) +nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server) { int fd; + int rv; for (;;) { #ifdef NNG_USE_ACCEPT4 @@ -387,7 +422,12 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server) nni_plat_tcp_setopts(fd); - s->fd = fd; + if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) { + close(fd); + return (rv); + } + + tsp->fd = fd; return (0); } |
