diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-01-21 22:40:10 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-16 19:22:27 -0800 |
| commit | 5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch) | |
| tree | bf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/platform/posix/posix_tcpconn.c | |
| parent | ca655b9db689ee0e655248b1a9f166b8db6cc984 (diff) | |
| download | nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2 nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip | |
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic
stream API for all transports. There have been related bugs fixed
along the way. Additionally the man pages have changed.
The old non-polymorphic APIs are removed now. This is a breaking
change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/platform/posix/posix_tcpconn.c')
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 186 |
1 files changed, 77 insertions, 109 deletions
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index ef6ee8e3..0d3c274d 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -36,7 +36,7 @@ #include "posix_tcp.h" static void -tcp_conn_dowrite(nni_tcp_conn *c) +tcp_dowrite(nni_tcp_conn *c) { nni_aio *aio; int fd; @@ -118,7 +118,7 @@ tcp_conn_dowrite(nni_tcp_conn *c) } static void -tcp_conn_doread(nni_tcp_conn *c) +tcp_doread(nni_tcp_conn *c) { nni_aio *aio; int fd; @@ -195,9 +195,10 @@ tcp_conn_doread(nni_tcp_conn *c) } } -void -nni_tcp_conn_close(nni_tcp_conn *c) +static void +tcp_close(void *arg) { + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (!c->closed) { nni_aio *aio; @@ -212,21 +213,44 @@ nni_tcp_conn_close(nni_tcp_conn *c) nni_mtx_unlock(&c->mtx); } +// tcp_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +tcp_fini(void *arg) +{ + nni_tcp_conn *c = arg; + tcp_close(c); + nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + static void -tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +tcp_free(void *arg) +{ + nni_tcp_conn *c = arg; + nni_reap(&c->reap, tcp_fini, arg); +} + +static void +tcp_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_tcp_conn *c = arg; if (events & (POLLHUP | POLLERR | POLLNVAL)) { - nni_tcp_conn_close(c); + tcp_close(c); return; } nni_mtx_lock(&c->mtx); if (events & POLLIN) { - tcp_conn_doread(c); + tcp_doread(c); } if (events & POLLOUT) { - tcp_conn_dowrite(c); + tcp_dowrite(c); } events = 0; if (!nni_list_empty(&c->writeq)) { @@ -242,7 +266,7 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -tcp_conn_cancel(nni_aio *aio, void *arg, int rv) +tcp_cancel(nni_aio *aio, void *arg, int rv) { nni_tcp_conn *c = arg; @@ -254,18 +278,18 @@ tcp_conn_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&c->mtx); } -void -nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +static void +tcp_send(void *arg, nni_aio *aio) { - - int rv; + nni_tcp_conn *c = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; @@ -273,7 +297,7 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) nni_aio_list_append(&c->writeq, aio); if (nni_list_first(&c->writeq) == aio) { - tcp_conn_dowrite(c); + tcp_dowrite(c); // If we are still the first thing on the list, that // means we didn't finish the job, so arm the poller to // complete us. @@ -284,17 +308,18 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) nni_mtx_unlock(&c->mtx); } -void -nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +static void +tcp_recv(void *arg, nni_aio *aio) { - int rv; + nni_tcp_conn *c = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; @@ -306,7 +331,7 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) // many cases. We also need not arm a list if it was already // armed. if (nni_list_first(&c->readq) == aio) { - tcp_conn_doread(c); + tcp_doread(c); // If we are still the first thing on the list, that // means we didn't finish the job, so arm the poller to // complete us. @@ -317,59 +342,8 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) nni_mtx_unlock(&c->mtx); } -int -nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); - - if (getpeername(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); - - if (getsockname(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) -{ - int val = keep ? 1 : 0; - int fd = nni_posix_pfd_fd(c->pfd); - - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - -int -nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) -{ - - int val = nodelay ? 1 : 0; - int fd = nni_posix_pfd_fd(c->pfd); - - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - static int -tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn * c = arg; struct sockaddr_storage ss; @@ -388,7 +362,7 @@ tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t) } static int -tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn * c = arg; struct sockaddr_storage ss; @@ -407,7 +381,7 @@ tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) } static int -tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t) +tcp_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t) { nni_tcp_conn *c = arg; int fd; @@ -427,7 +401,7 @@ tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t) } static int -tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t) +tcp_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t) { nni_tcp_conn *c = arg; int fd; @@ -447,7 +421,7 @@ tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t) } static int -tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; int fd = nni_posix_pfd_fd(c->pfd); @@ -462,7 +436,7 @@ tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) } static int -tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; int fd = nni_posix_pfd_fd(c->pfd); @@ -476,46 +450,46 @@ tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) return (nni_copyout_bool(val, buf, szp, t)); } -static const nni_option tcp_conn_options[] = { +static const nni_option tcp_options[] = { { .o_name = NNG_OPT_REMADDR, - .o_get = tcp_conn_get_peername, + .o_get = tcp_get_peername, }, { .o_name = NNG_OPT_LOCADDR, - .o_get = tcp_conn_get_sockname, + .o_get = tcp_get_sockname, }, { .o_name = NNG_OPT_TCP_NODELAY, - .o_get = tcp_conn_get_nodelay, - .o_set = tcp_conn_set_nodelay, + .o_get = tcp_get_nodelay, + .o_set = tcp_set_nodelay, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, - .o_get = tcp_conn_get_keepalive, - .o_set = tcp_conn_set_keepalive, + .o_get = tcp_get_keepalive, + .o_set = tcp_set_keepalive, }, { .o_name = NULL, }, }; -int -nni_tcp_conn_getopt( - nni_tcp_conn *c, const char *name, void *buf, size_t *szp, nni_type t) +static int +tcp_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - return (nni_getopt(tcp_conn_options, name, c, buf, szp, t)); + nni_tcp_conn *c = arg; + return (nni_getopt(tcp_options, name, c, buf, szp, t)); } -int -nni_tcp_conn_setopt( - nni_tcp_conn *c, const char *name, const void *buf, size_t sz, nni_type t) +static int +tcp_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - return (nni_setopt(tcp_conn_options, name, c, buf, sz, t)); + nni_tcp_conn *c = arg; + return (nni_setopt(tcp_options, name, c, buf, sz, t)); } int -nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +nni_posix_tcp_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) { nni_tcp_conn *c; @@ -530,12 +504,19 @@ nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) nni_aio_list_init(&c->readq); nni_aio_list_init(&c->writeq); + c->stream.s_free = tcp_free; + c->stream.s_close = tcp_close; + c->stream.s_recv = tcp_recv; + c->stream.s_send = tcp_send; + c->stream.s_getx = tcp_getx; + c->stream.s_setx = tcp_setx; + *cp = c; return (0); } void -nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive) +nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive) { // Configure the initial socket options. (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY, @@ -543,18 +524,5 @@ nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive) (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(int)); - nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); -} - -void -nni_tcp_conn_fini(nni_tcp_conn *c) -{ - nni_tcp_conn_close(c); - nni_posix_pfd_fini(c->pfd); - nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN - c->pfd = NULL; - nni_mtx_unlock(&c->mtx); - nni_mtx_fini(&c->mtx); - - NNI_FREE_STRUCT(c); + nni_posix_pfd_set_cb(c->pfd, tcp_cb, c); } |
