diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-06-06 09:10:07 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-06-06 09:10:07 -0700 |
| commit | 4771d04fb589c406ff8b2fc1b4edf93b2df42515 (patch) | |
| tree | 4d98c382b90f67eb158e613d6d0819201c6ed616 /src | |
| parent | 46a559cbdff02d8f9ca48bd918c17c33336de3a1 (diff) | |
| download | nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.tar.gz nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.tar.bz2 nng-4771d04fb589c406ff8b2fc1b4edf93b2df42515.zip | |
Endpoint now holds a reference on the socket.
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/endpt.c | 115 | ||||
| -rw-r--r-- | src/core/endpt.h | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 48 | ||||
| -rw-r--r-- | src/core/socket.h | 3 |
4 files changed, 108 insertions, 59 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 76cddbc3..de166aff 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -52,7 +52,9 @@ nni_ep_hold(nni_ep **epp, uint32_t id) if (rv != 0) { return (NNG_ECLOSED); } - *epp = ep; + if (epp != NULL) { + *epp = ep; + } return (0); } @@ -84,16 +86,20 @@ nni_ep_ctor(uint32_t id) ep->ep_bound = 0; ep->ep_pipe = NULL; ep->ep_id = id; + ep->ep_data = NULL; NNI_LIST_NODE_INIT(&ep->ep_node); -#if 0 - if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) { - nni_cv_fini(&ep->ep_cv); + if ((rv = nni_mtx_init(&ep->ep_mtx)) != 0) { + NNI_FREE_STRUCT(ep); + return (NULL); + } + + if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) { + nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); return (NULL); } -#endif return (ep); } @@ -104,7 +110,18 @@ nni_ep_dtor(void *ptr) { nni_ep *ep = ptr; + // If a thread is running, make sure it is stopped. + nni_thr_fini(&ep->ep_thr); + + if (ep->ep_sock != NULL) { + // This is idempotent; harmless if not already on the list. + nni_sock_rem_ep(ep->ep_sock, ep); + } + if (ep->ep_data != NULL) { + ep->ep_ops.ep_fini(ep->ep_data); + } nni_cv_fini(&ep->ep_cv); + nni_mtx_fini(&ep->ep_mtx); NNI_FREE_STRUCT(ep); } @@ -139,26 +156,16 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) // dereference on hot paths. ep->ep_ops = *tran->tran_ep; - if ((rv = nni_cv_init(&ep->ep_cv, &sock->s_mx)) != 0) { - NNI_FREE_STRUCT(ep); - return (rv); - } - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); + if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock)) != 0) { nni_objhash_unref(nni_eps, id); - return (NNG_ECLOSED); + return (rv); } - rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock); - if (rv != 0) { - nni_mtx_unlock(&sock->s_mx); + if ((rv = nni_sock_add_ep(sock, ep)) != 0) { nni_objhash_unref(nni_eps, id); return (rv); } - nni_list_append(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); *epp = ep; return (0); @@ -169,9 +176,9 @@ void nni_ep_close(nni_ep *ep) { nni_pipe *pipe; - nni_mtx *mx = &ep->ep_sock->s_mx; + nni_sock *sock = ep->ep_sock; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); NNI_ASSERT(ep->ep_close == 0); ep->ep_close = 1; ep->ep_ops.ep_close(ep->ep_data); @@ -180,11 +187,7 @@ nni_ep_close(nni_ep *ep) ep->ep_pipe = NULL; } nni_cv_wake(&ep->ep_cv); - nni_list_remove(&ep->ep_sock->s_eps, ep); - nni_mtx_unlock(mx); - - nni_thr_fini(&ep->ep_thr); - ep->ep_ops.ep_fini(ep->ep_data); + nni_mtx_unlock(&ep->ep_mtx); nni_objhash_unref(nni_eps, ep->ep_id); } @@ -238,17 +241,15 @@ nni_dialer(void *arg) nni_duration maxrtime; nni_duration defrtime; nni_duration rtime; - nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mtx_lock(mx); + // XXX: these need to be obtained from the socket using a lock. defrtime = ep->ep_sock->s_reconn; if ((maxrtime = ep->ep_sock->s_reconnmax) == 0) { maxrtime = defrtime; } - nni_mtx_unlock(mx); for (;;) { - nni_mtx_lock(mx); + // XXX: socket lock please... if ((defrtime != ep->ep_sock->s_reconn) || (maxrtime != ep->ep_sock->s_reconnmax)) { // Times changed, so reset them. @@ -258,15 +259,17 @@ nni_dialer(void *arg) } rtime = defrtime; } + + nni_mtx_lock(&ep->ep_mtx); while ((!ep->ep_close) && (ep->ep_pipe != NULL)) { rtime = defrtime; nni_cv_wait(&ep->ep_cv); } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); break; } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = nni_dial_once(ep); switch (rv) { @@ -286,15 +289,14 @@ nni_dialer(void *arg) } // we inject a delay so we don't just spin hard on // errors like connection refused. - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); while (!ep->ep_close) { - // We need a different condvar... rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); } } @@ -303,37 +305,36 @@ int nni_ep_dial(nni_ep *ep, int flags) { int rv = 0; - nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } if ((rv = nni_thr_init(&ep->ep_thr, nni_dialer, ep)) != 0) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } ep->ep_mode = NNI_EP_MODE_DIAL; if (flags & NNG_FLAG_SYNCH) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = nni_dial_once(ep); if (rv != 0) { nni_thr_fini(&ep->ep_thr); ep->ep_mode = NNI_EP_MODE_IDLE; return (rv); } - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); } nni_thr_run(&ep->ep_thr); - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } @@ -367,19 +368,18 @@ nni_listener(void *arg) nni_ep *ep = arg; nni_pipe *pipe; int rv; - nni_mtx *mx = &ep->ep_sock->s_mx; for (;;) { nni_time cooldown; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); // If we didn't bind synchronously, do it now. while (!ep->ep_bound && !ep->ep_close) { int rv; - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = ep->ep_ops.ep_bind(ep->ep_data); - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); if (rv == 0) { ep->ep_bound = 1; @@ -398,10 +398,10 @@ nni_listener(void *arg) } } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); break; } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); pipe = NULL; @@ -435,14 +435,14 @@ nni_listener(void *arg) break; } cooldown += nni_clock(); - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); while (!ep->ep_close) { rv = nni_cv_until(&ep->ep_cv, cooldown); if (rv == NNG_ETIMEDOUT) { break; } } - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); } } @@ -451,40 +451,39 @@ int nni_ep_listen(nni_ep *ep, int flags) { int rv = 0; - nni_mtx *mx = &ep->ep_sock->s_mx; - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); if (ep->ep_mode != NNI_EP_MODE_IDLE) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_EBUSY); } if (ep->ep_close) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (NNG_ECLOSED); } if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (rv); } ep->ep_mode = NNI_EP_MODE_LISTEN; if (flags & NNG_FLAG_SYNCH) { - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); rv = ep->ep_ops.ep_bind(ep->ep_data); if (rv != 0) { nni_thr_fini(&ep->ep_thr); ep->ep_mode = NNI_EP_MODE_IDLE; return (rv); } - nni_mtx_lock(mx); + nni_mtx_lock(&ep->ep_mtx); ep->ep_bound = 1; } nni_thr_run(&ep->ep_thr); - nni_mtx_unlock(mx); + nni_mtx_unlock(&ep->ep_mtx); return (0); } diff --git a/src/core/endpt.h b/src/core/endpt.h index 0dc0433d..a9029cc9 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -30,6 +30,7 @@ struct nni_ep { int ep_mode; int ep_close; // full shutdown int ep_bound; // true if we bound locally + nni_mtx ep_mtx; nni_cv ep_cv; nni_pipe * ep_pipe; // Connected pipe (dialers only) }; diff --git a/src/core/socket.c b/src/core/socket.c index ffacd59a..d091d126 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -56,7 +56,10 @@ nni_sock_hold(nni_sock **sockp, uint32_t id) return (NNG_ECLOSED); } nni_mtx_unlock(&sock->s_mx); - *sockp = sock; + + if (sockp != NULL) { + *sockp = sock; + } return (0); } @@ -594,6 +597,49 @@ nni_sock_shutdown(nni_sock *sock) } +// nni_sock_add_ep adds a newly created endpoint to the socket. The +// caller must hold references on the sock and the ep, and not be holding +// the socket lock. The ep acquires a reference against the sock, +// which will be dropped later by nni_sock_rem_ep. The endpoint must not +// already be associated with a socket. (Note, the ep holds the reference +// on the socket, not the other way around.) +int +nni_sock_add_ep(nni_sock *sock, nni_ep *ep) +{ + int rv; + + if ((rv = nni_sock_hold(NULL, sock->s_id)) != 0) { + return (rv); + } + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + nni_sock_rele(sock); + return (NNG_ECLOSED); + } + nni_list_append(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); + return (0); +} + + +void +nni_sock_rem_ep(nni_sock *sock, nni_ep *ep) +{ + nni_mtx_lock(&sock->s_mx); + // If we're not on the list, then nothing to do. Be idempotent. + if (!nni_list_active(&sock->s_eps, ep)) { + nni_mtx_unlock(&sock->s_mx); + return; + } + nni_list_remove(&sock->s_eps, ep); + nni_mtx_unlock(&sock->s_mx); + + // Drop the reference the EP acquired in add_ep. + nni_sock_rele(sock); +} + + // nni_sock_close shuts down the socket, then releases any resources // associated with it. It is a programmer error to reference the socket // after this function is called, as the pointer may reference invalid diff --git a/src/core/socket.h b/src/core/socket.h index 5fc3d593..247513a0 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -85,6 +85,9 @@ extern void nni_sock_unlock(nni_sock *); extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); extern void nni_sock_unnotify(nni_sock *, nni_notify *); +extern int nni_sock_add_ep(nni_sock *, nni_ep *); +extern void nni_sock_rem_ep(nni_sock *, nni_ep *); + // nni_sock_pipe_add is called by the pipe to register the pipe with // with the socket. The pipe is added to the idle list. The protocol // private pipe data is initialized as well. |
