diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-09 19:05:28 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-09 19:05:28 -0700 |
| commit | 9feb54e9c7ab116ba566086a76604338f86e3bc3 (patch) | |
| tree | fb8f2aeaef74db92f02377d6e1045f5237b694a7 /src/core/socket.c | |
| parent | 052dad14137e60c24e6bf6e4551d7009bdeb19e9 (diff) | |
| download | nng-9feb54e9c7ab116ba566086a76604338f86e3bc3.tar.gz nng-9feb54e9c7ab116ba566086a76604338f86e3bc3.tar.bz2 nng-9feb54e9c7ab116ba566086a76604338f86e3bc3.zip | |
fixes #47 compat_reqttls fails sometimes
fixes #23 Restore the old idhash logic for sockets
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 314 |
1 files changed, 144 insertions, 170 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 240af252..46ed2100 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -14,9 +14,9 @@ // Socket implementation. -static nni_objhash *nni_socks = NULL; -static nni_list nni_sock_list; -static nni_mtx nni_sock_lk; +static nni_list nni_sock_list; +static nni_idhash *nni_sock_hash; +static nni_mtx nni_sock_lk; uint32_t nni_sock_id(nni_sock *s) @@ -42,52 +42,53 @@ int nni_sock_find(nni_sock **sockp, uint32_t id) { int rv; - nni_sock *sock; + nni_sock *s; if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_objhash_find(nni_socks, id, (void **) &sock)) != 0) { - return (rv); - } - nni_mtx_lock(&sock->s_mx); - if ((sock->s_closed) || (sock->s_data == NULL)) { - nni_objhash_unref(nni_socks, id); - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - nni_mtx_unlock(&sock->s_mx); - - if (sockp != NULL) { - *sockp = sock; + nni_mtx_lock(&nni_sock_lk); + if ((rv = nni_idhash_find(nni_sock_hash, id, (void **) &s)) == 0) { + if (s->s_closed) { + rv = NNG_ECLOSED; + } else { + s->s_refcnt++; + *sockp = s; + } } + nni_mtx_unlock(&nni_sock_lk); - return (0); + return (rv); } void -nni_sock_rele(nni_sock *sock) +nni_sock_rele(nni_sock *s) { - nni_objhash_unref(nni_socks, sock->s_id); + nni_mtx_lock(&s->s_mx); + s->s_refcnt--; + if (s->s_closing) { + nni_cv_wake(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); } static int nni_sock_pipe_start(nni_pipe *pipe) { - nni_sock *sock = pipe->p_sock; + nni_sock *s = pipe->p_sock; void * pdata = nni_pipe_get_proto_data(pipe); int rv; - NNI_ASSERT(sock != NULL); - if (sock->s_closing) { + NNI_ASSERT(s != NULL); + if (s->s_closing) { // We're closing, bail out. return (NNG_ECLOSED); } - if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) { + if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { // Peer protocol mismatch. return (NNG_EPROTO); } - if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) { // Protocol rejection for other reasons. // E.g. pair and already have active connected partner. return (rv); @@ -113,39 +114,38 @@ nni_sock_pipe_start_cb(void *arg) } int -nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. - nni_mtx_lock(&sock->s_mx); + nni_mtx_lock(&s->s_mx); nni_mtx_lock(&ep->ep_mtx); - if ((sock->s_closing) || (ep->ep_closed)) { + if ((s->s_closing) || (ep->ep_closed)) { nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); if (rv != 0) { nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (rv); } - rv = sock->s_pipe_ops.pipe_init( - &pipe->p_proto_data, pipe, sock->s_data); + rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data); if (rv != 0) { nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_lock(&sock->s_mx); + nni_mtx_lock(&s->s_mx); return (rv); } // Save the protocol destructor. - pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; - pipe->p_sock = sock; + pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini; + pipe->p_sock = s; pipe->p_ep = ep; - nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&s->s_pipes, pipe); nni_list_append(&ep->ep_pipes, pipe); // Start the initial negotiation I/O... @@ -159,7 +159,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) } nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (0); } @@ -305,92 +305,83 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify) NNI_FREE_STRUCT(notify); } -static void * -nni_sock_ctor(uint32_t id) +static void +nni_sock_destroy(nni_sock *s) { - int rv; - nni_sock *sock; - - if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { - return (NULL); - } - // s_protocol, s_peer, and s_flags undefined as yet. - sock->s_linger = 0; - sock->s_sndtimeo = -1; - sock->s_rcvtimeo = -1; - sock->s_closing = 0; - sock->s_reconn = NNI_SECOND; - sock->s_reconnmax = 0; - sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default - sock->s_id = id; - NNI_LIST_NODE_INIT(&sock->s_node); - - nni_pipe_sock_list_init(&sock->s_pipes); - - nni_ep_list_init(&sock->s_eps); - - sock->s_send_fd.sn_init = 0; - sock->s_recv_fd.sn_init = 0; - - if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) { - goto fail; + if (s == NULL) { + return; } - rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock); - if (rv != 0) { - goto fail; + // Close any open notification pipes. + if (s->s_recv_fd.sn_init) { + nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); } - rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock); - if (rv != 0) { - goto fail; + if (s->s_send_fd.sn_init) { + nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd); } - if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || - ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { - goto fail; + // The protocol needs to clean up its state. + if (s->s_data != NULL) { + s->s_sock_ops.sock_fini(s->s_data); } - return (sock); - -fail: - nni_ev_fini(&sock->s_send_ev); - nni_ev_fini(&sock->s_recv_ev); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (NULL); + nni_ev_fini(&s->s_send_ev); + nni_ev_fini(&s->s_recv_ev); + nni_msgq_fini(s->s_urq); + nni_msgq_fini(s->s_uwq); + nni_cv_fini(&s->s_cv); + nni_mtx_fini(&s->s_mx); + NNI_FREE_STRUCT(s); } -static void -nni_sock_dtor(void *ptr) +static int +nni_sock_create(nni_sock **sp, const nni_proto *proto) { - nni_sock *sock = ptr; - - // Close any open notification pipes. - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_close( - sock->s_recv_fd.sn_wfd, sock->s_recv_fd.sn_rfd); - } - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_close( - sock->s_send_fd.sn_wfd, sock->s_send_fd.sn_rfd); - } + int rv; + nni_sock *s; - // The protocol needs to clean up its state. - if (sock->s_data != NULL) { - sock->s_sock_ops.sock_fini(sock->s_data); + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + s->s_linger = 0; + s->s_sndtimeo = -1; + s->s_rcvtimeo = -1; + s->s_closing = 0; + s->s_reconn = NNI_SECOND; + s->s_reconnmax = 0; + s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default + s->s_id = 0; + s->s_refcnt = 0; + s->s_send_fd.sn_init = 0; + s->s_recv_fd.sn_init = 0; + s->s_self_id = proto->proto_self; + s->s_peer_id = proto->proto_peer; + s->s_flags = proto->proto_flags; + s->s_sock_ops = *proto->proto_sock_ops; + s->s_pipe_ops = *proto->proto_pipe_ops; + + NNI_ASSERT(s->s_sock_ops.sock_open != NULL); + NNI_ASSERT(s->s_sock_ops.sock_close != NULL); + + NNI_ASSERT(s->s_pipe_ops.pipe_start != NULL); + NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL); + + NNI_LIST_NODE_INIT(&s->s_node); + nni_pipe_sock_list_init(&s->s_pipes); + nni_ep_list_init(&s->s_eps); + + if (((rv = nni_mtx_init(&s->s_mx)) != 0) || + ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) || + ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) || + ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) || + ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || + ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || + ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) { + nni_sock_destroy(s); + return (rv); } - - nni_ev_fini(&sock->s_send_ev); - nni_ev_fini(&sock->s_recv_ev); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + *sp = s; + return (rv); } int @@ -399,10 +390,11 @@ nni_sock_sys_init(void) int rv; NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); - if (((rv = nni_objhash_init( - &nni_socks, nni_sock_ctor, nni_sock_dtor)) != 0) || + if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) { nni_sock_sys_fini(); + } else { + nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); } return (rv); } @@ -410,15 +402,14 @@ nni_sock_sys_init(void) void nni_sock_sys_fini(void) { - nni_objhash_fini(nni_socks); - nni_socks = NULL; + nni_idhash_fini(nni_sock_hash); nni_mtx_fini(&nni_sock_lk); } int nni_sock_open(nni_sock **sockp, const nni_proto *proto) { - nni_sock *sock; + nni_sock *s = NULL; int rv; uint32_t sockid; @@ -427,41 +418,23 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) return (NNG_ENOTSUP); } - if ((rv = nni_init()) != 0) { - return (rv); - } - - rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock); - if (rv != 0) { - return (rv); - } - - // We make a copy of the protocol operations. - sock->s_self_id = proto->proto_self; - sock->s_peer_id = proto->proto_peer; - sock->s_flags = proto->proto_flags; - sock->s_sock_ops = *proto->proto_sock_ops; - sock->s_pipe_ops = *proto->proto_pipe_ops; - - NNI_ASSERT(sock->s_sock_ops.sock_open != NULL); - NNI_ASSERT(sock->s_sock_ops.sock_close != NULL); - - NNI_ASSERT(sock->s_pipe_ops.pipe_start != NULL); - NNI_ASSERT(sock->s_pipe_ops.pipe_stop != NULL); - - if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) { - nni_objhash_unref(nni_socks, sockid); + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_create(&s, proto)) != 0)) { + nni_sock_destroy(s); return (rv); } - sock->s_sock_ops.sock_open(sock->s_data); - nni_mtx_lock(&nni_sock_lk); - nni_list_append(&nni_sock_list, sock); + if ((rv = nni_idhash_alloc(nni_sock_hash, &s->s_id, s)) != 0) { + nni_sock_destroy(s); + } else { + nni_list_append(&nni_sock_list, s); + s->s_sock_ops.sock_open(s->s_data); + *sockp = s; + } nni_mtx_unlock(&nni_sock_lk); - *sockp = sock; - return (0); + return (rv); } // nni_sock_shutdown shuts down the socket; after this point no further @@ -591,37 +564,39 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) // after this function is called, as the pointer may reference invalid // memory or other objects. void -nni_sock_close(nni_sock *sock) +nni_sock_close(nni_sock *s) { // Shutdown everything if not already done. This operation // is idempotent. - nni_sock_shutdown(sock); + nni_sock_shutdown(s); - nni_mtx_lock(&sock->s_mx); - if (sock->s_closed) { - nni_mtx_unlock(&sock->s_mx); + nni_mtx_lock(&nni_sock_lk); + if (s->s_closed) { + // Some other thread called close. All we need to do is + // drop our reference count. + nni_mtx_unlock(&nni_sock_lk); + nni_sock_rele(s); return; } - sock->s_closed = 1; - nni_mtx_unlock(&sock->s_mx); + s->s_closed = 1; + nni_idhash_remove(nni_sock_hash, s->s_id); - nni_mtx_lock(&nni_sock_lk); - nni_list_node_remove(&sock->s_node); - nni_mtx_unlock(&nni_sock_lk); + // We might have been removed from the list already, e.g. by + // nni_sock_closeall. This is idempotent. + nni_list_node_remove(&s->s_node); - // At this point nothing else should be referencing us. - // As with UNIX close, it is a gross error for the caller - // to have concurrent threads using this. We've taken care to - // ensure that any active consumers have been stopped, but if - // user code attempts to utilize the socket *after* this point, - // the results may be tragic. + nni_mtx_unlock(&nni_sock_lk); - // Unreference twice. First drops the reference our caller - // acquired to start the open, and the second (blocking) one - // is the reference created for us at socket creation. + // Wait for all other references to drop. Note that we + // have a reference already (from our caller). + nni_mtx_lock(&s->s_mx); + while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_eps))) { + nni_cv_wait(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); - nni_objhash_unref(nni_socks, sock->s_id); - nni_objhash_unref_wait(nni_socks, sock->s_id); + nni_sock_destroy(s); } void @@ -636,13 +611,12 @@ nni_sock_closeall(void) nni_mtx_unlock(&nni_sock_lk); return; } - id = s->s_id; + // Bump the reference count. The close call below will + // drop it. + s->s_refcnt++; nni_list_node_remove(&s->s_node); nni_mtx_unlock(&nni_sock_lk); - - if (nni_sock_find(&s, id) == 0) { - nni_sock_close(s); - } + nni_sock_close(s); } } |
