diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 130 |
1 files changed, 92 insertions, 38 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 1331bb4c..70029278 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -73,7 +73,7 @@ nni_socket_close(nni_socket *sock) { nni_pipe *pipe; nni_endpt *ep; - uint64_t linger; + nni_time linger; nni_mutex_enter(&sock->s_mx); @@ -87,23 +87,20 @@ nni_socket_close(nni_socket *sock) } nni_mutex_exit(&sock->s_mx); - // XXX: TODO. This is a place where we should drain the write side - // msgqueue, effectively getting a linger on the socket. The - // protocols will drain this queue, and should continue to run - // handling both transmit and receive until that's done. - // Note that *all* protocols need to monitor for this linger, even - // those that do not transmit. This way they can notice and go ahead - // and quickly shutdown their pipes; this keeps us from having to wait - // for *our* pipelist to become empty. This helps us ensure that - // they effectively get the chance to linger on their side, without - // requring us to do anything magical for them. - - // nni_msgqueue_drain(sock->s_uwq, sock->s_linger); - - // Now we should attempt to wait for the list of pipes to drop to - // zero -- indicating that the protocol has shut things down - // cleanly, voluntarily. (I.e. it finished its drain.) + // XXX: TODO: add socket linger timeout to this, from socket option. linger = nni_clock(); + + // We drain the upper write queue. This is just like closing it, + // except that the protocol gets a chance to get the messages and + // push them down to the transport. This operation can *block* + // until the linger time has expired. + nni_msgqueue_drain(sock->s_uwq, linger); + + // Generally, unless the protocol is blocked trying to perform + // writes (e.g. a slow reader on the other side), it should be + // trying to shut things down -- the normal flow is for it to + // close pipes and call nni_sock_rem_pipe(). We wait to give it + // a chance to do so gracefully. nni_mutex_enter(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { @@ -111,36 +108,44 @@ nni_socket_close(nni_socket *sock) } } - // Time's up! Shut it down the hard way. + // At this point, we've done everything we politely can to give + // the protocol a chance to flush its write side. Now its time + // to be a little more insistent. + + // Close the upper read queue immediately. This can happen + // safely while we hold the lock. nni_msgqueue_close(sock->s_urq); - nni_msgqueue_close(sock->s_uwq); - // This gives the protocol notice, in case it didn't get the hint. - // XXX: In retrospect, this entry point seems redundant. - sock->s_ops.proto_shutdown(sock->s_data); - - // The protocol *MUST* give us back our pipes at this point, and - // quickly too! If this blocks for any non-trivial amount of time - // here, it indicates a protocol implementation bug. + + // Go through and close all the pipes. + nni_mutex_enter(&sock->s_mx); + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + + // At this point, the protocols should have all their operations + // failing, if they have any remaining, and they should be returning + // any pipes back to us very quickly. We'll wait for them to finish, + // as it MUST occur shortly. while (nni_list_first(&sock->s_pipes) != NULL) { nni_cond_wait(&sock->s_cv); } - // Wait to make sure endpoint listeners have shutdown and exited - // as well. They should have done so *long* ago. Because this - // requires waiting for threads to finish, which *could* in theory - // overlap with this, we must drop the socket lock. + // We signaled the endpoints to shutdown and cleanup. We just + // need to wait for them to finish. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_cond_wait(&sock->s_cv); + } + nni_mutex_exit(&sock->s_mx); - // TODO: This looks like it should happen as an endpt_remove - // operation? - nni_list_remove(&sock->s_eps, ep); - nni_mutex_exit(&sock->s_mx); + // At this point nothing else should be referencing us. - nni_endpt_destroy(ep); - nni_mutex_enter(&sock->s_mx); - } + // The protocol needs to clean up its state. + sock->s_ops.proto_destroy(&sock->s_data); - nni_mutex_exit(&sock->s_mx); + // And we need to clean up *our* state. + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); + nni_free(sock, sizeof (*sock)); return (0); } @@ -197,6 +202,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout) return (rv); } + int nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) { @@ -239,6 +245,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) return (0); } + // nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t nni_socket_proto(nni_socket *sock) @@ -246,6 +253,7 @@ nni_socket_proto(nni_socket *sock) return (sock->s_ops.proto_self); } + // nni_socket_rem_pipe removes the pipe from the socket. This is often // called by the protocol when a pipe is removed due to close. void @@ -294,8 +302,54 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) return (rv); } nni_list_append(&sock->s_pipes, pipe); + + // Add the pipe to its endpoint list. + nni_mutex_enter(&pipe->p_ep->ep_mx); + nni_list_append(&pipe->p_ep->ep_pipes, pipe); + nni_mutex_exit(&pipe->p_ep->ep_mx); + pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); return (0); } + + +void +nni_sock_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && + (nni_list_first(&ep->ep_pipes) != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + break; + } + nni_mutex_exit(&ep->ep_mx); + + pipe = NULL; + + if (((rv = nni_endpt_dial(ep, pipe)) != 0) || + ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) { + if (rv == NNG_ECLOSED) { + return; + } + if (pipe != NULL) { + nni_pipe_destroy(pipe); + } + // XXX: Inject a wait for reconnect... + continue; + } + + } + + // XXX: move the endpoint to the sockets reap list +} |
