aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c166
-rw-r--r--src/core/init.c1
-rw-r--r--src/core/pipe.c52
-rw-r--r--src/core/pipe.h19
-rw-r--r--src/core/socket.c28
-rw-r--r--src/core/socket.h4
6 files changed, 148 insertions, 122 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index b1ee6b82..da0e3318 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -20,6 +20,7 @@ static void nni_ep_accept_start(nni_ep *);
static void nni_ep_accept_done(void *);
static void nni_ep_connect_start(nni_ep *);
static void nni_ep_connect_done(void *);
+static void nni_ep_backoff_start(nni_ep *);
static void nni_ep_backoff_done(void *);
static nni_idhash *nni_eps;
@@ -211,6 +212,32 @@ nni_ep_stop(nni_ep *ep)
}
static void
+nni_ep_backoff_start(nni_ep *ep)
+{
+ nni_duration backoff;
+
+ if (ep->ep_closed) {
+ return;
+ }
+ backoff = ep->ep_currtime;
+ ep->ep_currtime *= 2;
+ if (ep->ep_currtime > ep->ep_maxrtime) {
+ ep->ep_currtime = ep->ep_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is pretty
+ // similar to 802 style backoff, except that we have a nearly
+ // uniform time period instead of discrete slot times. This
+ // algorithm may lead to slight biases because we don't have
+ // a statistically perfect distribution with the modulo of the
+ // random number, but this really doesn't matter.
+
+ ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff);
+ nni_aio_start(&ep->ep_backoff, NULL, ep);
+}
+
+static void
nni_ep_backoff_done(void *arg)
{
nni_ep * ep = arg;
@@ -230,34 +257,17 @@ nni_ep_backoff_done(void *arg)
static void
nni_ep_connect_done(void *arg)
{
- nni_ep * ep = arg;
- nni_aio * aio = &ep->ep_con_aio;
- void * tpipe;
- nni_pipe * pipe;
- const nni_tran_pipe *ops;
- int rv;
- nni_time cooldown;
-
- ops = ep->ep_tran->tran_pipe;
+ nni_ep * ep = arg;
+ nni_aio *aio = &ep->ep_con_aio;
+ int rv;
+ nni_time cooldown;
- nni_mtx_lock(&ep->ep_mtx);
if ((rv = nni_aio_result(aio)) == 0) {
-
- tpipe = aio->a_pipe;
- NNI_ASSERT(tpipe != NULL);
-
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- ops->p_fini(tpipe);
- }
+ rv = nni_pipe_create(ep, aio->a_pipe);
}
-
-done:
+ nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
case 0:
- pipe->p_tran_ops = *ops;
- pipe->p_tran_data = tpipe;
-
// Good connect, so reset the backoff timer.
// XXX: This is kind of bad if a remote host just drops
// the connection without completing our negotiation.
@@ -267,7 +277,6 @@ done:
// some meaningful amount of time. Alternatively we
// can dial into the pipe start logic...
ep->ep_currtime = ep->ep_inirtime;
- nni_pipe_start(pipe);
// No further outgoing connects -- we will restart a
// connection from the pipe when the pipe is removed.
@@ -278,14 +287,7 @@ done:
break;
default:
// Other errors involve the use of the backoff timer.
- // XXX: randomize this slightly to prevent reconnect
- // storms.
- ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
- nni_aio_start(&ep->ep_backoff, NULL, ep);
+ nni_ep_backoff_start(ep);
break;
}
nni_mtx_unlock(&ep->ep_mtx);
@@ -308,115 +310,61 @@ nni_ep_connect_start(nni_ep *ep)
int
nni_ep_dial(nni_ep *ep, int flags)
{
- int rv = 0;
- nni_aio * aio;
- void * tpipe;
- nni_pipe * pipe;
- const nni_tran_pipe *ops;
+ int rv = 0;
+ nni_aio *aio;
nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
ep->ep_currtime = ep->ep_inirtime;
nni_mtx_lock(&ep->ep_mtx);
- ops = ep->ep_tran->tran_pipe;
if (ep->ep_mode != NNI_EP_MODE_DIAL) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
}
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_EBUSY);
- }
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
- ep->ep_started = 1;
-
if ((flags & NNG_FLAG_SYNCH) == 0) {
nni_ep_connect_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
return (0);
}
- // This one is kind of special, since we need
- // to block for the connection to complete. Ick.
+ // Synchronous mode: so we have to wait for it to complete.
aio = &ep->ep_con_syn;
aio->a_endpt = ep->ep_data;
ep->ep_ops.ep_connect(ep->ep_data, aio);
-
- // We're about to drop the lock, but we cannot allow the
- // endpoint to be removed. Put a hold on it.
- ep->ep_refcnt++;
nni_mtx_unlock(&ep->ep_mtx);
nni_aio_wait(aio);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_refcnt--;
-
- if (ep->ep_closed) {
- rv = NNG_ECLOSED;
- } else {
- rv = nni_aio_result(aio);
- }
-
- if (rv != 0) {
- ep->ep_started = 0;
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
- tpipe = aio->a_pipe;
- NNI_ASSERT(tpipe != NULL);
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- ops->p_fini(tpipe);
- } else {
- pipe->p_tran_ops = *ops;
- pipe->p_tran_data = tpipe;
- nni_pipe_start(pipe);
+ // As we're synchronous, we also have to handle the completion.
+ if ((rv = nni_aio_result(aio)) == 0) {
+ NNI_ASSERT(aio->a_pipe != NULL);
+ rv = nni_pipe_create(ep, aio->a_pipe);
}
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
static void
nni_ep_accept_done(void *arg)
{
- nni_ep * ep = arg;
- nni_aio * aio = &ep->ep_acc_aio;
- void * tpipe;
- nni_pipe * pipe;
- int rv;
- const nni_tran_pipe *ops;
-
- ops = ep->ep_tran->tran_pipe;
-
- nni_mtx_lock(&ep->ep_mtx);
- if ((rv = nni_aio_result(aio)) != 0) {
- goto done;
- }
- tpipe = aio->a_pipe;
- NNI_ASSERT(tpipe != NULL);
+ nni_ep * ep = arg;
+ nni_aio *aio = &ep->ep_acc_aio;
+ int rv;
- rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- ops->p_fini(tpipe);
- goto done;
+ if ((rv = nni_aio_result(aio)) == 0) {
+ NNI_ASSERT(aio->a_pipe != NULL);
+ rv = nni_pipe_create(ep, aio->a_pipe);
}
-done:
+ nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
case 0:
- pipe->p_tran_ops = *ops;
- pipe->p_tran_data = tpipe;
- nni_pipe_start(pipe);
nni_ep_accept_start(ep);
break;
case NNG_ECLOSED:
@@ -426,7 +374,6 @@ done:
case NNG_ECONNABORTED:
case NNG_ECONNRESET:
// These are remote conditions, no cool down.
- // cooldown = 0;
nni_ep_accept_start(ep);
break;
default:
@@ -434,15 +381,9 @@ done:
// This is because errors here are probably due to system
// failures (resource exhaustion) and we hope by not
// thrashing we give the system a chance to recover.
- ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime;
- ep->ep_currtime *= 2;
- if (ep->ep_currtime > ep->ep_maxrtime) {
- ep->ep_currtime = ep->ep_maxrtime;
- }
- nni_aio_start(&ep->ep_backoff, NULL, ep);
+ nni_ep_backoff_start(ep);
break;
}
-
nni_mtx_unlock(&ep->ep_mtx);
}
@@ -465,25 +406,20 @@ nni_ep_listen(nni_ep *ep, int flags)
{
int rv = 0;
+ nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime);
+
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_mode != NNI_EP_MODE_LISTEN) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
}
- if (ep->ep_started) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_EBUSY);
- }
if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
- ep->ep_started = 1;
-
rv = ep->ep_ops.ep_bind(ep->ep_data);
if (rv != 0) {
- ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
diff --git a/src/core/init.c b/src/core/init.c
index 88c80f3f..0c294506 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -1,5 +1,6 @@
//
// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 664ababa..2fd19464 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -170,6 +170,57 @@ nni_pipe_start_cb(void *arg)
}
int
+nni_pipe_create(nni_ep *ep, void *tdata)
+{
+ nni_pipe *p;
+ int rv;
+ nni_tran *tran = ep->ep_tran;
+ nni_sock *sock = ep->ep_sock;
+
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ // In this case we just toss the pipe...
+ tran->tran_pipe->p_fini(p);
+ return (NNG_ENOMEM);
+ }
+
+ // Make a private copy of the transport ops.
+ p->p_tran_ops = *tran->tran_pipe;
+ p->p_tran_data = tdata;
+ p->p_proto_data = NULL;
+
+ if ((rv = nni_mtx_init(&p->p_mtx)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
+ }
+ if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
+ }
+
+ NNI_LIST_NODE_INIT(&p->p_sock_node);
+ NNI_LIST_NODE_INIT(&p->p_ep_node);
+
+ if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
+ }
+
+ p->p_tran_ops = *tran->tran_pipe;
+ p->p_tran_data = tdata;
+
+ // Attempt to initialize protocol data.
+ if ((rv = nni_sock_pipe_init(sock, p)) != 0) {
+ nni_pipe_destroy(p);
+ return (rv);
+ }
+
+ // Start the pipe running.
+ nni_pipe_start(p);
+ return (0);
+}
+
+#if 0
+int
nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran)
{
nni_pipe *p;
@@ -211,6 +262,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran)
*pp = p;
return (0);
}
+#endif
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 990dac9d..90e9213e 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -55,16 +55,25 @@ extern void nni_pipe_close(nni_pipe *);
// other consumers are referencing the pipe. We assume that either the
// socket (protocol code) or endpoint may have references to the pipe
// when this function is called. The pipe cleanup is asynchronous and
-// make take a while depending on scheduling, etc.
+// make take a while depending on scheduling, etc. The pipe lock itself
+// may not be held during this, but any other locks may be.
extern void nni_pipe_stop(nni_pipe *);
-// Used only by the socket core - as we don't wish to expose the details
-// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *);
+// nni_pipe_create is used only by endpoints - as we don't wish to expose the
+// details of the pipe structure outside of pipe.c. This function must be
+// called without any locks held, as it will call back up into the socket and
+// endpoint, grabbing each of those locks. The function takes ownership of
+// the transport specific pipe (3rd argument), regardless of whether it
+// succeeds or not. The endpoint should be held when calling this.
+extern int nni_pipe_create(nni_ep *, void *);
+
+// nni_pipe_start is called by the socket to begin any startup activities
+// on the pipe before making it ready for use by protocols. For example,
+// TCP and IPC initial handshaking is performed this way.
+extern void nni_pipe_start(nni_pipe *);
extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
-extern void nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
// nni_pipe_get_proto_data gets the protocol private data set with the
diff --git a/src/core/socket.c b/src/core/socket.c
index 75ae8450..ecddf5dd 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -757,6 +757,11 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
return (rv);
}
nni_list_append(&sock->s_eps, ep);
+ // Put a hold on the endpoint, for now.
+ nni_mtx_lock(&ep->ep_mtx);
+ ep->ep_refcnt++;
+ ep->ep_started = 1;
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
if ((rv = nni_ep_dial(ep, flags)) != 0) {
@@ -765,6 +770,15 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
*epp = ep;
}
+ // Drop our endpoint hold.
+ nni_mtx_lock(&ep->ep_mtx);
+ if (rv != 0) {
+ ep->ep_started = 0;
+ }
+ ep->ep_refcnt--;
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+
return (rv);
}
@@ -779,7 +793,12 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+
nni_list_append(&sock->s_eps, ep);
+ nni_mtx_lock(&ep->ep_mtx);
+ ep->ep_refcnt++;
+ ep->ep_started = 1;
+ nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
if ((rv = nni_ep_listen(ep, flags)) != 0) {
@@ -788,6 +807,15 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
*epp = ep;
}
+ // Drop our endpoint hold.
+ nni_mtx_lock(&ep->ep_mtx);
+ if (rv != 0) {
+ ep->ep_started = 0;
+ }
+ ep->ep_refcnt--;
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 56cf60c0..6516de7e 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -95,8 +95,8 @@ extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *);
// nni_sock_pipe_ready lets the socket know the pipe is ready for
// business. This also calls the socket/protocol specific add function,
-// and it may return an error. A reference count on the pipe is incremented
-// on success. The reference count should be dropped by nni_sock_pipe_closed.
+// and it may return an error. The reference count should be dropped by
+// nni_sock_pipe_closed.
extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *);
// Set error codes for applications. These are only ever