aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/dialer.c237
-rw-r--r--src/core/dialer.h3
-rw-r--r--src/core/init.c3
-rw-r--r--src/core/listener.c160
-rw-r--r--src/core/listener.h4
-rw-r--r--src/core/pipe.c207
-rw-r--r--src/core/pipe.h39
-rw-r--r--src/core/reap.c81
-rw-r--r--src/core/reap.h1
-rw-r--r--src/core/socket.c369
-rw-r--r--src/core/socket.h16
-rw-r--r--src/core/sockimpl.h107
-rw-r--r--src/nng.h2
-rw-r--r--src/protocol/bus0/bus.c10
-rw-r--r--src/protocol/pair0/pair.c8
-rw-r--r--src/protocol/pair1/pair.c12
-rw-r--r--src/protocol/pipeline0/pull.c4
-rw-r--r--src/protocol/pipeline0/push.c6
-rw-r--r--src/protocol/pubsub0/pub.c6
-rw-r--r--src/protocol/pubsub0/sub.c4
-rw-r--r--src/protocol/reqrep0/rep.c6
-rw-r--r--src/protocol/reqrep0/req.c6
-rw-r--r--src/protocol/reqrep0/xrep.c10
-rw-r--r--src/protocol/reqrep0/xreq.c10
-rw-r--r--src/protocol/survey0/respond.c6
-rw-r--r--src/protocol/survey0/survey.c8
-rw-r--r--src/protocol/survey0/xrespond.c10
-rw-r--r--src/protocol/survey0/xsurvey.c10
-rw-r--r--tests/sock.c2
29 files changed, 620 insertions, 727 deletions
diff --git a/src/core/dialer.c b/src/core/dialer.c
index e93c893e..3144d673 100644
--- a/src/core/dialer.c
+++ b/src/core/dialer.c
@@ -9,36 +9,12 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-struct nni_dialer {
- nni_tran_dialer_ops d_ops; // transport ops
- nni_tran * d_tran; // transport pointer
- void * d_data; // transport private
- uint64_t d_id; // endpoint id
- nni_list_node d_node; // per socket list
- nni_sock * d_sock;
- nni_url * d_url;
- int d_refcnt;
- int d_lastrv; // last result from synchronous
- bool d_synch; // synchronous connect in progress?
- bool d_started;
- bool d_closed; // full shutdown
- nni_atomic_flag d_closing; // close pending (waiting on refcnt)
- nni_mtx d_mtx;
- nni_cv d_cv;
- nni_list d_pipes;
- nni_aio * d_con_aio;
- nni_aio * d_tmo_aio; // backoff timer
- nni_duration d_maxrtime; // maximum time for reconnect
- nni_duration d_currtime; // current time for reconnect
- nni_duration d_inirtime; // initial time for reconnect
- nni_time d_conntime; // time of last good connect
-};
-
// Functionality related to dialers.
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
@@ -65,6 +41,7 @@ nni_dialer_sys_init(void)
void
nni_dialer_sys_fini(void)
{
+ nni_reap_drain();
nni_mtx_fini(&dialers_lk);
nni_idhash_fini(dialers);
dialers = NULL;
@@ -73,26 +50,15 @@ nni_dialer_sys_fini(void)
uint32_t
nni_dialer_id(nni_dialer *d)
{
- return ((uint32_t) d->d_id);
+ return (d->d_id);
}
-static void
-dialer_destroy(nni_dialer *d)
+void
+nni_dialer_destroy(nni_dialer *d)
{
- if (d == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (d->d_id != 0) {
- nni_idhash_remove(dialers, d->d_id);
- }
-
nni_aio_stop(d->d_con_aio);
nni_aio_stop(d->d_tmo_aio);
- nni_sock_remove_dialer(d->d_sock, d);
-
nni_aio_fini(d->d_con_aio);
nni_aio_fini(d->d_tmo_aio);
@@ -126,13 +92,13 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- d->d_url = url;
- d->d_closed = false;
- d->d_started = false;
- d->d_data = NULL;
- d->d_refcnt = 1;
- d->d_sock = s;
- d->d_tran = tran;
+ d->d_url = url;
+ d->d_closed = false;
+ d->d_data = NULL;
+ d->d_refcnt = 1;
+ d->d_sock = s;
+ d->d_tran = tran;
+ nni_atomic_flag_reset(&d->d_started);
nni_atomic_flag_reset(&d->d_closing);
// Make a copy of the endpoint operations. This allows us to
@@ -141,8 +107,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
d->d_ops = *tran->tran_dialer;
NNI_LIST_NODE_INIT(&d->d_node);
-
- nni_pipe_ep_list_init(&d->d_pipes);
+ NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);
nni_mtx_init(&d->d_mtx);
nni_cv_init(&d->d_cv, &d->d_mtx);
@@ -150,9 +115,9 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *urlstr)
if (((rv = nni_aio_init(&d->d_con_aio, dialer_connect_cb, d)) != 0) ||
((rv = nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d)) != 0) ||
((rv = d->d_ops.d_init(&d->d_data, url, s)) != 0) ||
- ((rv = nni_idhash_alloc(dialers, &d->d_id, d)) != 0) ||
+ ((rv = nni_idhash_alloc32(dialers, &d->d_id, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
- dialer_destroy(d);
+ nni_dialer_destroy(d);
return (rv);
}
@@ -203,80 +168,33 @@ nni_dialer_rele(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
d->d_refcnt--;
- if (d->d_refcnt == 0) {
- nni_cv_wake(&d->d_cv);
+ if ((d->d_refcnt == 0) && (d->d_closed)) {
+ nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
}
nni_mtx_unlock(&dialers_lk);
}
-int
-nni_dialer_shutdown(nni_dialer *d)
-{
- if (nni_atomic_flag_test_and_set(&d->d_closing)) {
- return (NNG_ECLOSED);
- }
-
- // Abort any remaining in-flight operations.
- nni_aio_close(d->d_con_aio);
- nni_aio_close(d->d_tmo_aio);
-
- // Stop the underlying transport.
- d->d_ops.d_close(d->d_data);
-
- return (0);
-}
-
void
nni_dialer_close(nni_dialer *d)
{
- nni_pipe *p;
-
- nni_mtx_lock(&d->d_mtx);
+ nni_mtx_lock(&dialers_lk);
if (d->d_closed) {
- nni_mtx_unlock(&d->d_mtx);
+ nni_mtx_unlock(&dialers_lk);
nni_dialer_rele(d);
return;
}
d->d_closed = true;
- nni_mtx_unlock(&d->d_mtx);
-
- nni_dialer_shutdown(d);
-
- nni_aio_stop(d->d_con_aio);
- nni_aio_stop(d->d_tmo_aio);
-
- nni_mtx_lock(&d->d_mtx);
- NNI_LIST_FOREACH (&d->d_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&d->d_pipes)) || (d->d_refcnt != 1)) {
- nni_cv_wait(&d->d_cv);
- }
- nni_mtx_unlock(&d->d_mtx);
-
- dialer_destroy(d);
-}
+ nni_mtx_unlock(&dialers_lk);
-// This function starts an exponential backoff timer for reconnecting.
-static void
-dialer_timer_start(nni_dialer *d)
-{
- nni_duration backoff;
+ // Remove us from the table so we cannot be found.
+ // This is done fairly early in the teardown process.
+ // If we're here, either the socket or the listener has been
+ // closed at the user request, so there would be a race anyway.
+ nni_idhash_remove(dialers, d->d_id);
- backoff = d->d_currtime;
- d->d_currtime *= 2;
- if (d->d_currtime > d->d_maxrtime) {
- d->d_currtime = d->d_maxrtime;
- }
+ nni_dialer_shutdown(d);
- // To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is
- // pretty similar to 802 style backoff, except that we have a
- // nearly uniform time period instead of discrete slot times.
- // This algorithm may lead to slight biases because we don't
- // have a statistically perfect distribution with the modulo of
- // the random number, but this really doesn't matter.
- nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+ nni_dialer_rele(d);
}
static void
@@ -285,11 +203,9 @@ dialer_timer_cb(void *arg)
nni_dialer *d = arg;
nni_aio * aio = d->d_tmo_aio;
- nni_mtx_lock(&d->d_mtx);
if (nni_aio_result(aio) == 0) {
dialer_connect_start(d);
}
- nni_mtx_unlock(&d->d_mtx);
}
static void
@@ -304,52 +220,32 @@ dialer_connect_cb(void *arg)
if ((rv = nni_aio_result(aio)) == 0) {
void *data = nni_aio_get_output(aio, 0);
NNI_ASSERT(data != NULL);
- rv = nni_pipe_create2(&p, d->d_sock, d->d_tran, data);
+ rv = nni_pipe_create(&p, d->d_sock, d->d_tran, data);
}
nni_mtx_lock(&d->d_mtx);
- synch = d->d_synch;
- d->d_synch = false;
- if (rv == 0) {
- nni_pipe_set_dialer(p, d);
- nni_list_append(&d->d_pipes, p);
-
- // Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops
- // us immediately, is going to get hit pretty hard
- // (depending on the initial backoff) with no
- // exponential backoff. This can happen if we wind up
- // trying to connect to some port that does not speak
- // SP for example.
- d->d_currtime = d->d_inirtime;
- }
+ synch = d->d_synch;
nni_mtx_unlock(&d->d_mtx);
- if ((rv == 0) && ((rv = nni_sock_pipe_add(d->d_sock, p)) != 0)) {
- nni_pipe_stop(p);
- }
-
- nni_mtx_lock(&d->d_mtx);
switch (rv) {
case 0:
- // No further outgoing connects -- we will restart a
- // connection from the pipe when the pipe is removed.
+ nni_dialer_add_pipe(d, p);
break;
- case NNG_ECLOSED:
- case NNG_ECANCELED:
- // Canceled/closed -- stop everything.
+ case NNG_ECLOSED: // No further action.
+ case NNG_ECANCELED: // No further action.
break;
default:
- // redial, but only if we are not synchronous
if (!synch) {
- dialer_timer_start(d);
+ nni_dialer_timer_start(d);
}
break;
}
if (synch) {
+ nni_mtx_lock(&d->d_mtx);
+ d->d_synch = false;
d->d_lastrv = rv;
nni_cv_wake(&d->d_cv);
+ nni_mtx_unlock(&d->d_mtx);
}
- nni_mtx_unlock(&d->d_mtx);
}
static void
@@ -366,70 +262,37 @@ nni_dialer_start(nni_dialer *d, int flags)
{
int rv = 0;
- // nni_sock_reconntimes(d->d_sock, &d->d_inirtime,
- //&d->d_maxrtime);
- d->d_currtime = d->d_inirtime;
-
- nni_mtx_lock(&d->d_mtx);
-
- if (d->d_started) {
- nni_mtx_unlock(&d->d_mtx);
+ if (nni_atomic_flag_test_and_set(&d->d_started)) {
return (NNG_ESTATE);
}
if ((flags & NNG_FLAG_NONBLOCK) != 0) {
- d->d_started = true;
- dialer_connect_start(d);
+ nni_mtx_lock(&d->d_mtx);
+ d->d_currtime = d->d_inirtime;
nni_mtx_unlock(&d->d_mtx);
+ dialer_connect_start(d);
return (0);
}
- d->d_synch = true;
- d->d_started = true;
+ nni_mtx_lock(&d->d_mtx);
+ d->d_synch = true;
+ nni_mtx_unlock(&d->d_mtx);
+
dialer_connect_start(d);
+ nni_mtx_lock(&d->d_mtx);
while (d->d_synch) {
nni_cv_wait(&d->d_cv);
}
rv = d->d_lastrv;
- nni_cv_wake(&d->d_cv);
+ nni_mtx_unlock(&d->d_mtx);
if (rv != 0) {
- d->d_started = false;
+ nni_atomic_flag_reset(&d->d_started);
}
- nni_mtx_unlock(&d->d_mtx);
return (rv);
}
-void
-nni_dialer_remove_pipe(nni_dialer *d, nni_pipe *p)
-{
- if (d == NULL) {
- return;
- }
-
- // Break up the relationship between the dialer and the pipe.
- nni_mtx_lock(&d->d_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&d->d_pipes, p)) {
- nni_list_remove(&d->d_pipes, p);
- }
- // Wake up the close thread if it is waiting.
- if (d->d_closed) {
- if (nni_list_empty(&d->d_pipes)) {
- nni_cv_wake(&d->d_cv);
- }
- } else {
- // If this pipe closed, then lets restart the dial operation.
- // Since the remote side seems to have closed, lets start with
- // a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects
- // immediately.
- dialer_timer_start(d);
- }
- nni_mtx_unlock(&d->d_mtx);
-}
-
int
nni_dialer_setopt(nni_dialer *d, const char *name, const void *val, size_t sz,
nni_opt_type t)
@@ -508,9 +371,3 @@ nni_dialer_getopt(
return (nni_sock_getopt(d->d_sock, name, valp, szp, t));
}
-
-void
-nni_dialer_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_dialer, d_node);
-}
diff --git a/src/core/dialer.h b/src/core/dialer.h
index 56b0fb1b..4a3e127c 100644
--- a/src/core/dialer.h
+++ b/src/core/dialer.h
@@ -18,11 +18,8 @@ extern int nni_dialer_hold(nni_dialer *);
extern void nni_dialer_rele(nni_dialer *);
extern uint32_t nni_dialer_id(nni_dialer *);
extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *);
-extern int nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_close(nni_dialer *);
extern int nni_dialer_start(nni_dialer *, int);
-extern void nni_dialer_list_init(nni_list *);
-extern void nni_dialer_remove_pipe(nni_dialer *, nni_pipe *);
extern int nni_dialer_setopt(
nni_dialer *, const char *, const void *, size_t, nni_opt_type);
diff --git a/src/core/init.c b/src/core/init.c
index 30a7a547..c66e54d2 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -75,11 +75,12 @@ nni_fini(void)
nni_dialer_sys_fini();
nni_listener_sys_fini();
nni_sock_sys_fini();
- nni_reap_sys_fini(); // must be before timer and aio (expire)
+ nni_reap_drain();
nni_random_sys_fini();
nni_aio_sys_fini();
nni_timer_sys_fini();
nni_taskq_sys_fini();
+ nni_reap_sys_fini(); // must be before timer and aio (expire)
nni_mtx_fini(&nni_init_mtx);
nni_plat_fini();
diff --git a/src/core/listener.c b/src/core/listener.c
index 8e06076d..1bfa657d 100644
--- a/src/core/listener.c
+++ b/src/core/listener.c
@@ -9,30 +9,12 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-struct nni_listener {
- nni_tran_listener_ops l_ops; // transport ops
- nni_tran * l_tran; // transport pointer
- void * l_data; // transport private
- uint64_t l_id; // endpoint id
- nni_list_node l_node; // per socket list
- nni_sock * l_sock;
- nni_url * l_url;
- int l_refcnt;
- bool l_started;
- bool l_closed; // full shutdown
- nni_atomic_flag l_closing; // close pending
- nni_mtx l_mtx;
- nni_cv l_cv;
- nni_list l_pipes;
- nni_aio * l_acc_aio;
- nni_aio * l_tmo_aio;
-};
-
// Functionality related to listeners.
static void listener_accept_start(nni_listener *);
@@ -60,6 +42,7 @@ nni_listener_sys_init(void)
void
nni_listener_sys_fini(void)
{
+ nni_reap_drain();
nni_mtx_fini(&listeners_lk);
nni_idhash_fini(listeners);
listeners = NULL;
@@ -68,32 +51,21 @@ nni_listener_sys_fini(void)
uint32_t
nni_listener_id(nni_listener *l)
{
- return ((uint32_t) l->l_id);
+ return (l->l_id);
}
-static void
-listener_destroy(nni_listener *l)
+void
+nni_listener_destroy(nni_listener *l)
{
- if (l == NULL) {
- return;
- }
-
- // Remove us from the table so we cannot be found.
- if (l->l_id != 0) {
- nni_idhash_remove(listeners, l->l_id);
- }
-
nni_aio_stop(l->l_acc_aio);
-
- nni_sock_remove_listener(l->l_sock, l);
+ nni_aio_stop(l->l_tmo_aio);
nni_aio_fini(l->l_acc_aio);
+ nni_aio_fini(l->l_tmo_aio);
if (l->l_data != NULL) {
l->l_ops.l_fini(l->l_data);
}
- nni_cv_fini(&l->l_cv);
- nni_mtx_fini(&l->l_mtx);
nni_url_free(l->l_url);
NNI_FREE_STRUCT(l);
}
@@ -119,14 +91,14 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
nni_url_free(url);
return (NNG_ENOMEM);
}
- l->l_url = url;
- l->l_closed = false;
- l->l_started = false;
- l->l_data = NULL;
- l->l_refcnt = 1;
- l->l_sock = s;
- l->l_tran = tran;
+ l->l_url = url;
+ l->l_closed = false;
+ l->l_data = NULL;
+ l->l_refcnt = 1;
+ l->l_sock = s;
+ l->l_tran = tran;
nni_atomic_flag_reset(&l->l_closing);
+ nni_atomic_flag_reset(&l->l_started);
// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
@@ -134,18 +106,14 @@ nni_listener_create(nni_listener **lp, nni_sock *s, const char *urlstr)
l->l_ops = *tran->tran_listener;
NNI_LIST_NODE_INIT(&l->l_node);
-
- nni_pipe_ep_list_init(&l->l_pipes);
-
- nni_mtx_init(&l->l_mtx);
- nni_cv_init(&l->l_cv, &l->l_mtx);
+ NNI_LIST_INIT(&l->l_pipes, nni_pipe, p_ep_node);
if (((rv = nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)) != 0) ||
((rv = nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)) != 0) ||
((rv = l->l_ops.l_init(&l->l_data, url, s)) != 0) ||
- ((rv = nni_idhash_alloc(listeners, &l->l_id, l)) != 0) ||
+ ((rv = nni_idhash_alloc32(listeners, &l->l_id, l)) != 0) ||
((rv = nni_sock_add_listener(s, l)) != 0)) {
- listener_destroy(l);
+ nni_listener_destroy(l);
return (rv);
}
@@ -196,58 +164,33 @@ nni_listener_rele(nni_listener *l)
{
nni_mtx_lock(&listeners_lk);
l->l_refcnt--;
- if (l->l_refcnt == 0) {
- nni_cv_wake(&l->l_cv);
+ if ((l->l_refcnt == 0) && (l->l_closed)) {
+ nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
}
nni_mtx_unlock(&listeners_lk);
}
-int
-nni_listener_shutdown(nni_listener *l)
-{
- if (nni_atomic_flag_test_and_set(&l->l_closing)) {
- return (NNG_ECLOSED);
- }
-
- // Abort any remaining in-flight accepts.
- nni_aio_close(l->l_acc_aio);
- nni_aio_close(l->l_tmo_aio);
-
- // Stop the underlying transport.
- l->l_ops.l_close(l->l_data);
-
- return (0);
-}
-
void
nni_listener_close(nni_listener *l)
{
- nni_pipe *p;
-
- nni_mtx_lock(&l->l_mtx);
+ nni_mtx_lock(&listeners_lk);
if (l->l_closed) {
- nni_mtx_unlock(&l->l_mtx);
+ nni_mtx_unlock(&listeners_lk);
nni_listener_rele(l);
return;
}
l->l_closed = true;
- nni_mtx_unlock(&l->l_mtx);
-
- nni_listener_shutdown(l);
+ nni_mtx_unlock(&listeners_lk);
- nni_aio_stop(l->l_acc_aio);
- nni_aio_stop(l->l_tmo_aio);
+ // Remove us from the table so we cannot be found.
+ // This is done fairly early in the teardown process.
+ // If we're here, either the socket or the listener has been
+ // closed at the user request, so there would be a race anyway.
+ nni_idhash_remove(listeners, l->l_id);
- nni_mtx_lock(&l->l_mtx);
- NNI_LIST_FOREACH (&l->l_pipes, p) {
- nni_pipe_stop(p);
- }
- while ((!nni_list_empty(&l->l_pipes)) || (l->l_refcnt != 1)) {
- nni_cv_wait(&l->l_cv);
- }
- nni_mtx_unlock(&l->l_mtx);
+ nni_listener_shutdown(l);
- listener_destroy(l);
+ nni_listener_rele(l); // This will trigger a reap if id count is zero.
}
static void
@@ -272,14 +215,11 @@ listener_accept_cb(void *arg)
if ((rv = nni_aio_result(aio)) == 0) {
void *data = nni_aio_get_output(aio, 0);
NNI_ASSERT(data != NULL);
- rv = nni_pipe_create2(&p, l->l_sock, l->l_tran, data);
+ rv = nni_pipe_create(&p, l->l_sock, l->l_tran, data);
}
switch (rv) {
case 0:
- nni_mtx_lock(&l->l_mtx);
- nni_pipe_set_listener(p, l);
- nni_list_append(&l->l_pipes, p);
- nni_mtx_unlock(&l->l_mtx);
+ nni_listener_add_pipe(l, p);
listener_accept_start(l);
break;
case NNG_ECONNABORTED: // remote condition, no cooldown
@@ -298,10 +238,6 @@ listener_accept_cb(void *arg)
nni_sleep_aio(100, l->l_tmo_aio);
break;
}
-
- if ((rv == 0) && ((rv = nni_sock_pipe_add(l->l_sock, p)) != 0)) {
- nni_pipe_stop(p);
- }
}
static void
@@ -319,44 +255,20 @@ nni_listener_start(nni_listener *l, int flags)
int rv = 0;
NNI_ARG_UNUSED(flags);
- nni_mtx_lock(&l->l_mtx);
- if (l->l_started) {
- nni_mtx_unlock(&l->l_mtx);
+ if (nni_atomic_flag_test_and_set(&l->l_started)) {
return (NNG_ESTATE);
}
if ((rv = l->l_ops.l_bind(l->l_data)) != 0) {
- nni_mtx_unlock(&l->l_mtx);
+ nni_atomic_flag_reset(&l->l_started);
return (rv);
}
- l->l_started = true;
- nni_mtx_unlock(&l->l_mtx);
-
listener_accept_start(l);
return (0);
}
-void
-nni_listener_remove_pipe(nni_listener *l, nni_pipe *p)
-{
- if (l == NULL) {
- return;
- }
- // Break up relationship between listener and pipe.
- nni_mtx_lock(&l->l_mtx);
- // During early init, the pipe might not have this set.
- if (nni_list_active(&l->l_pipes, p)) {
- nni_list_remove(&l->l_pipes, p);
- }
- // Wake up the closer if it is waiting.
- if (l->l_closed && nni_list_empty(&l->l_pipes)) {
- nni_cv_wake(&l->l_cv);
- }
- nni_mtx_unlock(&l->l_mtx);
-}
-
int
nni_listener_setopt(nni_listener *l, const char *name, const void *val,
size_t sz, nni_opt_type t)
@@ -406,9 +318,3 @@ nni_listener_getopt(
return (nni_sock_getopt(l->l_sock, name, valp, szp, t));
}
-
-void
-nni_listener_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_listener, l_node);
-}
diff --git a/src/core/listener.h b/src/core/listener.h
index 41b1a678..5806fc72 100644
--- a/src/core/listener.h
+++ b/src/core/listener.h
@@ -18,12 +18,8 @@ extern int nni_listener_hold(nni_listener *);
extern void nni_listener_rele(nni_listener *);
extern uint32_t nni_listener_id(nni_listener *);
extern int nni_listener_create(nni_listener **, nni_sock *, const char *);
-extern int nni_listener_shutdown(nni_listener *);
extern void nni_listener_close(nni_listener *);
extern int nni_listener_start(nni_listener *, int);
-extern void nni_listener_list_init(nni_list *);
-extern int nni_listener_add_pipe(nni_listener *, nni_pipe *);
-extern void nni_listener_remove_pipe(nni_listener *, nni_pipe *);
extern int nni_listener_setopt(
nni_listener *, const char *, const void *, size_t, nni_opt_type);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 6b9b082c..e95fe1d4 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <string.h>
@@ -17,54 +18,17 @@
// Operations on pipes (to the transport) are generally blocking operations,
// performed in the context of the protocol.
-struct nni_pipe {
- uint32_t p_id;
- uint32_t p_sock_id;
- uint32_t p_dialer_id;
- uint32_t p_listener_id;
- nni_tran_pipe_ops p_tran_ops;
- nni_proto_pipe_ops p_proto_ops;
- void * p_tran_data;
- void * p_proto_data;
- nni_list_node p_sock_node;
- nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_listener * p_listener;
- nni_dialer * p_dialer;
- bool p_closed;
- nni_atomic_flag p_stop;
- bool p_cbs;
- int p_refcnt;
- nni_mtx p_mtx;
- nni_cv p_cv;
- nni_list_node p_reap_node;
- nni_aio * p_start_aio;
-};
-
static nni_idhash *nni_pipes;
static nni_mtx nni_pipe_lk;
-static nni_list nni_pipe_reap_list;
-static nni_mtx nni_pipe_reap_lk;
-static nni_cv nni_pipe_reap_cv;
-static nni_thr nni_pipe_reap_thr;
-static int nni_pipe_reap_run;
-
-static void nni_pipe_reaper(void *);
-
int
nni_pipe_sys_init(void)
{
int rv;
- NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
nni_mtx_init(&nni_pipe_lk);
- nni_mtx_init(&nni_pipe_reap_lk);
- nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);
- if (((rv = nni_idhash_init(&nni_pipes)) != 0) ||
- ((rv = nni_thr_init(&nni_pipe_reap_thr, nni_pipe_reaper, 0)) !=
- 0)) {
+ if ((rv = nni_idhash_init(&nni_pipes)) != 0) {
return (rv);
}
@@ -76,25 +40,13 @@ nni_pipe_sys_init(void)
nni_idhash_set_limits(
nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff);
- nni_pipe_reap_run = 1;
- nni_thr_run(&nni_pipe_reap_thr);
-
return (0);
}
void
nni_pipe_sys_fini(void)
{
- if (nni_pipe_reap_run) {
- nni_mtx_lock(&nni_pipe_reap_lk);
- nni_pipe_reap_run = 0;
- nni_cv_wake(&nni_pipe_reap_cv);
- nni_mtx_unlock(&nni_pipe_reap_lk);
- }
-
- nni_thr_fini(&nni_pipe_reap_thr);
- nni_cv_fini(&nni_pipe_reap_cv);
- nni_mtx_fini(&nni_pipe_reap_lk);
+ nni_reap_drain();
nni_mtx_fini(&nni_pipe_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
@@ -109,28 +61,7 @@ nni_pipe_destroy(nni_pipe *p)
return;
}
- if (p->p_cbs) {
- nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id);
- }
-
- // Stop any pending negotiation.
- nni_aio_stop(p->p_start_aio);
-
- if (p->p_proto_data != NULL) {
- p->p_proto_ops.pipe_stop(p->p_proto_data);
- }
- if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
- p->p_tran_ops.p_stop(p->p_tran_data);
- }
-
- // We have exclusive access at this point, so we can check if
- // we are still on any lists.
- nni_dialer_remove_pipe(p->p_dialer, p); // dialer may be NULL
- nni_listener_remove_pipe(p->p_listener, p); // listener may be NULL
-
- if (nni_list_node_active(&p->p_sock_node)) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
+ nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
@@ -138,11 +69,24 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_id != 0) {
nni_idhash_remove(nni_pipes, p->p_id);
}
+ // This wait guarantees that all callers are done with us.
while (p->p_refcnt != 0) {
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&nni_pipe_lk);
+ // Wait for neg callbacks to finish. (Already closed).
+ nni_aio_stop(p->p_start_aio);
+
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_stop(p->p_proto_data);
+ }
+ if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
+ p->p_tran_ops.p_stop(p->p_tran_data);
+ }
+
+ nni_pipe_remove(p);
+
if (p->p_proto_data != NULL) {
p->p_proto_ops.pipe_fini(p->p_proto_data);
}
@@ -229,6 +173,8 @@ nni_pipe_close(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
}
+
+ nni_reap(&p->p_reap, (nni_cb) nni_pipe_destroy, p);
}
bool
@@ -241,23 +187,6 @@ nni_pipe_closed(nni_pipe *p)
return (rv);
}
-void
-nni_pipe_stop(nni_pipe *p)
-{
- // Guard against recursive calls.
- if (nni_atomic_flag_test_and_set(&p->p_stop)) {
- return;
- }
-
- nni_pipe_close(p);
-
- // Put it on the reaplist for async cleanup
- nni_mtx_lock(&nni_pipe_reap_lk);
- nni_list_append(&nni_pipe_reap_list, p);
- nni_cv_wake(&nni_pipe_reap_cv);
- nni_mtx_unlock(&nni_pipe_reap_lk);
-}
-
uint16_t
nni_pipe_peer(nni_pipe *p)
{
@@ -270,32 +199,29 @@ nni_pipe_start_cb(void *arg)
nni_pipe *p = arg;
nni_sock *s = p->p_sock;
nni_aio * aio = p->p_start_aio;
- uint32_t id = nni_pipe_id(p);
if (nni_aio_result(aio) != 0) {
- nni_pipe_stop(p);
+ nni_pipe_close(p);
return;
}
- p->p_cbs = true; // We're running all cbs going forward
-
- nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
if (nni_pipe_closed(p)) {
- nni_pipe_stop(p);
+ nni_pipe_close(p);
return;
}
if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) ||
nni_sock_closing(s)) {
- nni_pipe_stop(p);
+ nni_pipe_close(p);
return;
}
- nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id);
+ nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
}
int
-nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
+nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
{
nni_pipe * p;
int rv;
@@ -315,13 +241,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
p->p_proto_ops = *pops;
p->p_proto_data = NULL;
p->p_sock = sock;
- p->p_sock_id = nni_sock_id(sock);
p->p_closed = false;
p->p_cbs = false;
p->p_refcnt = 0;
nni_atomic_flag_reset(&p->p_stop);
- NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
@@ -347,27 +271,11 @@ nni_pipe_create2(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
return (0);
}
-void
-nni_pipe_set_listener(nni_pipe *p, nni_listener *l)
-{
- p->p_listener = l;
- p->p_listener_id = nni_listener_id(l);
-}
-
-void
-nni_pipe_set_dialer(nni_pipe *p, nni_dialer *d)
-{
- p->p_dialer = d;
- p->p_dialer_id = nni_dialer_id(d);
-}
-
int
nni_pipe_getopt(
nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
{
nni_tran_option *o;
- nni_dialer * d;
- nni_listener * l;
for (o = p->p_tran_ops.p_options; o && o->o_name; o++) {
if (strcmp(o->o_name, name) != 0) {
@@ -376,24 +284,13 @@ nni_pipe_getopt(
return (o->o_get(p->p_tran_data, val, szp, t));
}
- // Maybe the endpoint knows? We look up by ID, instead of using
- // the links directly, to avoid needing a hold on them. The pipe
- // can wind up outliving the endpoint in certain circumstances.
- // This means that getting these properties from the pipe may wind
- // up being somewhat more expensive.
- if ((p->p_dialer_id != 0) &&
- (nni_dialer_find(&d, p->p_dialer_id) == 0)) {
- int rv;
- rv = nni_dialer_getopt(d, name, val, szp, t);
- nni_dialer_rele(d);
- return (rv);
+ // Maybe the endpoint knows? The guarantees on pipes ensure that the
+ // pipe will not outlive its creating endpoint.
+ if (p->p_dialer != NULL) {
+ return (nni_dialer_getopt(p->p_dialer, name, val, szp, t));
}
- if ((p->p_listener_id != 0) &&
- (nni_listener_find(&l, p->p_listener_id) == 0)) {
- int rv;
- rv = nni_listener_getopt(l, name, val, szp, t);
- nni_listener_rele(l);
- return (rv);
+ if (p->p_listener != NULL) {
+ return (nni_listener_getopt(p->p_listener, name, val, szp, t));
}
return (NNG_ENOTSUP);
}
@@ -414,56 +311,20 @@ nni_pipe_get_proto_data(nni_pipe *p)
return (p->p_proto_data);
}
-void
-nni_pipe_sock_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_pipe, p_sock_node);
-}
-
-void
-nni_pipe_ep_list_init(nni_list *list)
-{
- NNI_LIST_INIT(list, nni_pipe, p_ep_node);
-}
-
uint32_t
nni_pipe_sock_id(nni_pipe *p)
{
- return (p->p_sock_id);
+ return (nni_sock_id(p->p_sock));
}
uint32_t
nni_pipe_listener_id(nni_pipe *p)
{
- return (p->p_listener_id);
+ return (p->p_listener ? nni_listener_id(p->p_listener) : 0);
}
uint32_t
nni_pipe_dialer_id(nni_pipe *p)
{
- return (p->p_dialer_id);
-}
-
-static void
-nni_pipe_reaper(void *notused)
-{
- NNI_ARG_UNUSED(notused);
-
- nni_mtx_lock(&nni_pipe_reap_lk);
- for (;;) {
- nni_pipe *p;
- if ((p = nni_list_first(&nni_pipe_reap_list)) != NULL) {
- nni_list_remove(&nni_pipe_reap_list, p);
-
- nni_mtx_unlock(&nni_pipe_reap_lk);
- nni_pipe_destroy(p);
- nni_mtx_lock(&nni_pipe_reap_lk);
- continue;
- }
- if (!nni_pipe_reap_run) {
- break;
- }
- nni_cv_wait(&nni_pipe_reap_cv);
- }
- nni_mtx_unlock(&nni_pipe_reap_lk);
+ return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 5c505514..1d73ce51 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -29,38 +29,11 @@ extern void nni_pipe_send(nni_pipe *, nni_aio *);
// Pipe operations that protocols use.
extern uint32_t nni_pipe_id(nni_pipe *);
-// nni_pipe_destroy destroys a pipe -- there must not be any other
-// references to it; this is used only during creation failures.
-extern void nni_pipe_destroy(nni_pipe *);
-
// nni_pipe_close closes the underlying transport for the pipe. Further
-// operations against will return NNG_ECLOSED.
+// operations against will return NNG_ECLOSED. This is idempotent. The
+// actual pipe will be reaped asynchronously.
extern void nni_pipe_close(nni_pipe *);
-// nni_pipe_stop is called to begin the process of tearing down the socket.
-// This function runs asynchronously, and takes care to ensure that no
-// other consumers are referencing the pipe. We assume that either the
-// socket (protocol code) or endpoint may have references to the pipe
-// when this function is called. The pipe cleanup is asynchronous and
-// make take a while depending on scheduling, etc. The pipe lock itself
-// may not be held during this, but any other locks may be.
-extern void nni_pipe_stop(nni_pipe *);
-
-// nni_pipe_create is used only by endpoints - as we don't wish to expose the
-// details of the pipe structure outside of pipe.c. This function must be
-// called without any locks held, as it will call back up into the socket and
-// endpoint, grabbing each of those locks. The function takes ownership of
-// the transport specific pipe (3rd argument), regardless of whether it
-// succeeds or not. The endpoint should be held when calling this.
-extern int nni_pipe_create2(nni_pipe **, nni_sock *, nni_tran *, void *);
-extern void nni_pipe_set_dialer(nni_pipe *, nni_dialer *);
-extern void nni_pipe_set_listener(nni_pipe *, nni_listener *);
-
-// nni_pipe_start is called by the socket to begin any startup activities
-// on the pipe before making it ready for use by protocols. For example,
-// TCP and IPC initial handshaking is performed this way.
-extern void nni_pipe_start(nni_pipe *);
-
extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
@@ -73,14 +46,6 @@ extern int nni_pipe_getopt(
// nni_pipe_set_proto_data function. No locking is performed.
extern void *nni_pipe_get_proto_data(nni_pipe *);
-// nni_pipe_sock_list_init initializes a list of pipes, to be used by
-// a per-socket list.
-extern void nni_pipe_sock_list_init(nni_list *);
-
-// nni_pipe_ep_list_init initializes a list of pipes, to be used by
-// a per-endpoint list.
-extern void nni_pipe_ep_list_init(nni_list *);
-
// nni_pipe_find finds a pipe given its ID. It places a hold on the
// pipe, which must be released by the caller when it is done.
extern int nni_pipe_find(nni_pipe **, uint32_t);
diff --git a/src/core/reap.c b/src/core/reap.c
index 8191dba3..bfad6c32 100644
--- a/src/core/reap.c
+++ b/src/core/reap.c
@@ -14,47 +14,62 @@
#include <stdbool.h>
-static nni_list nni_reap_list;
-static nni_mtx nni_reap_mtx;
-static nni_cv nni_reap_cv;
-static bool nni_reap_exit = false;
-static nni_thr nni_reap_thr;
+static nni_list reap_list;
+static nni_mtx reap_mtx;
+static nni_cv reap_cv;
+static nni_cv reap_empty_cv;
+static bool reap_exit = false;
+static bool reap_empty = false;
+static nni_thr reap_thr;
static void
-nni_reap_stuff(void *notused)
+reap_worker(void *notused)
{
NNI_ARG_UNUSED(notused);
- nni_mtx_lock(&nni_reap_mtx);
+ nni_mtx_lock(&reap_mtx);
for (;;) {
nni_reap_item *item;
- if ((item = nni_list_first(&nni_reap_list)) != NULL) {
- nni_list_remove(&nni_reap_list, item);
- nni_mtx_unlock(&nni_reap_mtx);
+ while ((item = nni_list_first(&reap_list)) != NULL) {
+ nni_list_remove(&reap_list, item);
+ nni_mtx_unlock(&reap_mtx);
item->r_func(item->r_ptr);
- nni_mtx_lock(&nni_reap_mtx);
- continue;
+ nni_mtx_lock(&reap_mtx);
}
- if (nni_reap_exit) {
+ reap_empty = true;
+ nni_cv_wake(&reap_empty_cv);
+
+ if (reap_exit) {
break;
}
- nni_cv_wait(&nni_reap_cv);
+ nni_cv_wait(&reap_cv);
}
- nni_mtx_unlock(&nni_reap_mtx);
+ nni_mtx_unlock(&reap_mtx);
}
void
nni_reap(nni_reap_item *item, nni_cb func, void *ptr)
{
- nni_mtx_lock(&nni_reap_mtx);
+ nni_mtx_lock(&reap_mtx);
item->r_func = func;
item->r_ptr = ptr;
- nni_list_append(&nni_reap_list, item);
- nni_cv_wake(&nni_reap_cv);
- nni_mtx_unlock(&nni_reap_mtx);
+ nni_list_append(&reap_list, item);
+ reap_empty = false;
+ nni_cv_wake(&reap_cv);
+ nni_mtx_unlock(&reap_mtx);
+}
+
+void
+nni_reap_drain(void)
+{
+ nni_mtx_lock(&reap_mtx);
+ while (!reap_empty) {
+ nni_cv_wait(&reap_empty_cv);
+ }
+ nni_mtx_unlock(&reap_mtx);
}
int
@@ -62,28 +77,30 @@ nni_reap_sys_init(void)
{
int rv;
- NNI_LIST_INIT(&nni_reap_list, nni_reap_item, r_link);
- nni_mtx_init(&nni_reap_mtx);
- nni_cv_init(&nni_reap_cv, &nni_reap_mtx);
- nni_reap_exit = false;
+ NNI_LIST_INIT(&reap_list, nni_reap_item, r_link);
+ nni_mtx_init(&reap_mtx);
+ nni_cv_init(&reap_cv, &reap_mtx);
+ nni_cv_init(&reap_empty_cv, &reap_mtx);
+ reap_exit = false;
// If this fails, we don't fail init, instead we will try to
// start up at reap time.
- if ((rv = nni_thr_init(&nni_reap_thr, nni_reap_stuff, NULL)) != 0) {
- nni_cv_fini(&nni_reap_cv);
- nni_mtx_fini(&nni_reap_mtx);
+ if ((rv = nni_thr_init(&reap_thr, reap_worker, NULL)) != 0) {
+ nni_cv_fini(&reap_cv);
+ nni_cv_fini(&reap_empty_cv);
+ nni_mtx_fini(&reap_mtx);
return (rv);
}
- nni_thr_run(&nni_reap_thr);
+ nni_thr_run(&reap_thr);
return (0);
}
void
nni_reap_sys_fini(void)
{
- nni_mtx_lock(&nni_reap_mtx);
- nni_reap_exit = true;
- nni_cv_wake(&nni_reap_cv);
- nni_mtx_unlock(&nni_reap_mtx);
- nni_thr_fini(&nni_reap_thr);
+ nni_mtx_lock(&reap_mtx);
+ reap_exit = true;
+ nni_cv_wake(&reap_cv);
+ nni_mtx_unlock(&reap_mtx);
+ nni_thr_fini(&reap_thr);
}
diff --git a/src/core/reap.h b/src/core/reap.h
index 843c8c9c..40c27a5d 100644
--- a/src/core/reap.h
+++ b/src/core/reap.h
@@ -38,6 +38,7 @@ typedef struct nni_reap_item {
// part of a fully reapable graph; otherwise this can lead to an infinite
// loop in the reap thread.
extern void nni_reap(nni_reap_item *, nni_cb, void *);
+extern void nni_reap_drain(void);
extern int nni_reap_sys_init(void);
extern void nni_reap_sys_fini(void);
diff --git a/src/core/socket.c b/src/core/socket.c
index 620c5d19..640e0db6 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -9,6 +9,7 @@
//
#include "core/nng_impl.h"
+#include "sockimpl.h"
#include <stdio.h>
#include <string.h>
@@ -58,7 +59,7 @@ struct nni_socket {
nni_cv s_cv;
nni_cv s_close_cv;
- uint64_t s_id;
+ uint32_t s_id;
uint32_t s_flags;
unsigned s_refcnt; // protected by global lock
void * s_data; // Protocol private
@@ -97,6 +98,9 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
+static void dialer_shutdown_locked(nni_dialer *);
+static void listener_shutdown_locked(nni_listener *);
+
static int
sock_get_fd(nni_sock *s, int flag, int *fdp)
{
@@ -340,7 +344,7 @@ nni_free_opt(nni_sockopt *opt)
uint32_t
nni_sock_id(nni_sock *s)
{
- return ((uint32_t) s->s_id);
+ return (s->s_id);
}
// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
@@ -405,58 +409,6 @@ nni_sock_closing(nni_sock *s)
return (rv);
}
-void
-nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id)
-{
- if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
- nng_pipe_cb cb;
- void * arg;
-
- nni_mtx_lock(&s->s_pipe_cbs_mtx);
- cb = s->s_pipe_cbs[ev].cb_fn;
- arg = s->s_pipe_cbs[ev].cb_arg;
- nni_mtx_unlock(&s->s_pipe_cbs_mtx);
-
- if (cb != NULL) {
- nng_pipe p;
- p.id = id;
- cb(p, ev, arg);
- }
- }
-}
-
-int
-nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
-{
- // Initialize protocol pipe data.
- nni_mtx_lock(&s->s_mx);
- if (s->s_closing) {
- nni_mtx_unlock(&s->s_mx);
- return (NNG_ECLOSED);
- }
-
- nni_list_append(&s->s_pipes, p);
-
- // Start the initial negotiation I/O...
- nni_pipe_start(p);
-
- nni_mtx_unlock(&s->s_mx);
- return (0);
-}
-
-void
-nni_sock_pipe_remove(nni_sock *s, nni_pipe *p)
-{
- nni_mtx_lock(&s->s_mx);
- if (nni_list_active(&s->s_pipes, p)) {
- nni_list_remove(&s->s_pipes, p);
- }
- if (s->s_closing && nni_list_empty(&s->s_pipes)) {
- nni_cv_wake(&s->s_cv);
- }
- nni_mtx_unlock(&s->s_mx);
-}
-
static void
sock_destroy(nni_sock *s)
{
@@ -521,10 +473,9 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
-
- nni_pipe_sock_list_init(&s->s_pipes);
- nni_listener_list_init(&s->s_listeners);
- nni_dialer_list_init(&s->s_dialers);
+ NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);
+ NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);
+ NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -615,7 +566,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
}
nni_mtx_lock(&sock_lk);
- if ((rv = nni_idhash_alloc(sock_hash, &s->s_id, s)) != 0) {
+ if ((rv = nni_idhash_alloc32(sock_hash, &s->s_id, s)) != 0) {
sock_destroy(s);
} else {
nni_list_append(&sock_list, s);
@@ -625,8 +576,7 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
nni_mtx_unlock(&sock_lk);
// Set the sockname.
- (void) snprintf(
- s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id);
+ (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
return (rv);
}
@@ -640,9 +590,7 @@ nni_sock_shutdown(nni_sock *sock)
{
nni_pipe * pipe;
nni_dialer * d;
- nni_dialer * nd;
nni_listener *l;
- nni_listener *nl;
nni_ctx * ctx;
nni_ctx * nctx;
@@ -657,10 +605,10 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_listeners, l) {
- nni_listener_shutdown(l);
+ listener_shutdown_locked(l);
}
NNI_LIST_FOREACH (&sock->s_dialers, d) {
- nni_dialer_shutdown(d);
+ dialer_shutdown_locked(d);
}
nni_mtx_unlock(&sock->s_mx);
@@ -706,35 +654,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
+ // For each pipe, arrange for it to teardown hard. We would
+ // expect there not to be any here.
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
// Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nl = nni_list_first(&sock->s_listeners);
- while ((l = nl) != NULL) {
- nl = nni_list_next(&sock->s_listeners, nl);
-
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
if (nni_listener_hold(l) == 0) {
- nni_mtx_unlock(&sock->s_mx);
nni_listener_close(l);
- nni_mtx_lock(&sock->s_mx);
}
}
- nd = nni_list_first(&sock->s_dialers);
- while ((d = nd) != NULL) {
- nd = nni_list_next(&sock->s_dialers, nd);
-
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
if (nni_dialer_hold(d) == 0) {
- nni_mtx_unlock(&sock->s_mx);
nni_dialer_close(d);
- nni_mtx_lock(&sock->s_mx);
}
}
- // For each pipe, arrange for it to teardown hard.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
-
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
@@ -1271,7 +1210,6 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
{
nni_ctx *ctx;
int rv;
- uint64_t id;
if (sock->s_ctx_ops.ctx_init == NULL) {
return (NNG_ENOTSUP);
@@ -1286,12 +1224,11 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
NNI_FREE_STRUCT(ctx);
return (NNG_ECLOSED);
}
- if ((rv = nni_idhash_alloc(ctx_hash, &id, ctx)) != 0) {
+ if ((rv = nni_idhash_alloc32(ctx_hash, &ctx->c_id, ctx)) != 0) {
nni_mtx_unlock(&sock_lk);
NNI_FREE_STRUCT(ctx);
return (rv);
}
- ctx->c_id = (uint32_t) id;
if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
nni_idhash_remove(ctx_hash, ctx->c_id);
@@ -1415,3 +1352,265 @@ nni_ctx_setopt(
nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+
+static void
+dialer_timer_start_locked(nni_dialer *d)
+{
+ nni_duration backoff;
+
+ backoff = d->d_currtime;
+ d->d_currtime *= 2;
+ if (d->d_currtime > d->d_maxrtime) {
+ d->d_currtime = d->d_maxrtime;
+ }
+
+ // To minimize damage from storms, etc., we select a backoff
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+ nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
+}
+
+void
+nni_dialer_timer_start(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+ nni_mtx_lock(&s->s_mx);
+ dialer_timer_start_locked(d);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_dialer_add_pipe(nni_dialer *d, nni_pipe *p)
+{
+ nni_sock *s = d->d_sock;
+
+ nni_mtx_lock(&s->s_mx);
+
+ if (s->s_closed || d->d_closed) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_destroy(p);
+ return;
+ }
+
+ p->p_dialer = d;
+ nni_list_append(&d->d_pipes, p);
+ nni_list_append(&s->s_pipes, p);
+ d->d_pipe = p;
+ d->d_currtime = d->d_inirtime;
+ nni_mtx_unlock(&s->s_mx);
+
+ // Start the initial negotiation I/O...
+ nni_pipe_start(p);
+}
+
+static void
+dialer_shutdown_impl(nni_dialer *d)
+{
+ nni_pipe *p;
+
+ // Abort any remaining in-flight operations.
+ nni_aio_close(d->d_con_aio);
+ nni_aio_close(d->d_tmo_aio);
+
+ // Stop the underlying transport.
+ d->d_ops.d_close(d->d_data);
+
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_close(p);
+ }
+}
+
+static void
+dialer_shutdown_locked(nni_dialer *d)
+{
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
+ dialer_shutdown_impl(d);
+}
+
+void
+nni_dialer_shutdown(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+ if (nni_atomic_flag_test_and_set(&d->d_closing)) {
+ return;
+ }
+ nni_mtx_lock(&s->s_mx);
+ dialer_shutdown_impl(d);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_dialer_reap(nni_dialer *d)
+{
+ nni_sock *s = d->d_sock;
+
+ nni_aio_stop(d->d_tmo_aio);
+ nni_aio_stop(d->d_con_aio);
+
+ nni_mtx_lock(&s->s_mx);
+ if (!nni_list_empty(&d->d_pipes)) {
+ nni_pipe *p;
+ // This should already have been done, but be certain!
+ NNI_LIST_FOREACH (&d->d_pipes, p) {
+ nni_pipe_close(p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+ // Go back to the end of reap list.
+ nni_reap(&d->d_reap, (nni_cb) nni_dialer_reap, d);
+ return;
+ }
+
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_dialer_destroy(d);
+}
+
+void
+nni_listener_add_pipe(nni_listener *l, nni_pipe *p)
+{
+ nni_sock *s = l->l_sock;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closed || l->l_closed) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_pipe_destroy(p);
+ return;
+ }
+ p->p_listener = l;
+ nni_list_append(&l->l_pipes, p);
+ nni_list_append(&s->s_pipes, p);
+ nni_mtx_unlock(&s->s_mx);
+
+ // Start the initial negotiation I/O...
+ nni_pipe_start(p);
+}
+
+static void
+listener_shutdown_impl(nni_listener *l)
+{
+ nni_pipe *p;
+
+ // Abort any remaining in-flight accepts.
+ nni_aio_close(l->l_acc_aio);
+ nni_aio_close(l->l_tmo_aio);
+
+ // Stop the underlying transport.
+ l->l_ops.l_close(l->l_data);
+
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_close(p);
+ }
+}
+
+static void
+listener_shutdown_locked(nni_listener *l)
+{
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
+ listener_shutdown_impl(l);
+}
+
+void
+nni_listener_shutdown(nni_listener *l)
+{
+ nni_sock *s = l->l_sock;
+
+ if (nni_atomic_flag_test_and_set(&l->l_closing)) {
+ return;
+ }
+
+ nni_mtx_lock(&s->s_mx);
+ listener_shutdown_impl(l);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_listener_reap(nni_listener *l)
+{
+ nni_sock *s = l->l_sock;
+
+ nni_aio_stop(l->l_tmo_aio);
+ nni_aio_stop(l->l_acc_aio);
+
+ nni_mtx_lock(&s->s_mx);
+ if (!nni_list_empty(&l->l_pipes)) {
+ nni_pipe *p;
+ // This should already have been done, but be certain!
+ NNI_LIST_FOREACH (&l->l_pipes, p) {
+ nni_pipe_close(p);
+ }
+ nni_mtx_unlock(&s->s_mx);
+ // Go back to the end of reap list.
+ nni_reap(&l->l_reap, (nni_cb) nni_listener_reap, l);
+ return;
+ }
+
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
+
+ nni_listener_destroy(l);
+}
+
+void
+nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
+{
+ nni_sock * s = p->p_sock;
+ nng_pipe_cb cb;
+ void * arg;
+
+ if (!p->p_cbs) {
+ if (ev == NNG_PIPE_EV_ADD_PRE) {
+ // First event, after this we want all other events.
+ p->p_cbs = true;
+ } else {
+ return;
+ }
+ }
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+
+ if (cb != NULL) {
+ nng_pipe pid;
+ pid.id = p->p_id;
+ cb(pid, ev, arg);
+ }
+}
+
+void
+nni_pipe_remove(nni_pipe *p)
+{
+ nni_sock * s = p->p_sock;
+ nni_dialer *d = p->p_dialer;
+
+ nni_mtx_lock(&s->s_mx);
+ nni_list_node_remove(&p->p_sock_node);
+ nni_list_node_remove(&p->p_ep_node);
+ p->p_listener = NULL;
+ p->p_dialer = NULL;
+ if ((d != NULL) && (d->d_pipe == p)) {
+ d->d_pipe = NULL;
+ dialer_timer_start_locked(d); // Kick the timer to redial.
+ }
+ if (s->s_closing) {
+ nni_cv_wake(&s->s_cv);
+ }
+ nni_mtx_unlock(&s->s_mx);
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 37256571..4b9c4642 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -38,20 +38,6 @@ extern void nni_sock_send(nni_sock *, nni_aio *);
extern void nni_sock_recv(nni_sock *, nni_aio *);
extern uint32_t nni_sock_id(nni_sock *);
-// nni_sock_pipe_add adds the pipe to the socket. It is called by
-// the generic pipe creation code. It also adds the socket to the
-// ep list, and starts the pipe. It does all these to ensure that
-// we have complete success or failure, and there is no point where
-// a pipe could wind up orphaned.
-extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
-extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
-
-extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
-extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
-
-extern int nni_sock_add_listener(nni_sock *, nni_listener *);
-extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
-
// These are socket methods that protocol operations can expect to call.
// Note that each of these should be called without any locks held, since
// the socket can reenter the protocol.
@@ -76,8 +62,6 @@ extern uint32_t nni_sock_flags(nni_sock *);
// should be executed.
extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *);
-extern void nni_sock_run_pipe_cb(nni_sock *sock, int, uint32_t);
-
extern bool nni_sock_closing(nni_sock *sock);
// nni_ctx_open is used to open/create a new context structure.
diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h
new file mode 100644
index 00000000..207a83b3
--- /dev/null
+++ b/src/core/sockimpl.h
@@ -0,0 +1,107 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_SOCKIMPL_H
+#define CORE_SOCKIMPL_H
+
+// This file contains stuff shared within the core between sockets, endpoints,
+// and pipes. This must not be exposed to other subsystems -- these internals
+// are subject to change at any time.
+
+struct nni_dialer {
+ nni_tran_dialer_ops d_ops; // transport ops
+ nni_tran * d_tran; // transport pointer
+ void * d_data; // transport private
+ uint32_t d_id; // endpoint id
+ nni_list_node d_node; // per socket list
+ nni_sock * d_sock;
+ nni_url * d_url;
+ nni_pipe * d_pipe; // active pipe (for redialer)
+ int d_refcnt;
+ int d_lastrv; // last result from synchronous
+ bool d_synch; // synchronous connect in progress?
+ bool d_closed; // full shutdown
+ nni_atomic_flag d_started;
+ nni_atomic_flag d_closing; // close pending (waiting on refcnt)
+ nni_mtx d_mtx;
+ nni_cv d_cv;
+ nni_list d_pipes;
+ nni_aio * d_con_aio;
+ nni_aio * d_tmo_aio; // backoff timer
+ nni_duration d_maxrtime; // maximum time for reconnect
+ nni_duration d_currtime; // current time for reconnect
+ nni_duration d_inirtime; // initial time for reconnect
+ nni_time d_conntime; // time of last good connect
+ nni_reap_item d_reap;
+};
+
+struct nni_listener {
+ nni_tran_listener_ops l_ops; // transport ops
+ nni_tran * l_tran; // transport pointer
+ void * l_data; // transport private
+ uint32_t l_id; // endpoint id
+ nni_list_node l_node; // per socket list
+ nni_sock * l_sock;
+ nni_url * l_url;
+ int l_refcnt;
+ bool l_closed; // full shutdown
+ nni_atomic_flag l_started;
+ nni_atomic_flag l_closing; // close started (shutdown)
+ nni_list l_pipes;
+ nni_aio * l_acc_aio;
+ nni_aio * l_tmo_aio;
+ nni_reap_item l_reap;
+};
+
+struct nni_pipe {
+ uint32_t p_id;
+ nni_tran_pipe_ops p_tran_ops;
+ nni_proto_pipe_ops p_proto_ops;
+ void * p_tran_data;
+ void * p_proto_data;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
+ nni_sock * p_sock;
+ nni_dialer * p_dialer;
+ nni_listener * p_listener;
+ bool p_closed;
+ nni_atomic_flag p_stop;
+ bool p_cbs;
+ int p_refcnt;
+ nni_mtx p_mtx;
+ nni_cv p_cv;
+ nni_reap_item p_reap;
+ nni_aio * p_start_aio;
+};
+
+extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);
+extern void nni_sock_remove_dialer(nni_sock *, nni_dialer *);
+
+extern int nni_sock_add_listener(nni_sock *, nni_listener *);
+extern void nni_sock_remove_listener(nni_sock *, nni_listener *);
+
+extern void nni_dialer_add_pipe(nni_dialer *, nni_pipe *);
+extern void nni_dialer_shutdown(nni_dialer *);
+extern void nni_dialer_reap(nni_dialer *);
+extern void nni_dialer_destroy(nni_dialer *);
+extern void nni_dialer_timer_start(nni_dialer *);
+
+extern void nni_listener_add_pipe(nni_listener *, nni_pipe *);
+extern void nni_listener_shutdown(nni_listener *);
+extern void nni_listener_reap(nni_listener *);
+extern void nni_listener_destroy(nni_listener *);
+
+extern void nni_pipe_remove(nni_pipe *);
+extern void nni_pipe_run_cb(nni_pipe *, nng_pipe_ev);
+extern void nni_pipe_destroy(nni_pipe *);
+extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *, void *);
+extern void nni_pipe_start(nni_pipe *);
+
+#endif // CORE_SOCKIMPL_H
diff --git a/src/nng.h b/src/nng.h
index ece1d1d6..50e557a4 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -229,7 +229,7 @@ NNG_DECL int nng_getopt_ptr(nng_socket, const char *, void **);
typedef enum {
NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket
NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket
- NNG_PIPE_EV_REM_POST, // Called just after poipe removed from socket
+ NNG_PIPE_EV_REM_POST, // Called just after pipe removed from socket
NNG_PIPE_EV_NUM, // Used internally, must be last.
} nng_pipe_ev;
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index ba719e49..2426abba 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -234,7 +234,7 @@ bus0_pipe_getq_cb(void *arg)
if (nni_aio_result(p->aio_getq) != 0) {
// closed?
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
@@ -252,7 +252,7 @@ bus0_pipe_send_cb(void *arg)
// closed?
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -267,7 +267,7 @@ bus0_pipe_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
msg = nni_aio_get_msg(p->aio_recv);
@@ -277,7 +277,7 @@ bus0_pipe_recv_cb(void *arg)
// XXX: bump a nomemory stat
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -295,7 +295,7 @@ bus0_pipe_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index 4ab9f9e8..d663c5e2 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -176,7 +176,7 @@ pair0_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -196,7 +196,7 @@ pair0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_pipe_recv(p->npipe, p->aio_recv);
@@ -208,7 +208,7 @@ pair0_getq_cb(void *arg)
pair0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -226,7 +226,7 @@ pair0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index 0c6a4867..dc250943 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -244,7 +244,7 @@ pair1_pipe_recv_cb(void *arg)
int rv;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -257,13 +257,13 @@ pair1_pipe_recv_cb(void *arg)
// If the message is missing the hop count header, scrap it.
if (nni_msg_len(msg) < sizeof(uint32_t)) {
nni_msg_free(msg);
- nni_pipe_stop(npipe);
+ nni_pipe_close(npipe);
return;
}
hdr = nni_msg_trim_u32(msg);
if (hdr & 0xffffff00) {
nni_msg_free(msg);
- nni_pipe_stop(npipe);
+ nni_pipe_close(npipe);
return;
}
@@ -343,7 +343,7 @@ pair1_pipe_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_pipe_recv(p->npipe, p->aio_recv);
@@ -358,7 +358,7 @@ pair1_pipe_getq_cb(void *arg)
uint32_t hops;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -405,7 +405,7 @@ pair1_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index 63243cb5..a713bc80 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -141,7 +141,7 @@ pull0_recv_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -163,7 +163,7 @@ pull0_putq_cb(void *arg)
// we can do. Just close the pipe.
nni_msg_free(nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 9585b86c..00e9212c 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -163,7 +163,7 @@ push0_recv_cb(void *arg)
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_msg_free(nni_aio_get_msg(p->aio_recv));
@@ -180,7 +180,7 @@ push0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -195,7 +195,7 @@ push0_getq_cb(void *arg)
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 72cd1daa..cbc8acea 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -240,7 +240,7 @@ pub0_pipe_recv_cb(void *arg)
pub0_pipe *p = arg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -255,7 +255,7 @@ pub0_pipe_getq_cb(void *arg)
pub0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -273,7 +273,7 @@ pub0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index 2e8be4be..cb6d781f 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -164,7 +164,7 @@ sub0_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -182,7 +182,7 @@ sub0_recv_cb(void *arg)
// Any other error we stop the pipe for. It's probably
// NNG_ECLOSED anyway.
nng_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_pipe_recv(p->pipe, p->aio_recv);
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index f725cadb..f9ce58fd 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -416,7 +416,7 @@ rep0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_mtx_lock(&s->lk);
@@ -519,7 +519,7 @@ rep0_pipe_recv_cb(void *arg)
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -544,7 +544,7 @@ rep0_pipe_recv_cb(void *arg)
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
body = nni_msg_body(msg);
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index 43751d14..018dd0e8 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -305,7 +305,7 @@ req0_send_cb(void *arg)
// We failed to send... clean up and deal with it.
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -345,7 +345,7 @@ req0_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -409,7 +409,7 @@ req0_recv_cb(void *arg)
malformed:
nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
}
static void
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index e3b9b605..1fe81ac5 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -262,7 +262,7 @@ xrep0_pipe_getq_cb(void *arg)
xrep0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -280,7 +280,7 @@ xrep0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -296,7 +296,7 @@ xrep0_pipe_recv_cb(void *arg)
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -327,7 +327,7 @@ xrep0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
body = nni_msg_body(msg);
@@ -361,7 +361,7 @@ xrep0_pipe_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 2f0a3652..a98c713e 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -173,7 +173,7 @@ xreq0_getq_cb(void *arg)
xreq0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -191,7 +191,7 @@ xreq0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -207,7 +207,7 @@ xreq0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
nni_aio_set_msg(p->aio_putq, NULL);
@@ -224,7 +224,7 @@ xreq0_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
@@ -236,7 +236,7 @@ xreq0_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer gave us garbage, so kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->pipe);
+ nni_pipe_close(p->pipe);
return;
}
id = nni_msg_trim_u32(msg);
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index fbdeb65a..4e0a5263 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -405,7 +405,7 @@ resp0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
nni_mtx_lock(&s->mtx);
@@ -511,7 +511,7 @@ resp0_pipe_recv_cb(void *arg)
size_t len;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -532,7 +532,7 @@ resp0_pipe_recv_cb(void *arg)
// Peer is speaking garbage, kick it.
nni_msg_free(msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
body = nni_msg_body(msg);
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index 42d88f13..58fa4aa6 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -376,7 +376,7 @@ surv0_pipe_getq_cb(void *arg)
surv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -394,7 +394,7 @@ surv0_pipe_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -411,7 +411,7 @@ surv0_pipe_recv_cb(void *arg)
uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -423,7 +423,7 @@ surv0_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer sent us garbage. Kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
id = nni_msg_trim_u32(msg);
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 03a47e92..334c5ca6 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -247,7 +247,7 @@ xresp0_getq_cb(void *arg)
xresp0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -265,7 +265,7 @@ xresp0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -282,7 +282,7 @@ xresp0_recv_cb(void *arg)
int hops;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -308,7 +308,7 @@ xresp0_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer sent us garbage, so kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
body = nni_msg_body(msg);
@@ -340,7 +340,7 @@ xresp0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index bad24042..fabcc766 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -205,7 +205,7 @@ xsurv0_getq_cb(void *arg)
xsurv0_pipe *p = arg;
if (nni_aio_result(p->aio_getq) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -223,7 +223,7 @@ xsurv0_send_cb(void *arg)
if (nni_aio_result(p->aio_send) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_send));
nni_aio_set_msg(p->aio_send, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -238,7 +238,7 @@ xsurv0_putq_cb(void *arg)
if (nni_aio_result(p->aio_putq) != 0) {
nni_msg_free(nni_aio_get_msg(p->aio_putq));
nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -252,7 +252,7 @@ xsurv0_recv_cb(void *arg)
nni_msg * msg;
if (nni_aio_result(p->aio_recv) != 0) {
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
@@ -264,7 +264,7 @@ xsurv0_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer gave us garbage, so kick it.
nni_msg_free(msg);
- nni_pipe_stop(p->npipe);
+ nni_pipe_close(p->npipe);
return;
}
if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
diff --git a/tests/sock.c b/tests/sock.c
index 002ae944..4aef1f38 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -315,6 +315,8 @@ TestMain("Socket Operations", {
size_t sz;
nng_socket s2;
char * a = "inproc://asy";
+ So(nng_setopt_ms(s1, NNG_OPT_RECONNMINT, 10) == 0);
+ So(nng_setopt_ms(s1, NNG_OPT_RECONNMAXT, 10) == 0);
So(nng_dial(s1, a, NULL, NNG_FLAG_NONBLOCK) == 0);
Convey("And connects late", {
So(nng_pair_open(&s2) == 0);