aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/init.c8
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/objhash.c119
-rw-r--r--src/core/objhash.h2
-rw-r--r--src/core/socket.c300
-rw-r--r--src/core/socket.h6
6 files changed, 256 insertions, 180 deletions
diff --git a/src/core/init.c b/src/core/init.c
index 8dbe50db..4d73e897 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -38,7 +38,14 @@ nni_init_helper(void)
nni_taskq_sys_fini();
return (rv);
}
+ if ((rv = nni_sock_sys_init()) != 0) {
+ nni_random_sys_fini();
+ nni_timer_sys_fini();
+ nni_taskq_sys_fini();
+ return (rv);
+ }
if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) {
+ nni_sock_sys_fini();
nni_random_sys_fini();
nni_timer_sys_fini();
nni_taskq_sys_fini();
@@ -48,6 +55,7 @@ nni_init_helper(void)
((rv = nni_idhash_init(&nni_pipes_x)) != 0) ||
((rv = nni_idhash_init(&nni_sockets_x)) != 0)) {
nni_mtx_fini(&nni_idlock_x);
+ nni_sock_sys_fini();
nni_random_sys_fini();
nni_timer_sys_fini();
nni_taskq_sys_fini();
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 2a72766c..291b5e28 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -33,6 +33,7 @@
#include "core/list.h"
#include "core/message.h"
#include "core/msgqueue.h"
+#include "core/objhash.h"
#include "core/options.h"
#include "core/panic.h"
#include "core/protocol.h"
diff --git a/src/core/objhash.c b/src/core/objhash.c
index ccc57c39..9529ac30 100644
--- a/src/core/objhash.c
+++ b/src/core/objhash.c
@@ -23,6 +23,7 @@ struct nni_objhash {
uint32_t oh_maxval;
uint32_t oh_dynval;
nni_mtx oh_lock;
+ nni_cv oh_cv;
nni_objhash_node * oh_nodes;
nni_objhash_ctor oh_ctor;
nni_objhash_dtor oh_dtor;
@@ -55,6 +56,12 @@ nni_objhash_init(nni_objhash **ohp, nni_objhash_ctor ctor,
return (rv);
}
+ if ((rv = nni_cv_init(&oh->oh_cv, &oh->oh_lock)) != 0) {
+ nni_mtx_fini(&oh->oh_lock);
+ NNI_FREE_STRUCT(oh);
+ return (rv);
+ }
+
oh->oh_nodes = NULL;
oh->oh_count = 0;
oh->oh_load = 0;
@@ -64,6 +71,8 @@ nni_objhash_init(nni_objhash **ohp, nni_objhash_ctor ctor,
oh->oh_minval = 1;
oh->oh_maxval = 0x7fffffff;
oh->oh_dynval = nni_random();
+ oh->oh_ctor = ctor;
+ oh->oh_dtor = dtor;
*ohp = oh;
return (0);
@@ -79,27 +88,12 @@ nni_objhash_fini(nni_objhash *oh)
oh->oh_cap = oh->oh_count = 0;
oh->oh_load = oh->oh_minload = oh->oh_maxload = 0;
}
+ nni_cv_fini(&oh->oh_cv);
nni_mtx_fini(&oh->oh_lock);
NNI_FREE_STRUCT(oh);
}
-void
-nni_objhash_reclaim(nni_objhash *oh)
-{
- nni_mtx_lock(&oh->oh_lock);
- // Reclaim the buffer if we want, but preserve the limits.
- if ((oh->oh_count == 0) && (oh->oh_cap != 0)) {
- nni_free(oh->oh_nodes, oh->oh_cap * sizeof (nni_objhash_node));
- oh->oh_cap = 0;
- oh->oh_nodes = NULL;
- oh->oh_minload = 0;
- oh->oh_maxload = 0;
- }
- nni_mtx_unlock(&oh->oh_lock);
-}
-
-
// Inspired by Python dict implementation. This probe will visit every
// cell. We always hash consecutively assigned IDs.
#define NNI_OBJHASH_NEXTPROBE(h, j) \
@@ -159,9 +153,10 @@ nni_objhash_find(nni_objhash *oh, uint32_t id, void **valp)
// Resize the object hash. This is called internally with the lock
-// for the object hash held.
+// for the object hash held. Grow indicates that this is being called
+// from a function that intends to add data, so extra space is needed.
static int
-nni_objhash_resize(nni_objhash *oh)
+nni_objhash_resize(nni_objhash *oh, int grow)
{
size_t newsize;
size_t oldsize;
@@ -169,6 +164,20 @@ nni_objhash_resize(nni_objhash *oh)
nni_objhash_node *oldnodes;
uint32_t i;
+ if ((!grow) && (oh->oh_count == 0) && (oh->oh_cap != 0)) {
+ // Table is empty, and we are unrefing. Lets reclaim the
+ // space. Note that this means that allocations which
+ // fluctuate between one and zero are going to bang on the
+ // allocator a bit. Since such cases should not be very
+ // performance sensitive, this is probably okay.
+ nni_free(oh->oh_nodes, oh->oh_cap * sizeof (nni_objhash_node));
+ oh->oh_cap = 0;
+ oh->oh_nodes = NULL;
+ oh->oh_minload = 0;
+ oh->oh_maxload = 0;
+ return (0);
+ }
+
if ((oh->oh_load < oh->oh_maxload) && (oh->oh_load >= oh->oh_minload)) {
// No resize needed.
return (0);
@@ -241,6 +250,9 @@ nni_objhash_unref(nni_objhash *oh, uint32_t id)
node->on_refcnt--;
if (node->on_refcnt != 0) {
+ if (node->on_refcnt == 1) {
+ nni_cv_wake(&oh->oh_cv);
+ }
// Still busy/referenced?
nni_mtx_unlock(&oh->oh_lock);
return;
@@ -270,8 +282,72 @@ nni_objhash_unref(nni_objhash *oh, uint32_t id)
if (node->on_skips == 0) {
oh->oh_load--;
}
+ // Reclaim the buffer if we want, but preserve the limits.
+ nni_objhash_resize(oh, 0);
+
+ nni_mtx_unlock(&oh->oh_lock);
+
+ // Now run the destructor.
+ dtor(val);
+}
+
+
+void
+nni_objhash_unref_wait(nni_objhash *oh, uint32_t id)
+{
+ int rv;
+ void *val;
+ uint32_t index;
+ nni_objhash_node *node;
+ nni_objhash_dtor dtor;
+
+ nni_mtx_lock(&oh->oh_lock);
+
+ dtor = oh->oh_dtor;
+
+ node = nni_objhash_find_node(oh, id);
+ NNI_ASSERT(node != NULL);
+ val = node->on_val;
+
+ while (node->on_refcnt != 1) {
+ nni_cv_wait(&oh->oh_cv);
+ }
+ node->on_refcnt--;
+ if (node->on_refcnt != 0) {
+ if (node->on_refcnt == 1) {
+ nni_cv_wake(&oh->oh_cv);
+ }
+ // Still busy/referenced?
+ nni_mtx_unlock(&oh->oh_lock);
+ return;
+ }
- nni_objhash_resize(oh);
+ index = id & (oh->oh_cap - 1);
+ for (;;) {
+ node = &oh->oh_nodes[index];
+ if (node->on_id == id) {
+ break;
+ }
+
+ NNI_ASSERT(node->on_skips != 0);
+ node->on_skips--;
+ if ((node->on_val == NULL) && (node->on_skips == 0)) {
+ oh->oh_load--;
+ }
+ index = NNI_OBJHASH_NEXTPROBE(oh, index);
+ }
+
+ NNI_ASSERT(node->on_val != NULL);
+ NNI_ASSERT(node->on_refcnt == 0);
+ NNI_ASSERT(node->on_id == id);
+
+ node->on_val = NULL;
+ oh->oh_count--;
+ if (node->on_skips == 0) {
+ oh->oh_load--;
+ }
+ // Reclaim the buffer if we want, but preserve the limits.
+ nni_objhash_resize(oh, 0);
nni_mtx_unlock(&oh->oh_lock);
@@ -299,6 +375,8 @@ nni_objhash_alloc(nni_objhash *oh, uint32_t *idp, void **valp)
return (NNG_ENOMEM);
}
+ nni_objhash_resize(oh, 1);
+
for (;;) {
id = oh->oh_dynval;
oh->oh_dynval++;
@@ -349,9 +427,10 @@ nni_objhash_alloc(nni_objhash *oh, uint32_t *idp, void **valp)
}
nni_mtx_unlock(&oh->oh_lock);
- return (NNG_ENOMEM); // no other return from ctor
+ return (NNG_ENOMEM); // no other return from ctor
}
+ oh->oh_count++;
if (node->on_skips == 0) {
oh->oh_load++;
}
diff --git a/src/core/objhash.h b/src/core/objhash.h
index e6fef50a..e234ca3d 100644
--- a/src/core/objhash.h
+++ b/src/core/objhash.h
@@ -47,10 +47,10 @@ typedef void (*nni_objhash_dtor)(void *);
extern int nni_objhash_init(nni_objhash **, nni_objhash_ctor, nni_objhash_dtor);
extern void nni_objhash_fini(nni_objhash *);
-extern void nni_objhash_reclaim(nni_objhash *);
extern int nni_objhash_find(nni_objhash *, uint32_t, void **);
extern void nni_objhash_unref(nni_objhash *, uint32_t);
+extern void nni_objhash_unref_wait(nni_objhash *, uint32_t);
extern int nni_objhash_alloc(nni_objhash *, uint32_t *, void **);
extern size_t nni_objhash_count(nni_objhash *);
diff --git a/src/core/socket.c b/src/core/socket.c
index b285de85..02a18a7a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -13,6 +13,8 @@
// Socket implementation.
+static nni_objhash *nni_socks;
+
uint32_t
nni_sock_id(nni_sock *s)
{
@@ -45,14 +47,15 @@ nni_sock_hold(nni_sock **sockp, uint32_t id)
if ((rv = nni_init()) != 0) {
return (rv);
}
- nni_mtx_lock(nni_idlock);
- rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
- if ((rv != 0) || (sock->s_closed)) {
- nni_mtx_unlock(nni_idlock);
+ if ((rv = nni_objhash_find(nni_socks, id, (void **) &sock)) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&sock->s_mx);
+ if ((sock->s_closed) || (sock->s_data == NULL)) {
+ nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- sock->s_refcnt++;
- nni_mtx_unlock(nni_idlock);
+ nni_mtx_unlock(&sock->s_mx);
*sockp = sock;
return (0);
@@ -62,49 +65,7 @@ nni_sock_hold(nni_sock **sockp, uint32_t id)
void
nni_sock_rele(nni_sock *sock)
{
- nni_mtx_lock(nni_idlock);
- sock->s_refcnt--;
- if ((sock->s_closed) && (sock->s_refcnt == 0)) {
- nni_cv_wake(&sock->s_refcv);
- }
- nni_mtx_unlock(nni_idlock);
-}
-
-
-// nni_sock_hold_close is a special hold acquired by the nng_close
-// function. This waits until it has exclusive access, and then marks
-// the socket unusuable by anything else.
-int
-nni_sock_hold_close(nni_sock **sockp, uint32_t id)
-{
- int rv;
- nni_sock *sock;
-
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
- nni_mtx_lock(nni_idlock);
- rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
- if (rv != 0) {
- nni_mtx_unlock(nni_idlock);
- return (NNG_ECLOSED);
- }
- nni_idhash_remove(nni_sockets, id);
- sock->s_id = 0;
- sock->s_closed = 1;
- nni_mtx_unlock(nni_idlock);
-
- nni_sock_shutdown(sock);
-
- nni_mtx_lock(nni_idlock);
- while (sock->s_refcnt != 0) {
- nni_cv_wait(&sock->s_refcv);
- }
- nni_mtx_unlock(nni_idlock);
- *sockp = sock;
-
- return (0);
+ nni_objhash_unref(nni_socks, sock->s_id);
}
@@ -363,6 +324,114 @@ nni_sock_nullstartpipe(void *arg)
}
+static void *
+nni_sock_ctor(uint32_t id)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
+ return (NULL);
+ }
+ // s_protocol, s_peer, and s_flags undefined as yet.
+ sock->s_linger = 0;
+ sock->s_sndtimeo = -1;
+ sock->s_rcvtimeo = -1;
+ sock->s_closing = 0;
+ sock->s_reconn = NNI_SECOND;
+ sock->s_reconnmax = 0;
+ sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
+ sock->s_id = id;
+
+ nni_pipe_sock_list_init(&sock->s_pipes);
+ nni_pipe_sock_list_init(&sock->s_idles);
+
+ nni_ep_list_init(&sock->s_eps);
+
+ sock->s_send_fd.sn_init = 0;
+ sock->s_recv_fd.sn_init = 0;
+
+ if (((rv = nni_mtx_init(&sock->s_mx)) != 0) ||
+ ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) {
+ goto fail;
+ }
+
+ rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+ rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) ||
+ ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) {
+ goto fail;
+ }
+
+ return (sock);
+
+fail:
+ nni_ev_fini(&sock->s_send_ev);
+ nni_ev_fini(&sock->s_recv_ev);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
+ return (NULL);
+}
+
+
+static void
+nni_sock_dtor(void *ptr)
+{
+ nni_sock *sock = ptr;
+
+ // Close any open notification pipes.
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_close(sock->s_recv_fd.sn_wfd,
+ sock->s_recv_fd.sn_rfd);
+ }
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_close(sock->s_send_fd.sn_wfd,
+ sock->s_send_fd.sn_rfd);
+ }
+
+ // The protocol needs to clean up its state.
+ if (sock->s_data != NULL) {
+ sock->s_sock_ops.sock_fini(sock->s_data);
+ }
+
+ nni_ev_fini(&sock->s_send_ev);
+ nni_ev_fini(&sock->s_recv_ev);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
+}
+
+
+int
+nni_sock_sys_init(void)
+{
+ int rv;
+
+ rv = nni_objhash_init(&nni_socks, nni_sock_ctor, nni_sock_dtor);
+
+ return (rv);
+}
+
+
+void
+nni_sock_sys_fini(void)
+{
+ nni_objhash_fini(nni_socks);
+}
+
+
// nn_sock_open creates the underlying socket.
int
nni_sock_open(nni_sock **sockp, uint16_t pnum)
@@ -370,9 +439,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_sock *sock;
nni_proto *proto;
int rv;
- int i;
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
+ uint32_t sockid;
if ((rv = nni_init()) != 0) {
return (rv);
@@ -380,31 +449,18 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
if ((proto = nni_proto_find(pnum)) == NULL) {
return (NNG_ENOTSUP);
}
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NNG_ENOMEM);
+
+ rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock);
+ if (rv != 0) {
+ return (rv);
}
// We make a copy of the protocol operations.
sock->s_protocol = proto->proto_self;
sock->s_peer = proto->proto_peer;
sock->s_flags = proto->proto_flags;
- sock->s_linger = 0;
- sock->s_sndtimeo = -1;
- sock->s_rcvtimeo = -1;
- sock->s_closing = 0;
- sock->s_reconn = NNI_SECOND;
- sock->s_reconnmax = 0;
- sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
-
- nni_pipe_sock_list_init(&sock->s_pipes);
- nni_pipe_sock_list_init(&sock->s_idles);
-
- nni_ep_list_init(&sock->s_eps);
-
- sock->s_send_fd.sn_init = 0;
- sock->s_recv_fd.sn_init = 0;
-
sock->s_sock_ops = *proto->proto_sock_ops;
+
sops = &sock->s_sock_ops;
if (sops->sock_sfilter == NULL) {
sops->sock_sfilter = nni_sock_nullfilter;
@@ -433,71 +489,15 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
pops->pipe_stop = nni_sock_nullop;
}
- if (((rv = nni_mtx_init(&sock->s_mx)) != 0) ||
- ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) {
- goto fail;
- }
-
- if ((rv = nni_cv_init(&sock->s_refcv, nni_idlock)) != 0) {
- goto fail;
- }
-
- rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock);
- if (rv != 0) {
- goto fail;
- }
-
- if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) ||
- ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) {
- goto fail;
- }
-
- nni_mtx_lock(nni_idlock);
- rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock);
- nni_mtx_unlock(nni_idlock);
- if (rv != 0) {
- goto fail;
- }
-
- // Caller always gets the socket held.
- sock->s_refcnt = 1;
-
if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) {
- goto fail;
+ nni_objhash_unref(nni_socks, sockid);
+ return (rv);
}
sops->sock_open(sock->s_data);
*sockp = sock;
return (0);
-
-fail:
- sock->s_sock_ops.sock_fini(sock->s_data);
-
- // And we need to clean up *our* state.
- if (sock->s_id != 0) {
- nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_sockets, sock->s_id);
- if (nni_idhash_count(nni_sockets) == 0) {
- nni_idhash_reclaim(nni_pipes);
- nni_idhash_reclaim(nni_endpoints);
- nni_idhash_reclaim(nni_sockets);
- }
- nni_mtx_unlock(nni_idlock);
- }
- nni_ev_fini(&sock->s_send_ev);
- nni_ev_fini(&sock->s_recv_ev);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_cv_fini(&sock->s_refcv);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (rv);
}
@@ -596,8 +596,7 @@ nni_sock_shutdown(nni_sock *sock)
// nni_sock_close shuts down the socket, then releases any resources
// associated with it. It is a programmer error to reference the socket
// after this function is called, as the pointer may reference invalid
-// memory or other objects. The socket should have been acquired with
-// nni_sock_hold_close().
+// memory or other objects.
void
nni_sock_close(nni_sock *sock)
{
@@ -608,6 +607,14 @@ nni_sock_close(nni_sock *sock)
// is idempotent.
nni_sock_shutdown(sock);
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closed) {
+ nni_mtx_unlock(&sock->s_mx);
+ return;
+ }
+ sock->s_closed = 1;
+ nni_mtx_unlock(&sock->s_mx);
+
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller
// to have concurrent threads using this. We've taken care to
@@ -615,39 +622,20 @@ nni_sock_close(nni_sock *sock)
// user code attempts to utilize the socket *after* this point,
// the results may be tragic.
+ // Unreference twice. First drops the reference our caller
+ // acquired to start the open, and the second (blocking) one
+ // is the reference created for us at socket creation.
+
+ nni_objhash_unref(nni_socks, sock->s_id);
+ nni_objhash_unref_wait(nni_socks, sock->s_id);
+
nni_mtx_lock(nni_idlock);
- if (sock->s_id != 0) {
- nni_idhash_remove(nni_sockets, sock->s_id);
- }
- if (nni_idhash_count(nni_sockets) == 0) {
+ // XXX: CLEAN THIS UP
+ if (nni_objhash_count(nni_socks) == 0) {
nni_idhash_reclaim(nni_pipes);
nni_idhash_reclaim(nni_endpoints);
- nni_idhash_reclaim(nni_sockets);
}
-
nni_mtx_unlock(nni_idlock);
-
- // Close any open notification pipes.
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_close(sock->s_recv_fd.sn_wfd,
- sock->s_recv_fd.sn_rfd);
- }
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_close(sock->s_send_fd.sn_wfd,
- sock->s_send_fd.sn_rfd);
- }
-
- // The protocol needs to clean up its state.
- sock->s_sock_ops.sock_fini(sock->s_data);
-
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_ev_fini(&sock->s_send_ev);
- nni_ev_fini(&sock->s_recv_ev);
- nni_cv_fini(&sock->s_refcv);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index dc6f5530..5fc3d593 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -19,8 +19,6 @@ struct nni_socket {
nni_cv s_cv;
uint32_t s_id;
- uint32_t s_refcnt;
- nni_cv s_refcv;
nni_msgq * s_uwq; // Upper write queue
nni_msgq * s_urq; // Upper read queue
@@ -63,8 +61,10 @@ struct nni_socket {
uint32_t s_nextid; // Next Pipe ID.
};
+extern int nni_sock_sys_init(void);
+extern void nni_sock_sys_fini(void);
+
extern int nni_sock_hold(nni_sock **, uint32_t);
-extern int nni_sock_hold_close(nni_sock **, uint32_t);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);