aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c130
1 files changed, 92 insertions, 38 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 1331bb4c..70029278 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -73,7 +73,7 @@ nni_socket_close(nni_socket *sock)
{
nni_pipe *pipe;
nni_endpt *ep;
- uint64_t linger;
+ nni_time linger;
nni_mutex_enter(&sock->s_mx);
@@ -87,23 +87,20 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
- // XXX: TODO. This is a place where we should drain the write side
- // msgqueue, effectively getting a linger on the socket. The
- // protocols will drain this queue, and should continue to run
- // handling both transmit and receive until that's done.
- // Note that *all* protocols need to monitor for this linger, even
- // those that do not transmit. This way they can notice and go ahead
- // and quickly shutdown their pipes; this keeps us from having to wait
- // for *our* pipelist to become empty. This helps us ensure that
- // they effectively get the chance to linger on their side, without
- // requring us to do anything magical for them.
-
- // nni_msgqueue_drain(sock->s_uwq, sock->s_linger);
-
- // Now we should attempt to wait for the list of pipes to drop to
- // zero -- indicating that the protocol has shut things down
- // cleanly, voluntarily. (I.e. it finished its drain.)
+ // XXX: TODO: add socket linger timeout to this, from socket option.
linger = nni_clock();
+
+ // We drain the upper write queue. This is just like closing it,
+ // except that the protocol gets a chance to get the messages and
+ // push them down to the transport. This operation can *block*
+ // until the linger time has expired.
+ nni_msgqueue_drain(sock->s_uwq, linger);
+
+ // Generally, unless the protocol is blocked trying to perform
+ // writes (e.g. a slow reader on the other side), it should be
+ // trying to shut things down -- the normal flow is for it to
+ // close pipes and call nni_sock_rem_pipe(). We wait to give it
+ // a chance to do so gracefully.
nni_mutex_enter(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
@@ -111,36 +108,44 @@ nni_socket_close(nni_socket *sock)
}
}
- // Time's up! Shut it down the hard way.
+ // At this point, we've done everything we politely can to give
+ // the protocol a chance to flush its write side. Now its time
+ // to be a little more insistent.
+
+ // Close the upper read queue immediately. This can happen
+ // safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- nni_msgqueue_close(sock->s_uwq);
- // This gives the protocol notice, in case it didn't get the hint.
- // XXX: In retrospect, this entry point seems redundant.
- sock->s_ops.proto_shutdown(sock->s_data);
-
- // The protocol *MUST* give us back our pipes at this point, and
- // quickly too! If this blocks for any non-trivial amount of time
- // here, it indicates a protocol implementation bug.
+
+ // Go through and close all the pipes.
+ nni_mutex_enter(&sock->s_mx);
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
+ // At this point, the protocols should have all their operations
+ // failing, if they have any remaining, and they should be returning
+ // any pipes back to us very quickly. We'll wait for them to finish,
+ // as it MUST occur shortly.
while (nni_list_first(&sock->s_pipes) != NULL) {
nni_cond_wait(&sock->s_cv);
}
- // Wait to make sure endpoint listeners have shutdown and exited
- // as well. They should have done so *long* ago. Because this
- // requires waiting for threads to finish, which *could* in theory
- // overlap with this, we must drop the socket lock.
+ // We signaled the endpoints to shutdown and cleanup. We just
+ // need to wait for them to finish.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_cond_wait(&sock->s_cv);
+ }
+ nni_mutex_exit(&sock->s_mx);
- // TODO: This looks like it should happen as an endpt_remove
- // operation?
- nni_list_remove(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_mx);
+ // At this point nothing else should be referencing us.
- nni_endpt_destroy(ep);
- nni_mutex_enter(&sock->s_mx);
- }
+ // The protocol needs to clean up its state.
+ sock->s_ops.proto_destroy(&sock->s_data);
- nni_mutex_exit(&sock->s_mx);
+ // And we need to clean up *our* state.
+ nni_cond_fini(&sock->s_cv);
+ nni_mutex_fini(&sock->s_mx);
+ nni_free(sock, sizeof (*sock));
return (0);
}
@@ -197,6 +202,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
return (rv);
}
+
int
nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
{
@@ -239,6 +245,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
return (0);
}
+
// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
nni_socket_proto(nni_socket *sock)
@@ -246,6 +253,7 @@ nni_socket_proto(nni_socket *sock)
return (sock->s_ops.proto_self);
}
+
// nni_socket_rem_pipe removes the pipe from the socket. This is often
// called by the protocol when a pipe is removed due to close.
void
@@ -294,8 +302,54 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+
+ // Add the pipe to its endpoint list.
+ nni_mutex_enter(&pipe->p_ep->ep_mx);
+ nni_list_append(&pipe->p_ep->ep_pipes, pipe);
+ nni_mutex_exit(&pipe->p_ep->ep_mx);
+
pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
}
+
+
+void
+nni_sock_dialer(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ for (;;) {
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_close) &&
+ (nni_list_first(&ep->ep_pipes) != NULL)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_dial(ep, pipe)) != 0) ||
+ ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) {
+ if (rv == NNG_ECLOSED) {
+ return;
+ }
+ if (pipe != NULL) {
+ nni_pipe_destroy(pipe);
+ }
+ // XXX: Inject a wait for reconnect...
+ continue;
+ }
+
+ }
+
+ // XXX: move the endpoint to the sockets reap list
+}