diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/platform/posix/posix_epdesc.c | 6 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_epoll.c | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_pollq_kqueue.c | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 8 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 7 | ||||
| -rw-r--r-- | src/supplemental/websocket/websocket.c | 4 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 4 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 6 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 6 | ||||
| -rw-r--r-- | src/transport/ws/websocket.c | 6 |
10 files changed, 45 insertions, 11 deletions
diff --git a/src/platform/posix/posix_epdesc.c b/src/platform/posix/posix_epdesc.c index dfb750d4..69026051 100644 --- a/src/platform/posix/posix_epdesc.c +++ b/src/platform/posix/posix_epdesc.c @@ -469,11 +469,7 @@ nni_epdesc_connect_cb(nni_posix_pfd *pfd, int events, void *arg) nni_mtx_lock(&ed->mtx); if ((ed->closed) || ((aio = nni_list_first(&ed->connectq)) == NULL) || (pfd != ed->pfd)) { - // Spurious completion. Ignore it, but discard the PFD. - if (ed->pfd == pfd) { - ed->pfd = NULL; - } - nni_posix_pfd_fini(pfd); + // Spurious completion. Just ignore it. nni_mtx_unlock(&ed->mtx); return; } diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index a8d8693a..9c1ae682 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -93,6 +93,10 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + + nni_mtx_lock(&pfd->mtx); pfd->pq = pq; pfd->fd = fd; pfd->cb = NULL; @@ -101,9 +105,8 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) pfd->closing = false; pfd->closed = false; - nni_mtx_init(&pfd->mtx); - nni_cv_init(&pfd->cv, &pq->mtx); NNI_LIST_NODE_INIT(&pfd->node); + nni_mtx_unlock(&pfd->mtx); // notifications disabled to begin with ev.events = 0; diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index 36ced3ff..c118d528 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -276,9 +276,9 @@ nni_posix_pollq_destroy(nni_posix_pollq *pq) { if (pq->kq >= 0) { close(pq->kq); - pq->kq = -1; } nni_thr_fini(&pq->thr); + pq->kq = -1; nni_posix_pollq_reap(pq); diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 24fc7335..33c9efcd 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -309,6 +309,12 @@ static void rep0_pipe_fini(void *arg) { rep0_pipe *p = arg; + nng_msg * msg; + + if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_free(msg); + } nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); @@ -538,6 +544,7 @@ rep0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_pipe_stop(p->pipe); return; } @@ -593,6 +600,7 @@ rep0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_pipe_recv(p->pipe, p->aio_recv); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 5f833ca6..569fd4ea 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -302,7 +302,12 @@ static void resp0_pipe_fini(void *arg) { resp0_pipe *p = arg; + nng_msg * msg; + if ((msg = nni_aio_get_msg(p->aio_recv)) != NULL) { + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_free(msg); + } nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); NNI_FREE_STRUCT(p); @@ -527,6 +532,7 @@ resp0_pipe_recv_cb(void *arg) if (nni_msg_len(msg) < 4) { // Peer is speaking garbage, kick it. nni_msg_free(msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_pipe_stop(p->npipe); return; } @@ -576,6 +582,7 @@ resp0_pipe_recv_cb(void *arg) drop: nni_msg_free(msg); + nni_aio_set_msg(p->aio_recv, NULL); nni_pipe_recv(p->npipe, p->aio_recv); } diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 18491190..55befdb2 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -1968,7 +1968,9 @@ nni_ws_dialer_close(nni_ws_dialer *d) d->closed = true; while ((ws = nni_list_first(&d->wspend)) != 0) { nni_list_remove(&d->wspend, ws); - nni_ws_close(ws); + nni_mtx_unlock(&d->mtx); + nni_ws_fini(ws); + nni_mtx_lock(&d->mtx); } nni_mtx_unlock(&d->mtx); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 016e47ec..88e11fb1 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -803,7 +803,9 @@ nni_ipc_ep_setopt_recvmaxsz(void *arg, const void *data, size_t sz, int typ) rv = nni_copyin_size(&val, data, sz, 0, NNI_MAXSZ, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } @@ -833,7 +835,9 @@ nni_ipc_ep_setopt_permissions(void *arg, const void *data, size_t sz, int typ) // meaningful chmod beyond the lower 9 bits. rv = nni_copyin_int(&val, data, sz, 0, 0x7FFFFFFF, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); rv = nni_plat_ipc_ep_set_permissions(ep->iep, val); + nni_mtx_unlock(&ep->mtx); } return (rv); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 419774e1..be0dd2b5 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -836,7 +836,9 @@ nni_tcp_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz, int typ) int rv; rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } @@ -849,7 +851,9 @@ nni_tcp_ep_setopt_nodelay(void *arg, const void *v, size_t sz, int typ) int rv; rv = nni_copyin_bool(&val, v, sz, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->nodelay = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } @@ -869,7 +873,9 @@ nni_tcp_ep_setopt_keepalive(void *arg, const void *v, size_t sz, int typ) int rv; rv = nni_copyin_bool(&val, v, sz, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->keepalive = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 385dd206..d1d21f6d 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -850,7 +850,9 @@ nni_tls_ep_setopt_nodelay(void *arg, const void *v, size_t sz, int typ) int rv; rv = nni_copyin_bool(&val, v, sz, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->nodelay = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } @@ -870,7 +872,9 @@ nni_tls_ep_setopt_keepalive(void *arg, const void *v, size_t sz, int typ) int rv; rv = nni_copyin_bool(&val, v, sz, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->keepalive = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } @@ -907,7 +911,9 @@ nni_tls_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz, int typ) rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); } return (rv); } diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c index 97175d49..62f43fd0 100644 --- a/src/transport/ws/websocket.c +++ b/src/transport/ws/websocket.c @@ -375,11 +375,13 @@ ws_ep_setopt_recvmaxsz(void *arg, const void *v, size_t sz, int typ) rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, typ); if ((rv == 0) && (ep != NULL)) { + nni_mtx_lock(&ep->mtx); ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); if (ep->mode == NNI_EP_MODE_DIAL) { - nni_ws_dialer_set_maxframe(ep->dialer, ep->rcvmax); + nni_ws_dialer_set_maxframe(ep->dialer, val); } else { - nni_ws_listener_set_maxframe(ep->listener, ep->rcvmax); + nni_ws_listener_set_maxframe(ep->listener, val); } } return (rv); |
