aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-06-06 09:10:07 -0700
committerGarrett D'Amore <garrett@damore.org>2017-06-06 09:10:07 -0700
commit4771d04fb589c406ff8b2fc1b4edf93b2df42515 (patch)
tree4d98c382b90f67eb158e613d6d0819201c6ed616 /src
parent46a559cbdff02d8f9ca48bd918c17c33336de3a1 (diff)
downloadnng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.tar.gz
nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.tar.bz2
nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.zip
Endpoint now holds a reference on the socket.
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c115
-rw-r--r--src/core/endpt.h1
-rw-r--r--src/core/socket.c48
-rw-r--r--src/core/socket.h3
4 files changed, 108 insertions, 59 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 76cddbc3..de166aff 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -52,7 +52,9 @@ nni_ep_hold(nni_ep **epp, uint32_t id)
if (rv != 0) {
return (NNG_ECLOSED);
}
- *epp = ep;
+ if (epp != NULL) {
+ *epp = ep;
+ }
return (0);
}
@@ -84,16 +86,20 @@ nni_ep_ctor(uint32_t id)
ep->ep_bound = 0;
ep->ep_pipe = NULL;
ep->ep_id = id;
+ ep->ep_data = NULL;
NNI_LIST_NODE_INIT(&ep->ep_node);
-#if 0
- if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) {
- nni_cv_fini(&ep->ep_cv);
+ if ((rv = nni_mtx_init(&ep->ep_mtx)) != 0) {
+ NNI_FREE_STRUCT(ep);
+ return (NULL);
+ }
+
+ if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) {
+ nni_mtx_fini(&ep->ep_mtx);
NNI_FREE_STRUCT(ep);
return (NULL);
}
-#endif
return (ep);
}
@@ -104,7 +110,18 @@ nni_ep_dtor(void *ptr)
{
nni_ep *ep = ptr;
+ // If a thread is running, make sure it is stopped.
+ nni_thr_fini(&ep->ep_thr);
+
+ if (ep->ep_sock != NULL) {
+ // This is idempotent; harmless if not already on the list.
+ nni_sock_rem_ep(ep->ep_sock, ep);
+ }
+ if (ep->ep_data != NULL) {
+ ep->ep_ops.ep_fini(ep->ep_data);
+ }
nni_cv_fini(&ep->ep_cv);
+ nni_mtx_fini(&ep->ep_mtx);
NNI_FREE_STRUCT(ep);
}
@@ -139,26 +156,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
// dereference on hot paths.
ep->ep_ops = *tran->tran_ep;
- if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (rv);
- }
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
+ if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) {
nni_objhash_unref(nni_eps, id);
- return (NNG_ECLOSED);
+ return (rv);
}
- rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock);
- if (rv != 0) {
- nni_mtx_unlock(&sock->s_mx);
+ if ((rv = nni_sock_add_ep(sock, ep)) != 0) {
nni_objhash_unref(nni_eps, id);
return (rv);
}
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
*epp = ep;
return (0);
@@ -169,9 +176,9 @@ void
nni_ep_close(nni_ep *ep)
{
nni_pipe *pipe;
- nni_mtx *mx = &ep->ep_sock->s_mx;
+ nni_sock *sock = ep->ep_sock;
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
NNI_ASSERT(ep->ep_close == 0);
ep->ep_close = 1;
ep->ep_ops.ep_close(ep->ep_data);
@@ -180,11 +187,7 @@ nni_ep_close(nni_ep *ep)
ep->ep_pipe = NULL;
}
nni_cv_wake(&ep->ep_cv);
- nni_list_remove(&ep->ep_sock->s_eps, ep);
- nni_mtx_unlock(mx);
-
- nni_thr_fini(&ep->ep_thr);
- ep->ep_ops.ep_fini(ep->ep_data);
+ nni_mtx_unlock(&ep->ep_mtx);
nni_objhash_unref(nni_eps, ep->ep_id);
}
@@ -238,17 +241,15 @@ nni_dialer(void *arg)
nni_duration maxrtime;
nni_duration defrtime;
nni_duration rtime;
- nni_mtx *mx = &ep->ep_sock->s_mx;
- nni_mtx_lock(mx);
+ // XXX: these need to be obtained from the socket using a lock.
defrtime = ep->ep_sock->s_reconn;
if ((maxrtime = ep->ep_sock->s_reconnmax) == 0) {
maxrtime = defrtime;
}
- nni_mtx_unlock(mx);
for (;;) {
- nni_mtx_lock(mx);
+ // XXX: socket lock please...
if ((defrtime != ep->ep_sock->s_reconn) ||
(maxrtime != ep->ep_sock->s_reconnmax)) {
// Times changed, so reset them.
@@ -258,15 +259,17 @@ nni_dialer(void *arg)
}
rtime = defrtime;
}
+
+ nni_mtx_lock(&ep->ep_mtx);
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
rtime = defrtime;
nni_cv_wait(&ep->ep_cv);
}
if (ep->ep_close) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
break;
}
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
rv = nni_dial_once(ep);
switch (rv) {
@@ -286,15 +289,14 @@ nni_dialer(void *arg)
}
// we inject a delay so we don't just spin hard on
// errors like connection refused.
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
while (!ep->ep_close) {
- // We need a different condvar...
rv = nni_cv_until(&ep->ep_cv, cooldown);
if (rv == NNG_ETIMEDOUT) {
break;
}
}
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
}
}
@@ -303,37 +305,36 @@ int
nni_ep_dial(nni_ep *ep, int flags)
{
int rv = 0;
- nni_mtx *mx = &ep->ep_sock->s_mx;
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_mode != NNI_EP_MODE_IDLE) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (NNG_EBUSY);
}
if (ep->ep_close) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
ep->ep_mode = NNI_EP_MODE_DIAL;
if (flags & NNG_FLAG_SYNCH) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
rv = nni_dial_once(ep);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
ep->ep_mode = NNI_EP_MODE_IDLE;
return (rv);
}
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
}
nni_thr_run(&ep->ep_thr);
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
@@ -367,19 +368,18 @@ nni_listener(void *arg)
nni_ep *ep = arg;
nni_pipe *pipe;
int rv;
- nni_mtx *mx = &ep->ep_sock->s_mx;
for (;;) {
nni_time cooldown;
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
// If we didn't bind synchronously, do it now.
while (!ep->ep_bound && !ep->ep_close) {
int rv;
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
if (rv == 0) {
ep->ep_bound = 1;
@@ -398,10 +398,10 @@ nni_listener(void *arg)
}
}
if (ep->ep_close) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
break;
}
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
pipe = NULL;
@@ -435,14 +435,14 @@ nni_listener(void *arg)
break;
}
cooldown += nni_clock();
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
while (!ep->ep_close) {
rv = nni_cv_until(&ep->ep_cv, cooldown);
if (rv == NNG_ETIMEDOUT) {
break;
}
}
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
}
}
@@ -451,40 +451,39 @@ int
nni_ep_listen(nni_ep *ep, int flags)
{
int rv = 0;
- nni_mtx *mx = &ep->ep_sock->s_mx;
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_mode != NNI_EP_MODE_IDLE) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (NNG_EBUSY);
}
if (ep->ep_close) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
ep->ep_mode = NNI_EP_MODE_LISTEN;
if (flags & NNG_FLAG_SYNCH) {
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
if (rv != 0) {
nni_thr_fini(&ep->ep_thr);
ep->ep_mode = NNI_EP_MODE_IDLE;
return (rv);
}
- nni_mtx_lock(mx);
+ nni_mtx_lock(&ep->ep_mtx);
ep->ep_bound = 1;
}
nni_thr_run(&ep->ep_thr);
- nni_mtx_unlock(mx);
+ nni_mtx_unlock(&ep->ep_mtx);
return (0);
}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 0dc0433d..a9029cc9 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -30,6 +30,7 @@ struct nni_ep {
int ep_mode;
int ep_close; // full shutdown
int ep_bound; // true if we bound locally
+ nni_mtx ep_mtx;
nni_cv ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
};
diff --git a/src/core/socket.c b/src/core/socket.c
index ffacd59a..d091d126 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -56,7 +56,10 @@ nni_sock_hold(nni_sock **sockp, uint32_t id)
return (NNG_ECLOSED);
}
nni_mtx_unlock(&sock->s_mx);
- *sockp = sock;
+
+ if (sockp != NULL) {
+ *sockp = sock;
+ }
return (0);
}
@@ -594,6 +597,49 @@ nni_sock_shutdown(nni_sock *sock)
}
+// nni_sock_add_ep adds a newly created endpoint to the socket. The
+// caller must hold references on the sock and the ep, and not be holding
+// the socket lock. The ep acquires a reference against the sock,
+// which will be dropped later by nni_sock_rem_ep. The endpoint must not
+// already be associated with a socket. (Note, the ep holds the reference
+// on the socket, not the other way around.)
+int
+nni_sock_add_ep(nni_sock *sock, nni_ep *ep)
+{
+ int rv;
+
+ if ((rv = nni_sock_hold(NULL, sock->s_id)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_sock_rele(sock);
+ return (NNG_ECLOSED);
+ }
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
+ return (0);
+}
+
+
+void
+nni_sock_rem_ep(nni_sock *sock, nni_ep *ep)
+{
+ nni_mtx_lock(&sock->s_mx);
+ // If we're not on the list, then nothing to do. Be idempotent.
+ if (!nni_list_active(&sock->s_eps, ep)) {
+ nni_mtx_unlock(&sock->s_mx);
+ return;
+ }
+ nni_list_remove(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
+
+ // Drop the reference the EP acquired in add_ep.
+ nni_sock_rele(sock);
+}
+
+
// nni_sock_close shuts down the socket, then releases any resources
// associated with it. It is a programmer error to reference the socket
// after this function is called, as the pointer may reference invalid
diff --git a/src/core/socket.h b/src/core/socket.h
index 5fc3d593..247513a0 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -85,6 +85,9 @@ extern void nni_sock_unlock(nni_sock *);
extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_sock_unnotify(nni_sock *, nni_notify *);
+extern int nni_sock_add_ep(nni_sock *, nni_ep *);
+extern void nni_sock_rem_ep(nni_sock *, nni_ep *);
+
// nni_sock_pipe_add is called by the pipe to register the pipe with
// with the socket. The pipe is added to the idle list. The protocol
// private pipe data is initialized as well.