diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/pipe.c | 49 | ||||
| -rw-r--r-- | src/core/socket.c | 201 | ||||
| -rw-r--r-- | src/core/socket.h | 10 |
3 files changed, 148 insertions, 112 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index 321e1a09..ccd0dbe7 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -29,6 +29,7 @@ struct nni_pipe { nni_ep * p_ep; bool p_closed; bool p_stop; + bool p_cbs; int p_refcnt; nni_mtx p_mtx; nni_cv p_cv; @@ -100,10 +101,19 @@ nni_pipe_sys_fini(void) static void nni_pipe_destroy(nni_pipe *p) { + bool cbs; if (p == NULL) { return; } + nni_mtx_lock(&p->p_mtx); + cbs = p->p_cbs; + nni_mtx_unlock(&p->p_mtx); + + if (cbs) { + nni_sock_run_pipe_cb(p->p_sock, NNG_PIPE_EV_REM_POST, p->p_id); + } + // Stop any pending negotiation. nni_aio_stop(p->p_start_aio); @@ -119,6 +129,7 @@ nni_pipe_destroy(nni_pipe *p) if (nni_list_node_active(&p->p_ep_node)) { nni_ep_pipe_remove(p->p_ep, p); } + if (nni_list_node_active(&p->p_sock_node)) { nni_sock_pipe_remove(p->p_sock, p); } @@ -263,13 +274,32 @@ static void nni_pipe_start_cb(void *arg) { nni_pipe *p = arg; + nni_sock *s = p->p_sock; nni_aio * aio = p->p_start_aio; + uint32_t id = nni_pipe_id(p); - if ((nni_aio_result(aio) != 0) || - (nni_sock_pipe_start(p->p_sock, p) != 0) || - (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) { + if (nni_aio_result(aio) != 0) { nni_pipe_stop(p); + return; } + + nni_mtx_lock(&p->p_mtx); + p->p_cbs = true; // We're running all cbs going forward + nni_mtx_unlock(&p->p_mtx); + + nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_PRE, id); + if (nni_pipe_closed(p)) { + nni_pipe_stop(p); + return; + } + + if ((p->p_proto_ops.pipe_start(p->p_proto_data) != 0) || + nni_sock_closing(s)) { + nni_pipe_stop(p); + return; + } + + nni_sock_run_pipe_cb(s, NNG_PIPE_EV_ADD_POST, id); } int @@ -289,6 +319,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) } // Make a private copy of the transport ops. + p->p_start_aio = NULL; p->p_tran_ops = *tran->tran_pipe; p->p_tran_data = tdata; p->p_proto_ops = *pops; @@ -297,6 +328,7 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_sock = sock; p->p_closed = false; p->p_stop = false; + p->p_cbs = false; p->p_refcnt = 0; NNI_LIST_NODE_INIT(&p->p_reap_node); @@ -317,18 +349,13 @@ nni_pipe_create(nni_ep *ep, void *tdata) if ((rv != 0) || ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) || - ((rv = nni_ep_pipe_add(ep, p)) != 0)) { + ((rv = nni_ep_pipe_add(ep, p)) != 0) || + ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); return (rv); } - // At this point the endpoint knows about it, and the protocol - // might too, so on failure we have to tear it down fully as if done - // after a successful result. - if ((rv = nni_sock_pipe_add(sock, p)) != 0) { - nni_pipe_stop(p); - } - return (rv); + return (0); } int diff --git a/src/core/socket.c b/src/core/socket.c index f14c15fd..3a209441 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -47,6 +47,11 @@ typedef struct nni_sockopt { void * data; } nni_sockopt; +typedef struct nni_sock_pipe_cb { + nng_pipe_cb cb_fn; + void * cb_arg; +} nni_sock_pipe_cb; + struct nni_socket { nni_list_node s_node; nni_mtx s_mx; @@ -81,11 +86,12 @@ struct nni_socket { nni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - bool s_closing; // Socket is closing - bool s_closed; // Socket closed, protected by global lock - bool s_ctxwait; // Waiting for contexts to close. - nng_pipe_cb s_pipe_cb; // User callback for pipe events. - void * s_pipe_cbarg; // Argument for pipe events. + bool s_closing; // Socket is closing + bool s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. + + nni_mtx s_pipe_cbs_mtx; + nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; }; static void nni_ctx_destroy(nni_ctx *); @@ -431,39 +437,34 @@ nni_sock_rele(nni_sock *s) nni_mtx_unlock(&nni_sock_lk); } -int -nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) +bool +nni_sock_closing(nni_sock *s) { - nng_pipe_cb cb; - - NNI_ASSERT(s != NULL); + bool rv; nni_mtx_lock(&s->s_mx); - if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { - // Peer protocol mismatch. - nni_mtx_unlock(&s->s_mx); - return (NNG_EPROTO); - } - if ((cb = s->s_pipe_cb) != NULL) { - nng_pipe p; - void * arg = s->s_pipe_cbarg; - nni_mtx_unlock(&s->s_mx); - p.id = nni_pipe_id(pipe); - cb(p, NNG_PIPE_ADD, arg); - if (nni_pipe_closed(pipe)) { - return (NNG_ECLOSED); + rv = s->s_closing; + nni_mtx_unlock(&s->s_mx); + return (rv); +} + +void +nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id) +{ + if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + nng_pipe_cb cb; + void * arg; + + nni_mtx_lock(&s->s_pipe_cbs_mtx); + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + + if (cb != NULL) { + nng_pipe p; + p.id = id; + cb(p, ev, arg); } - nni_mtx_lock(&s->s_mx); - } - if (s->s_closing) { - // We're closing, bail out. This has to be done after - // we have dropped the lock above in case the sock is closed - // while the user callback runs. - nni_mtx_unlock(&s->s_mx); - return (NNG_ECLOSED); } - - nni_mtx_unlock(&s->s_mx); - return (0); } int @@ -486,26 +487,16 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p) } void -nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) +nni_sock_pipe_remove(nni_sock *s, nni_pipe *p) { - nng_pipe_cb cb; - - nni_mtx_lock(&sock->s_mx); - if ((cb = sock->s_pipe_cb) != NULL) { - void * arg = sock->s_pipe_cbarg; - nng_pipe p; - nni_mtx_unlock(&sock->s_mx); - p.id = nni_pipe_id(pipe); - cb(p, NNG_PIPE_REM, arg); - nni_mtx_lock(&sock->s_mx); - } - if (nni_list_active(&sock->s_pipes, pipe)) { - nni_list_remove(&sock->s_pipes, pipe); + nni_mtx_lock(&s->s_mx); + if (nni_list_active(&s->s_pipes, p)) { + nni_list_remove(&s->s_pipes, p); } - if (sock->s_closing && nni_list_empty(&sock->s_pipes)) { - nni_cv_wake(&sock->s_cv); + if (s->s_closing && nni_list_empty(&s->s_pipes)) { + nni_cv_wake(&s->s_cv); } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); } static void @@ -532,6 +523,7 @@ nni_sock_destroy(nni_sock *s) nni_cv_fini(&s->s_close_cv); nni_cv_fini(&s->s_cv); nni_mtx_fini(&s->s_mx); + nni_mtx_fini(&s->s_pipe_cbs_mtx); NNI_FREE_STRUCT(s); } @@ -574,6 +566,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); + nni_mtx_init(&s->s_pipe_cbs_mtx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &nni_sock_lk); @@ -668,10 +661,10 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) return (rv); } -// nni_sock_shutdown shuts down the socket; after this point no further -// access to the socket will function, and any threads blocked in entry -// points will be woken (and the functions they are blocked in will return -// NNG_ECLOSED.) +// nni_sock_shutdown shuts down the socket; after this point no +// further access to the socket will function, and any threads blocked +// in entry points will be woken (and the functions they are blocked +// in will return NNG_ECLOSED.) int nni_sock_shutdown(nni_sock *sock) { @@ -689,8 +682,8 @@ nni_sock_shutdown(nni_sock *sock) // Mark us closing, so no more EPs or changes can occur. sock->s_closing = true; - // Close the EPs. This prevents new connections from forming but - // but allows existing ones to drain. + // Close the EPs. This prevents new connections from forming + // but but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { nni_ep_shutdown(ep); } @@ -728,9 +721,9 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_lock(&sock->s_mx); - // At this point, we've done everything we politely can to give - // the protocol a chance to flush its write side. Now its time - // to be a little more insistent. + // At this point, we've done everything we politely can to + // give the protocol a chance to flush its write side. Now + // its time to be a little more insistent. // Close the upper queues immediately. This can happen // safely while we hold the lock. @@ -756,7 +749,8 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe_stop(pipe); } - // We have to wait for *both* endpoints and pipes to be removed. + // We have to wait for *both* endpoints and pipes to be + // removed. while ((!nni_list_empty(&sock->s_pipes)) || (!nni_list_empty(&sock->s_eps))) { nni_cv_wait(&sock->s_cv); @@ -777,9 +771,9 @@ nni_sock_shutdown(nni_sock *sock) } // nni_sock_close shuts down the socket, then releases any resources -// associated with it. It is a programmer error to reference the socket -// after this function is called, as the pointer may reference invalid -// memory or other objects. +// associated with it. It is a programmer error to reference the +// socket after this function is called, as the pointer may reference +// invalid memory or other objects. void nni_sock_close(nni_sock *s) { @@ -789,8 +783,8 @@ nni_sock_close(nni_sock *s) nni_mtx_lock(&nni_sock_lk); if (s->s_closed) { - // Some other thread called close. All we need to do is - // drop our reference count. + // Some other thread called close. All we need to do + // is drop our reference count. nni_mtx_unlock(&nni_sock_lk); nni_sock_rele(s); return; @@ -835,8 +829,8 @@ nni_sock_closeall(void) nni_mtx_unlock(&nni_sock_lk); return; } - // Bump the reference count. The close call below will - // drop it. + // Bump the reference count. The close call below + // will drop it. s->s_refcnt++; nni_list_node_remove(&s->s_node); nni_mtx_unlock(&nni_sock_lk); @@ -962,7 +956,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } // Protocol options. The protocol can override options that - // the socket framework would otherwise supply, like buffer sizes. + // the socket framework would otherwise supply, like buffer + // sizes. for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) { if (strcmp(pso->pso_name, name) != 0) { continue; @@ -976,7 +971,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) return (rv); } - // Some options do not go down to transports. Handle them directly. + // Some options do not go down to transports. Handle them + // directly. for (sso = nni_sock_options; sso->so_name != NULL; sso++) { if (strcmp(sso->so_name, name) != 0) { continue; @@ -997,17 +993,20 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) return (rv); } - // Validation of transport options. This is stateless, so transports - // should not fail to set an option later if they passed it here. + // Validation of transport options. This is stateless, so + // transports should not fail to set an option later if they + // passed it here. rv = nni_tran_chkopt(name, v, sz, t); - // Also check a few generic things. We do this if no transport - // was found, or even if a transport rejected one of the settings. + // Also check a few generic things. We do this if no + // transport was found, or even if a transport rejected one of + // the settings. if ((rv == NNG_ENOTSUP) || (rv == 0)) { if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) { size_t z; - // just a sanity test on the size; it also ensures that - // a size can be set even with no transport configured. + // just a sanity test on the size; it also + // ensures that a size can be set even with no + // transport configured. rv = nni_copyin_size(&z, v, sz, 0, NNI_MAXSZ, t); } } @@ -1049,9 +1048,10 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } } - // Apply the options. Failure to set any option on any transport - // (other than ENOTSUP) stops the operation altogether. Its - // important that transport wide checks properly pre-validate. + // Apply the options. Failure to set any option on any + // transport (other than ENOTSUP) stops the operation + // altogether. Its important that transport wide checks + // properly pre-validate. NNI_LIST_FOREACH (&s->s_eps, ep) { int x; if (optv->typ == NNI_TYPE_OPAQUE) { @@ -1075,15 +1075,16 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } if (rv == 0) { - // Remove and toss the old value, we are using a new one. + // Remove and toss the old value, we are using a new + // one. if (oldv != NULL) { nni_list_remove(&s->s_options, oldv); nni_free_opt(oldv); } - // Insert our new value. This permits it to be compared - // against later, and for new endpoints to automatically - // receive these values, + // Insert our new value. This permits it to be + // compared against later, and for new endpoints to + // automatically receive these values, nni_list_append(&s->s_options, optv); } else { nni_free_opt(optv); @@ -1108,7 +1109,8 @@ nni_sock_getopt(nni_sock *s, const char *name, void *val, size_t *szp, int t) } // Protocol specific options. The protocol can override - // options like the send buffer or notification descriptors this way. + // options like the send buffer or notification descriptors + // this way. for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) { if (strcmp(name, pso->pso_name) != 0) { continue; @@ -1166,12 +1168,14 @@ nni_sock_flags(nni_sock *sock) } void -nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg) +nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) { - nni_mtx_lock(&sock->s_mx); - sock->s_pipe_cb = cb; - sock->s_pipe_cbarg = arg; - nni_mtx_unlock(&sock->s_mx); + if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + nni_mtx_lock(&s->s_pipe_cbs_mtx); + s->s_pipe_cbs[ev].cb_fn = cb; + s->s_pipe_cbs[ev].cb_arg = arg; + nni_mtx_unlock(&s->s_pipe_cbs_mtx); + } } int @@ -1185,11 +1189,12 @@ nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) } nni_mtx_lock(&nni_sock_lk); if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) { - // We refuse a reference if either the socket is closed, - // or the context is closed. (If the socket is closed, - // and we are only getting the reference so we can close it, - // then we still allow. In the case the only valid operation - // will be to close the socket.) + // We refuse a reference if either the socket is + // closed, or the context is closed. (If the socket + // is closed, and we are only getting the reference so + // we can close it, then we still allow. In the case + // the only valid operation will be to close the + // socket.) if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) { rv = NNG_ECLOSED; } else { @@ -1224,8 +1229,8 @@ nni_ctx_rele(nni_ctx *ctx) nni_mtx_lock(&nni_sock_lk); ctx->c_refcnt--; if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) { - // Either still have an active reference, or not actually - // closing yet. + // Either still have an active reference, or not + // actually closing yet. nni_mtx_unlock(&nni_sock_lk); return; } @@ -1288,8 +1293,8 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) nni_mtx_unlock(&nni_sock_lk); // Paranoia, fixing a possible race in close. Don't let us - // give back a context if the socket is being shutdown (it might - // not have reached the "closed" state yet.) + // give back a context if the socket is being shutdown (it + // might not have reached the "closed" state yet.) nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { nni_mtx_unlock(&sock->s_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 22a13ef7..833129fa 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -44,7 +44,6 @@ extern uint32_t nni_sock_id(nni_sock *); // a pipe could wind up orphaned. extern int nni_sock_pipe_add(nni_sock *, nni_pipe *); extern void nni_sock_pipe_remove(nni_sock *, nni_pipe *); -extern int nni_sock_pipe_start(nni_sock *, nni_pipe *); extern int nni_sock_ep_add(nni_sock *, nni_ep *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); @@ -71,8 +70,13 @@ extern uint32_t nni_sock_flags(nni_sock *); // one of the only cases (the only?) where the socket core understands // the public data types. (Other solutions might exist, but they require // keeping extra state to support conversion between public and internal -// types.) -extern void nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb, void *); +// types.) The second argument is a mask of events for which the callback +// should be executed. +extern void nni_sock_set_pipe_cb(nni_sock *sock, int, nng_pipe_cb, void *); + +extern void nni_sock_run_pipe_cb(nni_sock *sock, int, uint32_t); + +extern bool nni_sock_closing(nni_sock *sock); // nni_ctx_open is used to open/create a new context structure. // Contexts are not supported by most protocols, but for those that do, |
