aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c107
-rw-r--r--src/core/endpt.h10
-rw-r--r--src/core/socket.c35
-rw-r--r--src/transport/inproc/inproc.c2
4 files changed, 64 insertions, 90 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index b5a877e5..2d2f62ce 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -32,6 +32,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
if ((ep = nni_alloc(sizeof (*ep))) == NULL) {
return (NNG_ENOMEM);
}
+ ep->ep_sock = sock;
ep->ep_dialer = NULL;
ep->ep_listener = NULL;
ep->ep_close = 0;
@@ -39,25 +40,18 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_bound = 0;
ep->ep_pipe = NULL;
NNI_LIST_NODE_INIT(&ep->ep_node);
- if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
- nni_free(ep, sizeof (*ep));
- return (NNG_ENOMEM);
- }
- if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_mx)) != 0) {
- nni_mutex_fini(&ep->ep_mx);
+ if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
}
// Could safely use strcpy here, but this avoids discussion.
(void) snprintf(ep->ep_addr, sizeof (ep->ep_addr), "%s", addr);
- ep->ep_sock = sock;
ep->ep_ops = *tran->tran_ep_ops;
rv = ep->ep_ops.ep_create(&ep->ep_data, addr, nni_socket_proto(sock));
if (rv != 0) {
nni_cond_fini(&ep->ep_cv);
- nni_mutex_fini(&ep->ep_mx);
nni_free(ep, sizeof (*ep));
return (rv);
}
@@ -67,33 +61,14 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
void
-nni_endpt_destroy(nni_endpt *ep)
-{
- // We should already have been closed at this point, so this
- // should proceed very quickly.
- if (ep->ep_dialer != NULL) {
- nni_thread_reap(ep->ep_dialer);
- }
- if (ep->ep_listener != NULL) {
- nni_thread_reap(ep->ep_listener);
- }
-
- ep->ep_ops.ep_destroy(ep->ep_data);
-
- nni_cond_fini(&ep->ep_cv);
- nni_mutex_fini(&ep->ep_mx);
- nni_free(ep, sizeof (*ep));
-}
-
-
-void
nni_endpt_close(nni_endpt *ep)
{
nni_pipe *pipe;
+ nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
return;
}
ep->ep_close = 1;
@@ -103,7 +78,19 @@ nni_endpt_close(nni_endpt *ep)
ep->ep_pipe = NULL;
}
nni_cond_broadcast(&ep->ep_cv);
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
+
+ if (ep->ep_dialer != NULL) {
+ nni_thread_reap(ep->ep_dialer);
+ }
+ if (ep->ep_listener != NULL) {
+ nni_thread_reap(ep->ep_listener);
+ }
+
+ ep->ep_ops.ep_destroy(ep->ep_data);
+
+ nni_cond_fini(&ep->ep_cv);
+ nni_free(ep, sizeof (*ep));
}
@@ -151,27 +138,28 @@ nni_dialer(void *arg)
nni_pipe *pipe;
int rv;
nni_time cooldown;
+ nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
nni_cond_wait(&ep->ep_cv);
}
if (ep->ep_stop || ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
return;
}
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
for (;;) {
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
break;
}
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
rv = nni_dial_once(ep);
switch (rv) {
@@ -193,7 +181,7 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
while (!ep->ep_close) {
// We need a different condvar...
rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
@@ -201,7 +189,7 @@ nni_dialer(void *arg)
break;
}
}
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
}
}
@@ -211,8 +199,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
+ nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
@@ -229,9 +218,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
goto out;
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
if (rv == 0) {
ep->ep_start = 1;
@@ -245,7 +234,7 @@ nni_endpt_dial(nni_endpt *ep, int flags)
nni_cond_signal(&ep->ep_cv);
}
out:
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
if (reap != NULL) {
nni_thread_reap(reap);
@@ -282,27 +271,28 @@ nni_listener(void *arg)
nni_endpt *ep = arg;
nni_pipe *pipe;
int rv;
+ nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
nni_cond_wait(&ep->ep_cv);
}
if (ep->ep_stop || ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
return;
}
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
for (;;) {
nni_time cooldown;
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
// If we didn't bind synchronously, do it now.
while (!ep->ep_bound && !ep->ep_close) {
int rv;
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
if (rv == 0) {
ep->ep_bound = 1;
@@ -321,10 +311,10 @@ nni_listener(void *arg)
}
}
if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
break;
}
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
pipe = NULL;
@@ -342,14 +332,14 @@ nni_listener(void *arg)
cooldown = 100000; // 100ms
}
cooldown += nni_clock();
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
while (!ep->ep_close) {
rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
if (rv == NNG_ETIMEDOUT) {
break;
}
}
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
}
}
@@ -359,8 +349,9 @@ nni_endpt_listen(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
+ nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
@@ -377,9 +368,9 @@ nni_endpt_listen(nni_endpt *ep, int flags)
goto out;
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
- nni_mutex_enter(&ep->ep_mx);
+ nni_mutex_enter(mx);
if (rv == 0) {
ep->ep_bound = 1;
ep->ep_start = 1;
@@ -390,10 +381,10 @@ nni_endpt_listen(nni_endpt *ep, int flags)
reap = ep->ep_listener;
ep->ep_listener = NULL;
}
- nni_cond_signal(&ep->ep_cv);
+ nni_cond_broadcast(&ep->ep_cv);
}
out:
- nni_mutex_exit(&ep->ep_mx);
+ nni_mutex_exit(mx);
if (reap != NULL) {
nni_thread_reap(reap);
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 36f55de2..73ed3243 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -23,17 +23,15 @@ struct nng_endpt {
char ep_addr[NNG_MAXADDRLEN];
nni_thread * ep_dialer;
nni_thread * ep_listener;
- int ep_stop; // thread exits before start
- int ep_start; // start thread running
- int ep_close; // full shutdown
- int ep_bound; // true if we bound locally
- nni_mutex ep_mx;
+ int ep_stop; // thread exits before start
+ int ep_start; // start thread running
+ int ep_close; // full shutdown
+ int ep_bound; // true if we bound locally
nni_cond ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
};
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
-extern void nni_endpt_destroy(nni_endpt *);
extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
extern void nni_endpt_close(nni_endpt *);
extern int nni_endpt_dial(nni_endpt *, int);
diff --git a/src/core/socket.c b/src/core/socket.c
index 6fa6712d..1cba29c2 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -44,6 +44,12 @@ nni_reaper(void *arg)
nni_mutex_enter(&sock->s_mx);
if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
nni_list_remove(&sock->s_reaps, pipe);
+
+ if (((ep = pipe->p_ep) != NULL) &&
+ ((ep->ep_pipe == pipe))) {
+ ep->ep_pipe = NULL;
+ nni_cond_broadcast(&ep->ep_cv);
+ }
nni_mutex_exit(&sock->s_mx);
// This should already have been done.
@@ -58,18 +64,6 @@ nni_reaper(void *arg)
pipe->p_pdata);
}
- // If pipe was a connected (dialer) pipe,
- // then let the endpoint know so it can try to
- // reestablish the connection.
- if (((ep = pipe->p_ep) != NULL) &&
- ((ep->ep_pipe == pipe))) {
- ep->ep_pipe = NULL;
- pipe->p_ep = NULL;
- nni_mutex_enter(&ep->ep_mx);
- nni_cond_signal(&ep->ep_cv);
- nni_mutex_exit(&ep->ep_mx);
- }
-
// XXX: also publish event...
nni_pipe_destroy(pipe);
continue;
@@ -174,8 +168,11 @@ nni_socket_close(nni_socket *sock)
// Stop all EPS. We're going to do this first, since we know
// we're closing.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
+ while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_list_remove(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
nni_endpt_close(ep);
+ nni_mutex_enter(&sock->s_mx);
}
// Special optimization; if there are no pipes connected,
@@ -225,16 +222,6 @@ nni_socket_close(nni_socket *sock)
reaper = sock->s_reaper;
sock->s_reaper = NULL;
nni_cond_broadcast(&sock->s_cv);
-
- // We already told the endpoints to shutdown. We just
- // need to reap them now.
- while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_mx);
-
- nni_endpt_destroy(ep);
- nni_mutex_enter(&sock->s_mx);
- }
nni_mutex_exit(&sock->s_mx);
// Wait for the reaper to exit.
@@ -351,7 +338,6 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
rv = nni_endpt_dial(ep, flags);
if (rv != 0) {
nni_endpt_close(ep);
- nni_endpt_destroy(ep);
} else {
if (epp != NULL) {
*epp = ep;
@@ -377,7 +363,6 @@ nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp,
rv = nni_endpt_listen(ep, flags);
if (rv != 0) {
nni_endpt_close(ep);
- nni_endpt_destroy(ep);
} else {
if (epp != NULL) {
*epp = ep;
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index d15b642e..04cd8fec 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -294,7 +294,7 @@ nni_inproc_ep_connect(void *arg, void **pipep)
}
*pipep = ep->cpipe;
nni_mutex_exit(&nni_inproc.mx);
- return (ep->closed ? NNG_ECLOSED : 0);
+ return (0);
}