diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-01 19:14:26 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-01 19:14:26 -0800 |
| commit | 3a421d08f87b8b34786ac47e30552fbdb2cf4371 (patch) | |
| tree | cc430dedd312cb670c8c8b7c29dcfd01e8e08d75 | |
| parent | f26dea6463adce8d70e1a4b22d8f9a867cf672c6 (diff) | |
| download | nng-3a421d08f87b8b34786ac47e30552fbdb2cf4371.tar.gz nng-3a421d08f87b8b34786ac47e30552fbdb2cf4371.tar.bz2 nng-3a421d08f87b8b34786ac47e30552fbdb2cf4371.zip | |
Endpoint uses single thread.
| -rw-r--r-- | src/core/endpt.c | 125 | ||||
| -rw-r--r-- | src/core/endpt.h | 10 | ||||
| -rw-r--r-- | src/core/socket.c | 4 |
3 files changed, 51 insertions, 88 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index a04cfa00..e1488035 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -33,13 +33,11 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) return (NNG_ENOMEM); } ep->ep_sock = sock; - ep->ep_dialer = NULL; - ep->ep_listener = NULL; ep->ep_close = 0; - ep->ep_start = 0; ep->ep_bound = 0; ep->ep_pipe = NULL; NNI_LIST_NODE_INIT(&ep->ep_node); + if ((rv = nni_cond_init(&ep->ep_cv, &ep->ep_sock->s_mx)) != 0) { nni_free(ep, sizeof (*ep)); return (NNG_ENOMEM); @@ -86,11 +84,8 @@ nni_endpt_close(nni_endpt *ep) nni_list_remove(&ep->ep_sock->s_eps, ep); nni_mutex_exit(mx); - if (ep->ep_dialer != NULL) { - nni_thread_reap(ep->ep_dialer); - } - if (ep->ep_listener != NULL) { - nni_thread_reap(ep->ep_listener); + if (ep->ep_mode != NNI_EP_MODE_IDLE) { + nni_thr_fini(&ep->ep_thr); } ep->ep_ops.ep_destroy(ep->ep_data); @@ -148,16 +143,6 @@ nni_dialer(void *arg) nni_time cooldown; nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(mx); - return; - } - nni_mutex_exit(mx); - for (;;) { nni_mutex_enter(mx); while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { @@ -211,43 +196,34 @@ nni_endpt_dial(nni_endpt *ep, int flags) nni_mutex *mx = &ep->ep_sock->s_mx; nni_mutex_enter(mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; + if (ep->ep_mode != NNI_EP_MODE_IDLE) { + nni_mutex_exit(mx); + return (NNG_EBUSY); } if (ep->ep_close) { - rv = NNG_ECLOSED; - goto out; + nni_mutex_exit(mx); + return (NNG_ECLOSED); } - ep->ep_stop = 0; - ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; - if (nni_thread_create(&ep->ep_dialer, nni_dialer, ep) != 0) { - rv = NNG_ENOMEM; - goto out; + if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { + nni_mutex_exit(mx); + return (rv); } - if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + ep->ep_mode = NNI_EP_MODE_DIAL; + + if (flags & NNG_FLAG_SYNCH) { nni_mutex_exit(mx); rv = nni_dial_once(ep); - nni_mutex_enter(mx); - - if (rv == 0) { - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_dialer; - ep->ep_dialer = NULL; + if (rv != 0) { + nni_thr_fini(&ep->ep_thr); + ep->ep_mode = NNI_EP_MODE_IDLE; + return (rv); } - nni_cond_signal(&ep->ep_cv); + nni_mutex_enter(mx); } -out: - nni_mutex_exit(mx); - if (reap != NULL) { - nni_thread_reap(reap); - } + nni_thr_run(&ep->ep_thr); + nni_mutex_exit(mx); return (rv); } @@ -282,15 +258,6 @@ nni_listener(void *arg) int rv; nni_mutex *mx = &ep->ep_sock->s_mx; - nni_mutex_enter(mx); - while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) { - nni_cond_wait(&ep->ep_cv); - } - if (ep->ep_stop || ep->ep_close) { - nni_mutex_exit(mx); - return; - } - nni_mutex_exit(mx); for (;;) { nni_time cooldown; nni_mutex_enter(mx); @@ -361,43 +328,37 @@ nni_endpt_listen(nni_endpt *ep, int flags) nni_mutex *mx = &ep->ep_sock->s_mx; nni_mutex_enter(mx); - if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { - rv = NNG_EBUSY; - goto out; + if (ep->ep_mode != NNI_EP_MODE_IDLE) { + nni_mutex_exit(mx); + return (NNG_EBUSY); } + if (ep->ep_close) { - rv = NNG_ECLOSED; - goto out; + nni_mutex_exit(mx); + return (NNG_ECLOSED); } - ep->ep_stop = 0; - ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1; - if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) { - rv = NNG_ENOMEM; - goto out; + if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) { + nni_mutex_exit(mx); + return (rv); } - if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) { + + ep->ep_mode = NNI_EP_MODE_LISTEN; + + if (flags & NNG_FLAG_SYNCH) { nni_mutex_exit(mx); rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mutex_enter(mx); - if (rv == 0) { - ep->ep_bound = 1; - ep->ep_start = 1; - } else { - // This will cause the thread to exit instead of - // starting. - ep->ep_stop = 1; - reap = ep->ep_listener; - ep->ep_listener = NULL; + if (rv != 0) { + nni_thr_fini(&ep->ep_thr); + ep->ep_mode = NNI_EP_MODE_IDLE; + return (rv); } - nni_cond_broadcast(&ep->ep_cv); + nni_mutex_enter(mx); + ep->ep_bound = 1; } -out: - nni_mutex_exit(mx); - if (reap != NULL) { - nni_thread_reap(reap); - } + nni_thr_run(&ep->ep_thr); + nni_mutex_exit(mx); - return (rv); + return (0); } diff --git a/src/core/endpt.h b/src/core/endpt.h index 73ed3243..757d58ce 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -21,16 +21,18 @@ struct nng_endpt { nni_list_node ep_node; // Per socket list nni_socket * ep_sock; char ep_addr[NNG_MAXADDRLEN]; - nni_thread * ep_dialer; - nni_thread * ep_listener; - int ep_stop; // thread exits before start - int ep_start; // start thread running + nni_thr ep_thr; + int ep_mode; int ep_close; // full shutdown int ep_bound; // true if we bound locally nni_cond ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) }; +#define NNI_EP_MODE_IDLE 0 +#define NNI_EP_MODE_DIAL 1 +#define NNI_EP_MODE_LISTEN 2 + extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *); extern int nni_endpt_accept(nni_endpt *, nni_pipe **); extern void nni_endpt_close(nni_endpt *); diff --git a/src/core/socket.c b/src/core/socket.c index 3536934a..a79b1fde 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -74,7 +74,7 @@ nni_reaper(void *arg) if ((sock->s_closing) && (nni_list_first(&sock->s_reaps) == NULL) && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_mutex_exit(&sock->s_mx); + nni_mutex_exit(&sock->s_mx); break; } @@ -132,7 +132,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); - } + } if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) { nni_msgqueue_destroy(sock->s_uwq); nni_thr_fini(&sock->s_reaper); |
