diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 113 |
1 files changed, 71 insertions, 42 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index f5acd941..66580300 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -81,12 +81,11 @@ struct nni_socket { nni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock - bool s_ctxwait; // Waiting for contexts to close. - - nni_notifyfd s_send_fd; - nni_notifyfd s_recv_fd; + bool s_closing; // Socket is closing + bool s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. + nng_pipe_cb s_pipe_cb; // User callback for pipe events. + void * s_pipe_cbarg; // Argument for pipe events. }; static void nni_ctx_destroy(nni_ctx *); @@ -435,21 +434,41 @@ nni_sock_rele(nni_sock *s) int nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe) { - void *pdata = nni_pipe_get_proto_data(pipe); - int rv; + void * pdata = nni_pipe_get_proto_data(pipe); + nng_pipe_cb cb; + int rv; NNI_ASSERT(s != NULL); nni_mtx_lock(&s->s_mx); - if (s->s_closing) { - // We're closing, bail out. - rv = NNG_ECLOSED; - } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { + if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { // Peer protocol mismatch. - rv = NNG_EPROTO; - } else { - // Protocol can reject for other reasons. - rv = s->s_pipe_ops.pipe_start(pdata); + nni_mtx_unlock(&s->s_mx); + return (NNG_EPROTO); + } + if ((cb = s->s_pipe_cb) != NULL) { + nng_pipe p; + void * arg = s->s_pipe_cbarg; + nni_mtx_unlock(&s->s_mx); + p.id = nni_pipe_id(pipe); + cb(p, NNG_PIPE_ADD, arg); + if (nni_pipe_closed(pipe)) { + return (NNG_ECLOSED); + } + nni_mtx_lock(&s->s_mx); } + if (s->s_closing) { + // We're closing, bail out. This has to be done after + // we have dropped the lock above in case the sock is closed + // while the user callback runs. + nni_mtx_unlock(&s->s_mx); + return (NNG_ECLOSED); + } + + // Protocol can reject for other reasons. + // This must be the last operation, until this point + // the protocol has not actually "seen" the pipe. + rv = s->s_pipe_ops.pipe_start(pdata); + nni_mtx_unlock(&s->s_mx); return (rv); } @@ -484,9 +503,18 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p) void nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) { - void *pdata; + void * pdata; + nng_pipe_cb cb; nni_mtx_lock(&sock->s_mx); + if ((cb = sock->s_pipe_cb) != NULL) { + void * arg = sock->s_pipe_cbarg; + nng_pipe p; + nni_mtx_unlock(&sock->s_mx); + p.id = nni_pipe_id(pipe); + cb(p, NNG_PIPE_REM, arg); + nni_mtx_lock(&sock->s_mx); + } pdata = nni_pipe_get_proto_data(pipe); if (pdata != NULL) { sock->s_pipe_ops.pipe_stop(pdata); @@ -507,14 +535,6 @@ nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; - // Close any open notification pipes. - if (s->s_recv_fd.sn_init) { - nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); - } - if (s->s_send_fd.sn_init) { - nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd); - } - // The protocol needs to clean up its state. if (s->s_data != NULL) { s->s_sock_ops.sock_fini(s->s_data); @@ -546,21 +566,21 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->s_sndtimeo = -1; - s->s_rcvtimeo = -1; - s->s_closing = 0; - s->s_reconn = NNI_SECOND; - s->s_reconnmax = 0; - s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default - s->s_id = 0; - s->s_refcnt = 0; - s->s_send_fd.sn_init = 0; - s->s_recv_fd.sn_init = 0; - s->s_self_id = proto->proto_self; - s->s_peer_id = proto->proto_peer; - s->s_flags = proto->proto_flags; - s->s_sock_ops = *proto->proto_sock_ops; - s->s_pipe_ops = *proto->proto_pipe_ops; + s->s_sndtimeo = -1; + s->s_rcvtimeo = -1; + s->s_closing = 0; + s->s_reconn = NNI_SECOND; + s->s_reconnmax = 0; + s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default + s->s_id = 0; + s->s_refcnt = 0; + s->s_self_id = proto->proto_self; + s->s_peer_id = proto->proto_peer; + s->s_flags = proto->proto_flags; + s->s_sock_ops = *proto->proto_sock_ops; + s->s_pipe_ops = *proto->proto_pipe_ops; + s->s_closed = false; + s->s_closing = false; if (proto->proto_ctx_ops != NULL) { s->s_ctx_ops = *proto->proto_ctx_ops; @@ -692,7 +712,7 @@ nni_sock_shutdown(nni_sock *sock) return (NNG_ECLOSED); } // Mark us closing, so no more EPs or changes can occur. - sock->s_closing = 1; + sock->s_closing = true; // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. @@ -800,7 +820,7 @@ nni_sock_close(nni_sock *s) nni_sock_rele(s); return; } - s->s_closed = 1; + s->s_closed = true; nni_idhash_remove(nni_sock_hash, s->s_id); // We might have been removed from the list already, e.g. by @@ -1157,6 +1177,15 @@ nni_sock_flags(nni_sock *sock) return (sock->s_flags); } +void +nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg) +{ + nni_mtx_lock(&sock->s_mx); + sock->s_pipe_cb = cb; + sock->s_pipe_cbarg = arg; + nni_mtx_unlock(&sock->s_mx); +} + int nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) { |
