diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/core/idhash.c | 87 | ||||
| -rw-r--r-- | src/core/idhash.h | 25 | ||||
| -rw-r--r-- | src/core/init.c | 26 | ||||
| -rw-r--r-- | src/core/init.h | 12 | ||||
| -rw-r--r-- | src/core/socket.c | 85 | ||||
| -rw-r--r-- | src/core/socket.h | 3 | ||||
| -rw-r--r-- | src/nng.c | 11 | ||||
| -rw-r--r-- | src/nng.h | 6 | ||||
| -rw-r--r-- | src/protocol/bus/bus.c | 1 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 12 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 12 |
12 files changed, 184 insertions, 98 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e70074c2..2df849ae 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -32,7 +32,9 @@ set (NNG_SOURCES core/clock.c core/clock.h core/endpt.c + core/endpt.h core/event.c + core/event.h core/idhash.c core/idhash.h core/init.c diff --git a/src/core/idhash.c b/src/core/idhash.c index 0b680bc9..07368a65 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -11,54 +11,58 @@ #include <string.h> -typedef struct { +struct nni_idhash_entry { uint32_t ihe_key; uint32_t ihe_skips; void * ihe_val; -} nni_idhash_entry; - -struct nni_idhash { - uint32_t ih_cap; - uint32_t ih_count; - uint32_t ih_load; - uint32_t ih_minload; // considers placeholders - uint32_t ih_maxload; - uint32_t ih_walkers; - uint32_t ih_minval; - uint32_t ih_maxval; - uint32_t ih_dynval; - nni_idhash_entry * ih_entries; }; + int -nni_idhash_create(nni_idhash **hp) +nni_idhash_init(nni_idhash *h) { - nni_idhash *h; - - if ((h = NNI_ALLOC_STRUCT(h)) == NULL) { - return (NNG_ENOMEM); - } - h->ih_entries = nni_alloc(8 * sizeof (nni_idhash_entry)); - if (h->ih_entries == NULL) { - NNI_FREE_STRUCT(h); - return (NNG_ENOMEM); - } - (void) memset(h->ih_entries, 0, (8 * sizeof (nni_idhash_entry))); + h->ih_entries = NULL; h->ih_count = 0; h->ih_load = 0; - h->ih_cap = 8; - h->ih_maxload = 5; + h->ih_cap = 0; + h->ih_maxload = 0; h->ih_minload = 0; // never shrink below this h->ih_walkers = 0; h->ih_minval = 0; h->ih_maxval = 0xffffffff; h->ih_dynval = 0; - *hp = h; return (0); } void +nni_idhash_fini(nni_idhash *h) +{ + NNI_ASSERT(h->ih_walkers == 0); + if (h->ih_entries != NULL) { + nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); + h->ih_entries = NULL; + h->ih_cap = h->ih_count = 0; + h->ih_load = h->ih_minload = h->ih_maxload = 0; + } +} + + +void +nni_idhash_reclaim(nni_idhash *h) +{ + // Reclaim the buffer if we want, but preserve the limits. + if ((h->ih_count == 0) && (h->ih_cap != 0) && (h->ih_walkers == 0)) { + nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); + h->ih_cap = 0; + h->ih_entries = NULL; + h->ih_minload = 0; + h->ih_maxload = 0; + } +} + + +void nni_idhash_set_limits(nni_idhash *h, uint32_t minval, uint32_t maxval, uint32_t start) { @@ -71,16 +75,6 @@ nni_idhash_set_limits(nni_idhash *h, uint32_t minval, uint32_t maxval, } -void -nni_idhash_destroy(nni_idhash *h) -{ - if (h != NULL) { - nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); - NNI_FREE_STRUCT(h); - } -} - - // Inspired by Python dict implementation. This probe will visit every // cell. We always hash consecutively assigned IDs. #define NNI_IDHASH_NEXTPROBE(h, j) \ @@ -91,6 +85,10 @@ nni_idhash_find(nni_idhash *h, uint32_t id, void **valp) { uint32_t index = id & (h->ih_cap - 1); + if (h->ih_count == 0) { + return (NNG_ENOENT); + } + for (;;) { if ((h->ih_entries[index].ihe_val == NULL) && (h->ih_entries[index].ihe_skips == 0)) { @@ -165,7 +163,9 @@ nni_hash_resize(nni_idhash *h) index = NNI_IDHASH_NEXTPROBE(h, index); } } - nni_free(oldents, sizeof (nni_idhash_entry) * oldsize); + if (oldsize != 0) { + nni_free(oldents, sizeof (nni_idhash_entry) * oldsize); + } return (0); } @@ -286,11 +286,10 @@ nni_idhash_alloc(nni_idhash *h, uint32_t *idp, void *val) } -int -nni_idhash_count(nni_idhash *h, uint32_t *countp) +size_t +nni_idhash_count(nni_idhash *h) { - *countp = h->ih_count; - return (0); + return (h->ih_count); } diff --git a/src/core/idhash.h b/src/core/idhash.h index e8876ad5..4616ccde 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -22,7 +22,23 @@ // use table sizes that are powers of two. Note that hash items // must be non-NULL. The table is locked. -typedef struct nni_idhash nni_idhash; +typedef struct nni_idhash nni_idhash; +typedef struct nni_idhash_entry nni_idhash_entry; + +// The details of the nni_idhash are "private". But they let us inline +// this into structures. +struct nni_idhash { + size_t ih_cap; + size_t ih_count; + size_t ih_load; + size_t ih_minload; // considers placeholders + size_t ih_maxload; + uint32_t ih_walkers; + uint32_t ih_minval; + uint32_t ih_maxval; + uint32_t ih_dynval; + nni_idhash_entry * ih_entries; +}; // nni_idhash_walkfn is called when walking a hash table. If the // return value is non-zero, then nni_idhash_walk will terminate further @@ -32,14 +48,17 @@ typedef struct nni_idhash nni_idhash; // Note that the walkfn must not attempt to change the hash table. // The user must provide any locking needed. typedef int (*nni_idhash_walkfn)(void *, uint32_t, void *); -extern int nni_idhash_create(nni_idhash **); +extern int nni_idhash_init(nni_idhash *); +extern void nni_idhash_fini(nni_idhash *); +extern void nni_idhash_reclaim(nni_idhash *); extern void nni_idhash_set_limits(nni_idhash *, uint32_t, uint32_t, uint32_t); +extern int nni_idhash_create(nni_idhash **); extern void nni_idhash_destroy(nni_idhash *); extern int nni_idhash_find(nni_idhash *, uint32_t, void **); extern int nni_idhash_remove(nni_idhash *, uint32_t); extern int nni_idhash_insert(nni_idhash *, uint32_t, void *); extern int nni_idhash_alloc(nni_idhash *, uint32_t *, void *); -extern int nni_idhash_count(nni_idhash *, uint32_t *); +extern size_t nni_idhash_count(nni_idhash *); extern int nni_idhash_walk(nni_idhash *, nni_idhash_walkfn, void *); #endif // CORE_IDHASH_H diff --git a/src/core/init.c b/src/core/init.c index 48a5096c..8ada0247 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -15,6 +15,10 @@ nni_idhash *nni_endpoints; nni_idhash *nni_pipes; nni_idhash *nni_sockets; nni_mtx *nni_idlock; + +static nni_idhash nni_endpoints_x; +static nni_idhash nni_pipes_x; +static nni_idhash nni_sockets_x; static nni_mtx nni_idlock_x; static int @@ -28,17 +32,23 @@ nni_init_helper(void) if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) { return (rv); } - if (((rv = nni_idhash_create(&nni_endpoints)) != 0) || - ((rv = nni_idhash_create(&nni_pipes)) != 0) || - ((rv = nni_idhash_create(&nni_sockets)) != 0)) { + if (((rv = nni_idhash_init(&nni_endpoints_x)) != 0) || + ((rv = nni_idhash_init(&nni_pipes_x)) != 0) || + ((rv = nni_idhash_init(&nni_sockets_x)) != 0)) { nni_mtx_fini(&nni_idlock_x); nni_random_fini(); return (rv); } - nni_idhash_set_limits(nni_pipes, 1, 0x7fffffff, + nni_idhash_set_limits(&nni_pipes_x, 1, 0x7fffffff, nni_random() & 0x7fffffff); - nni_idhash_set_limits(nni_sockets, 1, 0xffffffff, 1); + nni_idhash_set_limits(&nni_sockets_x, 1, 0x7fffffff, 1); + nni_idhash_set_limits(&nni_endpoints_x, 1, 0xffffffff, 1); + nni_idlock = &nni_idlock_x; + nni_pipes = &nni_pipes_x; + nni_endpoints = &nni_endpoints_x; + nni_sockets = &nni_sockets_x; + nni_tran_init(); return (0); } @@ -54,9 +64,9 @@ nni_init(void) void nni_fini(void) { - nni_idhash_destroy(nni_endpoints); - nni_idhash_destroy(nni_pipes); - nni_idhash_destroy(nni_sockets); + nni_idhash_fini(&nni_endpoints_x); + nni_idhash_fini(&nni_pipes_x); + nni_idhash_fini(&nni_sockets_x); nni_mtx_fini(&nni_idlock_x); nni_tran_fini(); nni_random_fini(); diff --git a/src/core/init.h b/src/core/init.h index bae78fc3..e48b6705 100644 --- a/src/core/init.h +++ b/src/core/init.h @@ -15,17 +15,17 @@ // nni_init is called each time the user enters the library. It ensures that // the library is initlialized properly, and also deals with checks such as // whether the process has forked since last initialization. -extern int nni_init(void); +int nni_init(void); // nni_fini tears everything down. In the future it may be used to ensure // that all resources used by the library are released back to the system. -extern void nni_fini(void); +void nni_fini(void); // Private hash tables matching IDs to values. Consumers need to use the // nni_idlock to protect access to these. -nni_idhash *nni_endpoints; -nni_idhash *nni_pipes; -nni_idhash *nni_sockets; -nni_mtx *nni_idlock; +extern nni_mtx *nni_idlock; +extern nni_idhash *nni_endpoints; +extern nni_idhash *nni_pipes; +extern nni_idhash *nni_sockets; #endif // CORE_INIT_H diff --git a/src/core/socket.c b/src/core/socket.c index b1f8f2c4..52df70a2 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -47,34 +47,58 @@ nni_sock_hold(nni_sock **sockp, uint32_t id) } nni_mtx_lock(nni_idlock); rv = nni_idhash_find(nni_sockets, id, (void **) &sock); + if ((rv != 0) || (sock->s_closed)) { + nni_mtx_unlock(nni_idlock); + return (NNG_ECLOSED); + } + sock->s_refcnt++; nni_mtx_unlock(nni_idlock); + *sockp = sock; - if (rv == 0) { - if (sock->s_closing) { - rv = NNG_ECLOSED; - } else { - nni_mtx_lock(&sock->s_mx); - sock->s_refcnt++; - nni_mtx_unlock(&sock->s_mx); - *sockp = sock; - } - } - if (rv == NNG_ENOENT) { - rv = NNG_ECLOSED; - } - return (rv); + return (0); } void nni_sock_rele(nni_sock *sock) { - nni_mtx_lock(&sock->s_mx); + nni_mtx_lock(nni_idlock); sock->s_refcnt--; - if (sock->s_closing) { - nni_cv_wake(&sock->s_cv); + if ((sock->s_closed) && (sock->s_refcnt == 1)) { + nni_cv_wake(&sock->s_refcv); } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(nni_idlock); +} + + +// nni_sock_hold_close is a special hold acquired by the nng_close +// function. This waits until it has exclusive access, and then marks +// the socket unusuable by anything else. +int +nni_sock_hold_close(nni_sock **sockp, uint32_t id) +{ + int rv; + nni_sock *sock; + + if ((rv = nni_init()) != 0) { + return (rv); + } + + nni_mtx_lock(nni_idlock); + rv = nni_idhash_find(nni_sockets, id, (void **) &sock); + if (rv != 0) { + nni_mtx_unlock(nni_idlock); + return (NNG_ECLOSED); + } + sock->s_closed = 1; + sock->s_refcnt++; + while (sock->s_refcnt != 1) { + nni_cv_wait(&sock->s_refcv); + } + nni_mtx_unlock(nni_idlock); + *sockp = sock; + + return (0); } @@ -290,6 +314,10 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } + if ((rv = nni_cv_init(&sock->s_refcv, nni_idlock)) != 0) { + goto fail; + } + rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RECV, sock); if (rv != 0) { goto fail; @@ -355,6 +383,11 @@ fail: if (sock->s_id != 0) { nni_mtx_lock(nni_idlock); nni_idhash_remove(nni_sockets, sock->s_id); + if (nni_idhash_count(nni_sockets) == 0) { + nni_idhash_reclaim(nni_pipes); + nni_idhash_reclaim(nni_endpoints); + nni_idhash_reclaim(nni_sockets); + } nni_mtx_unlock(nni_idlock); } nni_thr_fini(&sock->s_notifier); @@ -363,6 +396,7 @@ fail: nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_refcv); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); @@ -472,7 +506,8 @@ nni_sock_shutdown(nni_sock *sock) // nni_sock_close shuts down the socket, then releases any resources // associated with it. It is a programmer error to reference the socket // after this function is called, as the pointer may reference invalid -// memory or other objects. +// memory or other objects. The socket should have been acquired with +// nni_sock_hold_close(). void nni_sock_close(nni_sock *sock) { @@ -482,11 +517,6 @@ nni_sock_close(nni_sock *sock) // Shutdown everything if not already done. This operation // is idempotent. nni_sock_shutdown(sock); - nni_mtx_lock(&sock->s_mx); - while (sock->s_refcnt > 1) { - nni_cv_wait(&sock->s_cv); - } - nni_mtx_unlock(&sock->s_mx); // At this point nothing else should be referencing us. // As with UNIX close, it is a gross error for the caller @@ -497,6 +527,12 @@ nni_sock_close(nni_sock *sock) nni_mtx_lock(nni_idlock); nni_idhash_remove(nni_sockets, sock->s_id); + if (nni_idhash_count(nni_sockets) == 0) { + nni_idhash_reclaim(nni_pipes); + nni_idhash_reclaim(nni_endpoints); + nni_idhash_reclaim(nni_sockets); + } + nni_mtx_unlock(nni_idlock); // The protocol needs to clean up its state. @@ -516,6 +552,7 @@ nni_sock_close(nni_sock *sock) nni_msgq_fini(sock->s_uwq); nni_ev_fini(&sock->s_send_ev); nni_ev_fini(&sock->s_recv_ev); + nni_cv_fini(&sock->s_refcv); nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_notify_mx); diff --git a/src/core/socket.h b/src/core/socket.h index 2a30fae5..2f34a038 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -19,6 +19,7 @@ struct nni_socket { uint32_t s_id; uint32_t s_refcnt; + nni_cv s_refcv; nni_msgq * s_uwq; // Upper write queue nni_msgq * s_urq; // Upper read queue @@ -52,6 +53,7 @@ struct nni_socket { int s_ep_pend; // EP dial/listen in progress int s_closing; // Socket is closing + int s_closed; // Socket closed int s_reapexit; // Shutdown the reaper int s_besteffort; // Best effort mode delivery int s_senderr; // Protocol state machine use @@ -64,6 +66,7 @@ struct nni_socket { }; extern int nni_sock_hold(nni_sock **, uint32_t); +extern int nni_sock_hold_close(nni_sock **, uint32_t); extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); @@ -56,7 +56,9 @@ nng_close(nng_socket sid) int rv; nni_sock *sock; - if ((rv = nni_sock_hold(&sock, sid)) != 0) { + // Close is special, because we still want to be able to get + // a hold on the socket even if shutdown was called. + if ((rv = nni_sock_hold_close(&sock, sid)) != 0) { return (rv); } // No release -- close releases it. @@ -607,3 +609,10 @@ nng_device(nng_socket sock1, nng_socket sock2) // Device TBD. return (NNG_ENOTSUP); } + + +void +nng_usleep(uint64_t usec) +{ + nni_usleep(usec); +} @@ -372,6 +372,12 @@ NNG_DECL int64_t nng_stat_value(nng_stat *); // which means that messages from one side are forwarded to the other. NNG_DECL int nng_device(nng_socket, nng_socket); +// Sleep for the specified usecs. This is intended for use by test +// programs (to avoid needing to expose the rest of the private details). +// Applications are discouraged from using this -- use your platform +// time services instead. +NNG_DECL void nng_usleep(uint64_t); + // Pollset functionality. TBD. (Note that I'd rather avoid this // altogether, because I believe that the notification mechanism I've // created offers a superior way to handle this. I don't think many diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 986d8ed7..02467ffe 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -94,6 +94,7 @@ nni_bus_pipe_fini(void *arg) nni_bus_pipe *ppipe = arg; if (ppipe != NULL) { + nni_msgq_fini(ppipe->sendq); NNI_FREE_STRUCT(ppipe); } } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 6410e6db..a34f6eea 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -26,7 +26,7 @@ struct nni_rep_sock { nni_msgq * urq; int raw; int ttl; - nni_idhash * pipes; + nni_idhash pipes; char * btrace; size_t btrace_len; }; @@ -53,7 +53,7 @@ nni_rep_sock_init(void **repp, nni_sock *sock) rep->raw = 0; rep->btrace = NULL; rep->btrace_len = 0; - if ((rv = nni_idhash_create(&rep->pipes)) != 0) { + if ((rv = nni_idhash_init(&rep->pipes)) != 0) { NNI_FREE_STRUCT(rep); return (rv); } @@ -73,7 +73,7 @@ nni_rep_sock_fini(void *arg) nni_rep_sock *rep = arg; if (rep != NULL) { - nni_idhash_destroy(rep->pipes); + nni_idhash_fini(&rep->pipes); if (rep->btrace != NULL) { nni_free(rep->btrace, rep->btrace_len); } @@ -121,7 +121,7 @@ nni_rep_pipe_add(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp)); + return (nni_idhash_insert(&rep->pipes, nni_pipe_id(rp->pipe), rp)); } @@ -131,7 +131,7 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); + nni_idhash_remove(&rep->pipes, nni_pipe_id(rp->pipe)); } @@ -166,7 +166,7 @@ nni_rep_sock_send(void *arg) nni_msg_trim_header(msg, 4); nni_mtx_lock(mx); - if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) { + if (nni_idhash_find(&rep->pipes, id, (void **) &rp) != 0) { nni_mtx_unlock(mx); nni_msg_free(msg); continue; diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 5603b675..263e023a 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -24,7 +24,7 @@ struct nni_resp_sock { nni_sock * nsock; int raw; int ttl; - nni_idhash * pipes; + nni_idhash pipes; char * btrace; size_t btrace_len; }; @@ -51,7 +51,7 @@ nni_resp_sock_init(void **pp, nni_sock *nsock) psock->raw = 0; psock->btrace = NULL; psock->btrace_len = 0; - if ((rv = nni_idhash_create(&psock->pipes)) != 0) { + if ((rv = nni_idhash_init(&psock->pipes)) != 0) { NNI_FREE_STRUCT(psock); return (rv); } @@ -68,7 +68,7 @@ nni_resp_sock_fini(void *arg) nni_resp_sock *psock = arg; if (psock != NULL) { - nni_idhash_destroy(psock->pipes); + nni_idhash_fini(&psock->pipes); if (psock->btrace != NULL) { nni_free(psock->btrace, psock->btrace_len); } @@ -117,7 +117,7 @@ nni_resp_pipe_add(void *arg) nni_resp_sock *psock = ppipe->psock; int rv; - rv = nni_idhash_insert(psock->pipes, nni_pipe_id(ppipe->npipe), ppipe); + rv = nni_idhash_insert(&psock->pipes, nni_pipe_id(ppipe->npipe), ppipe); return (rv); } @@ -128,7 +128,7 @@ nni_resp_pipe_rem(void *arg) nni_resp_pipe *ppipe = arg; nni_resp_sock *psock = ppipe->psock; - nni_idhash_remove(psock->pipes, nni_pipe_id(ppipe->npipe)); + nni_idhash_remove(&psock->pipes, nni_pipe_id(ppipe->npipe)); } @@ -163,7 +163,7 @@ nni_resp_sock_send(void *arg) nni_msg_trim_header(msg, 4); nni_mtx_lock(mx); - if (nni_idhash_find(psock->pipes, id, (void **) &ppipe) != 0) { + if (nni_idhash_find(&psock->pipes, id, (void **) &ppipe) != 0) { nni_mtx_unlock(mx); nni_msg_free(msg); continue; |
