aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c112
-rw-r--r--src/core/endpt.h6
-rw-r--r--src/core/init.c2
3 files changed, 105 insertions, 15 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9aae89eb..c0890cd3 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -16,6 +16,70 @@
// Functionality realited to end points.
int
+nni_ep_hold(nni_ep **epp, uint32_t id)
+{
+ int rv;
+ nni_ep *ep;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(nni_idlock);
+ if ((rv = nni_idhash_find(nni_endpoints, id, (void **) &ep)) != 0) {
+ nni_mtx_unlock(nni_idlock);
+ return (NNG_ECLOSED);
+ }
+ ep->ep_refcnt++;
+ nni_mtx_unlock(nni_idlock);
+ *epp = ep;
+ return (0);
+}
+
+
+void
+nni_ep_rele(nni_ep *ep)
+{
+ nni_mtx_lock(nni_idlock);
+ ep->ep_refcnt--;
+ if (ep->ep_refcnt == 1) {
+ nni_cv_wake(&ep->ep_refcv);
+ }
+ nni_mtx_unlock(nni_idlock);
+}
+
+
+int
+nni_ep_hold_close(nni_ep **epp, uint32_t id)
+{
+ int rv;
+ nni_ep *ep;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(nni_idlock);
+ if ((rv = nni_idhash_find(nni_endpoints, id, (void **) &ep)) != 0) {
+ nni_mtx_unlock(nni_idlock);
+ return (NNG_ECLOSED);
+ }
+ ep->ep_id = 0;
+ nni_idhash_remove(nni_endpoints, id);
+ while (ep->ep_refcnt) {
+ nni_cv_wait(&ep->ep_refcv);
+ }
+ nni_mtx_unlock(nni_idlock);
+ return (0);
+}
+
+
+uint32_t
+nni_ep_id(nni_ep *ep)
+{
+ return (ep->ep_id);
+}
+
+
+int
nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
{
nni_tran *tran;
@@ -37,6 +101,10 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
ep->ep_bound = 0;
ep->ep_pipe = NULL;
ep->ep_tran = tran;
+ ep->ep_refcnt = 0;
+ ep->ep_id = 0;
+ memset(&ep->ep_cv, 0, sizeof (ep->ep_cv));
+ memset(&ep->ep_refcv, 0, sizeof (ep->ep_refcv));
NNI_LIST_NODE_INIT(&ep->ep_node);
// Could safely use strcpy here, but this avoids discussion.
(void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr);
@@ -46,7 +114,10 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
// dereference on hot paths.
ep->ep_ops = *tran->tran_ep;
- if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) {
+ if (((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) ||
+ ((rv = nni_cv_init(&ep->ep_refcv, nni_idlock)) != 0)) {
+ nni_cv_fini(&ep->ep_cv);
+ nni_cv_fini(&ep->ep_refcv);
NNI_FREE_STRUCT(ep);
return (rv);
}
@@ -55,31 +126,33 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);
nni_cv_fini(&ep->ep_cv);
+ nni_cv_fini(&ep->ep_refcv);
NNI_FREE_STRUCT(ep);
return (NNG_ECLOSED);
}
- nni_mtx_lock(nni_idlock);
- rv = nni_idhash_alloc(nni_endpoints, &ep->ep_id, ep);
- nni_mtx_unlock(nni_idlock);
+
+ rv = ep->ep_ops.ep_init(&ep->ep_data, addr, nni_sock_proto(sock));
if (rv != 0) {
nni_mtx_unlock(&sock->s_mx);
nni_cv_fini(&ep->ep_cv);
+ nni_cv_fini(&ep->ep_refcv);
NNI_FREE_STRUCT(ep);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
- rv = ep->ep_ops.ep_init(&ep->ep_data, addr, nni_sock_proto(sock));
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_alloc(nni_endpoints, &ep->ep_id, ep);
+ nni_mtx_unlock(nni_idlock);
if (rv != 0) {
- nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_endpoints, ep->ep_id);
- nni_mtx_unlock(nni_idlock);
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_lock(&sock->s_mx);
+ nni_list_remove(&sock->s_eps, ep);
+ ep->ep_ops.ep_fini(ep->ep_data);
nni_cv_fini(&ep->ep_cv);
+ nni_cv_fini(&ep->ep_refcv);
NNI_FREE_STRUCT(ep);
- return (rv);
}
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
*epp = ep;
return (0);
@@ -92,6 +165,12 @@ nni_ep_close(nni_ep *ep)
nni_pipe *pipe;
nni_mtx *mx = &ep->ep_sock->s_mx;
+ nni_mtx_lock(nni_idlock);
+ while (ep->ep_refcnt) {
+ nni_cv_wait(&ep->ep_refcv);
+ }
+ nni_mtx_unlock(nni_idlock);
+
nni_mtx_lock(mx);
NNI_ASSERT(ep->ep_close == 0);
ep->ep_close = 1;
@@ -101,14 +180,19 @@ nni_ep_close(nni_ep *ep)
ep->ep_pipe = NULL;
}
nni_cv_wake(&ep->ep_cv);
+ nni_list_remove(&ep->ep_sock->s_eps, ep);
nni_mtx_unlock(mx);
nni_thr_fini(&ep->ep_thr);
ep->ep_ops.ep_fini(ep->ep_data);
nni_mtx_lock(nni_idlock);
- nni_list_remove(&ep->ep_sock->s_eps, ep);
- nni_idhash_remove(nni_endpoints, ep->ep_id);
+ if (ep->ep_id != 0) {
+ // We might have removed this already as a result of
+ // application initiated endpoint close request instead
+ // of socket close.
+ nni_idhash_remove(nni_endpoints, ep->ep_id);
+ }
nni_mtx_unlock(nni_idlock);
nni_cv_fini(&ep->ep_cv);
diff --git a/src/core/endpt.h b/src/core/endpt.h
index a3b2ce9e..493355d2 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -27,7 +27,9 @@ struct nni_ep {
int ep_mode;
int ep_close; // full shutdown
int ep_bound; // true if we bound locally
+ int ep_refcnt;
nni_cv ep_cv;
+ nni_cv ep_refcv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
};
@@ -35,6 +37,10 @@ struct nni_ep {
#define NNI_EP_MODE_DIAL 1
#define NNI_EP_MODE_LISTEN 2
+extern int nni_ep_hold(nni_ep **, uint32_t);
+extern int nni_ep_hold_close(nni_ep **, uint32_t);
+extern void nni_ep_rele(nni_ep *);
+extern uint32_t nni_ep_id(nni_ep *);
extern int nni_ep_create(nni_ep **, nni_sock *, const char *);
extern int nni_ep_accept(nni_ep *, nni_pipe **);
extern void nni_ep_close(nni_ep *);
diff --git a/src/core/init.c b/src/core/init.c
index 8ada0247..66ad9725 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -40,7 +40,7 @@ nni_init_helper(void)
return (rv);
}
nni_idhash_set_limits(&nni_pipes_x, 1, 0x7fffffff,
- nni_random() & 0x7fffffff);
+ (nni_random() & 0x7ffffffe) + 1);
nni_idhash_set_limits(&nni_sockets_x, 1, 0x7fffffff, 1);
nni_idhash_set_limits(&nni_endpoints_x, 1, 0xffffffff, 1);