From 9feb54e9c7ab116ba566086a76604338f86e3bc3 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 9 Aug 2017 19:05:28 -0700 Subject: fixes #47 compat_reqttls fails sometimes fixes #23 Restore the old idhash logic for sockets --- src/CMakeLists.txt | 2 - src/core/nng_impl.h | 4 +- src/core/objhash.c | 451 ---------------------------------------------------- src/core/objhash.h | 58 ------- src/core/socket.c | 314 +++++++++++++++++------------------- src/core/socket.h | 22 ++- src/nng_compat.c | 1 + 7 files changed, 156 insertions(+), 696 deletions(-) delete mode 100644 src/core/objhash.c delete mode 100644 src/core/objhash.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 31abc042..ad6c025e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,8 +53,6 @@ set (NNG_SOURCES core/msgqueue.c core/msgqueue.h core/nng_impl.h - core/objhash.c - core/objhash.h core/options.c core/options.h core/panic.c diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 8b76b036..8510dfb5 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -1,5 +1,6 @@ // -// Copyright 2016 Garrett D'Amore +// Copyright 2017 Garrett D'Amore +// Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -33,7 +34,6 @@ #include "core/list.h" #include "core/message.h" #include "core/msgqueue.h" -#include "core/objhash.h" #include "core/options.h" #include "core/panic.h" #include "core/protocol.h" diff --git a/src/core/objhash.c b/src/core/objhash.c deleted file mode 100644 index acdcd2c3..00000000 --- a/src/core/objhash.c +++ /dev/null @@ -1,451 +0,0 @@ -// -// Copyright 2016 Garrett D'Amore -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/objhash.h" -#include "core/nng_impl.h" - -#include - -// The details of the nni_objhash are "private". -struct nni_objhash { - size_t oh_cap; - size_t oh_count; - size_t oh_load; - size_t oh_minload; // considers placeholders - size_t oh_maxload; - uint32_t oh_minval; - uint32_t oh_maxval; - uint32_t oh_dynval; - nni_mtx oh_lock; - nni_cv oh_cv; - nni_objhash_node *oh_nodes; - nni_objhash_ctor oh_ctor; - nni_objhash_dtor oh_dtor; -}; - -struct nni_objhash_node { - uint32_t on_id; // the key - uint32_t on_skips; // indicates - uint32_t on_refcnt; // reference count - void * on_val; // pointer to user data -}; - -int -nni_objhash_init( - nni_objhash **ohp, nni_objhash_ctor ctor, nni_objhash_dtor dtor) -{ - nni_objhash *oh; - int rv; - - if ((ctor == NULL) || (dtor == NULL)) { - return (NNG_EINVAL); - } - - if ((oh = NNI_ALLOC_STRUCT(oh)) == NULL) { - return (NNG_ENOMEM); - } - - if ((rv = nni_mtx_init(&oh->oh_lock)) != 0) { - NNI_FREE_STRUCT(oh); - return (rv); - } - - if ((rv = nni_cv_init(&oh->oh_cv, &oh->oh_lock)) != 0) { - nni_mtx_fini(&oh->oh_lock); - NNI_FREE_STRUCT(oh); - return (rv); - } - - oh->oh_nodes = NULL; - oh->oh_count = 0; - oh->oh_load = 0; - oh->oh_cap = 0; - oh->oh_maxload = 0; - oh->oh_minload = 0; // never shrink below this - oh->oh_minval = 1; - oh->oh_maxval = 0x7fffffff; - oh->oh_dynval = - nni_random() % (oh->oh_maxval - oh->oh_minval) + oh->oh_minval; - oh->oh_ctor = ctor; - oh->oh_dtor = dtor; - *ohp = oh; - - return (0); -} - -void -nni_objhash_fini(nni_objhash *oh) -{ - if (oh == NULL) { - return; - } - if (oh->oh_nodes != NULL) { - nni_free(oh->oh_nodes, oh->oh_cap * sizeof(nni_objhash_node)); - oh->oh_nodes = NULL; - oh->oh_cap = oh->oh_count = 0; - oh->oh_load = oh->oh_minload = oh->oh_maxload = 0; - } - nni_cv_fini(&oh->oh_cv); - nni_mtx_fini(&oh->oh_lock); - NNI_FREE_STRUCT(oh); -} - -// Inspired by Python dict implementation. This probe will visit every -// cell. We always hash consecutively assigned IDs. -#define NNI_OBJHASH_NEXTPROBE(h, j) ((((j) *5) + 1) & (h->oh_cap - 1)) - -// nni_objhash_find_node finds the object hash node associated with a given id. -// The object hash lock must be held by the caller. -static nni_objhash_node * -nni_objhash_find_node(nni_objhash *oh, uint32_t id) -{ - uint32_t index; - nni_objhash_node *node; - - if (oh->oh_count == 0) { - return (NULL); - } - - index = id & (oh->oh_cap - 1); - - for (;;) { - node = &oh->oh_nodes[index]; - - if ((node->on_val == NULL) && (node->on_skips == 0)) { - return (NULL); - } - if (node->on_id == id) { - return (node); - } - index = NNI_OBJHASH_NEXTPROBE(oh, index); - } -} - -// nni_objhash_find looks up the object, and bumps the reference on it. -// The caller should drop the reference when done by calling nni_objhash_unref. -int -nni_objhash_find(nni_objhash *oh, uint32_t id, void **valp) -{ - nni_objhash_node *node; - int rv; - - nni_mtx_lock(&oh->oh_lock); - node = nni_objhash_find_node(oh, id); - - if ((node != NULL) && (node->on_val != NULL)) { - if (valp != NULL) { - *valp = node->on_val; - } - node->on_refcnt++; - rv = 0; - } else { - rv = NNG_ENOENT; - } - nni_mtx_unlock(&oh->oh_lock); - return (rv); -} - -// Resize the object hash. This is called internally with the lock -// for the object hash held. Grow indicates that this is being called -// from a function that intends to add data, so extra space is needed. -static int -nni_objhash_resize(nni_objhash *oh, int grow) -{ - size_t newsize; - size_t oldsize; - nni_objhash_node *newnodes; - nni_objhash_node *oldnodes; - uint32_t i; - - if ((!grow) && (oh->oh_count == 0) && (oh->oh_cap != 0)) { - // Table is empty, and we are unrefing. Lets reclaim the - // space. Note that this means that allocations which - // fluctuate between one and zero are going to bang on the - // allocator a bit. Since such cases should not be very - // performance sensitive, this is probably okay. - nni_free(oh->oh_nodes, oh->oh_cap * sizeof(nni_objhash_node)); - oh->oh_cap = 0; - oh->oh_nodes = NULL; - oh->oh_minload = 0; - oh->oh_maxload = 0; - return (0); - } - - if ((oh->oh_load < oh->oh_maxload) && - (oh->oh_load >= oh->oh_minload)) { - // No resize needed. - return (0); - } - - oldsize = oh->oh_cap; - newsize = oh->oh_cap; - - newsize = 8; - while (newsize < (oh->oh_count * 2)) { - newsize *= 2; - } - - oldnodes = oh->oh_nodes; - newnodes = nni_alloc(sizeof(nni_objhash_node) * newsize); - if (newnodes == NULL) { - return (NNG_ENOMEM); - } - memset(newnodes, 0, sizeof(nni_objhash_node) * newsize); - - oh->oh_nodes = newnodes; - oh->oh_cap = newsize; - if (newsize > 8) { - oh->oh_minload = newsize / 8; - oh->oh_maxload = newsize * 2 / 3; - } else { - oh->oh_minload = 0; - oh->oh_maxload = 5; - } - for (i = 0; i < oldsize; i++) { - uint32_t index; - if (oldnodes[i].on_val == NULL) { - continue; - } - index = oldnodes[i].on_id & (newsize - 1); - for (;;) { - if (newnodes[index].on_val == NULL) { - oh->oh_load++; - newnodes[index].on_val = oldnodes[i].on_val; - newnodes[index].on_id = oldnodes[i].on_id; - newnodes[index].on_refcnt = - oldnodes[i].on_refcnt; - break; - } - newnodes[index].on_skips++; - index = NNI_OBJHASH_NEXTPROBE(oh, index); - } - } - if (oldsize != 0) { - nni_free(oldnodes, sizeof(nni_objhash_node) * oldsize); - } - return (0); -} - -void -nni_objhash_unref(nni_objhash *oh, uint32_t id) -{ - void * val; - uint32_t index; - nni_objhash_node *node; - nni_objhash_dtor dtor; - - nni_mtx_lock(&oh->oh_lock); - - dtor = oh->oh_dtor; - - node = nni_objhash_find_node(oh, id); - NNI_ASSERT(node != NULL); - val = node->on_val; - - NNI_ASSERT(node->on_refcnt > 0); - NNI_ASSERT(node->on_refcnt < 1000000); // reasonable limit, debug only - node->on_refcnt--; - - // If we have further references, we are done, except that if we have - // only one remaining reference, we might want to wake up another - // thread blocked in nni_objhash_unref_wait. - if (node->on_refcnt != 0) { - if (node->on_refcnt == 1) { - nni_cv_wake(&oh->oh_cv); - } - nni_mtx_unlock(&oh->oh_lock); - return; - } - - NNI_ASSERT(node->on_refcnt == 0); - index = id & (oh->oh_cap - 1); - for (;;) { - node = &oh->oh_nodes[index]; - if (node->on_id == id) { - break; - } - - NNI_ASSERT(node->on_skips != 0); - node->on_skips--; - if ((node->on_val == NULL) && (node->on_skips == 0)) { - oh->oh_load--; - } - index = NNI_OBJHASH_NEXTPROBE(oh, index); - } - - NNI_ASSERT(node->on_val != NULL); - NNI_ASSERT(node->on_refcnt == 0); - NNI_ASSERT(node->on_id == id); - - node->on_val = NULL; - oh->oh_count--; - if (node->on_skips == 0) { - oh->oh_load--; - } - // Reclaim the buffer if we want, but preserve the limits. - nni_objhash_resize(oh, 0); - - nni_mtx_unlock(&oh->oh_lock); - - // Now run the destructor. - dtor(val); -} - -void -nni_objhash_unref_wait(nni_objhash *oh, uint32_t id) -{ - void * val; - uint32_t index; - nni_objhash_node *node; - nni_objhash_dtor dtor; - - nni_mtx_lock(&oh->oh_lock); - - dtor = oh->oh_dtor; - - node = nni_objhash_find_node(oh, id); - NNI_ASSERT(node != NULL); - NNI_ASSERT(node->on_refcnt > 0); - val = node->on_val; - - while (node->on_refcnt != 1) { - nni_cv_wait(&oh->oh_cv); - // If the table resizes, it can invalidate our old node. - node = nni_objhash_find_node(oh, id); - } - node->on_refcnt--; - NNI_ASSERT(node->on_refcnt == 0); - - index = id & (oh->oh_cap - 1); - for (;;) { - node = &oh->oh_nodes[index]; - if (node->on_id == id) { - break; - } - - NNI_ASSERT(node->on_skips != 0); - node->on_skips--; - if ((node->on_val == NULL) && (node->on_skips == 0)) { - oh->oh_load--; - } - index = NNI_OBJHASH_NEXTPROBE(oh, index); - } - - NNI_ASSERT(node->on_val != NULL); - NNI_ASSERT(node->on_refcnt == 0); - NNI_ASSERT(node->on_id == id); - - node->on_val = NULL; - oh->oh_count--; - if (node->on_skips == 0) { - oh->oh_load--; - } - // Reclaim the buffer if we want, but preserve the limits. - nni_objhash_resize(oh, 0); - - nni_mtx_unlock(&oh->oh_lock); - - // Now run the destructor. - dtor(val); -} - -// Allocate a new object hash entry. Note that this will execute the -// constructor with the object hash lock held. Consequently, code that -// runs the constructor must not run for long periods of time, since that -// can block all other uses of the object hash. -int -nni_objhash_alloc(nni_objhash *oh, uint32_t *idp, void **valp) -{ - uint32_t id; - uint32_t index; - nni_objhash_node *node; - - nni_mtx_lock(&oh->oh_lock); - - if (oh->oh_count > (oh->oh_maxval - oh->oh_minval)) { - // Really more like ENOSPC.. the table is filled to max. - nni_mtx_unlock(&oh->oh_lock); - return (NNG_ENOMEM); - } - - nni_objhash_resize(oh, 1); - - for (;;) { - id = oh->oh_dynval; - oh->oh_dynval++; - if ((oh->oh_dynval > oh->oh_maxval) || - (oh->oh_dynval < oh->oh_minval)) { - oh->oh_dynval = oh->oh_minval; - } - - if (nni_objhash_find_node(oh, id) == NULL) { - // We can use this ID, great! - break; - } - } - - // We know the ID we're going to use, but we have to walk again, - // because we need to note whether we had to skip (probe), and mark - // them so they don't get nuked along the way. - // check to see if anything is located there. - index = id & (oh->oh_cap - 1); - for (;;) { - node = &oh->oh_nodes[index]; - if (node->on_val == NULL) { - break; - } - NNI_ASSERT(node->on_id != id); - node->on_skips++; - index = NNI_OBJHASH_NEXTPROBE(oh, index); - } - - NNI_ASSERT(node->on_refcnt == 0); - node->on_id = id; - node->on_refcnt++; - - node->on_val = oh->oh_ctor(id); - - if (node->on_val == NULL) { - // Constructor failed; walk *again* to undo the skip - // increments. - node->on_refcnt--; - index = id & (oh->oh_cap - 1); - for (;;) { - node = &oh->oh_nodes[index]; - if (node->on_val == NULL) { - NNI_ASSERT(node->on_id == id); - break; - } - NNI_ASSERT(node->on_skips != 0); - node->on_skips--; - index = NNI_OBJHASH_NEXTPROBE(oh, index); - } - - nni_mtx_unlock(&oh->oh_lock); - return (NNG_ENOMEM); // no other return from ctor - } - - oh->oh_count++; - if (node->on_skips == 0) { - oh->oh_load++; - } - *valp = node->on_val; - *idp = id; - - NNI_ASSERT(node->on_refcnt == 1); - - nni_mtx_unlock(&oh->oh_lock); - return (0); -} - -size_t -nni_objhash_count(nni_objhash *oh) -{ - return (oh->oh_count); -} diff --git a/src/core/objhash.h b/src/core/objhash.h deleted file mode 100644 index 97f666b8..00000000 --- a/src/core/objhash.h +++ /dev/null @@ -1,58 +0,0 @@ -// -// Copyright 2016 Garrett D'Amore -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_OBJHASH_H -#define CORE_OBJHASH_H - -#include "core/nng_impl.h" - -// Object Hash. This is a generic object manager, which lets us deal -// with reference counting of objects, and provides a unique ID for -// objects that will not generally be reused. Object Hash manages it's -// own locking. Object IDs start from a random positive value, and -// generally increment. The ID assigned to an object will always be -// positive. -// -// Similar to our linked lists, consumers must supply a node structure -// in their object. The implementation uses this for reference counting -// and so forth. -// -// In terms of implementation, the underlying hash uses open addressing, -// combined with an improved probe (taken from Python) to avoid collisions. -// Our algorithm just uses the low order bits, and we use table sizes that -// are powers of two to make the modulo dirt cheap. -// - -typedef struct nni_objhash nni_objhash; -typedef struct nni_objhash_node nni_objhash_node; - -// Object constructor function. This is expected to allocate an object. -// It takes the generated object ID as an argument, which it can store on -// the object itself. It should return NULL if resources cannot be allocated; -// there are no other valid reasons for this to fail. -typedef void *(*nni_objhash_ctor)(uint32_t); - -// Object destructor function. This should release any resources and perform -// any other deinitialization. -typedef void (*nni_objhash_dtor)(void *); - -// nni_objhash_init initializes the object hash; the constructor and and -// destructor functions are supplied. -extern int nni_objhash_init( - nni_objhash **, nni_objhash_ctor, nni_objhash_dtor); - -extern void nni_objhash_fini(nni_objhash *); - -extern int nni_objhash_find(nni_objhash *, uint32_t, void **); -extern void nni_objhash_unref(nni_objhash *, uint32_t); -extern void nni_objhash_unref_wait(nni_objhash *, uint32_t); -extern int nni_objhash_alloc(nni_objhash *, uint32_t *, void **); -extern size_t nni_objhash_count(nni_objhash *); - -#endif // CORE_OBJHASH_H diff --git a/src/core/socket.c b/src/core/socket.c index 240af252..46ed2100 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -14,9 +14,9 @@ // Socket implementation. -static nni_objhash *nni_socks = NULL; -static nni_list nni_sock_list; -static nni_mtx nni_sock_lk; +static nni_list nni_sock_list; +static nni_idhash *nni_sock_hash; +static nni_mtx nni_sock_lk; uint32_t nni_sock_id(nni_sock *s) @@ -42,52 +42,53 @@ int nni_sock_find(nni_sock **sockp, uint32_t id) { int rv; - nni_sock *sock; + nni_sock *s; if ((rv = nni_init()) != 0) { return (rv); } - if ((rv = nni_objhash_find(nni_socks, id, (void **) &sock)) != 0) { - return (rv); - } - nni_mtx_lock(&sock->s_mx); - if ((sock->s_closed) || (sock->s_data == NULL)) { - nni_objhash_unref(nni_socks, id); - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - nni_mtx_unlock(&sock->s_mx); - - if (sockp != NULL) { - *sockp = sock; + nni_mtx_lock(&nni_sock_lk); + if ((rv = nni_idhash_find(nni_sock_hash, id, (void **) &s)) == 0) { + if (s->s_closed) { + rv = NNG_ECLOSED; + } else { + s->s_refcnt++; + *sockp = s; + } } + nni_mtx_unlock(&nni_sock_lk); - return (0); + return (rv); } void -nni_sock_rele(nni_sock *sock) +nni_sock_rele(nni_sock *s) { - nni_objhash_unref(nni_socks, sock->s_id); + nni_mtx_lock(&s->s_mx); + s->s_refcnt--; + if (s->s_closing) { + nni_cv_wake(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); } static int nni_sock_pipe_start(nni_pipe *pipe) { - nni_sock *sock = pipe->p_sock; + nni_sock *s = pipe->p_sock; void * pdata = nni_pipe_get_proto_data(pipe); int rv; - NNI_ASSERT(sock != NULL); - if (sock->s_closing) { + NNI_ASSERT(s != NULL); + if (s->s_closing) { // We're closing, bail out. return (NNG_ECLOSED); } - if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) { + if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) { // Peer protocol mismatch. return (NNG_EPROTO); } - if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) { + if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) { // Protocol rejection for other reasons. // E.g. pair and already have active connected partner. return (rv); @@ -113,39 +114,38 @@ nni_sock_pipe_start_cb(void *arg) } int -nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) +nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe) { int rv; // Initialize protocol pipe data. - nni_mtx_lock(&sock->s_mx); + nni_mtx_lock(&s->s_mx); nni_mtx_lock(&ep->ep_mtx); - if ((sock->s_closing) || (ep->ep_closed)) { + if ((s->s_closing) || (ep->ep_closed)) { nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe); if (rv != 0) { nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (rv); } - rv = sock->s_pipe_ops.pipe_init( - &pipe->p_proto_data, pipe, sock->s_data); + rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data); if (rv != 0) { nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_lock(&sock->s_mx); + nni_mtx_lock(&s->s_mx); return (rv); } // Save the protocol destructor. - pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini; - pipe->p_sock = sock; + pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini; + pipe->p_sock = s; pipe->p_ep = ep; - nni_list_append(&sock->s_pipes, pipe); + nni_list_append(&s->s_pipes, pipe); nni_list_append(&ep->ep_pipes, pipe); // Start the initial negotiation I/O... @@ -159,7 +159,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe) } nni_mtx_unlock(&ep->ep_mtx); - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (0); } @@ -305,92 +305,83 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify) NNI_FREE_STRUCT(notify); } -static void * -nni_sock_ctor(uint32_t id) +static void +nni_sock_destroy(nni_sock *s) { - int rv; - nni_sock *sock; - - if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { - return (NULL); - } - // s_protocol, s_peer, and s_flags undefined as yet. - sock->s_linger = 0; - sock->s_sndtimeo = -1; - sock->s_rcvtimeo = -1; - sock->s_closing = 0; - sock->s_reconn = NNI_SECOND; - sock->s_reconnmax = 0; - sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default - sock->s_id = id; - NNI_LIST_NODE_INIT(&sock->s_node); - - nni_pipe_sock_list_init(&sock->s_pipes); - - nni_ep_list_init(&sock->s_eps); - - sock->s_send_fd.sn_init = 0; - sock->s_recv_fd.sn_init = 0; - - if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) { - goto fail; + if (s == NULL) { + return; } - rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock); - if (rv != 0) { - goto fail; + // Close any open notification pipes. + if (s->s_recv_fd.sn_init) { + nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); } - rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock); - if (rv != 0) { - goto fail; + if (s->s_send_fd.sn_init) { + nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd); } - if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || - ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { - goto fail; + // The protocol needs to clean up its state. + if (s->s_data != NULL) { + s->s_sock_ops.sock_fini(s->s_data); } - return (sock); - -fail: - nni_ev_fini(&sock->s_send_ev); - nni_ev_fini(&sock->s_recv_ev); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (NULL); + nni_ev_fini(&s->s_send_ev); + nni_ev_fini(&s->s_recv_ev); + nni_msgq_fini(s->s_urq); + nni_msgq_fini(s->s_uwq); + nni_cv_fini(&s->s_cv); + nni_mtx_fini(&s->s_mx); + NNI_FREE_STRUCT(s); } -static void -nni_sock_dtor(void *ptr) +static int +nni_sock_create(nni_sock **sp, const nni_proto *proto) { - nni_sock *sock = ptr; - - // Close any open notification pipes. - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_close( - sock->s_recv_fd.sn_wfd, sock->s_recv_fd.sn_rfd); - } - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_close( - sock->s_send_fd.sn_wfd, sock->s_send_fd.sn_rfd); - } + int rv; + nni_sock *s; - // The protocol needs to clean up its state. - if (sock->s_data != NULL) { - sock->s_sock_ops.sock_fini(sock->s_data); + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + s->s_linger = 0; + s->s_sndtimeo = -1; + s->s_rcvtimeo = -1; + s->s_closing = 0; + s->s_reconn = NNI_SECOND; + s->s_reconnmax = 0; + s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default + s->s_id = 0; + s->s_refcnt = 0; + s->s_send_fd.sn_init = 0; + s->s_recv_fd.sn_init = 0; + s->s_self_id = proto->proto_self; + s->s_peer_id = proto->proto_peer; + s->s_flags = proto->proto_flags; + s->s_sock_ops = *proto->proto_sock_ops; + s->s_pipe_ops = *proto->proto_pipe_ops; + + NNI_ASSERT(s->s_sock_ops.sock_open != NULL); + NNI_ASSERT(s->s_sock_ops.sock_close != NULL); + + NNI_ASSERT(s->s_pipe_ops.pipe_start != NULL); + NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL); + + NNI_LIST_NODE_INIT(&s->s_node); + nni_pipe_sock_list_init(&s->s_pipes); + nni_ep_list_init(&s->s_eps); + + if (((rv = nni_mtx_init(&s->s_mx)) != 0) || + ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) || + ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) || + ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) || + ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || + ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || + ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) { + nni_sock_destroy(s); + return (rv); } - - nni_ev_fini(&sock->s_send_ev); - nni_ev_fini(&sock->s_recv_ev); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + *sp = s; + return (rv); } int @@ -399,10 +390,11 @@ nni_sock_sys_init(void) int rv; NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); - if (((rv = nni_objhash_init( - &nni_socks, nni_sock_ctor, nni_sock_dtor)) != 0) || + if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) { nni_sock_sys_fini(); + } else { + nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); } return (rv); } @@ -410,15 +402,14 @@ nni_sock_sys_init(void) void nni_sock_sys_fini(void) { - nni_objhash_fini(nni_socks); - nni_socks = NULL; + nni_idhash_fini(nni_sock_hash); nni_mtx_fini(&nni_sock_lk); } int nni_sock_open(nni_sock **sockp, const nni_proto *proto) { - nni_sock *sock; + nni_sock *s = NULL; int rv; uint32_t sockid; @@ -427,41 +418,23 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) return (NNG_ENOTSUP); } - if ((rv = nni_init()) != 0) { - return (rv); - } - - rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock); - if (rv != 0) { - return (rv); - } - - // We make a copy of the protocol operations. - sock->s_self_id = proto->proto_self; - sock->s_peer_id = proto->proto_peer; - sock->s_flags = proto->proto_flags; - sock->s_sock_ops = *proto->proto_sock_ops; - sock->s_pipe_ops = *proto->proto_pipe_ops; - - NNI_ASSERT(sock->s_sock_ops.sock_open != NULL); - NNI_ASSERT(sock->s_sock_ops.sock_close != NULL); - - NNI_ASSERT(sock->s_pipe_ops.pipe_start != NULL); - NNI_ASSERT(sock->s_pipe_ops.pipe_stop != NULL); - - if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) { - nni_objhash_unref(nni_socks, sockid); + if (((rv = nni_init()) != 0) || + ((rv = nni_sock_create(&s, proto)) != 0)) { + nni_sock_destroy(s); return (rv); } - sock->s_sock_ops.sock_open(sock->s_data); - nni_mtx_lock(&nni_sock_lk); - nni_list_append(&nni_sock_list, sock); + if ((rv = nni_idhash_alloc(nni_sock_hash, &s->s_id, s)) != 0) { + nni_sock_destroy(s); + } else { + nni_list_append(&nni_sock_list, s); + s->s_sock_ops.sock_open(s->s_data); + *sockp = s; + } nni_mtx_unlock(&nni_sock_lk); - *sockp = sock; - return (0); + return (rv); } // nni_sock_shutdown shuts down the socket; after this point no further @@ -591,37 +564,39 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) // after this function is called, as the pointer may reference invalid // memory or other objects. void -nni_sock_close(nni_sock *sock) +nni_sock_close(nni_sock *s) { // Shutdown everything if not already done. This operation // is idempotent. - nni_sock_shutdown(sock); + nni_sock_shutdown(s); - nni_mtx_lock(&sock->s_mx); - if (sock->s_closed) { - nni_mtx_unlock(&sock->s_mx); + nni_mtx_lock(&nni_sock_lk); + if (s->s_closed) { + // Some other thread called close. All we need to do is + // drop our reference count. + nni_mtx_unlock(&nni_sock_lk); + nni_sock_rele(s); return; } - sock->s_closed = 1; - nni_mtx_unlock(&sock->s_mx); + s->s_closed = 1; + nni_idhash_remove(nni_sock_hash, s->s_id); - nni_mtx_lock(&nni_sock_lk); - nni_list_node_remove(&sock->s_node); - nni_mtx_unlock(&nni_sock_lk); + // We might have been removed from the list already, e.g. by + // nni_sock_closeall. This is idempotent. + nni_list_node_remove(&s->s_node); - // At this point nothing else should be referencing us. - // As with UNIX close, it is a gross error for the caller - // to have concurrent threads using this. We've taken care to - // ensure that any active consumers have been stopped, but if - // user code attempts to utilize the socket *after* this point, - // the results may be tragic. + nni_mtx_unlock(&nni_sock_lk); - // Unreference twice. First drops the reference our caller - // acquired to start the open, and the second (blocking) one - // is the reference created for us at socket creation. + // Wait for all other references to drop. Note that we + // have a reference already (from our caller). + nni_mtx_lock(&s->s_mx); + while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) || + (!nni_list_empty(&s->s_eps))) { + nni_cv_wait(&s->s_cv); + } + nni_mtx_unlock(&s->s_mx); - nni_objhash_unref(nni_socks, sock->s_id); - nni_objhash_unref_wait(nni_socks, sock->s_id); + nni_sock_destroy(s); } void @@ -636,13 +611,12 @@ nni_sock_closeall(void) nni_mtx_unlock(&nni_sock_lk); return; } - id = s->s_id; + // Bump the reference count. The close call below will + // drop it. + s->s_refcnt++; nni_list_node_remove(&s->s_node); nni_mtx_unlock(&nni_sock_lk); - - if (nni_sock_find(&s, id) == 0) { - nni_sock_close(s); - } + nni_sock_close(s); } } diff --git a/src/core/socket.h b/src/core/socket.h index 6824b04b..76ebca12 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -15,41 +15,39 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. struct nni_socket { - nni_mtx s_mx; - nni_cv s_cv; + nni_list_node s_node; + nni_mtx s_mx; + nni_cv s_cv; + nni_cv s_close_cv; uint32_t s_id; + uint32_t s_flags; + unsigned s_refcnt; // protected by global lock + void * s_data; // Protocol private nni_msgq *s_uwq; // Upper write queue nni_msgq *s_urq; // Upper read queue - nni_list_node s_node; - nni_proto_id s_self_id; nni_proto_id s_peer_id; - uint32_t s_flags; - nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; - void *s_data; // Protocol private - // XXX: options nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout nni_duration s_reconn; // reconnect time nni_duration s_reconnmax; // max reconnect time + size_t s_rcvmaxsz; // maximum receive size nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes - size_t s_rcvmaxsz; // maximum receive size - int s_ep_pend; // EP dial/listen in progress int s_closing; // Socket is closing - int s_closed; // Socket closed + int s_closed; // Socket closed, protected by global lock int s_besteffort; // Best effort mode delivery int s_senderr; // Protocol state machine use int s_recverr; // Protocol state machine use @@ -59,8 +57,6 @@ struct nni_socket { nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; - - uint32_t s_nextid; // Next Pipe ID. }; extern int nni_sock_sys_init(void); diff --git a/src/nng_compat.c b/src/nng_compat.c index 34fc6553..e03ff42d 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -41,6 +41,7 @@ static struct { { NNG_EMSGSIZE, EMSGSIZE }, { NNG_ECONNABORTED, ECONNABORTED }, { NNG_ECONNRESET, ECONNRESET }, + { NNG_ECANCELED, EBADF }, { 0, 0 }, // clang-format on }; -- cgit v1.2.3-70-g09d2