From 3585000ca027740dbdb4599f4991cd2bf562e2f2 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 20 Oct 2017 17:03:12 -0700 Subject: fixes #112 Need to move some stuff from socket to message queues --- src/core/socket.c | 142 +++++++++++++++++++++--------------------------------- 1 file changed, 56 insertions(+), 86 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 8895f7a7..bc1f446d 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -66,12 +66,9 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use - int s_recverr; // Protocol state machine use + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock nni_event s_recv_ev; // Event for readability nni_event s_send_ev; // Event for sendability @@ -374,15 +371,19 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) } void -nni_sock_lock(nni_sock *sock) +nni_sock_send_pending(nni_sock *sock) { - nni_mtx_lock(&sock->s_mx); + if (sock->s_send_fd.sn_init) { + nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); + } } void -nni_sock_unlock(nni_sock *sock) +nni_sock_recv_pending(nni_sock *sock) { - nni_mtx_unlock(&sock->s_mx); + if (sock->s_recv_fd.sn_init) { + nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); + } } static void @@ -551,6 +552,11 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) return (rv); } + if (s->s_sock_ops.sock_filter != NULL) { + nni_msgq_set_filter( + s->s_urq, s->s_sock_ops.sock_filter, s->s_data); + } + *sp = s; return (rv); } @@ -779,13 +785,23 @@ nni_sock_closeall(void) } } +void +nni_sock_send(nni_sock *sock, nni_aio *aio) +{ + sock->s_sock_ops.sock_send(sock->s_data, aio); +} + int nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) { int rv; - int besteffort; nni_time expire; nni_time timeo = sock->s_sndtimeo; + nni_aio *aio; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { expire = NNI_TIME_ZERO; @@ -795,47 +811,24 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) expire = nni_clock(); expire += timeo; } + nni_aio_set_timeout(aio, expire); + nni_aio_set_msg(aio, msg); - // Senderr is typically set by protocols when the state machine - // indicates that it is no longer valid to send a message. E.g. - // a REP socket with no REQ pending. - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if ((rv = sock->s_senderr) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } - besteffort = sock->s_besteffort; - - if (sock->s_sock_ops.sock_sfilter != NULL) { - msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); - } - nni_mtx_unlock(&sock->s_mx); + nni_sock_send(sock, aio); + nni_aio_wait(aio); - if (msg == NULL) { - return (0); - } + rv = nni_aio_result(aio); + nni_aio_fini(aio); - if (besteffort) { - // BestEffort mode -- if we cannot handle the message due to - // backpressure, we just throw it away, and don't complain. - expire = NNI_TIME_ZERO; - } - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); - } - rv = nni_msgq_put_until(sock->s_uwq, msg, expire); - if (besteffort && (rv == NNG_ETIMEDOUT)) { - // Pretend this worked... it didn't, but pretend. - nni_msg_free(msg); - return (0); - } return (rv); } +void +nni_sock_recv(nni_sock *sock, nni_aio *aio) +{ + sock->s_sock_ops.sock_recv(sock->s_data, aio); +} + int nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) { @@ -843,6 +836,11 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) nni_msg *msg; nni_time expire; nni_time timeo = sock->s_rcvtimeo; + nni_aio *aio; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { expire = NNI_TIME_ZERO; @@ -853,39 +851,23 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) expire += timeo; } - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if ((rv = sock->s_recverr) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } - nni_mtx_unlock(&sock->s_mx); - - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); - } - for (;;) { - rv = nni_msgq_get_until(sock->s_urq, &msg, expire); + nni_aio_set_timeout(aio, expire); + nni_sock_recv(sock, aio); + nni_aio_wait(aio); + + rv = nni_aio_result(aio); if (rv != 0) { - return (rv); - } - if (sock->s_sock_ops.sock_rfilter != NULL) { - nni_mtx_lock(&sock->s_mx); - msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); - nni_mtx_unlock(&sock->s_mx); - } - if (msg != NULL) { break; } - // Protocol dropped the message; try again. - } + msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); - *msgp = msg; - return (0); + *msgp = msg; + break; + } + nni_aio_fini(aio); + return (rv); } // nni_sock_protocol returns the socket's 16-bit protocol number. @@ -949,18 +931,6 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) nni_mtx_unlock(&sock->s_mx); } -void -nni_sock_recverr(nni_sock *sock, int err) -{ - sock->s_recverr = err; -} - -void -nni_sock_senderr(nni_sock *sock, int err) -{ - sock->s_senderr = err; -} - int nni_sock_setopt(nni_sock *s, const char *name, const void *val, size_t size) { -- cgit v1.2.3-70-g09d2