diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/endpt.c | 166 | ||||
| -rw-r--r-- | src/core/init.c | 1 | ||||
| -rw-r--r-- | src/core/pipe.c | 52 | ||||
| -rw-r--r-- | src/core/pipe.h | 19 | ||||
| -rw-r--r-- | src/core/socket.c | 28 | ||||
| -rw-r--r-- | src/core/socket.h | 4 |
6 files changed, 148 insertions, 122 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index b1ee6b82..da0e3318 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -20,6 +20,7 @@ static void nni_ep_accept_start(nni_ep *); static void nni_ep_accept_done(void *); static void nni_ep_connect_start(nni_ep *); static void nni_ep_connect_done(void *); +static void nni_ep_backoff_start(nni_ep *); static void nni_ep_backoff_done(void *); static nni_idhash *nni_eps; @@ -211,6 +212,32 @@ nni_ep_stop(nni_ep *ep) } static void +nni_ep_backoff_start(nni_ep *ep) +{ + nni_duration backoff; + + if (ep->ep_closed) { + return; + } + backoff = ep->ep_currtime; + ep->ep_currtime *= 2; + if (ep->ep_currtime > ep->ep_maxrtime) { + ep->ep_currtime = ep->ep_maxrtime; + } + + // To minimize damage from storms, etc., we select a backoff + // value randomly, in the range of [0, backoff-1]; this is pretty + // similar to 802 style backoff, except that we have a nearly + // uniform time period instead of discrete slot times. This + // algorithm may lead to slight biases because we don't have + // a statistically perfect distribution with the modulo of the + // random number, but this really doesn't matter. + + ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff); + nni_aio_start(&ep->ep_backoff, NULL, ep); +} + +static void nni_ep_backoff_done(void *arg) { nni_ep * ep = arg; @@ -230,34 +257,17 @@ nni_ep_backoff_done(void *arg) static void nni_ep_connect_done(void *arg) { - nni_ep * ep = arg; - nni_aio * aio = &ep->ep_con_aio; - void * tpipe; - nni_pipe * pipe; - const nni_tran_pipe *ops; - int rv; - nni_time cooldown; - - ops = ep->ep_tran->tran_pipe; + nni_ep * ep = arg; + nni_aio *aio = &ep->ep_con_aio; + int rv; + nni_time cooldown; - nni_mtx_lock(&ep->ep_mtx); if ((rv = nni_aio_result(aio)) == 0) { - - tpipe = aio->a_pipe; - NNI_ASSERT(tpipe != NULL); - - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - ops->p_fini(tpipe); - } + rv = nni_pipe_create(ep, aio->a_pipe); } - -done: + nni_mtx_lock(&ep->ep_mtx); switch (rv) { case 0: - pipe->p_tran_ops = *ops; - pipe->p_tran_data = tpipe; - // Good connect, so reset the backoff timer. // XXX: This is kind of bad if a remote host just drops // the connection without completing our negotiation. @@ -267,7 +277,6 @@ done: // some meaningful amount of time. Alternatively we // can dial into the pipe start logic... ep->ep_currtime = ep->ep_inirtime; - nni_pipe_start(pipe); // No further outgoing connects -- we will restart a // connection from the pipe when the pipe is removed. @@ -278,14 +287,7 @@ done: break; default: // Other errors involve the use of the backoff timer. - // XXX: randomize this slightly to prevent reconnect - // storms. - ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - nni_aio_start(&ep->ep_backoff, NULL, ep); + nni_ep_backoff_start(ep); break; } nni_mtx_unlock(&ep->ep_mtx); @@ -308,115 +310,61 @@ nni_ep_connect_start(nni_ep *ep) int nni_ep_dial(nni_ep *ep, int flags) { - int rv = 0; - nni_aio * aio; - void * tpipe; - nni_pipe * pipe; - const nni_tran_pipe *ops; + int rv = 0; + nni_aio *aio; nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); ep->ep_currtime = ep->ep_inirtime; nni_mtx_lock(&ep->ep_mtx); - ops = ep->ep_tran->tran_pipe; if (ep->ep_mode != NNI_EP_MODE_DIAL) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_EBUSY); - } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - ep->ep_started = 1; - if ((flags & NNG_FLAG_SYNCH) == 0) { nni_ep_connect_start(ep); nni_mtx_unlock(&ep->ep_mtx); return (0); } - // This one is kind of special, since we need - // to block for the connection to complete. Ick. + // Synchronous mode: so we have to wait for it to complete. aio = &ep->ep_con_syn; aio->a_endpt = ep->ep_data; ep->ep_ops.ep_connect(ep->ep_data, aio); - - // We're about to drop the lock, but we cannot allow the - // endpoint to be removed. Put a hold on it. - ep->ep_refcnt++; nni_mtx_unlock(&ep->ep_mtx); nni_aio_wait(aio); - nni_mtx_lock(&ep->ep_mtx); - ep->ep_refcnt--; - - if (ep->ep_closed) { - rv = NNG_ECLOSED; - } else { - rv = nni_aio_result(aio); - } - - if (rv != 0) { - ep->ep_started = 0; - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); - return (rv); - } - - tpipe = aio->a_pipe; - NNI_ASSERT(tpipe != NULL); - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - ops->p_fini(tpipe); - } else { - pipe->p_tran_ops = *ops; - pipe->p_tran_data = tpipe; - nni_pipe_start(pipe); + // As we're synchronous, we also have to handle the completion. + if ((rv = nni_aio_result(aio)) == 0) { + NNI_ASSERT(aio->a_pipe != NULL); + rv = nni_pipe_create(ep, aio->a_pipe); } - nni_cv_wake(&ep->ep_cv); - nni_mtx_unlock(&ep->ep_mtx); return (rv); } static void nni_ep_accept_done(void *arg) { - nni_ep * ep = arg; - nni_aio * aio = &ep->ep_acc_aio; - void * tpipe; - nni_pipe * pipe; - int rv; - const nni_tran_pipe *ops; - - ops = ep->ep_tran->tran_pipe; - - nni_mtx_lock(&ep->ep_mtx); - if ((rv = nni_aio_result(aio)) != 0) { - goto done; - } - tpipe = aio->a_pipe; - NNI_ASSERT(tpipe != NULL); + nni_ep * ep = arg; + nni_aio *aio = &ep->ep_acc_aio; + int rv; - rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran); - if (rv != 0) { - ops->p_fini(tpipe); - goto done; + if ((rv = nni_aio_result(aio)) == 0) { + NNI_ASSERT(aio->a_pipe != NULL); + rv = nni_pipe_create(ep, aio->a_pipe); } -done: + nni_mtx_lock(&ep->ep_mtx); switch (rv) { case 0: - pipe->p_tran_ops = *ops; - pipe->p_tran_data = tpipe; - nni_pipe_start(pipe); nni_ep_accept_start(ep); break; case NNG_ECLOSED: @@ -426,7 +374,6 @@ done: case NNG_ECONNABORTED: case NNG_ECONNRESET: // These are remote conditions, no cool down. - // cooldown = 0; nni_ep_accept_start(ep); break; default: @@ -434,15 +381,9 @@ done: // This is because errors here are probably due to system // failures (resource exhaustion) and we hope by not // thrashing we give the system a chance to recover. - ep->ep_backoff.a_expire = nni_clock() + ep->ep_currtime; - ep->ep_currtime *= 2; - if (ep->ep_currtime > ep->ep_maxrtime) { - ep->ep_currtime = ep->ep_maxrtime; - } - nni_aio_start(&ep->ep_backoff, NULL, ep); + nni_ep_backoff_start(ep); break; } - nni_mtx_unlock(&ep->ep_mtx); } @@ -465,25 +406,20 @@ nni_ep_listen(nni_ep *ep, int flags) { int rv = 0; + nni_sock_reconntimes(ep->ep_sock, &ep->ep_inirtime, &ep->ep_maxrtime); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_mode != NNI_EP_MODE_LISTEN) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ENOTSUP); } - if (ep->ep_started) { - nni_mtx_unlock(&ep->ep_mtx); - return (NNG_EBUSY); - } if (ep->ep_closed) { nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } - ep->ep_started = 1; - rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { - ep->ep_started = 0; nni_mtx_unlock(&ep->ep_mtx); return (rv); } diff --git a/src/core/init.c b/src/core/init.c index 88c80f3f..0c294506 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -1,5 +1,6 @@ // // Copyright 2016 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this diff --git a/src/core/pipe.c b/src/core/pipe.c index 664ababa..2fd19464 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -170,6 +170,57 @@ nni_pipe_start_cb(void *arg) } int +nni_pipe_create(nni_ep *ep, void *tdata) +{ + nni_pipe *p; + int rv; + nni_tran *tran = ep->ep_tran; + nni_sock *sock = ep->ep_sock; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + // In this case we just toss the pipe... + tran->tran_pipe->p_fini(p); + return (NNG_ENOMEM); + } + + // Make a private copy of the transport ops. + p->p_tran_ops = *tran->tran_pipe; + p->p_tran_data = tdata; + p->p_proto_data = NULL; + + if ((rv = nni_mtx_init(&p->p_mtx)) != 0) { + nni_pipe_destroy(p); + return (rv); + } + if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) { + nni_pipe_destroy(p); + return (rv); + } + + NNI_LIST_NODE_INIT(&p->p_sock_node); + NNI_LIST_NODE_INIT(&p->p_ep_node); + + if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) != 0) { + nni_pipe_destroy(p); + return (rv); + } + + p->p_tran_ops = *tran->tran_pipe; + p->p_tran_data = tdata; + + // Attempt to initialize protocol data. + if ((rv = nni_sock_pipe_init(sock, p)) != 0) { + nni_pipe_destroy(p); + return (rv); + } + + // Start the pipe running. + nni_pipe_start(p); + return (0); +} + +#if 0 +int nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran) { nni_pipe *p; @@ -211,6 +262,7 @@ nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran) *pp = p; return (0); } +#endif int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) diff --git a/src/core/pipe.h b/src/core/pipe.h index 990dac9d..90e9213e 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -55,16 +55,25 @@ extern void nni_pipe_close(nni_pipe *); // other consumers are referencing the pipe. We assume that either the // socket (protocol code) or endpoint may have references to the pipe // when this function is called. The pipe cleanup is asynchronous and -// make take a while depending on scheduling, etc. +// make take a while depending on scheduling, etc. The pipe lock itself +// may not be held during this, but any other locks may be. extern void nni_pipe_stop(nni_pipe *); -// Used only by the socket core - as we don't wish to expose the details -// of the pipe structure outside of pipe.c. -extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *); +// nni_pipe_create is used only by endpoints - as we don't wish to expose the +// details of the pipe structure outside of pipe.c. This function must be +// called without any locks held, as it will call back up into the socket and +// endpoint, grabbing each of those locks. The function takes ownership of +// the transport specific pipe (3rd argument), regardless of whether it +// succeeds or not. The endpoint should be held when calling this. +extern int nni_pipe_create(nni_ep *, void *); + +// nni_pipe_start is called by the socket to begin any startup activities +// on the pipe before making it ready for use by protocols. For example, +// TCP and IPC initial handshaking is performed this way. +extern void nni_pipe_start(nni_pipe *); extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); -extern void nni_pipe_start(nni_pipe *); extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); // nni_pipe_get_proto_data gets the protocol private data set with the diff --git a/src/core/socket.c b/src/core/socket.c index 75ae8450..ecddf5dd 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -757,6 +757,11 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) return (rv); } nni_list_append(&sock->s_eps, ep); + // Put a hold on the endpoint, for now. + nni_mtx_lock(&ep->ep_mtx); + ep->ep_refcnt++; + ep->ep_started = 1; + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); if ((rv = nni_ep_dial(ep, flags)) != 0) { @@ -765,6 +770,15 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) *epp = ep; } + // Drop our endpoint hold. + nni_mtx_lock(&ep->ep_mtx); + if (rv != 0) { + ep->ep_started = 0; + } + ep->ep_refcnt--; + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + return (rv); } @@ -779,7 +793,12 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) nni_mtx_unlock(&sock->s_mx); return (rv); } + nni_list_append(&sock->s_eps, ep); + nni_mtx_lock(&ep->ep_mtx); + ep->ep_refcnt++; + ep->ep_started = 1; + nni_mtx_unlock(&ep->ep_mtx); nni_mtx_unlock(&sock->s_mx); if ((rv = nni_ep_listen(ep, flags)) != 0) { @@ -788,6 +807,15 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags) *epp = ep; } + // Drop our endpoint hold. + nni_mtx_lock(&ep->ep_mtx); + if (rv != 0) { + ep->ep_started = 0; + } + ep->ep_refcnt--; + nni_cv_wake(&ep->ep_cv); + nni_mtx_unlock(&ep->ep_mtx); + return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 56cf60c0..6516de7e 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -95,8 +95,8 @@ extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *); // nni_sock_pipe_ready lets the socket know the pipe is ready for // business. This also calls the socket/protocol specific add function, -// and it may return an error. A reference count on the pipe is incremented -// on success. The reference count should be dropped by nni_sock_pipe_closed. +// and it may return an error. The reference count should be dropped by +// nni_sock_pipe_closed. extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *); // Set error codes for applications. These are only ever |
