summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-01 19:14:26 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-01 19:14:26 -0800
commit3a421d08f87b8b34786ac47e30552fbdb2cf4371 (patch)
treecc430dedd312cb670c8c8b7c29dcfd01e8e08d75
parentf26dea6463adce8d70e1a4b22d8f9a867cf672c6 (diff)
downloadnng-3a421d08f87b8b34786ac47e30552fbdb2cf4371.tar.gz
nng-3a421d08f87b8b34786ac47e30552fbdb2cf4371.tar.bz2
nng-3a421d08f87b8b34786ac47e30552fbdb2cf4371.zip
Endpoint uses single thread.
-rw-r--r--src/core/endpt.c125
-rw-r--r--src/core/endpt.h10
-rw-r--r--src/core/socket.c4
3 files changed, 51 insertions, 88 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index a04cfa00..e1488035 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -33,13 +33,11 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
return (NNG_ENOMEM);
}
ep->ep_sock = sock;
- ep->ep_dialer = NULL;
- ep->ep_listener = NULL;
ep->ep_close = 0;
- ep->ep_start = 0;
ep->ep_bound = 0;
ep->ep_pipe = NULL;
NNI_LIST_NODE_INIT(&ep->ep_node);
+
if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -86,11 +84,8 @@ nni_endpt_close(nni_endpt *ep)
nni_list_remove(&ep->ep_sock->s_eps, ep);
nni_mutex_exit(mx);
- if (ep->ep_dialer != NULL) {
- nni_thread_reap(ep->ep_dialer);
- }
- if (ep->ep_listener != NULL) {
- nni_thread_reap(ep->ep_listener);
+ if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ nni_thr_fini(&ep->ep_thr);
}
ep->ep_ops.ep_destroy(ep->ep_data);
@@ -148,16 +143,6 @@ nni_dialer(void *arg)
nni_time cooldown;
nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(mx);
- while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
- nni_cond_wait(&ep->ep_cv);
- }
- if (ep->ep_stop || ep->ep_close) {
- nni_mutex_exit(mx);
- return;
- }
- nni_mutex_exit(mx);
-
for (;;) {
nni_mutex_enter(mx);
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
@@ -211,43 +196,34 @@ nni_endpt_dial(nni_endpt *ep, int flags)
nni_mutex *mx = &ep->ep_sock->s_mx;
nni_mutex_enter(mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
+ if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ nni_mutex_exit(mx);
+ return (NNG_EBUSY);
}
if (ep->ep_close) {
- rv = NNG_ECLOSED;
- goto out;
+ nni_mutex_exit(mx);
+ return (NNG_ECLOSED);
}
- ep->ep_stop = 0;
- ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
- if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
+ if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) {
+ nni_mutex_exit(mx);
+ return (rv);
}
- if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ ep->ep_mode = NNI_EP_MODE_DIAL;
+
+ if (flags & NNG_FLAG_SYNCH) {
nni_mutex_exit(mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(mx);
-
- if (rv == 0) {
- ep->ep_start = 1;
- } else {
- // This will cause the thread to exit instead of
- // starting.
- ep->ep_stop = 1;
- reap = ep->ep_dialer;
- ep->ep_dialer = NULL;
+ if (rv != 0) {
+ nni_thr_fini(&ep->ep_thr);
+ ep->ep_mode = NNI_EP_MODE_IDLE;
+ return (rv);
}
- nni_cond_signal(&ep->ep_cv);
+ nni_mutex_enter(mx);
}
-out:
- nni_mutex_exit(mx);
- if (reap != NULL) {
- nni_thread_reap(reap);
- }
+ nni_thr_run(&ep->ep_thr);
+ nni_mutex_exit(mx);
return (rv);
}
@@ -282,15 +258,6 @@ nni_listener(void *arg)
int rv;
nni_mutex *mx = &ep->ep_sock->s_mx;
- nni_mutex_enter(mx);
- while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
- nni_cond_wait(&ep->ep_cv);
- }
- if (ep->ep_stop || ep->ep_close) {
- nni_mutex_exit(mx);
- return;
- }
- nni_mutex_exit(mx);
for (;;) {
nni_time cooldown;
nni_mutex_enter(mx);
@@ -361,43 +328,37 @@ nni_endpt_listen(nni_endpt *ep, int flags)
nni_mutex *mx = &ep->ep_sock->s_mx;
nni_mutex_enter(mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
+ if (ep->ep_mode != NNI_EP_MODE_IDLE) {
+ nni_mutex_exit(mx);
+ return (NNG_EBUSY);
}
+
if (ep->ep_close) {
- rv = NNG_ECLOSED;
- goto out;
+ nni_mutex_exit(mx);
+ return (NNG_ECLOSED);
}
- ep->ep_stop = 0;
- ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
- if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
+ if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) {
+ nni_mutex_exit(mx);
+ return (rv);
}
- if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+
+ ep->ep_mode = NNI_EP_MODE_LISTEN;
+
+ if (flags & NNG_FLAG_SYNCH) {
nni_mutex_exit(mx);
rv = ep->ep_ops.ep_bind(ep->ep_data);
- nni_mutex_enter(mx);
- if (rv == 0) {
- ep->ep_bound = 1;
- ep->ep_start = 1;
- } else {
- // This will cause the thread to exit instead of
- // starting.
- ep->ep_stop = 1;
- reap = ep->ep_listener;
- ep->ep_listener = NULL;
+ if (rv != 0) {
+ nni_thr_fini(&ep->ep_thr);
+ ep->ep_mode = NNI_EP_MODE_IDLE;
+ return (rv);
}
- nni_cond_broadcast(&ep->ep_cv);
+ nni_mutex_enter(mx);
+ ep->ep_bound = 1;
}
-out:
- nni_mutex_exit(mx);
- if (reap != NULL) {
- nni_thread_reap(reap);
- }
+ nni_thr_run(&ep->ep_thr);
+ nni_mutex_exit(mx);
- return (rv);
+ return (0);
}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 73ed3243..757d58ce 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -21,16 +21,18 @@ struct nng_endpt {
nni_list_node ep_node; // Per socket list
nni_socket * ep_sock;
char ep_addr[NNG_MAXADDRLEN];
- nni_thread * ep_dialer;
- nni_thread * ep_listener;
- int ep_stop; // thread exits before start
- int ep_start; // start thread running
+ nni_thr ep_thr;
+ int ep_mode;
int ep_close; // full shutdown
int ep_bound; // true if we bound locally
nni_cond ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
};
+#define NNI_EP_MODE_IDLE 0
+#define NNI_EP_MODE_DIAL 1
+#define NNI_EP_MODE_LISTEN 2
+
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
extern void nni_endpt_close(nni_endpt *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 3536934a..a79b1fde 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -74,7 +74,7 @@ nni_reaper(void *arg)
if ((sock->s_closing) &&
(nni_list_first(&sock->s_reaps) == NULL) &&
(nni_list_first(&sock->s_pipes) == NULL)) {
- nni_mutex_exit(&sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
break;
}
@@ -132,7 +132,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
- }
+ }
if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) {
nni_msgqueue_destroy(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);