aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-12 12:24:54 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-14 13:43:02 -0700
commit343417234aa3fd86e8ae0b56ae500a1ed3411cfc (patch)
tree728992cfe8c2987d5939026a1f734dcc58b3df18 /src
parent4fb81f024e5f32a186cd5538574f8e5796980e36 (diff)
downloadnng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.gz
nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.bz2
nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.zip
fixes #62 Endpoint close should be synchronous #62
fixes #66 Make pipe and endpoint structures private This changes a number of things, refactoring endpoints and supporting code to keep their internals private, and making endpoint close synchronous. This will allow us to add a consumer facing API for nng_ep_close(), as well as property APIs, etc. While here a bunch of convoluted and dead code was cleaned up.
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c389
-rw-r--r--src/core/endpt.h70
-rw-r--r--src/core/pipe.c114
-rw-r--r--src/core/pipe.h20
-rw-r--r--src/core/socket.c283
-rw-r--r--src/core/socket.h15
-rw-r--r--src/nng.c54
-rw-r--r--src/nng.h36
-rw-r--r--src/nng_compat.c20
9 files changed, 420 insertions, 581 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index a5865acf..6e5f7e8a 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -14,58 +14,63 @@
#include <stdlib.h>
#include <string.h>
+struct nni_ep {
+ nni_tran_ep ep_ops; // transport ops
+ nni_tran * ep_tran; // transport pointer
+ void * ep_data; // transport private
+ uint32_t ep_id; // endpoint id
+ nni_list_node ep_node; // per socket list
+ nni_sock * ep_sock;
+ char ep_addr[NNG_MAXADDRLEN];
+ int ep_mode;
+ int ep_closed; // full shutdown
+ int ep_closing; // close pending (waiting on refcnt)
+ int ep_bound; // true if we bound locally
+ int ep_refcnt;
+ nni_mtx ep_mtx;
+ nni_cv ep_cv;
+ nni_list ep_pipes;
+ nni_aio ep_acc_aio;
+ nni_aio ep_con_aio;
+ nni_aio ep_con_syn; // used for sync connect
+ nni_aio ep_tmo_aio; // backoff timer
+ nni_duration ep_maxrtime; // maximum time for reconnect
+ nni_duration ep_currtime; // current time for reconnect
+ nni_duration ep_inirtime; // initial time for reconnect
+ nni_time ep_conntime; // time of last good connect
+};
+
// Functionality related to end points.
-static void nni_ep_accept_start(nni_ep *);
-static void nni_ep_accept_done(void *);
-static void nni_ep_connect_start(nni_ep *);
-static void nni_ep_connect_done(void *);
-static void nni_ep_backoff_start(nni_ep *);
-static void nni_ep_backoff_done(void *);
-static void nni_ep_reaper(void *);
+static void nni_ep_acc_start(nni_ep *);
+static void nni_ep_acc_cb(void *);
+static void nni_ep_con_start(nni_ep *);
+static void nni_ep_con_cb(void *);
+static void nni_ep_tmo_start(nni_ep *);
+static void nni_ep_tmo_cb(void *);
static nni_idhash *nni_eps;
-
-static nni_list nni_ep_reap_list;
-static nni_mtx nni_ep_reap_lk;
-static nni_cv nni_ep_reap_cv;
-static nni_thr nni_ep_reap_thr;
-static int nni_ep_reap_run;
+static nni_mtx nni_ep_lk;
int
nni_ep_sys_init(void)
{
int rv;
- NNI_LIST_INIT(&nni_ep_reap_list, nni_ep, ep_reap_node);
-
- if (((rv = nni_mtx_init(&nni_ep_reap_lk)) != 0) ||
- ((rv = nni_cv_init(&nni_ep_reap_cv, &nni_ep_reap_lk)) != 0) ||
- ((rv = nni_thr_init(&nni_ep_reap_thr, nni_ep_reaper, 0)) != 0) ||
+ if (((rv = nni_mtx_init(&nni_ep_lk)) != 0) ||
((rv = nni_idhash_init(&nni_eps)) != 0)) {
return (rv);
}
nni_idhash_set_limits(
nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
- nni_ep_reap_run = 1;
- nni_thr_run(&nni_ep_reap_thr);
-
return (0);
}
void
nni_ep_sys_fini(void)
{
- if (nni_ep_reap_run) {
- nni_mtx_lock(&nni_ep_reap_lk);
- nni_ep_reap_run = 0;
- nni_cv_wake(&nni_ep_reap_cv);
- nni_mtx_unlock(&nni_ep_reap_lk);
- }
- nni_thr_fini(&nni_ep_reap_thr);
- nni_cv_fini(&nni_ep_reap_cv);
- nni_mtx_fini(&nni_ep_reap_lk);
+ nni_mtx_fini(&nni_ep_lk);
nni_idhash_fini(nni_eps);
nni_eps = NULL;
}
@@ -83,19 +88,22 @@ nni_ep_destroy(nni_ep *ep)
return;
}
- // Remove us form the table so we cannot be found.
+ // Remove us from the table so we cannot be found.
if (ep->ep_id != 0) {
nni_idhash_remove(nni_eps, ep->ep_id);
}
+
+ nni_sock_ep_remove(ep->ep_sock, ep);
+
nni_aio_stop(&ep->ep_acc_aio);
nni_aio_stop(&ep->ep_con_aio);
nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_backoff);
+ nni_aio_stop(&ep->ep_tmo_aio);
nni_aio_fini(&ep->ep_acc_aio);
nni_aio_fini(&ep->ep_con_aio);
nni_aio_fini(&ep->ep_con_syn);
- nni_aio_fini(&ep->ep_backoff);
+ nni_aio_fini(&ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_data != NULL) {
@@ -107,8 +115,8 @@ nni_ep_destroy(nni_ep *ep)
NNI_FREE_STRUCT(ep);
}
-int
-nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
+static int
+nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
{
nni_tran *tran;
nni_ep * ep;
@@ -127,152 +135,174 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
ep->ep_closed = 0;
ep->ep_bound = 0;
ep->ep_data = NULL;
- ep->ep_refcnt = 0;
+ ep->ep_refcnt = 1;
+ ep->ep_sock = s;
+ ep->ep_tran = tran;
+ ep->ep_mode = mode;
+
+ // Make a copy of the endpoint operations. This allows us to
+ // modify them (to override NULLs for example), and avoids an extra
+ // dereference on hot paths.
+ ep->ep_ops = *tran->tran_ep;
+
+ // Could safely use strcpy here, but this avoids discussion.
+ (void) snprintf(ep->ep_addr, sizeof(ep->ep_addr), "%s", addr);
NNI_LIST_NODE_INIT(&ep->ep_node);
- NNI_LIST_NODE_INIT(&ep->ep_reap_node);
nni_pipe_ep_list_init(&ep->ep_pipes);
if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) ||
((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) ||
- ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0)) {
- nni_ep_destroy(ep);
- return (rv);
- }
- rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_accept_done, ep);
- if (rv != 0) {
- nni_ep_destroy(ep);
- return (rv);
- }
- rv = nni_aio_init(&ep->ep_con_aio, nni_ep_connect_done, ep);
- if (rv != 0) {
- nni_ep_destroy(ep);
- return (rv);
- }
- rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL);
- if (rv != 0) {
- nni_ep_destroy(ep);
- return (rv);
- }
- rv = nni_aio_init(&ep->ep_backoff, nni_ep_backoff_done, ep);
- if (rv != 0) {
+ ((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
+ ((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
return (rv);
}
- ep->ep_sock = sock;
- ep->ep_tran = tran;
- ep->ep_mode = mode;
+ *epp = ep;
+ return (0);
+}
- // Could safely use strcpy here, but this avoids discussion.
- (void) snprintf(ep->ep_addr, sizeof(ep->ep_addr), "%s", addr);
+int
+nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *addr)
+{
+ return (nni_ep_create(epp, s, addr, NNI_EP_MODE_DIAL));
+}
- // Make a copy of the endpoint operations. This allows us to
- // modify them (to override NULLs for example), and avoids an extra
- // dereference on hot paths.
- ep->ep_ops = *tran->tran_ep;
+int
+nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *addr)
+{
+ return (nni_ep_create(epp, s, addr, NNI_EP_MODE_LISTEN));
+}
- if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) {
- nni_ep_destroy(ep);
+int
+nni_ep_find(nni_ep **epp, uint32_t id)
+{
+ int rv;
+ nni_ep *ep;
+
+ if ((rv = nni_init()) != 0) {
return (rv);
}
- *epp = ep;
- return (0);
+ nni_mtx_lock(&nni_ep_lk);
+ if ((rv = nni_idhash_find(nni_eps, id, (void **) &ep)) == 0) {
+ if (ep->ep_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ ep->ep_refcnt++;
+ *epp = ep;
+ }
+ }
+ nni_mtx_unlock(&nni_ep_lk);
+ return (rv);
+}
+
+int
+nni_ep_hold(nni_ep *ep)
+{
+ int rv;
+ nni_mtx_lock(&nni_ep_lk);
+ if (ep->ep_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ ep->ep_refcnt++;
+ rv = 0;
+ }
+ nni_mtx_unlock(&nni_ep_lk);
+ return (rv);
}
void
-nni_ep_close(nni_ep *ep)
+nni_ep_rele(nni_ep *ep)
+{
+ nni_mtx_lock(&nni_ep_lk);
+ ep->ep_refcnt--;
+ if (ep->ep_closing) {
+ nni_cv_wake(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&nni_ep_lk);
+}
+
+int
+nni_ep_shutdown(nni_ep *ep)
{
+ // This is done under the endpoints lock, although the remove
+ // is done under that as well, we also make sure that we hold
+ // the socket lock in the remove step.
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
nni_mtx_unlock(&ep->ep_mtx);
- return;
+ return (NNG_ECLOSED);
}
- ep->ep_closed = 1;
- nni_mtx_unlock(&ep->ep_mtx);
+ ep->ep_closing = 1;
// Abort any remaining in-flight operations.
nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_backoff, NNG_ECLOSED);
+ nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
-}
-
-static void
-nni_ep_reap(nni_ep *ep)
-{
- nni_ep_close(ep); // Extra sanity.
-
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_backoff);
- // Take us off the sock list.
- nni_sock_ep_remove(ep->ep_sock, ep);
-
- // Make sure any other unlocked users (references) are gone
- // before we actually remove the memory. We should not have
- // to wait long as we have closed the underlying pipe and
- // done everything we can to wake any waiter (synchronous connect)
- // gracefully.
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_closed = 1;
- for (;;) {
- if ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 0)) {
- nni_cv_wait(&ep->ep_cv);
- continue;
- }
- break;
- }
nni_mtx_unlock(&ep->ep_mtx);
- nni_ep_destroy(ep);
+ return (0);
}
void
-nni_ep_stop(nni_ep *ep)
+nni_ep_close(nni_ep *ep)
{
- nni_pipe *pipe;
+ nni_pipe *p;
nni_mtx_lock(&ep->ep_mtx);
-
- // Protection against recursion.
- if (ep->ep_stop) {
+ if (ep->ep_closed) {
nni_mtx_unlock(&ep->ep_mtx);
+ nni_ep_rele(ep);
return;
}
- ep->ep_stop = 1;
- NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
- nni_pipe_stop(pipe);
+ ep->ep_closed = 1;
+ nni_mtx_unlock(&ep->ep_mtx);
+
+ nni_ep_shutdown(ep);
+
+ nni_aio_stop(&ep->ep_acc_aio);
+ nni_aio_stop(&ep->ep_con_aio);
+ nni_aio_stop(&ep->ep_con_syn);
+ nni_aio_stop(&ep->ep_tmo_aio);
+
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, p) {
+ nni_pipe_stop(p);
+ }
+ while ((!nni_list_empty(&ep->ep_pipes)) || (ep->ep_refcnt != 1)) {
+ nni_cv_wait(&ep->ep_cv);
}
nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_lock(&nni_ep_reap_lk);
- NNI_ASSERT(!nni_list_node_active(&ep->ep_reap_node));
- nni_list_append(&nni_ep_reap_list, ep);
- nni_cv_wake(&nni_ep_reap_cv);
- nni_mtx_unlock(&nni_ep_reap_lk);
+ nni_ep_destroy(ep);
}
static void
-nni_ep_backoff_cancel(nni_aio *aio, int rv)
+nni_ep_tmo_cancel(nni_aio *aio, int rv)
{
// The only way this ever gets "finished", is via cancellation.
nni_aio_finish_error(aio, rv);
}
static void
-nni_ep_backoff_start(nni_ep *ep)
+nni_ep_tmo_start(nni_ep *ep)
{
nni_duration backoff;
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
return;
}
backoff = ep->ep_currtime;
@@ -282,36 +312,36 @@ nni_ep_backoff_start(nni_ep *ep)
}
// To minimize damage from storms, etc., we select a backoff
- // value randomly, in the range of [0, backoff-1]; this is pretty
- // similar to 802 style backoff, except that we have a nearly
- // uniform time period instead of discrete slot times. This
- // algorithm may lead to slight biases because we don't have
- // a statistically perfect distribution with the modulo of the
- // random number, but this really doesn't matter.
-
- ep->ep_backoff.a_expire = nni_clock() + (nni_random() % backoff);
- nni_aio_start(&ep->ep_backoff, nni_ep_backoff_cancel, ep);
+ // value randomly, in the range of [0, backoff-1]; this is
+ // pretty similar to 802 style backoff, except that we have a
+ // nearly uniform time period instead of discrete slot times.
+ // This algorithm may lead to slight biases because we don't
+ // have a statistically perfect distribution with the modulo of
+ // the random number, but this really doesn't matter.
+
+ ep->ep_tmo_aio.a_expire = nni_clock() + (nni_random() % backoff);
+ nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep);
}
static void
-nni_ep_backoff_done(void *arg)
+nni_ep_tmo_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_backoff;
+ nni_aio *aio = &ep->ep_tmo_aio;
nni_mtx_lock(&ep->ep_mtx);
if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
if (ep->ep_mode == NNI_EP_MODE_DIAL) {
- nni_ep_connect_start(ep);
+ nni_ep_con_start(ep);
} else {
- nni_ep_accept_start(ep);
+ nni_ep_acc_start(ep);
}
}
nni_mtx_unlock(&ep->ep_mtx);
}
static void
-nni_ep_connect_done(void *arg)
+nni_ep_con_cb(void *arg)
{
nni_ep * ep = arg;
nni_aio *aio = &ep->ep_con_aio;
@@ -324,11 +354,12 @@ nni_ep_connect_done(void *arg)
switch (rv) {
case 0:
// Good connect, so reset the backoff timer.
- // Note that a host that accepts the connect, but drops us
- // immediately, is going to get hit pretty hard (depending
- // on the initial backoff) with no exponential backoff.
- // This can happen if we wind up trying to connect to some
- // port that does not speak SP for example.
+ // Note that a host that accepts the connect, but drops
+ // us immediately, is going to get hit pretty hard
+ // (depending on the initial backoff) with no
+ // exponential backoff. This can happen if we wind up
+ // trying to connect to some port that does not speak
+ // SP for example.
ep->ep_currtime = ep->ep_inirtime;
// No further outgoing connects -- we will restart a
@@ -340,19 +371,19 @@ nni_ep_connect_done(void *arg)
break;
default:
// Other errors involve the use of the backoff timer.
- nni_ep_backoff_start(ep);
+ nni_ep_tmo_start(ep);
break;
}
nni_mtx_unlock(&ep->ep_mtx);
}
static void
-nni_ep_connect_start(nni_ep *ep)
+nni_ep_con_start(nni_ep *ep)
{
nni_aio *aio = &ep->ep_con_aio;
// Call with the Endpoint lock held.
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
return;
}
@@ -375,13 +406,13 @@ nni_ep_dial(nni_ep *ep, int flags)
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
}
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
if ((flags & NNG_FLAG_SYNCH) == 0) {
- nni_ep_connect_start(ep);
+ nni_ep_con_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
return (0);
}
@@ -404,7 +435,7 @@ nni_ep_dial(nni_ep *ep, int flags)
}
static void
-nni_ep_accept_done(void *arg)
+nni_ep_acc_cb(void *arg)
{
nni_ep * ep = arg;
nni_aio *aio = &ep->ep_acc_aio;
@@ -418,7 +449,7 @@ nni_ep_accept_done(void *arg)
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
case 0:
- nni_ep_accept_start(ep);
+ nni_ep_acc_start(ep);
break;
case NNG_ECLOSED:
case NNG_ECANCELED:
@@ -427,26 +458,27 @@ nni_ep_accept_done(void *arg)
case NNG_ECONNABORTED:
case NNG_ECONNRESET:
// These are remote conditions, no cool down.
- nni_ep_accept_start(ep);
+ nni_ep_acc_start(ep);
break;
default:
- // We don't really know why we failed, but we backoff here.
- // This is because errors here are probably due to system
- // failures (resource exhaustion) and we hope by not
- // thrashing we give the system a chance to recover.
- nni_ep_backoff_start(ep);
+ // We don't really know why we failed, but we backoff
+ // here. This is because errors here are probably due
+ // to system failures (resource exhaustion) and we hope
+ // by not thrashing we give the system a chance to
+ // recover.
+ nni_ep_tmo_start(ep);
break;
}
nni_mtx_unlock(&ep->ep_mtx);
}
static void
-nni_ep_accept_start(nni_ep *ep)
+nni_ep_acc_start(nni_ep *ep)
{
nni_aio *aio = &ep->ep_acc_aio;
// Call with the Endpoint lock held.
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
return;
}
aio->a_pipe = NULL;
@@ -467,7 +499,7 @@ nni_ep_listen(nni_ep *ep, int flags)
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ENOTSUP);
}
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
@@ -479,7 +511,7 @@ nni_ep_listen(nni_ep *ep, int flags)
}
ep->ep_bound = 1;
- nni_ep_accept_start(ep);
+ nni_ep_acc_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
return (0);
@@ -489,12 +521,11 @@ int
nni_ep_pipe_add(nni_ep *ep, nni_pipe *p)
{
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
+ if (ep->ep_closing) {
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
nni_list_append(&ep->ep_pipes, p);
- p->p_ep = ep;
nni_mtx_unlock(&ep->ep_mtx);
return (0);
}
@@ -508,7 +539,6 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
if (nni_list_active(&ep->ep_pipes, pipe)) {
nni_list_remove(&ep->ep_pipes, pipe);
}
- pipe->p_ep = NULL;
// Wake up the close thread if it is waiting.
if (ep->ep_closed && nni_list_empty(&ep->ep_pipes)) {
nni_cv_wake(&ep->ep_cv);
@@ -517,9 +547,10 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
// If this pipe closed, then lets restart the dial operation.
// Since the remote side seems to have closed, lets start with
// a backoff. This keeps us from pounding the crap out of the
- // thing if a remote server accepts but then disconnects immediately.
+ // thing if a remote server accepts but then disconnects
+ // immediately.
if ((!ep->ep_closed) && (ep->ep_mode == NNI_EP_MODE_DIAL)) {
- nni_ep_backoff_start(ep);
+ nni_ep_tmo_start(ep);
}
nni_mtx_unlock(&ep->ep_mtx);
}
@@ -530,28 +561,14 @@ nni_ep_list_init(nni_list *list)
NNI_LIST_INIT(list, nni_ep, ep_node);
}
-static void
-nni_ep_reaper(void *notused)
+nni_tran *
+nni_ep_tran(nni_ep *ep)
{
- NNI_ARG_UNUSED(notused);
-
- nni_mtx_lock(&nni_ep_reap_lk);
- for (;;) {
- nni_ep *ep;
-
- if ((ep = nni_list_first(&nni_ep_reap_list)) != NULL) {
- nni_list_remove(&nni_ep_reap_list, ep);
- nni_mtx_unlock(&nni_ep_reap_lk);
- nni_ep_reap(ep);
- nni_mtx_lock(&nni_ep_reap_lk);
- continue;
- }
-
- if (!nni_ep_reap_run) {
- break;
- }
+ return (ep->ep_tran);
+}
- nni_cv_wait(&nni_ep_reap_cv);
- }
- nni_mtx_unlock(&nni_ep_reap_lk);
-} \ No newline at end of file
+nni_sock *
+nni_ep_sock(nni_ep *ep)
+{
+ return (ep->ep_sock);
+}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 5a923bd5..fbb10911 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -1,5 +1,5 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
@@ -11,58 +11,30 @@
#ifndef CORE_ENDPT_H
#define CORE_ENDPT_H
-#include "core/defs.h"
-#include "core/list.h"
-#include "core/thread.h"
-#include "core/transport.h"
-
-// NB: This structure is supplied here for use by the CORE. Use of this
-// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS
-// OR TRANSPORTS.
-struct nni_ep {
- nni_tran_ep ep_ops; // transport ops
- nni_tran * ep_tran; // transport pointer
- void * ep_data; // transport private
- uint32_t ep_id; // endpoint id
- nni_list_node ep_node; // per socket list
- nni_sock * ep_sock;
- char ep_addr[NNG_MAXADDRLEN];
- int ep_mode;
- int ep_started;
- int ep_stop;
- int ep_closed; // full shutdown
- int ep_bound; // true if we bound locally
- int ep_refcnt;
- nni_mtx ep_mtx;
- nni_cv ep_cv;
- nni_list ep_pipes;
- nni_aio ep_acc_aio;
- nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
- nni_aio ep_backoff; // backoff timer
- nni_duration ep_maxrtime; // maximum time for reconnect
- nni_duration ep_currtime; // current time for reconnect
- nni_duration ep_inirtime; // initial time for reconnect
- nni_time ep_conntime; // time of last good connect
- nni_list_node ep_reap_node;
-};
+extern int nni_ep_sys_init(void);
+extern void nni_ep_sys_fini(void);
+extern nni_tran *nni_ep_tran(nni_ep *);
+extern nni_sock *nni_ep_sock(nni_ep *);
+extern int nni_ep_find(nni_ep **, uint32_t);
+extern int nni_ep_hold(nni_ep *);
+extern void nni_ep_rele(nni_ep *);
+extern uint32_t nni_ep_id(nni_ep *);
+extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
+extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
+extern void nni_ep_stop(nni_ep *);
+extern int nni_ep_shutdown(nni_ep *);
+extern void nni_ep_close(nni_ep *);
+extern int nni_ep_dial(nni_ep *, int);
+extern int nni_ep_listen(nni_ep *, int);
+extern void nni_ep_list_init(nni_list *);
+extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
+extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
+// Endpoint modes. Currently used by transports. Remove this when we make
+// transport dialers and listeners explicit.
enum nni_ep_mode {
NNI_EP_MODE_DIAL = 1,
NNI_EP_MODE_LISTEN = 2,
};
-extern int nni_ep_sys_init(void);
-extern void nni_ep_sys_fini(void);
-extern int nni_ep_find(nni_ep **, uint32_t);
-extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int);
-extern void nni_ep_stop(nni_ep *);
-extern void nni_ep_close(nni_ep *);
-extern int nni_ep_dial(nni_ep *, int);
-extern int nni_ep_listen(nni_ep *, int);
-extern void nni_ep_list_init(nni_list *);
-extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
-extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
-
#endif // CORE_ENDPT_H
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 1658aabc..75c4c8d6 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -15,6 +15,24 @@
// Operations on pipes (to the transport) are generally blocking operations,
// performed in the context of the protocol.
+struct nni_pipe {
+ uint32_t p_id;
+ nni_tran_pipe p_tran_ops;
+ void * p_tran_data;
+ void * p_proto_data;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
+ nni_sock * p_sock;
+ nni_ep * p_ep;
+ int p_reap;
+ int p_stop;
+ int p_refcnt;
+ nni_mtx p_mtx;
+ nni_cv p_cv;
+ nni_list_node p_reap_node;
+ nni_aio p_start_aio;
+};
+
static nni_idhash *nni_pipes;
static nni_list nni_pipe_reap_list;
@@ -78,7 +96,10 @@ nni_pipe_destroy(nni_pipe *p)
if (p == NULL) {
return;
}
- NNI_ASSERT(p->p_refcnt != 0xDEAD);
+
+ // Stop any pending negotiation.
+ nni_aio_stop(&p->p_start_aio);
+
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
@@ -86,14 +107,17 @@ nni_pipe_destroy(nni_pipe *p)
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
- p->p_refcnt = 0xDEAD;
- nni_aio_stop(&p->p_start_aio);
+ // We have exclusive access at this point, so we can check if
+ // we are still on any lists.
+
nni_aio_fini(&p->p_start_aio);
- if (p->p_proto_data != NULL) {
- p->p_proto_dtor(p->p_proto_data);
+ if (nni_list_node_active(&p->p_ep_node)) {
+ nni_ep_pipe_remove(p->p_ep, p);
}
+ nni_sock_pipe_remove(p->p_sock, p);
+
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -148,31 +172,6 @@ nni_pipe_close(nni_pipe *p)
nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
}
-// Pipe reap is called on a taskq when the pipe should be closed. No
-// locks are held. This routine must take care to synchronously ensure
-// that no further references to the pipe are possible, then it may
-// destroy the pipe.
-static void
-nni_pipe_reap(nni_pipe *p)
-{
- // Transport close...
- nni_pipe_close(p);
-
- nni_aio_stop(&p->p_start_aio);
-
- // Remove the pipe from the socket and the endpoint. Note
- // that it is in theory possible for either of these to be null
- // if the pipe is being torn down before it is fully initialized.
- if (p->p_ep != NULL) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (p->p_sock != NULL) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
-
- nni_pipe_destroy(p);
-}
-
void
nni_pipe_stop(nni_pipe *p)
{
@@ -206,16 +205,12 @@ nni_pipe_start_cb(void *arg)
nni_aio * aio = &p->p_start_aio;
int rv;
- nni_mtx_lock(&p->p_mtx);
if ((rv = nni_aio_result(aio)) != 0) {
- nni_mtx_unlock(&p->p_mtx);
nni_pipe_stop(p);
return;
}
- nni_mtx_unlock(&p->p_mtx);
-
- if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
+ if ((rv = nni_sock_pipe_start(p->p_sock, p)) != 0) {
nni_pipe_stop(p);
}
}
@@ -225,8 +220,8 @@ nni_pipe_create(nni_ep *ep, void *tdata)
{
nni_pipe *p;
int rv;
- nni_tran *tran = ep->ep_tran;
- nni_sock *sock = ep->ep_sock;
+ nni_tran *tran = nni_ep_tran(ep);
+ nni_sock *sock = nni_ep_sock(ep);
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -238,31 +233,24 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
p->p_proto_data = NULL;
+ p->p_ep = ep;
+ p->p_sock = sock;
NNI_LIST_NODE_INIT(&p->p_reap_node);
-
- if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
- ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0)) {
- tran->tran_pipe->p_fini(p);
- nni_mtx_fini(&p->p_mtx);
- NNI_FREE_STRUCT(p);
- return (rv);
- }
-
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
- if ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) {
- nni_pipe_destroy(p);
- return (rv);
- }
-
- if ((rv = nni_sock_pipe_add(sock, ep, p)) != 0) {
+ if (((rv = nni_mtx_init(&p->p_mtx)) != 0) ||
+ ((rv = nni_cv_init(&p->p_cv, &p->p_mtx)) != 0) ||
+ ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) !=
+ 0) ||
+ ((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
+ ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
+ ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
- return (rv);
}
- return (0);
+ return (rv);
}
int
@@ -292,6 +280,12 @@ nni_pipe_get_proto_data(nni_pipe *p)
}
void
+nni_pipe_set_proto_data(nni_pipe *p, void *data)
+{
+ p->p_proto_data = data;
+}
+
+void
nni_pipe_sock_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_sock_node);
@@ -318,18 +312,6 @@ nni_pipe_reaper(void *notused)
// Transport close...
nni_pipe_close(p);
- nni_aio_stop(&p->p_start_aio);
-
- // Remove the pipe from the socket and the endpoint.
- // Note that it is in theory possible for either of
- // these to be null if the pipe is being torn down
- // before it is fully initialized.
- if (p->p_ep != NULL) {
- nni_ep_pipe_remove(p->p_ep, p);
- }
- if (p->p_sock != NULL) {
- nni_sock_pipe_remove(p->p_sock, p);
- }
nni_pipe_destroy(p);
nni_mtx_lock(&nni_pipe_reap_lk);
continue;
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 40871062..9436d650 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -19,25 +19,6 @@
#include "core/thread.h"
#include "core/transport.h"
-struct nni_pipe {
- uint32_t p_id;
- nni_tran_pipe p_tran_ops;
- void * p_tran_data;
- void * p_proto_data;
- nni_cb p_proto_dtor;
- nni_list_node p_sock_node;
- nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_ep * p_ep;
- int p_reap;
- int p_stop;
- int p_refcnt;
- nni_mtx p_mtx;
- nni_cv p_cv;
- nni_list_node p_reap_node;
- nni_aio p_start_aio;
-};
-
extern int nni_pipe_sys_init(void);
extern void nni_pipe_sys_fini(void);
@@ -81,6 +62,7 @@ extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
extern void *nni_pipe_get_proto_data(nni_pipe *);
+extern void nni_pipe_set_proto_data(nni_pipe *, void *);
// nni_pipe_sock_list_init initializes a list of pipes, to be used by
// a per-socket list.
diff --git a/src/core/socket.c b/src/core/socket.c
index 9471e56e..e01791a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -64,132 +64,63 @@ nni_sock_find(nni_sock **sockp, uint32_t id)
void
nni_sock_rele(nni_sock *s)
{
- nni_mtx_lock(&s->s_mx);
+ nni_mtx_lock(&nni_sock_lk);
s->s_refcnt--;
- if (s->s_closing) {
- nni_cv_wake(&s->s_cv);
+ if (s->s_closed && (s->s_refcnt < 2)) {
+ nni_cv_wake(&s->s_close_cv);
}
- nni_mtx_unlock(&s->s_mx);
+ nni_mtx_unlock(&nni_sock_lk);
}
-static int
-nni_sock_pipe_start(nni_pipe *pipe)
+int
+nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- nni_sock *s = pipe->p_sock;
- void * pdata = nni_pipe_get_proto_data(pipe);
- int rv;
+ void *pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
NNI_ASSERT(s != NULL);
+ nni_mtx_lock(&s->s_mx);
if (s->s_closing) {
// We're closing, bail out.
- return (NNG_ECLOSED);
- }
- if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
+ rv = NNG_ECLOSED;
+ } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
// Peer protocol mismatch.
- return (NNG_EPROTO);
- }
- if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) {
- // Protocol rejection for other reasons.
- // E.g. pair and already have active connected partner.
- return (rv);
+ rv = NNG_EPROTO;
+ } else {
+ // Protocol can reject for other reasons.
+ rv = s->s_pipe_ops.pipe_start(pdata);
}
+ nni_mtx_unlock(&s->s_mx);
return (0);
}
-static void
-nni_sock_pipe_start_cb(void *arg)
+int
+nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
{
- nni_pipe *pipe = arg;
- nni_aio * aio = &pipe->p_start_aio;
+ int rv;
+ void *pdata;
- if (nni_aio_result(aio) != 0) {
- // Failed I/O during start, abort everything.
- nni_pipe_stop(pipe);
- return;
- }
- if (nni_sock_pipe_start(pipe) != 0) {
- nni_pipe_stop(pipe);
- return;
+ if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) {
+ return (rv);
}
-}
-
-int
-nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe)
-{
- int rv;
+ nni_pipe_set_proto_data(p, pdata);
// Initialize protocol pipe data.
nni_mtx_lock(&s->s_mx);
- nni_mtx_lock(&ep->ep_mtx);
-
- if ((s->s_closing) || (ep->ep_closed)) {
- nni_mtx_unlock(&ep->ep_mtx);
+ if (s->s_closing) {
nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_unlock(&s->s_mx);
- return (rv);
- }
- rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_lock(&s->s_mx);
- return (rv);
- }
- // Save the protocol destructor.
- pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini;
- pipe->p_sock = s;
- pipe->p_ep = ep;
-
- nni_list_append(&s->s_pipes, pipe);
- nni_list_append(&ep->ep_pipes, pipe);
+ nni_list_append(&s->s_pipes, p);
// Start the initial negotiation I/O...
- if (pipe->p_tran_ops.p_start == NULL) {
- if (nni_sock_pipe_start(pipe) != 0) {
- nni_pipe_stop(pipe);
- }
- } else {
- pipe->p_tran_ops.p_start(
- pipe->p_tran_data, &pipe->p_start_aio);
- }
+ nni_pipe_start(p);
- nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&s->s_mx);
return (0);
}
-int
-nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
-{
- int rv;
- void *pdata = nni_pipe_get_proto_data(pipe);
-
- nni_mtx_lock(&sock->s_mx);
-
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_EPROTO);
- }
-
- if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
-
- nni_mtx_unlock(&sock->s_mx);
-
- return (0);
-}
-
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
@@ -197,23 +128,20 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- // Stop any pending negotiation.
- nni_aio_stop(&pipe->p_start_aio);
-
- nni_mtx_lock(&sock->s_mx);
- if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
- nni_mtx_unlock(&sock->s_mx);
- return;
- }
-
- sock->s_pipe_ops.pipe_stop(pdata);
- if (nni_list_active(&sock->s_pipes, pipe)) {
- nni_list_remove(&sock->s_pipes, pipe);
- if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
- nni_cv_wake(&sock->s_cv);
+ if (pdata != NULL) {
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_ops.pipe_stop(pdata);
+ if (nni_list_active(&sock->s_pipes, pipe)) {
+ nni_list_remove(&sock->s_pipes, pipe);
+ if (sock->s_closing &&
+ nni_list_empty(&sock->s_pipes)) {
+ nni_cv_wake(&sock->s_cv);
+ }
}
+ sock->s_pipe_ops.pipe_fini(pdata);
+ nni_pipe_set_proto_data(pipe, NULL);
+ nni_mtx_unlock(&sock->s_mx);
}
- nni_mtx_unlock(&sock->s_mx);
}
void
@@ -329,6 +257,7 @@ nni_sock_destroy(nni_sock *s)
nni_ev_fini(&s->s_recv_ev);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
+ nni_cv_fini(&s->s_close_cv);
nni_cv_fini(&s->s_cv);
nni_mtx_fini(&s->s_mx);
NNI_FREE_STRUCT(s);
@@ -372,6 +301,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_mtx_init(&s->s_mx)) != 0) ||
((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) ||
+ ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) ||
((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) ||
((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) ||
((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
@@ -446,6 +376,7 @@ nni_sock_shutdown(nni_sock *sock)
{
nni_pipe *pipe;
nni_ep * ep;
+ nni_ep * nep;
nni_time linger;
nni_mtx_lock(&sock->s_mx);
@@ -468,7 +399,7 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_close(ep);
+ nni_ep_shutdown(ep);
}
nni_mtx_unlock(&sock->s_mx);
@@ -498,10 +429,20 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each ep, arrange for it to teardown hard.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_stop(ep);
+ // Go through the endpoint list, attempting to close them.
+ // We might already have a close in progress, in which case
+ // we skip past it; it will be removed from another thread.
+ nep = nni_list_first(&sock->s_eps);
+ while ((ep = nep) != NULL) {
+ nep = nni_list_next(&sock->s_eps, nep);
+
+ if (nni_ep_hold(ep) == 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ep_close(ep);
+ nni_mtx_lock(&sock->s_mx);
+ }
}
+
// For each pipe, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_stop(pipe);
@@ -527,38 +468,6 @@ nni_sock_shutdown(nni_sock *sock)
return (0);
}
-void
-nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
-{
- nni_pipe *pipe;
- // If we're not on the list, then nothing to do. Be idempotent.
- // Note that if the ep is not on a list, then we assume that we have
- // exclusive access. Therefore the check for being active need not
- // be locked.
- if (!nni_list_node_active(&ep->ep_node)) {
- return;
- }
-
- // This is done under the endpoints lock, although the remove
- // is done under that as well, we also make sure that we hold
- // the socket lock in the remove step.
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
- while (!nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wait(&ep->ep_cv);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_mtx_lock(&sock->s_mx);
- nni_list_remove(&sock->s_eps, ep);
- if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
- nni_cv_wake(&sock->s_cv);
- }
- nni_mtx_unlock(&sock->s_mx);
-}
-
// nni_sock_close shuts down the socket, then releases any resources
// associated with it. It is a programmer error to reference the socket
// after this function is called, as the pointer may reference invalid
@@ -585,13 +494,17 @@ nni_sock_close(nni_sock *s)
// nni_sock_closeall. This is idempotent.
nni_list_node_remove(&s->s_node);
- nni_mtx_unlock(&nni_sock_lk);
-
// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
+ while (s->s_refcnt > 1) {
+ nni_cv_wait(&s->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // Wait for pipe and eps to finish closing.
nni_mtx_lock(&s->s_mx);
- while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) ||
- (!nni_list_empty(&s->s_eps))) {
+ while (
+ (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) {
nni_cv_wait(&s->s_cv);
}
nni_mtx_unlock(&s->s_mx);
@@ -742,77 +655,29 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
+nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
{
- nni_ep *ep;
- int rv;
-
nni_mtx_lock(&sock->s_mx);
- if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) {
+ if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);
- return (rv);
+ return (NNG_ECLOSED);
}
- nni_mtx_lock(&ep->ep_mtx);
nni_list_append(&sock->s_eps, ep);
- // Put a hold on the endpoint, for now.
- ep->ep_refcnt++;
- ep->ep_started = 1;
- nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
-
- if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_stop(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
-
- // Drop our endpoint hold.
- nni_mtx_lock(&ep->ep_mtx);
- if (rv != 0) {
- ep->ep_started = 0;
- }
- ep->ep_refcnt--;
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (rv);
+ return (0);
}
-int
-nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
+void
+nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
{
- nni_ep *ep;
- int rv;
-
nni_mtx_lock(&sock->s_mx);
- if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
+ if (nni_list_active(&sock->s_eps, ep)) {
+ nni_list_remove(&sock->s_eps, ep);
+ if ((sock->s_closed) && (nni_list_empty(&sock->s_eps))) {
+ nni_cv_wake(&sock->s_cv);
+ }
}
-
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_refcnt++;
- ep->ep_started = 1;
- nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
-
- if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_stop(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
-
- // Drop our endpoint hold.
- nni_mtx_lock(&ep->ep_mtx);
- if (rv != 0) {
- ep->ep_started = 0;
- }
- ep->ep_refcnt--;
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (rv);
}
void
diff --git a/src/core/socket.h b/src/core/socket.h
index 76ebca12..94dd816d 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -74,8 +74,6 @@ extern int nni_sock_setopt(nni_sock *, int, const void *, size_t);
extern int nni_sock_getopt(nni_sock *, int, void *, size_t *);
extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time);
extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time);
-extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int);
-extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int);
extern uint32_t nni_sock_id(nni_sock *);
extern void nni_sock_lock(nni_sock *);
@@ -84,22 +82,17 @@ extern void nni_sock_unlock(nni_sock *);
extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_sock_unnotify(nni_sock *, nni_notify *);
-extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
-
// nni_sock_pipe_add adds the pipe to the socket. It is called by
// the generic pipe creation code. It also adds the socket to the
// ep list, and starts the pipe. It does all these to ensure that
// we have complete success or failure, and there is no point where
// a pipe could wind up orphaned.
-extern int nni_sock_pipe_add(nni_sock *, nni_ep *, nni_pipe *);
-
+extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *);
+extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p);
-// nni_sock_pipe_ready lets the socket know the pipe is ready for
-// business. This also calls the socket/protocol specific add function,
-// and it may return an error. The reference count should be dropped by
-// nni_sock_pipe_closed.
-extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *);
+extern int nni_sock_ep_add(nni_sock *, nni_ep *);
+extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// Set error codes for applications. These are only ever
// called from the filter functions in protocols, and thus
diff --git a/src/nng.c b/src/nng.c
index a34619da..421ce74a 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -223,7 +223,7 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
}
int
-nng_dial(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
+nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
{
nni_ep * ep;
int rv;
@@ -232,17 +232,25 @@ nng_dial(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
if ((rv = nni_sock_find(&sock, sid)) != 0) {
return (rv);
}
- if ((rv = nni_sock_dial(sock, addr, &ep, flags)) == 0) {
- if (epp != NULL) {
- *epp = nni_ep_id(ep);
- }
+ if ((rv = nni_ep_create_dialer(&ep, sock, addr)) != 0) {
+ nni_sock_rele(sock);
+ return (rv);
+ }
+ if ((rv = nni_ep_dial(ep, flags)) != 0) {
+ nni_ep_close(ep);
+ nni_sock_rele(sock);
+ return (rv);
+ }
+ if (dp != NULL) {
+ *dp = nni_ep_id(ep);
}
+ nni_ep_rele(ep);
nni_sock_rele(sock);
- return (rv);
+ return (0);
}
int
-nng_listen(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
+nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags)
{
nni_ep * ep;
int rv;
@@ -251,20 +259,36 @@ nng_listen(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
if ((rv = nni_sock_find(&sock, sid)) != 0) {
return (rv);
}
- if ((rv = nni_sock_listen(sock, addr, &ep, flags)) == 0) {
- if (epp != NULL) {
- *epp = nni_ep_id(ep);
- }
+ if ((rv = nni_ep_create_listener(&ep, sock, addr)) != 0) {
+ nni_sock_rele(sock);
+ return (rv);
+ }
+ if ((rv = nni_ep_listen(ep, flags)) != 0) {
+ nni_ep_close(ep);
+ nni_sock_rele(sock);
+ return (rv);
+ }
+
+ if (lp != NULL) {
+ *lp = nni_ep_id(ep);
}
+ nni_ep_rele(ep);
nni_sock_rele(sock);
return (rv);
}
int
-nng_endpoint_close(nng_endpoint eid)
+nng_dialer_close(nng_dialer d)
+{
+ // return (nni_ep_close());
+ NNI_ARG_UNUSED(d);
+ return (NNG_ENOTSUP);
+}
+
+int
+nng_listener_close(nng_listener l)
{
- // XXX: reimplement this properly.
- NNI_ARG_UNUSED(eid);
+ NNI_ARG_UNUSED(l);
return (NNG_ENOTSUP);
}
@@ -662,6 +686,7 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
return (nni_msg_getopt(msg, opt, ptr, szp));
}
+#if 0
int
nng_snapshot_create(nng_socket sock, nng_snapshot **snapp)
{
@@ -710,6 +735,7 @@ nng_stat_value(nng_stat *stat)
// Stats TBD.
return (0);
}
+#endif
// These routines exist as utility functions, exposing some of our "guts"
// to the external world for the purposes of test code and bundled utilities.
diff --git a/src/nng.h b/src/nng.h
index 9b2618eb..6cf581a2 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -51,7 +51,6 @@ typedef struct nng_event nng_event;
typedef struct nng_notify nng_notify;
typedef struct nng_snapshot nng_snapshot;
typedef struct nng_stat nng_stat;
-typedef uint32_t nng_endpoint; // XXX: REMOVE ME.
// nng_fini is used to terminate the library, freeing certain global resources.
// For most cases, this call is optional, but failure to do so may cause
@@ -66,10 +65,7 @@ NNG_DECL void nng_fini(void);
// nng_close closes the socket, terminating all activity and
// closing any underlying connections and releasing any associated
-// resources. Memory associated with the socket is freed, so it is an
-// error to reference the socket in any way after this is called. Likewise,
-// it is an error to reference any resources such as endpoints or
-// pipes associated with the socket.
+// resources.
NNG_DECL int nng_close(nng_socket);
// nng_closeall closes all open sockets. Do not call this from
@@ -159,10 +155,6 @@ NNG_DECL int nng_event_type(nng_event *);
NNG_DECL nng_socket nng_event_socket(nng_event *);
-NNG_DECL nng_endpoint nng_event_endpoint(nng_event *);
-
-NNG_DECL nng_pipe nng_event_pipe(nng_event *);
-
NNG_DECL const char *nng_event_reason(nng_event *);
// nng_listen creates a listening endpoint with no special options,
@@ -197,19 +189,27 @@ NNG_DECL int nng_dialer_start(nng_dialer, int);
// the listener is not already listening.
NNG_DECL int nng_listener_start(nng_listener, int);
-// nng_endpoint_close closes the endpoint, shutting down all underlying
-// connections and releasing all associated resources. It is an error to
-// refer to the endpoint after this is called.
-NNG_DECL int nng_endpoint_close(nng_endpoint);
+// nng_dialer_close closes the dialer, shutting down all underlying
+// connections and releasing all associated resources.
NNG_DECL int nng_dialer_close(nng_dialer);
+
+// nng_listener_close closes the listener, shutting down all underlying
+// connections and releasing all associated resources.
NNG_DECL int nng_listener_close(nng_listener);
-// nng_endpoint_setopt sets an option for a specific endpoint. Note
-// endpoint options may not be altered on a running endpoint.
-NNG_DECL int nng_endpoint_setopt(nng_endpoint, int, void *, size_t);
+// nng_dialer_setopt sets an option for a specific dialer. Note
+// dialer options may not be altered on a running dialer.
+NNG_DECL int nng_dialer_setopt(nng_dialer, int, void *, size_t);
+
+// nng_dialer_getopt obtains the option for a dialer.
+NNG_DECL int nng_dialer_getopt(nng_dialer, int, void *, size_t *);
+
+// nng_listener_setopt sets an option for a specific listener. Note
+// listener options may not be altered on a running listener.
+NNG_DECL int nng_listener_setopt(nng_listener, int, void *, size_t);
-// nng_endpoint_getopt obtains the option for an endpoint.
-NNG_DECL int nng_endpoint_getopt(nng_endpoint, int, void *, size_t *);
+// nng_listener_getopt obtains the option for a listener.
+NNG_DECL int nng_listener_getopt(nng_listener, int, void *, size_t *);
// nng_strerror returns a human readable string associated with the error
// code supplied.
diff --git a/src/nng_compat.c b/src/nng_compat.c
index 2593a0dd..48b406eb 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -160,27 +160,26 @@ int
nn_bind(int s, const char *addr)
{
int rv;
- nng_endpoint ep;
+ nng_listener l;
- if ((rv = nng_listen((nng_socket) s, addr, &ep, NNG_FLAG_SYNCH)) !=
- 0) {
+ if ((rv = nng_listen((nng_socket) s, addr, &l, NNG_FLAG_SYNCH)) != 0) {
nn_seterror(rv);
return (-1);
}
- return ((int) ep);
+ return ((int) l);
}
int
nn_connect(int s, const char *addr)
{
- int rv;
- nng_endpoint ep;
+ int rv;
+ nng_dialer d;
- if ((rv = nng_dial((nng_socket) s, addr, &ep, 0)) != 0) {
+ if ((rv = nng_dial((nng_socket) s, addr, &d, 0)) != 0) {
nn_seterror(rv);
return (-1);
}
- return ((int) ep);
+ return ((int) d);
}
int
@@ -192,8 +191,11 @@ nn_shutdown(int s, int ep)
// ID can result in affecting the wrong socket. But this requires
// a buggy application, and because we don't recycle endpoints
// until wrap, its unlikely to actually come up in practice.
+ // Note that listeners and dialers share the same namespace
+ // in the core, so we can close either one this way.
- if ((rv = nng_endpoint_close((nng_endpoint) ep)) != 0) {
+ if (((rv = nng_dialer_close((nng_dialer) ep)) != 0) &&
+ ((rv = nng_listener_close((nng_listener) ep)) != 0)) {
nn_seterror(rv);
return (-1);
}