aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c369
1 files changed, 284 insertions, 85 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 620c5d19..640e0db6 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <string.h>
@@ -58,7 +59,7 @@ struct nni_socket {
nni_cv s_cv;
nni_cv s_close_cv;
- uint64_t s_id;
+ uint32_t s_id;
uint32_t s_flags;
unsigned s_refcnt; // protected by global lock
void * s_data; // Protocol private
@@ -97,6 +98,9 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
+static void dialer_shutdown_locked(nni_dialer *);
+static void listener_shutdown_locked(nni_listener *);
+
static int
sock_get_fd(nni_sock *s, int flag, int *fdp)
{
@@ -340,7 +344,7 @@ nni_free_opt(nni_sockopt *opt)
uint32_t
nni_sock_id(nni_sock *s)
{
- return ((uint32_t) s->s_id);
+ return (s->s_id);
}
// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
@@ -405,58 +409,6 @@ nni_sock_closing(nni_sock *s)
return (rv);
}
-void
-nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id)
-{
- if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
- nng_pipe_cb cb;
- void * arg;
-
- nni_mtx_lock(&s->s_pipe_cbs_mtx);
- cb = s->s_pipe_cbs[ev].cb_fn;
- arg = s->s_pipe_cbs[ev].cb_arg;
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
-
- if (cb != NULL) {
- nng_pipe p;
- p.id = id;
- cb(p, ev, arg);
- }
- }
-}
-
-int
-nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
-{
- // Initialize protocol pipe data.
- nni_mtx_lock(&s->s_mx);
- if (s->s_closing) {
- nni_mtx_unlock(&s->s_mx);
- return (NNG_ECLOSED);
- }
-
- nni_list_append(&s->s_pipes, p);
-
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
-
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-
-void
-nni_sock_pipe_remove(nni_sock *s, nni_pipe *p)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_pipes, p)) {
- nni_list_remove(&s->s_pipes, p);
- }
- if (s->s_closing && nni_list_empty(&s->s_pipes)) {
- nni_cv_wake(&s->s_cv);
- }
- nni_mtx_unlock(&s->s_mx);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -521,10 +473,9 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
-
- nni_pipe_sock_list_init(&s->s_pipes);
- nni_listener_list_init(&s->s_listeners);
- nni_dialer_list_init(&s->s_dialers);
+ NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);
+ NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);
+ NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -615,7 +566,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
}
nni_mtx_lock(&sock_lk);
- if ((rv = nni_idhash_alloc(sock_hash, &s->s_id, s)) != 0) {
+ if ((rv = nni_idhash_alloc32(sock_hash, &s->s_id, s)) != 0) {
sock_destroy(s);
} else {
nni_list_append(&sock_list, s);
@@ -625,8 +576,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
nni_mtx_unlock(&sock_lk);
// Set the sockname.
- (void) snprintf(
- s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id);
+ (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
return (rv);
}
@@ -640,9 +590,7 @@ nni_sock_shutdown(nni_sock *sock)
{
nni_pipe * pipe;
nni_dialer * d;
- nni_dialer * nd;
nni_listener *l;
- nni_listener *nl;
nni_ctx * ctx;
nni_ctx * nctx;
@@ -657,10 +605,10 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_listeners, l) {
- nni_listener_shutdown(l);
+ listener_shutdown_locked(l);
}
NNI_LIST_FOREACH (&sock->s_dialers, d) {
- nni_dialer_shutdown(d);
+ dialer_shutdown_locked(d);
}
nni_mtx_unlock(&sock->s_mx);
@@ -706,35 +654,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
// Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nl = nni_list_first(&sock->s_listeners);
- while ((l = nl) != NULL) {
- nl = nni_list_next(&sock->s_listeners, nl);
-
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
if (nni_listener_hold(l) == 0) {
- nni_mtx_unlock(&sock->s_mx);
nni_listener_close(l);
- nni_mtx_lock(&sock->s_mx);
}
}
- nd = nni_list_first(&sock->s_dialers);
- while ((d = nd) != NULL) {
- nd = nni_list_next(&sock->s_dialers, nd);
-
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
if (nni_dialer_hold(d) == 0) {
- nni_mtx_unlock(&sock->s_mx);
nni_dialer_close(d);
- nni_mtx_lock(&sock->s_mx);
}
}
- // For each pipe, arrange for it to teardown hard.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
@@ -1271,7 +1210,6 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
{
nni_ctx *ctx;
int rv;
- uint64_t id;
if (sock->s_ctx_ops.ctx_init == NULL) {
return (NNG_ENOTSUP);
@@ -1286,12 +1224,11 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
NNI_FREE_STRUCT(ctx);
return (NNG_ECLOSED);
}
- if ((rv = nni_idhash_alloc(ctx_hash, &id, ctx)) != 0) {
+ if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) {
nni_mtx_unlock(&sock_lk);
NNI_FREE_STRUCT(ctx);
return (rv);
}
- ctx->c_id = (uint32_t) id;
if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
nni_idhash_remove(ctx_hash, ctx->c_id);
@@ -1415,3 +1352,265 @@ nni_ctx_setopt(
nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+
+static void
+dialer_timer_start_locked(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+void
+nni_dialer_timer_start(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+ nni_mtx_lock(&s->s_mx);
+ dialer_timer_start_locked(d);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+{
+ nni_sock *s = d->d_sock;
+
+ nni_mtx_lock(&s->s_mx);
+
+ if (s->s_closed || d->d_closed) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_destroy(p);
+ return;
+ }
+
+ p->p_dialer = d;
+ nni_list_append(&d->d_pipes, p);
+ nni_list_append(&s->s_pipes, p);
+ d->d_pipe = p;
+ d->d_currtime = d->d_inirtime;
+ nni_mtx_unlock(&s->s_mx);
+
+ // Start the initial negotiation I/O...
+ nni_pipe_start(p);
+}
+
+static void
+dialer_shutdown_impl(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_close(p);
+ }
+}
+
+static void
+dialer_shutdown_locked(nni_dialer *d)
+{
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
+ dialer_shutdown_impl(d);
+}
+
+void
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
+ nni_mtx_lock(&s->s_mx);
+ dialer_shutdown_impl(d);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_dialer_reap(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+
+ nni_aio_stop(d->d_tmo_aio);
+ nni_aio_stop(d->d_con_aio);
+
+ nni_mtx_lock(&s->s_mx);
+ if (!nni_list_empty(&d->d_pipes)) {
+ nni_pipe *p;
+ // This should already have been done, but be certain!
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_close(p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+ // Go back to the end of reap list.
+ nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
+ return;
+ }
+
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_dialer_destroy(d);
+}
+
+void
+nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+{
+ nni_sock *s = l->l_sock;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closed || l->l_closed) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_destroy(p);
+ return;
+ }
+ p->p_listener = l;
+ nni_list_append(&l->l_pipes, p);
+ nni_list_append(&s->s_pipes, p);
+ nni_mtx_unlock(&s->s_mx);
+
+ // Start the initial negotiation I/O...
+ nni_pipe_start(p);
+}
+
+static void
+listener_shutdown_impl(nni_listener *l)
+{
+ nni_pipe *p;
+
+ // Abort any remaining in-flight accepts.
+ nni_aio_close(l->l_acc_aio);
+ nni_aio_close(l->l_tmo_aio);
+
+ // Stop the underlying transport.
+ l->l_ops.l_close(l->l_data);
+
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_close(p);
+ }
+}
+
+static void
+listener_shutdown_locked(nni_listener *l)
+{
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
+ listener_shutdown_impl(l);
+}
+
+void
+nni_listener_shutdown(nni_listener *l)
+{
+ nni_sock *s = l->l_sock;
+
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
+
+ nni_mtx_lock(&s->s_mx);
+ listener_shutdown_impl(l);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_listener_reap(nni_listener *l)
+{
+ nni_sock *s = l->l_sock;
+
+ nni_aio_stop(l->l_tmo_aio);
+ nni_aio_stop(l->l_acc_aio);
+
+ nni_mtx_lock(&s->s_mx);
+ if (!nni_list_empty(&l->l_pipes)) {
+ nni_pipe *p;
+ // This should already have been done, but be certain!
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_close(p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+ // Go back to the end of reap list.
+ nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
+ return;
+ }
+
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_listener_destroy(l);
+}
+
+void
+nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
+{
+ nni_sock * s = p->p_sock;
+ nng_pipe_cb cb;
+ void * arg;
+
+ if (!p->p_cbs) {
+ if (ev == NNG_PIPE_EV_ADD_PRE) {
+ // First event, after this we want all other events.
+ p->p_cbs = true;
+ } else {
+ return;
+ }
+ }
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+
+ if (cb != NULL) {
+ nng_pipe pid;
+ pid.id = p->p_id;
+ cb(pid, ev, arg);
+ }
+}
+
+void
+nni_pipe_remove(nni_pipe *p)
+{
+ nni_sock * s = p->p_sock;
+ nni_dialer *d = p->p_dialer;
+
+ nni_mtx_lock(&s->s_mx);
+ nni_list_node_remove(&p->p_sock_node);
+ nni_list_node_remove(&p->p_ep_node);
+ p->p_listener = NULL;
+ p->p_dialer = NULL;
+ if ((d != NULL) && (d->d_pipe == p)) {
+ d->d_pipe = NULL;
+ dialer_timer_start_locked(d); // Kick the timer to redial.
+ }
+ if (s->s_closing) {
+ nni_cv_wake(&s->s_cv);
+ }
+ nni_mtx_unlock(&s->s_mx);
+}