aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
committerGarrett D'Amore <garrett@damore.org>2017-03-29 13:07:35 -0700
commit374f93a18edca2e0656c337a5b54927169ec31fa (patch)
treecbaef995db10cfafd795953be203de744dc688c9 /src/platform/posix
parent6091cf7e1c030417e1fd29c66160e71bcbe4f984 (diff)
downloadnng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.gz
nng-374f93a18edca2e0656c337a5b54927169ec31fa.tar.bz2
nng-374f93a18edca2e0656c337a5b54927169ec31fa.zip
TCP (POSIX) async send/recv working. Other changes.
Transport-level pipe initialization is now sepearate and explicit. The POSIX send/recv logic still uses threads under the hood, but makes use of the AIO framework for send/recv. This is a key stepping stone towards enabling poll() or similar async I/O approaches.
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_aio.h4
-rw-r--r--src/platform/posix/posix_aiothr.c15
-rw-r--r--src/platform/posix/posix_impl.h6
-rw-r--r--src/platform/posix/posix_net.c72
4 files changed, 66 insertions, 31 deletions
diff --git a/src/platform/posix/posix_aio.h b/src/platform/posix/posix_aio.h
index 08c731bc..797f9e43 100644
--- a/src/platform/posix/posix_aio.h
+++ b/src/platform/posix/posix_aio.h
@@ -51,8 +51,8 @@ extern void nni_posix_aio_pipe_fini(nni_posix_aio_pipe *);
// extern int nni_posix_aio_ep_init(nni_posix_aio_ep *, int);
// extern void nni_posix_aio_ep_fini(nni_posix_aio_ep *);
-extern int nni_posix_aio_read(nni_posix_aioq *, nni_aio *);
-extern int nni_posix_aio_write(nni_posix_aioq *, nni_aio *);
+extern int nni_posix_aio_read(nni_posix_aio_pipe *, nni_aio *);
+extern int nni_posix_aio_write(nni_posix_aio_pipe *, nni_aio *);
// extern int nni_posix_aio_connect();
// extern int nni_posix_aio_accept();
diff --git a/src/platform/posix/posix_aiothr.c b/src/platform/posix/posix_aiothr.c
index 013e4599..2d31aede 100644
--- a/src/platform/posix/posix_aiothr.c
+++ b/src/platform/posix/posix_aiothr.c
@@ -178,16 +178,17 @@ nni_plat_aiothr_dothr(nni_posix_aioq *q, int (*fn)(int, nni_aio *))
}
nni_list_remove(&q->aq_aios, aio);
- nni_mtx_unlock(&q->aq_lk);
+ //nni_mtx_unlock(&q->aq_lk);
// Call the callback.
nni_aio_finish(aio, rv, aio->a_count);
}
while ((aio = nni_list_first(&q->aq_aios)) != NULL) {
- nni_mtx_unlock(&q->aq_lk);
+ nni_list_remove(&q->aq_aios, aio);
+ //nni_mtx_unlock(&q->aq_lk);
nni_aio_finish(aio, NNG_ECLOSED, aio->a_count);
- nni_mtx_lock(&q->aq_lk);
+ //nni_mtx_lock(&q->aq_lk);
}
nni_mtx_unlock(&q->aq_lk);
@@ -299,16 +300,16 @@ nni_posix_aio_submit(nni_posix_aioq *q, nni_aio *aio)
int
-nni_posix_aio_read(nni_posix_aioq *q, nni_aio *aio)
+nni_posix_aio_read(nni_posix_aio_pipe *p, nni_aio *aio)
{
- return (nni_posix_aio_submit(q, aio));
+ return (nni_posix_aio_submit(&p->ap_readq, aio));
}
int
-nni_posix_aio_write(nni_posix_aioq *q, nni_aio *aio)
+nni_posix_aio_write(nni_posix_aio_pipe *p, nni_aio *aio)
{
- return (nni_posix_aio_submit(q, aio));
+ return (nni_posix_aio_submit(&p->ap_writeq, aio));
}
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 56cc5937..5da18323 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -35,12 +35,6 @@ extern int nni_plat_errno(int);
#endif
-#ifdef PLATFORM_POSIX_NET
-struct nni_plat_tcpsock {
- int fd;
- int devnull; // used for shutting down blocking accept()
-};
-#endif
#ifdef PLATFORM_POSIX_IPC
struct nni_plat_ipcsock {
diff --git a/src/platform/posix/posix_net.c b/src/platform/posix/posix_net.c
index 24b45819..c7651655 100644
--- a/src/platform/posix/posix_net.c
+++ b/src/platform/posix/posix_net.c
@@ -8,6 +8,7 @@
//
#include "core/nng_impl.h"
+#include "platform/posix/posix_aio.h"
#ifdef PLATFORM_POSIX_NET
@@ -30,6 +31,14 @@
#define NNI_TCP_SOCKTYPE SOCK_STREAM
#endif
+#ifdef PLATFORM_POSIX_NET
+struct nni_plat_tcpsock {
+ int fd;
+ int devnull; // used for shutting down blocking accept()
+ nni_posix_aio_pipe aiop;
+};
+#endif
+
static int
nni_plat_to_sockaddr(struct sockaddr_storage *ss, const nni_sockaddr *sa)
{
@@ -164,6 +173,20 @@ nni_plat_tcp_send(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
int
+nni_plat_tcp_aio_send(nni_plat_tcpsock *s, nni_aio *aio)
+{
+ return (nni_posix_aio_write(&s->aiop, aio));
+}
+
+
+int
+nni_plat_tcp_aio_recv(nni_plat_tcpsock *s, nni_aio *aio)
+{
+ return (nni_posix_aio_read(&s->aiop, aio));
+}
+
+
+int
nni_plat_tcp_recv(nni_plat_tcpsock *s, nni_iov *iovs, int cnt)
{
struct iovec iov[4]; // We never have more than 3 at present
@@ -241,32 +264,39 @@ nni_plat_tcp_setopts(int fd)
int
-nni_plat_tcp_init(nni_plat_tcpsock *s)
+nni_plat_tcp_init(nni_plat_tcpsock **tspp)
{
- s->fd = -1;
+ nni_plat_tcpsock *tsp;
+
+ if ((tsp = NNI_ALLOC_STRUCT(tsp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ tsp->fd = -1;
+ *tspp = tsp;
return (0);
}
void
-nni_plat_tcp_fini(nni_plat_tcpsock *s)
+nni_plat_tcp_fini(nni_plat_tcpsock *tsp)
{
- if (s->fd != -1) {
- (void) close(s->fd);
- s->fd = -1;
+ if (tsp->fd != -1) {
+ (void) close(tsp->fd);
+ tsp->fd = -1;
}
+ NNI_FREE_STRUCT(tsp);
}
void
-nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
+nni_plat_tcp_shutdown(nni_plat_tcpsock *tsp)
{
- if (s->fd != -1) {
- (void) shutdown(s->fd, SHUT_RDWR);
+ if (tsp->fd != -1) {
+ (void) shutdown(tsp->fd, SHUT_RDWR);
// This causes the equivalent of a close. Hopefully waking
// up anything that didn't get the hint with the shutdown.
// (macOS does not see the shtudown).
- (void) dup2(nni_plat_devnull, s->fd);
+ (void) dup2(nni_plat_devnull, tsp->fd);
}
}
@@ -278,7 +308,7 @@ nni_plat_tcp_shutdown(nni_plat_tcpsock *s)
// to keep up, and your clients are going to experience bad things. Normally
// the actual backlog should hover near 0 anyway.)
int
-nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
+nni_plat_tcp_listen(nni_plat_tcpsock *tsp, const nni_sockaddr *addr)
{
int fd;
int len;
@@ -310,7 +340,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
return (rv);
}
- s->fd = fd;
+ tsp->fd = fd;
return (0);
}
@@ -319,7 +349,7 @@ nni_plat_tcp_listen(nni_plat_tcpsock *s, const nni_sockaddr *addr)
// bind address is not null, then it will attempt to bind to the local
// address specified first.
int
-nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
+nni_plat_tcp_connect(nni_plat_tcpsock *tsp, const nni_sockaddr *addr,
const nni_sockaddr *bindaddr)
{
int fd;
@@ -358,15 +388,20 @@ nni_plat_tcp_connect(nni_plat_tcpsock *s, const nni_sockaddr *addr,
(void) close(fd);
return (rv);
}
- s->fd = fd;
+ if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ (void) close(fd);
+ return (rv);
+ }
+ tsp->fd = fd;
return (0);
}
int
-nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
+nni_plat_tcp_accept(nni_plat_tcpsock *tsp, nni_plat_tcpsock *server)
{
int fd;
+ int rv;
for (;;) {
#ifdef NNG_USE_ACCEPT4
@@ -387,7 +422,12 @@ nni_plat_tcp_accept(nni_plat_tcpsock *s, nni_plat_tcpsock *server)
nni_plat_tcp_setopts(fd);
- s->fd = fd;
+ if ((rv = nni_posix_aio_pipe_init(&tsp->aiop, fd)) != 0) {
+ close(fd);
+ return (rv);
+ }
+
+ tsp->fd = fd;
return (0);
}