aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix/posix_tcpconn.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix/posix_tcpconn.c')
-rw-r--r--src/platform/posix/posix_tcpconn.c64
1 files changed, 25 insertions, 39 deletions
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index d49fc838..b1a2233c 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -32,9 +32,9 @@ static void
tcp_dowrite(nni_tcp_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = nni_posix_pfd_fd(&c->pfd);
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -101,9 +101,9 @@ static void
tcp_doread(nni_tcp_conn *c)
{
nni_aio *aio;
- int fd;
+ int fd = nni_posix_pfd_fd(&c->pfd);
- if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) {
+ if (c->closed) {
return;
}
@@ -174,9 +174,7 @@ tcp_error(void *arg, int err)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, err);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
nni_mtx_unlock(&c->mtx);
}
@@ -193,9 +191,7 @@ tcp_close(void *arg)
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (c->pfd != NULL) {
- nni_posix_pfd_close(c->pfd);
- }
+ nni_posix_pfd_close(&c->pfd);
}
nni_mtx_unlock(&c->mtx);
}
@@ -203,18 +199,10 @@ tcp_close(void *arg)
static void
tcp_stop(void *arg)
{
- nni_tcp_conn *c = arg;
- nni_posix_pfd *pfd;
+ nni_tcp_conn *c = arg;
tcp_close(c);
- nni_mtx_lock(&c->mtx);
- pfd = c->pfd;
- c->pfd = NULL;
- nni_mtx_unlock(&c->mtx);
-
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
+ nni_posix_pfd_stop(&c->pfd);
}
// tcp_fini may block briefly waiting for the pollq thread.
@@ -224,6 +212,7 @@ tcp_fini(void *arg)
{
nni_tcp_conn *c = arg;
tcp_stop(c);
+ nni_posix_pfd_fini(&c->pfd);
nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
@@ -245,7 +234,7 @@ tcp_free(void *arg)
}
static void
-tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
+tcp_cb(void *arg, unsigned events)
{
nni_tcp_conn *c = arg;
@@ -253,6 +242,10 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
tcp_error(c, NNG_ECONNSHUT);
return;
}
+ if (c->dial_aio != NULL) {
+ nni_posix_tcp_dial_cb(c, events);
+ return;
+ }
nni_mtx_lock(&c->mtx);
if ((events & NNI_POLL_IN) != 0) {
tcp_doread(c);
@@ -268,7 +261,7 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg)
events |= NNI_POLL_IN;
}
if ((!c->closed) && (events != 0)) {
- nni_posix_pfd_arm(pfd, events);
+ nni_posix_pfd_arm(&c->pfd, events);
}
nni_mtx_unlock(&c->mtx);
}
@@ -310,7 +303,7 @@ tcp_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
@@ -344,7 +337,7 @@ tcp_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
- nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
+ nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
@@ -356,7 +349,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t)
nni_tcp_conn *c = arg;
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int rv;
nng_sockaddr sa;
@@ -375,7 +368,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t)
nni_tcp_conn *c = arg;
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int rv;
nng_sockaddr sa;
@@ -392,7 +385,7 @@ static int
tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int val = 0;
socklen_t valsz = sizeof(val);
@@ -407,7 +400,7 @@ static int
tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t)
{
nni_tcp_conn *c = arg;
- int fd = nni_posix_pfd_fd(c->pfd);
+ int fd = nni_posix_pfd_fd(&c->pfd);
int val = 0;
socklen_t valsz = sizeof(val);
@@ -455,7 +448,7 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t)
}
int
-nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
+nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d, int fd)
{
nni_tcp_conn *c;
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
@@ -468,6 +461,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->readq);
nni_aio_list_init(&c->writeq);
+ nni_posix_pfd_init(&c->pfd, fd, tcp_cb, c);
c->stream.s_free = tcp_free;
c->stream.s_stop = tcp_stop;
@@ -482,19 +476,11 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
}
void
-nni_posix_tcp_init(nni_tcp_conn *c, nni_posix_pfd *pfd)
-{
- c->pfd = pfd;
-}
-
-void
nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive)
{
// Configure the initial socket options.
- (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY,
+ (void) setsockopt(nni_posix_pfd_fd(&c->pfd), IPPROTO_TCP, TCP_NODELAY,
&nodelay, sizeof(int));
- (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE,
+ (void) setsockopt(nni_posix_pfd_fd(&c->pfd), SOL_SOCKET, SO_KEEPALIVE,
&keepalive, sizeof(int));
-
- nni_posix_pfd_set_cb(c->pfd, tcp_cb, c);
}