diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/init.c | 8 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/objhash.c | 119 | ||||
| -rw-r--r-- | src/core/objhash.h | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 300 | ||||
| -rw-r--r-- | src/core/socket.h | 6 | ||||
| -rw-r--r-- | src/nng.c | 6 |
7 files changed, 260 insertions, 182 deletions
diff --git a/src/core/init.c b/src/core/init.c index 8dbe50db..4d73e897 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -38,7 +38,14 @@ nni_init_helper(void) nni_taskq_sys_fini(); return (rv); } + if ((rv = nni_sock_sys_init()) != 0) { + nni_random_sys_fini(); + nni_timer_sys_fini(); + nni_taskq_sys_fini(); + return (rv); + } if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) { + nni_sock_sys_fini(); nni_random_sys_fini(); nni_timer_sys_fini(); nni_taskq_sys_fini(); @@ -48,6 +55,7 @@ nni_init_helper(void) ((rv = nni_idhash_init(&nni_pipes_x)) != 0) || ((rv = nni_idhash_init(&nni_sockets_x)) != 0)) { nni_mtx_fini(&nni_idlock_x); + nni_sock_sys_fini(); nni_random_sys_fini(); nni_timer_sys_fini(); nni_taskq_sys_fini(); diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 2a72766c..291b5e28 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -33,6 +33,7 @@ #include "core/list.h" #include "core/message.h" #include "core/msgqueue.h" +#include "core/objhash.h" #include "core/options.h" #include "core/panic.h" #include "core/protocol.h" diff --git a/src/core/objhash.c b/src/core/objhash.c index ccc57c39..9529ac30 100644 --- a/src/core/objhash.c +++ b/src/core/objhash.c @@ -23,6 +23,7 @@ struct nni_objhash { uint32_t oh_maxval; uint32_t oh_dynval; nni_mtx oh_lock; + nni_cv oh_cv; nni_objhash_node * oh_nodes; nni_objhash_ctor oh_ctor; nni_objhash_dtor oh_dtor; @@ -55,6 +56,12 @@ nni_objhash_init(nni_objhash **ohp, nni_objhash_ctor ctor, return (rv); } + if ((rv = nni_cv_init(&oh->oh_cv, &oh->oh_lock)) != 0) { + nni_mtx_fini(&oh->oh_lock); + NNI_FREE_STRUCT(oh); + return (rv); + } + oh->oh_nodes = NULL; oh->oh_count = 0; oh->oh_load = 0; @@ -64,6 +71,8 @@ nni_objhash_init(nni_objhash **ohp, nni_objhash_ctor ctor, oh->oh_minval = 1; oh->oh_maxval = 0x7fffffff; oh->oh_dynval = nni_random(); + oh->oh_ctor = ctor; + oh->oh_dtor = dtor; *ohp = oh; return (0); @@ -79,27 +88,12 @@ nni_objhash_fini(nni_objhash *oh) oh->oh_cap = oh->oh_count = 0; oh->oh_load = oh->oh_minload = oh->oh_maxload = 0; } + nni_cv_fini(&oh->oh_cv); nni_mtx_fini(&oh->oh_lock); NNI_FREE_STRUCT(oh); } -void -nni_objhash_reclaim(nni_objhash *oh) -{ - nni_mtx_lock(&oh->oh_lock); - // Reclaim the buffer if we want, but preserve the limits. - if ((oh->oh_count == 0) && (oh->oh_cap != 0)) { - nni_free(oh->oh_nodes, oh->oh_cap * sizeof (nni_objhash_node)); - oh->oh_cap = 0; - oh->oh_nodes = NULL; - oh->oh_minload = 0; - oh->oh_maxload = 0; - } - nni_mtx_unlock(&oh->oh_lock); -} - - // Inspired by Python dict implementation. This probe will visit every // cell. We always hash consecutively assigned IDs. #define NNI_OBJHASH_NEXTPROBE(h, j) \ @@ -159,9 +153,10 @@ nni_objhash_find(nni_objhash *oh, uint32_t id, void **valp) // Resize the object hash. This is called internally with the lock -// for the object hash held. +// for the object hash held. Grow indicates that this is being called +// from a function that intends to add data, so extra space is needed. static int -nni_objhash_resize(nni_objhash *oh) +nni_objhash_resize(nni_objhash *oh, int grow) { size_t newsize; size_t oldsize; @@ -169,6 +164,20 @@ nni_objhash_resize(nni_objhash *oh) nni_objhash_node *oldnodes; uint32_t i; + if ((!grow) && (oh->oh_count == 0) && (oh->oh_cap != 0)) { + // Table is empty, and we are unrefing. Lets reclaim the + // space. Note that this means that allocations which + // fluctuate between one and zero are going to bang on the + // allocator a bit. Since such cases should not be very + // performance sensitive, this is probably okay. + nni_free(oh->oh_nodes, oh->oh_cap * sizeof (nni_objhash_node)); + oh->oh_cap = 0; + oh->oh_nodes = NULL; + oh->oh_minload = 0; + oh->oh_maxload = 0; + return (0); + } + if ((oh->oh_load < oh->oh_maxload) && (oh->oh_load >= oh->oh_minload)) { // No resize needed. return (0); @@ -241,6 +250,9 @@ nni_objhash_unref(nni_objhash *oh, uint32_t id) node->on_refcnt--; if (node->on_refcnt != 0) { + if (node->on_refcnt == 1) { + nni_cv_wake(&oh->oh_cv); + } // Still busy/referenced? nni_mtx_unlock(&oh->oh_lock); return; @@ -270,8 +282,72 @@ nni_objhash_unref(nni_objhash *oh, uint32_t id) if (node->on_skips == 0) { oh->oh_load--; } + // Reclaim the buffer if we want, but preserve the limits. + nni_objhash_resize(oh, 0); + + nni_mtx_unlock(&oh->oh_lock); + + // Now run the destructor. + dtor(val); +} + + +void +nni_objhash_unref_wait(nni_objhash *oh, uint32_t id) +{ + int rv; + void *val; + uint32_t index; + nni_objhash_node *node; + nni_objhash_dtor dtor; + + nni_mtx_lock(&oh->oh_lock); + + dtor = oh->oh_dtor; + + node = nni_objhash_find_node(oh, id); + NNI_ASSERT(node != NULL); + val = node->on_val; + + while (node->on_refcnt != 1) { + nni_cv_wait(&oh->oh_cv); + } + node->on_refcnt--; + if (node->on_refcnt != 0) { + if (node->on_refcnt == 1) { + nni_cv_wake(&oh->oh_cv); + } + // Still busy/referenced? + nni_mtx_unlock(&oh->oh_lock); + return; + } - nni_objhash_resize(oh); + index = id & (oh->oh_cap - 1); + for (;;) { + node = &oh->oh_nodes[index]; + if (node->on_id == id) { + break; + } + + NNI_ASSERT(node->on_skips != 0); + node->on_skips--; + if ((node->on_val == NULL) && (node->on_skips == 0)) { + oh->oh_load--; + } + index = NNI_OBJHASH_NEXTPROBE(oh, index); + } + + NNI_ASSERT(node->on_val != NULL); + NNI_ASSERT(node->on_refcnt == 0); + NNI_ASSERT(node->on_id == id); + + node->on_val = NULL; + oh->oh_count--; + if (node->on_skips == 0) { + oh->oh_load--; + } + // Reclaim the buffer if we want, but preserve the limits. + nni_objhash_resize(oh, 0); nni_mtx_unlock(&oh->oh_lock); @@ -299,6 +375,8 @@ nni_objhash_alloc(nni_objhash *oh, uint32_t *idp, void **valp) return (NNG_ENOMEM); } + nni_objhash_resize(oh, 1); + for (;;) { id = oh->oh_dynval; oh->oh_dynval++; @@ -349,9 +427,10 @@ nni_objhash_alloc(nni_objhash *oh, uint32_t *idp, void **valp) } nni_mtx_unlock(&oh->oh_lock); - return (NNG_ENOMEM); // no other return from ctor + return (NNG_ENOMEM); // no other return from ctor } + oh->oh_count++; if (node->on_skips == 0) { oh->oh_load++; } diff --git a/src/core/objhash.h b/src/core/objhash.h index e6fef50a..e234ca3d 100644 --- a/src/core/objhash.h +++ b/src/core/objhash.h @@ -47,10 +47,10 @@ typedef void (*nni_objhash_dtor)(void *); extern int nni_objhash_init(nni_objhash **, nni_objhash_ctor, nni_objhash_dtor); extern void nni_objhash_fini(nni_objhash *); -extern void nni_objhash_reclaim(nni_objhash *); extern int nni_objhash_find(nni_objhash *, uint32_t, void **); extern void nni_objhash_unref(nni_objhash *, uint32_t); +extern void nni_objhash_unref_wait(nni_objhash *, uint32_t); extern int nni_objhash_alloc(nni_objhash *, uint32_t *, void **); extern size_t nni_objhash_count(nni_objhash *); diff --git a/src/core/socket.c b/src/core/socket.c index b285de85..02a18a7a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -13,6 +13,8 @@ // Socket implementation. +static nni_objhash *nni_socks; + uint32_t nni_sock_id(nni_sock *s) { @@ -45,14 +47,15 @@ nni_sock_hold(nni_sock **sockp, uint32_t id) if ((rv = nni_init()) != 0) { return (rv); } - nni_mtx_lock(nni_idlock); - rv = nni_idhash_find(nni_sockets, id, (void **) &sock); - if ((rv != 0) || (sock->s_closed)) { - nni_mtx_unlock(nni_idlock); + if ((rv = nni_objhash_find(nni_socks, id, (void **) &sock)) != 0) { + return (rv); + } + nni_mtx_lock(&sock->s_mx); + if ((sock->s_closed) || (sock->s_data == NULL)) { + nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - sock->s_refcnt++; - nni_mtx_unlock(nni_idlock); + nni_mtx_unlock(&sock->s_mx); *sockp = sock; return (0); @@ -62,49 +65,7 @@ nni_sock_hold(nni_sock **sockp, uint32_t id) void nni_sock_rele(nni_sock *sock) { - nni_mtx_lock(nni_idlock); - sock->s_refcnt--; - if ((sock->s_closed) && (sock->s_refcnt == 0)) { - nni_cv_wake(&sock->s_refcv); - } - nni_mtx_unlock(nni_idlock); -} - - -// nni_sock_hold_close is a special hold acquired by the nng_close -// function. This waits until it has exclusive access, and then marks -// the socket unusuable by anything else. -int -nni_sock_hold_close(nni_sock **sockp, uint32_t id) -{ - int rv; - nni_sock *sock; - - if ((rv = nni_init()) != 0) { - return (rv); - } - - nni_mtx_lock(nni_idlock); - rv = nni_idhash_find(nni_sockets, id, (void **) &sock); - if (rv != 0) { - nni_mtx_unlock(nni_idlock); - return (NNG_ECLOSED); - } - nni_idhash_remove(nni_sockets, id); - sock->s_id = 0; - sock->s_closed = 1; - nni_mtx_unlock(nni_idlock); - - nni_sock_shutdown(sock); - - nni_mtx_lock(nni_idlock); - while (sock->s_refcnt != 0) { - nni_cv_wait(&sock->s_refcv); - } - nni_mtx_unlock(nni_idlock); - *sockp = sock; - - return (0); + nni_objhash_unref(nni_socks, sock->s_id); } @@ -363,6 +324,114 @@ nni_sock_nullstartpipe(void *arg) } +static void * +nni_sock_ctor(uint32_t id) +{ + int rv; + nni_sock *sock; + + if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { + return (NULL); + } + // s_protocol, s_peer, and s_flags undefined as yet. + sock->s_linger = 0; + sock->s_sndtimeo = -1; + sock->s_rcvtimeo = -1; + sock->s_closing = 0; + sock->s_reconn = NNI_SECOND; + sock->s_reconnmax = 0; + sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default + sock->s_id = id; + + nni_pipe_sock_list_init(&sock->s_pipes); + nni_pipe_sock_list_init(&sock->s_idles); + + nni_ep_list_init(&sock->s_eps); + + sock->s_send_fd.sn_init = 0; + sock->s_recv_fd.sn_init = 0; + + if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || + ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) { + goto fail; + } + + rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock); + if (rv != 0) { + goto fail; + } + rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock); + if (rv != 0) { + goto fail; + } + + if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || + ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { + goto fail; + } + + return (sock); + +fail: + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); + return (NULL); +} + + +static void +nni_sock_dtor(void *ptr) +{ + nni_sock *sock = ptr; + + // Close any open notification pipes. + if (sock->s_recv_fd.sn_init) { + nni_plat_pipe_close(sock->s_recv_fd.sn_wfd, + sock->s_recv_fd.sn_rfd); + } + if (sock->s_send_fd.sn_init) { + nni_plat_pipe_close(sock->s_send_fd.sn_wfd, + sock->s_send_fd.sn_rfd); + } + + // The protocol needs to clean up its state. + if (sock->s_data != NULL) { + sock->s_sock_ops.sock_fini(sock->s_data); + } + + nni_ev_fini(&sock->s_send_ev); + nni_ev_fini(&sock->s_recv_ev); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); +} + + +int +nni_sock_sys_init(void) +{ + int rv; + + rv = nni_objhash_init(&nni_socks, nni_sock_ctor, nni_sock_dtor); + + return (rv); +} + + +void +nni_sock_sys_fini(void) +{ + nni_objhash_fini(nni_socks); +} + + // nn_sock_open creates the underlying socket. int nni_sock_open(nni_sock **sockp, uint16_t pnum) @@ -370,9 +439,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_sock *sock; nni_proto *proto; int rv; - int i; nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; + uint32_t sockid; if ((rv = nni_init()) != 0) { return (rv); @@ -380,31 +449,18 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) if ((proto = nni_proto_find(pnum)) == NULL) { return (NNG_ENOTSUP); } - if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { - return (NNG_ENOMEM); + + rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock); + if (rv != 0) { + return (rv); } // We make a copy of the protocol operations. sock->s_protocol = proto->proto_self; sock->s_peer = proto->proto_peer; sock->s_flags = proto->proto_flags; - sock->s_linger = 0; - sock->s_sndtimeo = -1; - sock->s_rcvtimeo = -1; - sock->s_closing = 0; - sock->s_reconn = NNI_SECOND; - sock->s_reconnmax = 0; - sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default - - nni_pipe_sock_list_init(&sock->s_pipes); - nni_pipe_sock_list_init(&sock->s_idles); - - nni_ep_list_init(&sock->s_eps); - - sock->s_send_fd.sn_init = 0; - sock->s_recv_fd.sn_init = 0; - sock->s_sock_ops = *proto->proto_sock_ops; + sops = &sock->s_sock_ops; if (sops->sock_sfilter == NULL) { sops->sock_sfilter = nni_sock_nullfilter; @@ -433,71 +489,15 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) pops->pipe_stop = nni_sock_nullop; } - if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) { - goto fail; - } - - if ((rv = nni_cv_init(&sock->s_refcv, nni_idlock)) != 0) { - goto fail; - } - - rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock); - if (rv != 0) { - goto fail; - } - rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock); - if (rv != 0) { - goto fail; - } - - if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || - ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { - goto fail; - } - - nni_mtx_lock(nni_idlock); - rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock); - nni_mtx_unlock(nni_idlock); - if (rv != 0) { - goto fail; - } - - // Caller always gets the socket held. - sock->s_refcnt = 1; - if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) { - goto fail; + nni_objhash_unref(nni_socks, sockid); + return (rv); } sops->sock_open(sock->s_data); *sockp = sock; return (0); - -fail: - sock->s_sock_ops.sock_fini(sock->s_data); - - // And we need to clean up *our* state. - if (sock->s_id != 0) { - nni_mtx_lock(nni_idlock); - nni_idhash_remove(nni_sockets, sock->s_id); - if (nni_idhash_count(nni_sockets) == 0) { - nni_idhash_reclaim(nni_pipes); - nni_idhash_reclaim(nni_endpoints); - nni_idhash_reclaim(nni_sockets); - } - nni_mtx_unlock(nni_idlock); - } - nni_ev_fini(&sock->s_send_ev); - nni_ev_fini(&sock->s_recv_ev); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_refcv); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); } @@ -596,8 +596,7 @@ nni_sock_shutdown(nni_sock *sock) // nni_sock_close shuts down the socket, then releases any resources // associated with it. It is a programmer error to reference the socket // after this function is called, as the pointer may reference invalid -// memory or other objects. The socket should have been acquired with -// nni_sock_hold_close(). +// memory or other objects. void nni_sock_close(nni_sock *sock) { @@ -608,6 +607,14 @@ nni_sock_close(nni_sock *sock) // is idempotent. nni_sock_shutdown(sock); + nni_mtx_lock(&sock->s_mx); + if (sock->s_closed) { + nni_mtx_unlock(&sock->s_mx); + return; + } + sock->s_closed = 1; + nni_mtx_unlock(&sock->s_mx); + // At this point nothing else should be referencing us. // As with UNIX close, it is a gross error for the caller // to have concurrent threads using this. We've taken care to @@ -615,39 +622,20 @@ nni_sock_close(nni_sock *sock) // user code attempts to utilize the socket *after* this point, // the results may be tragic. + // Unreference twice. First drops the reference our caller + // acquired to start the open, and the second (blocking) one + // is the reference created for us at socket creation. + + nni_objhash_unref(nni_socks, sock->s_id); + nni_objhash_unref_wait(nni_socks, sock->s_id); + nni_mtx_lock(nni_idlock); - if (sock->s_id != 0) { - nni_idhash_remove(nni_sockets, sock->s_id); - } - if (nni_idhash_count(nni_sockets) == 0) { + // XXX: CLEAN THIS UP + if (nni_objhash_count(nni_socks) == 0) { nni_idhash_reclaim(nni_pipes); nni_idhash_reclaim(nni_endpoints); - nni_idhash_reclaim(nni_sockets); } - nni_mtx_unlock(nni_idlock); - - // Close any open notification pipes. - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_close(sock->s_recv_fd.sn_wfd, - sock->s_recv_fd.sn_rfd); - } - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_close(sock->s_send_fd.sn_wfd, - sock->s_send_fd.sn_rfd); - } - - // The protocol needs to clean up its state. - sock->s_sock_ops.sock_fini(sock->s_data); - - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_ev_fini(&sock->s_send_ev); - nni_ev_fini(&sock->s_recv_ev); - nni_cv_fini(&sock->s_refcv); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); } diff --git a/src/core/socket.h b/src/core/socket.h index dc6f5530..5fc3d593 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -19,8 +19,6 @@ struct nni_socket { nni_cv s_cv; uint32_t s_id; - uint32_t s_refcnt; - nni_cv s_refcv; nni_msgq * s_uwq; // Upper write queue nni_msgq * s_urq; // Upper read queue @@ -63,8 +61,10 @@ struct nni_socket { uint32_t s_nextid; // Next Pipe ID. }; +extern int nni_sock_sys_init(void); +extern void nni_sock_sys_fini(void); + extern int nni_sock_hold(nni_sock **, uint32_t); -extern int nni_sock_hold_close(nni_sock **, uint32_t); extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); @@ -30,7 +30,9 @@ nng_open(nng_socket *sidp, uint16_t proto) return (rv); } *sidp = nni_sock_id(sock); - nni_sock_rele(sock); + + // Keep the socket "held" until it is explicitly closed. + return (0); } @@ -58,7 +60,7 @@ nng_close(nng_socket sid) // Close is special, because we still want to be able to get // a hold on the socket even if shutdown was called. - if ((rv = nni_sock_hold_close(&sock, sid)) != 0) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { return (rv); } // No release -- close releases it. |
