aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-12 12:24:54 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-14 13:43:02 -0700
commit343417234aa3fd86e8ae0b56ae500a1ed3411cfc (patch)
tree728992cfe8c2987d5939026a1f734dcc58b3df18 /src/core/socket.c
parent4fb81f024e5f32a186cd5538574f8e5796980e36 (diff)
downloadnng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.gz
nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.tar.bz2
nng-343417234aa3fd86e8ae0b56ae500a1ed3411cfc.zip
fixes #62 Endpoint close should be synchronous #62
fixes #66 Make pipe and endpoint structures private This changes a number of things, refactoring endpoints and supporting code to keep their internals private, and making endpoint close synchronous. This will allow us to add a consumer facing API for nng_ep_close(), as well as property APIs, etc. While here a bunch of convoluted and dead code was cleaned up.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c283
1 files changed, 74 insertions, 209 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 9471e56e..e01791a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -64,132 +64,63 @@ nni_sock_find(nni_sock **sockp, uint32_t id)
void
nni_sock_rele(nni_sock *s)
{
- nni_mtx_lock(&s->s_mx);
+ nni_mtx_lock(&nni_sock_lk);
s->s_refcnt--;
- if (s->s_closing) {
- nni_cv_wake(&s->s_cv);
+ if (s->s_closed && (s->s_refcnt < 2)) {
+ nni_cv_wake(&s->s_close_cv);
}
- nni_mtx_unlock(&s->s_mx);
+ nni_mtx_unlock(&nni_sock_lk);
}
-static int
-nni_sock_pipe_start(nni_pipe *pipe)
+int
+nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- nni_sock *s = pipe->p_sock;
- void * pdata = nni_pipe_get_proto_data(pipe);
- int rv;
+ void *pdata = nni_pipe_get_proto_data(pipe);
+ int rv;
NNI_ASSERT(s != NULL);
+ nni_mtx_lock(&s->s_mx);
if (s->s_closing) {
// We're closing, bail out.
- return (NNG_ECLOSED);
- }
- if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
+ rv = NNG_ECLOSED;
+ } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
// Peer protocol mismatch.
- return (NNG_EPROTO);
- }
- if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) {
- // Protocol rejection for other reasons.
- // E.g. pair and already have active connected partner.
- return (rv);
+ rv = NNG_EPROTO;
+ } else {
+ // Protocol can reject for other reasons.
+ rv = s->s_pipe_ops.pipe_start(pdata);
}
+ nni_mtx_unlock(&s->s_mx);
return (0);
}
-static void
-nni_sock_pipe_start_cb(void *arg)
+int
+nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
{
- nni_pipe *pipe = arg;
- nni_aio * aio = &pipe->p_start_aio;
+ int rv;
+ void *pdata;
- if (nni_aio_result(aio) != 0) {
- // Failed I/O during start, abort everything.
- nni_pipe_stop(pipe);
- return;
- }
- if (nni_sock_pipe_start(pipe) != 0) {
- nni_pipe_stop(pipe);
- return;
+ if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) {
+ return (rv);
}
-}
-
-int
-nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe)
-{
- int rv;
+ nni_pipe_set_proto_data(p, pdata);
// Initialize protocol pipe data.
nni_mtx_lock(&s->s_mx);
- nni_mtx_lock(&ep->ep_mtx);
-
- if ((s->s_closing) || (ep->ep_closed)) {
- nni_mtx_unlock(&ep->ep_mtx);
+ if (s->s_closing) {
nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_unlock(&s->s_mx);
- return (rv);
- }
- rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_lock(&s->s_mx);
- return (rv);
- }
- // Save the protocol destructor.
- pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini;
- pipe->p_sock = s;
- pipe->p_ep = ep;
-
- nni_list_append(&s->s_pipes, pipe);
- nni_list_append(&ep->ep_pipes, pipe);
+ nni_list_append(&s->s_pipes, p);
// Start the initial negotiation I/O...
- if (pipe->p_tran_ops.p_start == NULL) {
- if (nni_sock_pipe_start(pipe) != 0) {
- nni_pipe_stop(pipe);
- }
- } else {
- pipe->p_tran_ops.p_start(
- pipe->p_tran_data, &pipe->p_start_aio);
- }
+ nni_pipe_start(p);
- nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&s->s_mx);
return (0);
}
-int
-nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
-{
- int rv;
- void *pdata = nni_pipe_get_proto_data(pipe);
-
- nni_mtx_lock(&sock->s_mx);
-
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_EPROTO);
- }
-
- if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
-
- nni_mtx_unlock(&sock->s_mx);
-
- return (0);
-}
-
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
@@ -197,23 +128,20 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
pdata = nni_pipe_get_proto_data(pipe);
- // Stop any pending negotiation.
- nni_aio_stop(&pipe->p_start_aio);
-
- nni_mtx_lock(&sock->s_mx);
- if ((sock->s_pipe_ops.pipe_stop == NULL) || (pdata == NULL)) {
- nni_mtx_unlock(&sock->s_mx);
- return;
- }
-
- sock->s_pipe_ops.pipe_stop(pdata);
- if (nni_list_active(&sock->s_pipes, pipe)) {
- nni_list_remove(&sock->s_pipes, pipe);
- if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
- nni_cv_wake(&sock->s_cv);
+ if (pdata != NULL) {
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_ops.pipe_stop(pdata);
+ if (nni_list_active(&sock->s_pipes, pipe)) {
+ nni_list_remove(&sock->s_pipes, pipe);
+ if (sock->s_closing &&
+ nni_list_empty(&sock->s_pipes)) {
+ nni_cv_wake(&sock->s_cv);
+ }
}
+ sock->s_pipe_ops.pipe_fini(pdata);
+ nni_pipe_set_proto_data(pipe, NULL);
+ nni_mtx_unlock(&sock->s_mx);
}
- nni_mtx_unlock(&sock->s_mx);
}
void
@@ -329,6 +257,7 @@ nni_sock_destroy(nni_sock *s)
nni_ev_fini(&s->s_recv_ev);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
+ nni_cv_fini(&s->s_close_cv);
nni_cv_fini(&s->s_cv);
nni_mtx_fini(&s->s_mx);
NNI_FREE_STRUCT(s);
@@ -372,6 +301,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_mtx_init(&s->s_mx)) != 0) ||
((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) ||
+ ((rv = nni_cv_init(&s->s_close_cv, &nni_sock_lk)) != 0) ||
((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) ||
((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) ||
((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
@@ -446,6 +376,7 @@ nni_sock_shutdown(nni_sock *sock)
{
nni_pipe *pipe;
nni_ep * ep;
+ nni_ep * nep;
nni_time linger;
nni_mtx_lock(&sock->s_mx);
@@ -468,7 +399,7 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_close(ep);
+ nni_ep_shutdown(ep);
}
nni_mtx_unlock(&sock->s_mx);
@@ -498,10 +429,20 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each ep, arrange for it to teardown hard.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_stop(ep);
+ // Go through the endpoint list, attempting to close them.
+ // We might already have a close in progress, in which case
+ // we skip past it; it will be removed from another thread.
+ nep = nni_list_first(&sock->s_eps);
+ while ((ep = nep) != NULL) {
+ nep = nni_list_next(&sock->s_eps, nep);
+
+ if (nni_ep_hold(ep) == 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ep_close(ep);
+ nni_mtx_lock(&sock->s_mx);
+ }
}
+
// For each pipe, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_stop(pipe);
@@ -527,38 +468,6 @@ nni_sock_shutdown(nni_sock *sock)
return (0);
}
-void
-nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
-{
- nni_pipe *pipe;
- // If we're not on the list, then nothing to do. Be idempotent.
- // Note that if the ep is not on a list, then we assume that we have
- // exclusive access. Therefore the check for being active need not
- // be locked.
- if (!nni_list_node_active(&ep->ep_node)) {
- return;
- }
-
- // This is done under the endpoints lock, although the remove
- // is done under that as well, we also make sure that we hold
- // the socket lock in the remove step.
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
- nni_pipe_stop(pipe);
- }
- while (!nni_list_empty(&ep->ep_pipes)) {
- nni_cv_wait(&ep->ep_cv);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- nni_mtx_lock(&sock->s_mx);
- nni_list_remove(&sock->s_eps, ep);
- if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
- nni_cv_wake(&sock->s_cv);
- }
- nni_mtx_unlock(&sock->s_mx);
-}
-
// nni_sock_close shuts down the socket, then releases any resources
// associated with it. It is a programmer error to reference the socket
// after this function is called, as the pointer may reference invalid
@@ -585,13 +494,17 @@ nni_sock_close(nni_sock *s)
// nni_sock_closeall. This is idempotent.
nni_list_node_remove(&s->s_node);
- nni_mtx_unlock(&nni_sock_lk);
-
// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
+ while (s->s_refcnt > 1) {
+ nni_cv_wait(&s->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // Wait for pipe and eps to finish closing.
nni_mtx_lock(&s->s_mx);
- while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) ||
- (!nni_list_empty(&s->s_eps))) {
+ while (
+ (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) {
nni_cv_wait(&s->s_cv);
}
nni_mtx_unlock(&s->s_mx);
@@ -742,77 +655,29 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
+nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
{
- nni_ep *ep;
- int rv;
-
nni_mtx_lock(&sock->s_mx);
- if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) {
+ if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);
- return (rv);
+ return (NNG_ECLOSED);
}
- nni_mtx_lock(&ep->ep_mtx);
nni_list_append(&sock->s_eps, ep);
- // Put a hold on the endpoint, for now.
- ep->ep_refcnt++;
- ep->ep_started = 1;
- nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
-
- if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_stop(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
-
- // Drop our endpoint hold.
- nni_mtx_lock(&ep->ep_mtx);
- if (rv != 0) {
- ep->ep_started = 0;
- }
- ep->ep_refcnt--;
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (rv);
+ return (0);
}
-int
-nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
+void
+nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
{
- nni_ep *ep;
- int rv;
-
nni_mtx_lock(&sock->s_mx);
- if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
+ if (nni_list_active(&sock->s_eps, ep)) {
+ nni_list_remove(&sock->s_eps, ep);
+ if ((sock->s_closed) && (nni_list_empty(&sock->s_eps))) {
+ nni_cv_wake(&sock->s_cv);
+ }
}
-
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_refcnt++;
- ep->ep_started = 1;
- nni_mtx_unlock(&ep->ep_mtx);
nni_mtx_unlock(&sock->s_mx);
-
- if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_stop(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
-
- // Drop our endpoint hold.
- nni_mtx_lock(&ep->ep_mtx);
- if (rv != 0) {
- ep->ep_started = 0;
- }
- ep->ep_refcnt--;
- nni_cv_wake(&ep->ep_cv);
- nni_mtx_unlock(&ep->ep_mtx);
-
- return (rv);
}
void