diff options
Diffstat (limited to 'src/platform/posix/posix_tcpconn.c')
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 64 |
1 files changed, 25 insertions, 39 deletions
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index d49fc838..b1a2233c 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -32,9 +32,9 @@ static void tcp_dowrite(nni_tcp_conn *c) { nni_aio *aio; - int fd; + int fd = nni_posix_pfd_fd(&c->pfd); - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed) { return; } @@ -101,9 +101,9 @@ static void tcp_doread(nni_tcp_conn *c) { nni_aio *aio; - int fd; + int fd = nni_posix_pfd_fd(&c->pfd); - if (c->closed || ((fd = nni_posix_pfd_fd(c->pfd)) < 0)) { + if (c->closed) { return; } @@ -174,9 +174,7 @@ tcp_error(void *arg, int err) nni_aio_list_remove(aio); nni_aio_finish_error(aio, err); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); nni_mtx_unlock(&c->mtx); } @@ -193,9 +191,7 @@ tcp_close(void *arg) nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - if (c->pfd != NULL) { - nni_posix_pfd_close(c->pfd); - } + nni_posix_pfd_close(&c->pfd); } nni_mtx_unlock(&c->mtx); } @@ -203,18 +199,10 @@ tcp_close(void *arg) static void tcp_stop(void *arg) { - nni_tcp_conn *c = arg; - nni_posix_pfd *pfd; + nni_tcp_conn *c = arg; tcp_close(c); - nni_mtx_lock(&c->mtx); - pfd = c->pfd; - c->pfd = NULL; - nni_mtx_unlock(&c->mtx); - - if (pfd != NULL) { - nni_posix_pfd_fini(pfd); - } + nni_posix_pfd_stop(&c->pfd); } // tcp_fini may block briefly waiting for the pollq thread. @@ -224,6 +212,7 @@ tcp_fini(void *arg) { nni_tcp_conn *c = arg; tcp_stop(c); + nni_posix_pfd_fini(&c->pfd); nni_mtx_fini(&c->mtx); if (c->dialer != NULL) { @@ -245,7 +234,7 @@ tcp_free(void *arg) } static void -tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) +tcp_cb(void *arg, unsigned events) { nni_tcp_conn *c = arg; @@ -253,6 +242,10 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) tcp_error(c, NNG_ECONNSHUT); return; } + if (c->dial_aio != NULL) { + nni_posix_tcp_dial_cb(c, events); + return; + } nni_mtx_lock(&c->mtx); if ((events & NNI_POLL_IN) != 0) { tcp_doread(c); @@ -268,7 +261,7 @@ tcp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) events |= NNI_POLL_IN; } if ((!c->closed) && (events != 0)) { - nni_posix_pfd_arm(pfd, events); + nni_posix_pfd_arm(&c->pfd, events); } nni_mtx_unlock(&c->mtx); } @@ -310,7 +303,7 @@ tcp_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -344,7 +337,7 @@ tcp_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); + nni_posix_pfd_arm(&c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); @@ -356,7 +349,7 @@ tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn *c = arg; struct sockaddr_storage ss; socklen_t len = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int rv; nng_sockaddr sa; @@ -375,7 +368,7 @@ tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) nni_tcp_conn *c = arg; struct sockaddr_storage ss; socklen_t len = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int rv; nng_sockaddr sa; @@ -392,7 +385,7 @@ static int tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int val = 0; socklen_t valsz = sizeof(val); @@ -407,7 +400,7 @@ static int tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; - int fd = nni_posix_pfd_fd(c->pfd); + int fd = nni_posix_pfd_fd(&c->pfd); int val = 0; socklen_t valsz = sizeof(val); @@ -455,7 +448,7 @@ tcp_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) } int -nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) +nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d, int fd) { nni_tcp_conn *c; if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { @@ -468,6 +461,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); nni_aio_list_init(&c->writeq); + nni_posix_pfd_init(&c->pfd, fd, tcp_cb, c); c->stream.s_free = tcp_free; c->stream.s_stop = tcp_stop; @@ -482,19 +476,11 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d) } void -nni_posix_tcp_init(nni_tcp_conn *c, nni_posix_pfd *pfd) -{ - c->pfd = pfd; -} - -void nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive) { // Configure the initial socket options. - (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY, + (void) setsockopt(nni_posix_pfd_fd(&c->pfd), IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(int)); - (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE, + (void) setsockopt(nni_posix_pfd_fd(&c->pfd), SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(int)); - - nni_posix_pfd_set_cb(c->pfd, tcp_cb, c); } |
