aboutsummaryrefslogtreecommitdiff
path: root/src/platform/posix
diff options
context:
space:
mode:
Diffstat (limited to 'src/platform/posix')
-rw-r--r--src/platform/posix/posix_ipcconn.c25
-rw-r--r--src/platform/posix/posix_ipcdial.c14
-rw-r--r--src/platform/posix/posix_ipclisten.c30
-rw-r--r--src/platform/posix/posix_sockfd.c19
-rw-r--r--src/platform/posix/posix_tcpconn.c25
-rw-r--r--src/platform/posix/posix_tcpdial.c12
-rw-r--r--src/platform/posix/posix_tcplisten.c12
7 files changed, 111 insertions, 26 deletions
diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c
index 7f979a0a..8b11b64f 100644
--- a/src/platform/posix/posix_ipcconn.c
+++ b/src/platform/posix/posix_ipcconn.c
@@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c)
}
static void
-ipc_reap(void *arg)
+ipc_stop(void *arg)
{
- ipc_conn *c = arg;
+ ipc_conn *c = arg;
+ nni_posix_pfd *pfd;
+
ipc_close(c);
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
+ nni_mtx_lock(&c->mtx);
+ pfd = c->pfd;
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
}
- nni_mtx_fini(&c->mtx);
+}
+static void
+ipc_reap(void *arg)
+{
+ ipc_conn *c = arg;
+ ipc_stop(c);
+
+ nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
nni_posix_ipc_dialer_rele(c->dialer);
}
@@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
c->dialer = d;
c->stream.s_free = ipc_free;
c->stream.s_close = ipc_close;
+ c->stream.s_stop = ipc_stop;
c->stream.s_send = ipc_send;
c->stream.s_recv = ipc_recv;
c->stream.s_get = ipc_get;
diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c
index 7799aca2..667d8262 100644
--- a/src/platform/posix/posix_ipcdial.c
+++ b/src/platform/posix/posix_ipcdial.c
@@ -41,6 +41,7 @@ ipc_dialer_close(void *arg)
c->dial_aio = NULL;
nni_aio_set_prov_data(aio, NULL);
nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
@@ -50,6 +51,13 @@ ipc_dialer_close(void *arg)
}
static void
+ipc_dialer_stop(void *arg)
+{
+ nni_ipc_dialer *d = arg;
+ ipc_dialer_close(d);
+}
+
+static void
ipc_dialer_fini(ipc_dialer *d)
{
nni_mtx_fini(&d->mtx);
@@ -61,7 +69,7 @@ ipc_dialer_free(void *arg)
{
ipc_dialer *d = arg;
- ipc_dialer_close(d);
+ ipc_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_ipc_dialer_rele(d);
}
@@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
@@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)
if (rv != 0) {
nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
@@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
@@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->closed = false;
d->sd.sd_free = ipc_dialer_free;
d->sd.sd_close = ipc_dialer_close;
+ d->sd.sd_stop = ipc_dialer_stop;
d->sd.sd_dial = ipc_dialer_dial;
d->sd.sd_get = ipc_dialer_get;
d->sd.sd_set = ipc_dialer_set;
diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c
index bd938841..a122d7bb 100644
--- a/src/platform/posix/posix_ipclisten.c
+++ b/src/platform/posix/posix_ipclisten.c
@@ -73,6 +73,23 @@ ipc_listener_close(void *arg)
}
static void
+ipc_listener_stop(void *arg)
+{
+ ipc_listener *l = arg;
+ nni_posix_pfd *pfd;
+
+ nni_mtx_lock(&l->mtx);
+ ipc_listener_doclose(l);
+ pfd = l->pfd;
+ l->pfd = NULL;
+ nni_mtx_unlock(&l->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+}
+
+static void
ipc_listener_doaccept(ipc_listener *l)
{
nni_aio *aio;
@@ -132,6 +149,7 @@ ipc_listener_doaccept(ipc_listener *l)
}
if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -406,17 +424,10 @@ ipc_listener_listen(void *arg)
static void
ipc_listener_free(void *arg)
{
- ipc_listener *l = arg;
- nni_posix_pfd *pfd;
+ ipc_listener *l = arg;
- nni_mtx_lock(&l->mtx);
- ipc_listener_doclose(l);
- pfd = l->pfd;
- nni_mtx_unlock(&l->mtx);
+ ipc_listener_stop(l);
- if (pfd != NULL) {
- nni_posix_pfd_fini(pfd);
- }
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
@@ -507,6 +518,7 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->perms = 0;
l->sl.sl_free = ipc_listener_free;
l->sl.sl_close = ipc_listener_close;
+ l->sl.sl_stop = ipc_listener_stop;
l->sl.sl_listen = ipc_listener_listen;
l->sl.sl_accept = ipc_listener_accept;
l->sl.sl_get = ipc_listener_get;
diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c
index b0d88a31..997feae6 100644
--- a/src/platform/posix/posix_sockfd.c
+++ b/src/platform/posix/posix_sockfd.c
@@ -200,16 +200,26 @@ sfd_close(void *arg)
nni_mtx_unlock(&c->mtx);
}
-// sfd_fini may block briefly waiting for the pollq thread.
-// To get that out of our context, we simply reap this.
static void
-sfd_fini(void *arg)
+sfd_stop(void *arg)
{
nni_sfd_conn *c = arg;
sfd_close(c);
+
+ // ideally this would *stop* without freeing the pfd
if (c->pfd != NULL) {
nni_posix_pfd_fini(c->pfd);
+ c->pfd = NULL;
}
+}
+
+// sfd_fini may block briefly waiting for the pollq thread.
+// To get that out of our context, we simply reap this.
+static void
+sfd_fini(void *arg)
+{
+ nni_sfd_conn *c = arg;
+ sfd_stop(c);
nni_mtx_fini(&c->mtx);
NNI_FREE_STRUCT(c);
@@ -475,6 +485,7 @@ nni_sfd_conn_alloc(nni_sfd_conn **cp, int fd)
c->stream.s_free = sfd_free;
c->stream.s_close = sfd_close;
+ c->stream.s_stop = sfd_stop;
c->stream.s_recv = sfd_recv;
c->stream.s_send = sfd_send;
c->stream.s_get = sfd_get;
@@ -490,4 +501,4 @@ void
nni_sfd_close_fd(int fd)
{
close(fd);
-} \ No newline at end of file
+}
diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c
index 74b3371b..ce5243b0 100644
--- a/src/platform/posix/posix_tcpconn.c
+++ b/src/platform/posix/posix_tcpconn.c
@@ -10,6 +10,7 @@
//
#include "core/nng_impl.h"
+#include "platform/posix/posix_pollq.h"
#include <errno.h>
#include <fcntl.h>
@@ -199,16 +200,30 @@ tcp_close(void *arg)
nni_mtx_unlock(&c->mtx);
}
+static void
+tcp_stop(void *arg)
+{
+ nni_tcp_conn *c = arg;
+ nni_posix_pfd *pfd;
+ tcp_close(c);
+
+ nni_mtx_lock(&c->mtx);
+ pfd = c->pfd;
+ c->pfd = NULL;
+ nni_mtx_unlock(&c->mtx);
+
+ if (pfd != NULL) {
+ nni_posix_pfd_fini(pfd);
+ }
+}
+
// tcp_fini may block briefly waiting for the pollq thread.
// To get that out of our context, we simply reap this.
static void
tcp_fini(void *arg)
{
nni_tcp_conn *c = arg;
- tcp_close(c);
- if (c->pfd != NULL) {
- nni_posix_pfd_fini(c->pfd);
- }
+ tcp_stop(c);
nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
@@ -221,6 +236,7 @@ static nni_reap_list tcp_reap_list = {
.rl_offset = offsetof(nni_tcp_conn, reap),
.rl_func = tcp_fini,
};
+
static void
tcp_free(void *arg)
{
@@ -454,6 +470,7 @@ nni_posix_tcp_alloc(nni_tcp_conn **cp, nni_tcp_dialer *d)
nni_aio_list_init(&c->writeq);
c->stream.s_free = tcp_free;
+ c->stream.s_stop = tcp_stop;
c->stream.s_close = tcp_close;
c->stream.s_recv = tcp_recv;
c->stream.s_send = tcp_send;
diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c
index 73ba4394..52ea6cff 100644
--- a/src/platform/posix/posix_tcpdial.c
+++ b/src/platform/posix/posix_tcpdial.c
@@ -77,9 +77,15 @@ tcp_dialer_fini(nni_tcp_dialer *d)
}
void
-nni_tcp_dialer_fini(nni_tcp_dialer *d)
+nni_tcp_dialer_stop(nni_tcp_dialer *d)
{
nni_tcp_dialer_close(d);
+}
+
+void
+nni_tcp_dialer_fini(nni_tcp_dialer *d)
+{
+ nni_tcp_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_tcp_dialer_rele(d);
}
@@ -112,6 +118,8 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
+ nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
}
@@ -263,6 +271,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
+ nng_stream_close(&c->stream);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c
index a38411c5..32f0fd60 100644
--- a/src/platform/posix/posix_tcplisten.c
+++ b/src/platform/posix/posix_tcplisten.c
@@ -150,6 +150,7 @@ tcp_listener_doaccept(nni_tcp_listener *l)
if ((rv = nni_posix_pfd_init(&pfd, newfd)) != 0) {
close(newfd);
+ nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_list_remove(aio);
nni_aio_finish_error(aio, rv);
@@ -280,18 +281,25 @@ nni_tcp_listener_listen(nni_tcp_listener *l, const nni_sockaddr *sa)
}
void
-nni_tcp_listener_fini(nni_tcp_listener *l)
+nni_tcp_listener_stop(nni_tcp_listener *l)
{
nni_posix_pfd *pfd;
nni_mtx_lock(&l->mtx);
tcp_listener_doclose(l);
- pfd = l->pfd;
+ pfd = l->pfd;
+ l->pfd = NULL;
nni_mtx_unlock(&l->mtx);
if (pfd != NULL) {
nni_posix_pfd_fini(pfd);
}
+}
+
+void
+nni_tcp_listener_fini(nni_tcp_listener *l)
+{
+ nni_tcp_listener_stop(l);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}