aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/platform/posix/posix_epdesc.c6
-rw-r--r--src/platform/posix/posix_pollq_epoll.c7
-rw-r--r--src/platform/posix/posix_pollq_kqueue.c2
-rw-r--r--src/protocol/reqrep0/rep.c8
-rw-r--r--src/protocol/survey0/respond.c7
-rw-r--r--src/supplemental/websocket/websocket.c4
-rw-r--r--src/transport/ipc/ipc.c4
-rw-r--r--src/transport/tcp/tcp.c6
-rw-r--r--src/transport/tls/tls.c6
-rw-r--r--src/transport/ws/websocket.c6
10 files changed, 45 insertions, 11 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c
index dfb750d4..69026051 100644
--- a/src/platform/posix/posix_epdesc.c
+++ b/src/platform/posix/posix_epdesc.c
@@ -469,11 +469,7 @@ nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg)
nni_mtx_lock(&ed->mtx);
if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) ||
(pfd != ed->pfd)) {
- // Spurious completion. Ignore it, but discard the PFD.
- if (ed->pfd == pfd) {
- ed->pfd = NULL;
- }
- nni_posix_pfd_fini(pfd);
+ // Spurious completion. Just ignore it.
nni_mtx_unlock(&ed->mtx);
return;
}
diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c
index a8d8693a..9c1ae682 100644
--- a/src/platform/posix/posix_pollq_epoll.c
+++ b/src/platform/posix/posix_pollq_epoll.c
@@ -93,6 +93,10 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&pfd->mtx);
+ nni_cv_init(&pfd->cv, &pq->mtx);
+
+ nni_mtx_lock(&pfd->mtx);
pfd->pq = pq;
pfd->fd = fd;
pfd->cb = NULL;
@@ -101,9 +105,8 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->closing = false;
pfd->closed = false;
- nni_mtx_init(&pfd->mtx);
- nni_cv_init(&pfd->cv, &pq->mtx);
NNI_LIST_NODE_INIT(&pfd->node);
+ nni_mtx_unlock(&pfd->mtx);
// notifications disabled to begin with
ev.events = 0;
diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c
index 36ced3ff..c118d528 100644
--- a/src/platform/posix/posix_pollq_kqueue.c
+++ b/src/platform/posix/posix_pollq_kqueue.c
@@ -276,9 +276,9 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
if (pq->kq >= 0) {
close(pq->kq);
- pq->kq = -1;
}
nni_thr_fini(&pq->thr);
+ pq->kq = -1;
nni_posix_pollq_reap(pq);
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 24fc7335..33c9efcd 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -309,6 +309,12 @@ static void
rep0_pipe_fini(void *arg)
{
rep0_pipe *p = arg;
+ nng_msg * msg;
+
+ if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) {
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_free(msg);
+ }
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
@@ -538,6 +544,7 @@ rep0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_stop(p->pipe);
return;
}
@@ -593,6 +600,7 @@ rep0_pipe_recv_cb(void *arg)
drop:
nni_msg_free(msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_recv(p->pipe, p->aio_recv);
}
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index 5f833ca6..569fd4ea 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -302,7 +302,12 @@ static void
resp0_pipe_fini(void *arg)
{
resp0_pipe *p = arg;
+ nng_msg * msg;
+ if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) {
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msg_free(msg);
+ }
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
NNI_FREE_STRUCT(p);
@@ -527,6 +532,7 @@ resp0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage, kick it.
nni_msg_free(msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_stop(p->npipe);
return;
}
@@ -576,6 +582,7 @@ resp0_pipe_recv_cb(void *arg)
drop:
nni_msg_free(msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_pipe_recv(p->npipe, p->aio_recv);
}
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 18491190..55befdb2 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -1968,7 +1968,9 @@ nni_ws_dialer_close(nni_ws_dialer *d)
d->closed = true;
while ((ws = nni_list_first(&d->wspend)) != 0) {
nni_list_remove(&d->wspend, ws);
- nni_ws_close(ws);
+ nni_mtx_unlock(&d->mtx);
+ nni_ws_fini(ws);
+ nni_mtx_lock(&d->mtx);
}
nni_mtx_unlock(&d->mtx);
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 016e47ec..88e11fb1 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -803,7 +803,9 @@ nni_ipc_ep_setopt_recvmaxsz(void *arg, const void *data, size_t sz, int typ)
rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
@@ -833,7 +835,9 @@ nni_ipc_ep_setopt_permissions(void *arg, const void *data, size_t sz, int typ)
// meaningful chmod beyond the lower 9 bits.
rv = nni_copyin_int(&val, data, sz, 0, 0x7FFFFFFF, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
rv = nni_plat_ipc_ep_set_permissions(ep->iep, val);
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index 419774e1..be0dd2b5 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -836,7 +836,9 @@ nni_tcp_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz, int typ)
int rv;
rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
@@ -849,7 +851,9 @@ nni_tcp_ep_setopt_nodelay(void *arg, const void *v, size_t sz, int typ)
int rv;
rv = nni_copyin_bool(&val, v, sz, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->nodelay = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
@@ -869,7 +873,9 @@ nni_tcp_ep_setopt_keepalive(void *arg, const void *v, size_t sz, int typ)
int rv;
rv = nni_copyin_bool(&val, v, sz, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->keepalive = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index 385dd206..d1d21f6d 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -850,7 +850,9 @@ nni_tls_ep_setopt_nodelay(void *arg, const void *v, size_t sz, int typ)
int rv;
rv = nni_copyin_bool(&val, v, sz, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->nodelay = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
@@ -870,7 +872,9 @@ nni_tls_ep_setopt_keepalive(void *arg, const void *v, size_t sz, int typ)
int rv;
rv = nni_copyin_bool(&val, v, sz, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->keepalive = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
@@ -907,7 +911,9 @@ nni_tls_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz, int typ)
rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ nni_mtx_unlock(&ep->mtx);
}
return (rv);
}
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 97175d49..62f43fd0 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -375,11 +375,13 @@ ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz, int typ)
rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, typ);
if ((rv == 0) && (ep != NULL)) {
+ nni_mtx_lock(&ep->mtx);
ep->rcvmax = val;
+ nni_mtx_unlock(&ep->mtx);
if (ep->mode == NNI_EP_MODE_DIAL) {
- nni_ws_dialer_set_maxframe(ep->dialer, ep->rcvmax);
+ nni_ws_dialer_set_maxframe(ep->dialer, val);
} else {
- nni_ws_listener_set_maxframe(ep->listener, ep->rcvmax);
+ nni_ws_listener_set_maxframe(ep->listener, val);
}
}
return (rv);