From 934c1316ae47754a2e368c65228c3cbfe552680f Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 22 Dec 2016 15:23:21 -0800 Subject: Inline locks (fewer allocs), simpler absolute times for wakeups. nn_sock_recv. --- src/core/socket.c | 115 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 85 insertions(+), 30 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 0272f454..c500db31 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -32,7 +32,7 @@ int nni_socket_create(nni_socket **sockp, uint16_t proto) { nni_socket *sock; - struct nni_protocol *ops; + nni_protocol *ops; int rv; if ((ops = nni_protocol_find(proto)) == NULL) { @@ -43,12 +43,12 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) } sock->s_ops = *ops; - if ((rv = nni_mutex_create(&sock->s_mx)) != 0) { + if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); return (rv); } - if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) { - nni_mutex_destroy(sock->s_mx); + if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) { + nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -57,8 +57,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { - nni_cond_destroy(sock->s_cv); - nni_mutex_destroy(sock->s_mx); + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -73,9 +73,10 @@ nni_socket_close(nni_socket *sock) { nni_pipe *pipe; nni_endpt *ep; + uint64_t linger; - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; @@ -90,9 +91,8 @@ nni_socket_close(nni_socket *sock) or nni_ep_shutdown(ep); #endif - break; /* REMOVE ME */ } - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); // XXX: TODO. This is a place where we should drain the write side // msgqueue, effectively getting a linger on the socket. The @@ -110,11 +110,10 @@ nni_socket_close(nni_socket *sock) // Now we should attempt to wait for the list of pipes to drop to // zero -- indicating that the protocol has shut things down // cleanly, voluntarily. (I.e. it finished its drain.) - nni_mutex_enter(sock->s_mx); + linger = nni_clock(); + nni_mutex_enter(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { - // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger); - int rv = NNG_ETIMEDOUT; - if (rv == NNG_ETIMEDOUT) { + if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { break; } } @@ -130,35 +129,50 @@ nni_socket_close(nni_socket *sock) // quickly too! If this blocks for any non-trivial amount of time // here, it indicates a protocol implementation bug. while (nni_list_first(&sock->s_pipes) != NULL) { - nni_cond_wait(sock->s_cv); + nni_cond_wait(&sock->s_cv); } // Wait to make sure endpoint listeners have shutdown and exited // as well. They should have done so *long* ago. while (nni_list_first(&sock->s_eps) != NULL) { - nni_cond_wait(sock->s_cv); + nni_cond_wait(&sock->s_cv); } + nni_mutex_exit(&sock->s_mx); + return (0); } int -nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) +nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout) { int rv; int besteffort; + nni_time expire; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } else if (tmout < 0) { + expire = NNI_TIME_NEVER; + } else { + expire = NNI_TIME_ZERO; + } // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. // a REP socket with no REQ pending. - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = sock->s_senderr) != 0) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (rv); } besteffort = sock->s_besteffort; - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); if (sock->s_ops.proto_send_filter != NULL) { msg = sock->s_ops.proto_send_filter(sock->s_data, msg); @@ -170,9 +184,9 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) if (besteffort) { // BestEffort mode -- if we cannot handle the message due to // backpressure, we just throw it away, and don't complain. - tmout = 0; + expire = NNI_TIME_ZERO; } - rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); + rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire); if (besteffort && (rv == NNG_EAGAIN)) { // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); @@ -181,6 +195,47 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) return (rv); } +int +nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) +{ + int rv; + nni_time expire; + nni_msg *msg; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } else if (tmout < 0) { + expire = NNI_TIME_NEVER; + } else { + expire = NNI_TIME_ZERO; + } + + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } + if ((rv = sock->s_recverr) != 0) { + nni_mutex_exit(&sock->s_mx); + return (rv); + } + nni_mutex_exit(&sock->s_mx); + + for (;;) { + rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); + if (rv != 0) { + return (rv); + } + msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); + if (msg != NULL) { + break; + } + // Protocol dropped the message; try again. + } + + *msgp = msg; + return (0); +} // nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t @@ -195,9 +250,9 @@ nni_socket_proto(nni_socket *sock) void nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) { - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); if (pipe->p_sock != sock) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); } // Remove the pipe from the protocol. Protocols may @@ -215,9 +270,9 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // If we're closing, wake the socket if we finished draining. if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_cond_broadcast(sock->s_cv); + nni_cond_broadcast(&sock->s_cv); } - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); } @@ -226,18 +281,18 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) { int rv; - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (rv); } nni_list_append(&sock->s_pipes, pipe); pipe->p_sock = sock; - /* XXX: Publish event */ - nni_mutex_exit(sock->s_mx); + // XXX: Publish event + nni_mutex_exit(&sock->s_mx); return (0); } -- cgit v1.2.3-70-g09d2