aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 23:17:12 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 23:17:12 -0800
commitbca0a27e2f4978a5a74748b07613c0e30014880c (patch)
treea6beea2e7e63e02be070e4b124dd40c92917dbd6 /src/core/socket.c
parent29628309ae018c3f317eef9b03625d4ce3807a92 (diff)
downloadnng-bca0a27e2f4978a5a74748b07613c0e30014880c.tar.gz
nng-bca0a27e2f4978a5a74748b07613c0e30014880c.tar.bz2
nng-bca0a27e2f4978a5a74748b07613c0e30014880c.zip
Implemened synchronous & asynchronuos dialer, accepter. Getting close...
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c193
1 files changed, 170 insertions, 23 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index bbb9e48a..1b3bcf56 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -26,6 +26,7 @@ nni_socket_recvq(nni_socket *s)
return (s->s_urq);
}
+
// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket **sockp, uint16_t proto)
@@ -41,7 +42,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (NNG_ENOMEM);
}
sock->s_ops = *ops;
- sock->s_linger = NNG_DEFAULT_LINGER;
+ sock->s_linger = NNG_LINGER_DEFAULT;
if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
@@ -133,10 +134,10 @@ nni_socket_close(nni_socket *sock)
// need to reap them now.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
nni_list_remove(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_eps);
+ nni_mutex_exit(&sock->s_mx);
nni_endpt_destroy(ep);
- nni_mutex_enter(&sock->s_eps);
+ nni_mutex_enter(&sock->s_mx);
}
nni_mutex_exit(&sock->s_mx);
@@ -262,6 +263,8 @@ nni_socket_proto(nni_socket *sock)
void
nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
{
+ nni_endpt *ep;
+
nni_mutex_enter(&sock->s_mx);
if (pipe->p_sock != sock) {
nni_mutex_exit(&sock->s_mx);
@@ -274,12 +277,18 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
// Now remove it from our own list.
nni_list_remove(&sock->s_pipes, pipe);
pipe->p_sock = NULL;
- // XXX: TODO: Redial
- // XXX: also publish event...
- // if (pipe->p_ep != NULL) {
- // nn_endpt_rem_pipe(pipe->p_ep, pipe)
- // }
+ // If we were a connected (dialer) pipe, then let the endpoint
+ // know so it can try to reestablish the connection.
+ if ((ep = pipe->p_ep) != NULL) {
+ ep->ep_pipe = NULL;
+ pipe->p_ep = NULL;
+ nni_mutex_enter(&ep->ep_mx);
+ nni_cond_signal(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ }
+
+ // XXX: also publish event...
nni_pipe_destroy(pipe);
// If we're closing, wake the socket if we finished draining.
@@ -291,7 +300,7 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
+nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
{
int rv;
@@ -306,11 +315,6 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
}
nni_list_append(&sock->s_pipes, pipe);
- // Add the pipe to its endpoint list.
- nni_mutex_enter(&pipe->p_ep->ep_mx);
- nni_list_append(&pipe->p_ep->ep_pipes, pipe);
- nni_mutex_exit(&pipe->p_ep->ep_mx);
-
pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
@@ -318,6 +322,39 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
}
+// nni_socket_dial_one just does a single dial call, so it can be used
+// for synchronous dialing.
+static int
+nni_socket_dial_one(nni_endpt *ep)
+{
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ pipe = NULL;
+
+ if ((rv = nni_endpt_dial(ep, &pipe)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ // Set up the linkage so that when the pipe closes
+ // we can notify the dialer to redial.
+ pipe->p_ep = ep;
+ ep->ep_pipe = pipe;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ return (0);
+}
+
+
+// nni_socket_dialer is the thread worker that dials in the background.
static void
nni_socket_dialer(void *arg)
{
@@ -325,13 +362,116 @@ nni_socket_dialer(void *arg)
nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
+ nni_time cooldown;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
for (;;) {
nni_mutex_enter(&ep->ep_mx);
- while ((!ep->ep_close) &&
- (nni_list_first(&ep->ep_pipes) != NULL)) {
+ while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ nni_mutex_exit(&ep->ep_mx);
+
+ rv = nni_socket_dial_one(ep);
+ switch (rv) {
+ case 0:
+ // good connection
+ continue;
+ case NNG_ENOMEM:
+ cooldown = 1000000;
+ break;
+ default:
+ // XXX: THIS NEEDS TO BE A PROPER BACKOFF.
+ cooldown = 100000;
+ break;
+ }
+ // we inject a delay so we don't just spin hard on
+ // errors like connection refused. For NNG_ENOMEM, we
+ // wait even longer, since the system needs time to
+ // release resources.
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ }
+}
+
+
+int
+nni_socket_dial(nni_socket *sock, nni_endpt *ep, int sync)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&sock->s_mx);
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_sock != sock) { // Should never happen
+ rv = NNG_EINVAL;
+ goto out;
+ }
+ if (sock->s_closing || ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = sync ? 0 : 1;
+ if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (sync)) {
+ nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(&sock->s_mx);
+ rv = nni_socket_dial_one(ep);
+ nni_mutex_enter(&sock->s_mx);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_dialer;
+ ep->ep_dialer = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(&sock->s_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}
+
+
+static void
+nni_socket_accepter(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ for (;;) {
+ nni_mutex_enter(&ep->ep_mx);
if (ep->ep_close) {
nni_mutex_exit(&ep->ep_mx);
break;
@@ -340,24 +480,31 @@ nni_socket_dialer(void *arg)
pipe = NULL;
- if (((rv = nni_endpt_dial(ep, &pipe)) != 0) ||
- ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) {
+ if (((rv = nni_endpt_accept(ep, &pipe)) != 0) ||
+ ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) {
if (rv == NNG_ECLOSED) {
break;
}
if (pipe != NULL) {
nni_pipe_destroy(pipe);
}
- // XXX: Publish connection error event...
- // XXX: Inject a wait for reconnect...
- continue;
+ // XXX: Publish accept error event...
+
+ // If we can't allocate memory, don't spin, so that
+ // things get a chance to release memory later.
+ // Other errors, like ECONNRESET, should not recur.
+ // (If we find otherwise we can inject a short sleep
+ // here of about 1 ms without too much penalty.)
+ if (rv == NNG_ENOMEM) {
+ nni_usleep(100000);
+ }
}
}
}
int
-nni_socket_dial(nni_socket *sock, nni_endpt *ep)
+nni_socket_accept(nni_socket *sock, nni_endpt *ep)
{
int rv = 0;
@@ -367,7 +514,7 @@ nni_socket_dial(nni_socket *sock, nni_endpt *ep)
rv = NNG_EBUSY;
goto out;
}
- if (ep->ep_sock != sock) { // Should never happen
+ if (ep->ep_sock != sock) { // Should never happen
rv = NNG_EINVAL;
goto out;
}