diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-06 09:10:07 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-06 09:10:07 -0700 |
| commit | 4771d04fb589c406ff8b2fc1b4edf93b2df42515 (patch) | |
| tree | 4d98c382b90f67eb158e613d6d0819201c6ed616 /src/core/endpt.c | |
| parent | 46a559cbdff02d8f9ca48bd918c17c33336de3a1 (diff) | |
| download | nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.tar.gz nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.tar.bz2 nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.zip | |
Endpoint now holds a reference on the socket.
Diffstat (limited to 'src/core/endpt.c')
| -rw-r--r-- | src/core/endpt.c | 115 |
1 files changed, 57 insertions, 58 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 76cddbc3..de166aff 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -52,7 +52,9 @@ nni_ep_hold(nni_ep **epp, uint32_t id) if (rv != 0) { return (NNG_ECLOSED); } - *epp = ep; + if (epp != NULL) { + *epp = ep; + } return (0); } @@ -84,16 +86,20 @@ nni_ep_ctor(uint32_t id) ep->ep_bound = 0; ep->ep_pipe = NULL; ep->ep_id = id; + ep->ep_data = NULL; NNI_LIST_NODE_INIT(&ep->ep_node); -#if 0 - if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) { - nni_cv_fini(&ep->ep_cv); + if ((rv = nni_mtx_init(&ep->ep_mtx)) != 0) { + NNI_FREE_STRUCT(ep); + return (NULL); + } + + if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) { + nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); return (NULL); } -#endif return (ep); } @@ -104,7 +110,18 @@ nni_ep_dtor(void *ptr) { nni_ep *ep = ptr; + // If a thread is running, make sure it is stopped. + nni_thr_fini(&ep->ep_thr); + + if (ep->ep_sock != NULL) { + // This is idempotent; harmless if not already on the list. + nni_sock_rem_ep(ep->ep_sock, ep); + } + if (ep->ep_data != NULL) { + ep->ep_ops.ep_fini(ep->ep_data); + } nni_cv_fini(&ep->ep_cv); + nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); } @@ -139,26 +156,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) // dereference on hot paths. ep->ep_ops = *tran->tran_ep; - if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); + if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) { nni_objhash_unref(nni_eps, id); - return (NNG_ECLOSED); + return (rv); } - rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock); - if (rv != 0) { - nni_mtx_unlock(&sock->s_mx); + if ((rv = nni_sock_add_ep(sock, ep)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } - nni_list_append(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); *epp = ep; return (0); @@ -169,9 +176,9 @@ void nni_ep_close(nni_ep *ep) { nni_pipe *pipe; - nni_mtx *mx = &ep->ep_sock->s_mx; + nni_sock *sock = ep->ep_sock; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); NNI_ASSERT(ep->ep_close == 0); ep->ep_close = 1; ep->ep_ops.ep_close(ep->ep_data); @@ -180,11 +187,7 @@ nni_ep_close(nni_ep *ep) ep->ep_pipe = NULL; } nni_cv_wake(&ep->ep_cv); - nni_list_remove(&ep->ep_sock->s_eps, ep); - nni_mtx_unlock(mx); - - nni_thr_fini(&ep->ep_thr); - ep->ep_ops.ep_fini(ep->ep_data); + nni_mtx_unlock(&ep->ep_mtx); nni_objhash_unref(nni_eps, ep->ep_id); } @@ -238,17 +241,15 @@ nni_dialer(void *arg) nni_duration maxrtime; nni_duration defrtime; nni_duration rtime; - nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mtx_lock(mx); + // XXX: these need to be obtained from the socket using a lock. defrtime = ep->ep_sock->s_reconn; if ((maxrtime = ep->ep_sock->s_reconnmax) == 0) { maxrtime = defrtime; } - nni_mtx_unlock(mx); for (;;) { - nni_mtx_lock(mx); + // XXX: socket lock please... if ((defrtime != ep->ep_sock->s_reconn) || (maxrtime != ep->ep_sock->s_reconnmax)) { // Times changed, so reset them. @@ -258,15 +259,17 @@ nni_dialer(void *arg) } rtime = defrtime; } + + nni_mtx_lock(&ep->ep_mtx); while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { rtime = defrtime; nni_cv_wait(&ep->ep_cv); } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); break; } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = nni_dial_once(ep); switch (rv) { @@ -286,15 +289,14 @@ nni_dialer(void *arg) } // we inject a delay so we don't just spin hard on // errors like connection refused. - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); while (!ep->ep_close) { - // We need a different condvar... rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); } } @@ -303,37 +305,36 @@ int nni_ep_dial(nni_ep *ep, int flags) { int rv = 0; - nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } ep->ep_mode = NNI_EP_MODE_DIAL; if (flags & NNG_FLAG_SYNCH) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = nni_dial_once(ep); if (rv != 0) { nni_thr_fini(&ep->ep_thr); ep->ep_mode = NNI_EP_MODE_IDLE; return (rv); } - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); } nni_thr_run(&ep->ep_thr); - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } @@ -367,19 +368,18 @@ nni_listener(void *arg) nni_ep *ep = arg; nni_pipe *pipe; int rv; - nni_mtx *mx = &ep->ep_sock->s_mx; for (;;) { nni_time cooldown; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); // If we didn't bind synchronously, do it now. while (!ep->ep_bound && !ep->ep_close) { int rv; - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); if (rv == 0) { ep->ep_bound = 1; @@ -398,10 +398,10 @@ nni_listener(void *arg) } } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); break; } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); pipe = NULL; @@ -435,14 +435,14 @@ nni_listener(void *arg) break; } cooldown += nni_clock(); - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); while (!ep->ep_close) { rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); } } @@ -451,40 +451,39 @@ int nni_ep_listen(nni_ep *ep, int flags) { int rv = 0; - nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } ep->ep_mode = NNI_EP_MODE_LISTEN; if (flags & NNG_FLAG_SYNCH) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { nni_thr_fini(&ep->ep_thr); ep->ep_mode = NNI_EP_MODE_IDLE; return (rv); } - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); ep->ep_bound = 1; } nni_thr_run(&ep->ep_thr); - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (0); } |
