aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/nng_impl.h4
-rw-r--r--src/core/objhash.c451
-rw-r--r--src/core/objhash.h58
-rw-r--r--src/core/socket.c314
-rw-r--r--src/core/socket.h22
-rw-r--r--src/nng_compat.c1
7 files changed, 156 insertions, 696 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 31abc042..ad6c025e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -53,8 +53,6 @@ set (NNG_SOURCES
core/msgqueue.c
core/msgqueue.h
core/nng_impl.h
- core/objhash.c
- core/objhash.h
core/options.c
core/options.h
core/panic.c
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 8b76b036..8510dfb5 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -1,5 +1,6 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -33,7 +34,6 @@
#include "core/list.h"
#include "core/message.h"
#include "core/msgqueue.h"
-#include "core/objhash.h"
#include "core/options.h"
#include "core/panic.h"
#include "core/protocol.h"
diff --git a/src/core/objhash.c b/src/core/objhash.c
deleted file mode 100644
index acdcd2c3..00000000
--- a/src/core/objhash.c
+++ /dev/null
@@ -1,451 +0,0 @@
-//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/objhash.h"
-#include "core/nng_impl.h"
-
-#include <string.h>
-
-// The details of the nni_objhash are "private".
-struct nni_objhash {
- size_t oh_cap;
- size_t oh_count;
- size_t oh_load;
- size_t oh_minload; // considers placeholders
- size_t oh_maxload;
- uint32_t oh_minval;
- uint32_t oh_maxval;
- uint32_t oh_dynval;
- nni_mtx oh_lock;
- nni_cv oh_cv;
- nni_objhash_node *oh_nodes;
- nni_objhash_ctor oh_ctor;
- nni_objhash_dtor oh_dtor;
-};
-
-struct nni_objhash_node {
- uint32_t on_id; // the key
- uint32_t on_skips; // indicates
- uint32_t on_refcnt; // reference count
- void * on_val; // pointer to user data
-};
-
-int
-nni_objhash_init(
- nni_objhash **ohp, nni_objhash_ctor ctor, nni_objhash_dtor dtor)
-{
- nni_objhash *oh;
- int rv;
-
- if ((ctor == NULL) || (dtor == NULL)) {
- return (NNG_EINVAL);
- }
-
- if ((oh = NNI_ALLOC_STRUCT(oh)) == NULL) {
- return (NNG_ENOMEM);
- }
-
- if ((rv = nni_mtx_init(&oh->oh_lock)) != 0) {
- NNI_FREE_STRUCT(oh);
- return (rv);
- }
-
- if ((rv = nni_cv_init(&oh->oh_cv, &oh->oh_lock)) != 0) {
- nni_mtx_fini(&oh->oh_lock);
- NNI_FREE_STRUCT(oh);
- return (rv);
- }
-
- oh->oh_nodes = NULL;
- oh->oh_count = 0;
- oh->oh_load = 0;
- oh->oh_cap = 0;
- oh->oh_maxload = 0;
- oh->oh_minload = 0; // never shrink below this
- oh->oh_minval = 1;
- oh->oh_maxval = 0x7fffffff;
- oh->oh_dynval =
- nni_random() % (oh->oh_maxval - oh->oh_minval) + oh->oh_minval;
- oh->oh_ctor = ctor;
- oh->oh_dtor = dtor;
- *ohp = oh;
-
- return (0);
-}
-
-void
-nni_objhash_fini(nni_objhash *oh)
-{
- if (oh == NULL) {
- return;
- }
- if (oh->oh_nodes != NULL) {
- nni_free(oh->oh_nodes, oh->oh_cap * sizeof(nni_objhash_node));
- oh->oh_nodes = NULL;
- oh->oh_cap = oh->oh_count = 0;
- oh->oh_load = oh->oh_minload = oh->oh_maxload = 0;
- }
- nni_cv_fini(&oh->oh_cv);
- nni_mtx_fini(&oh->oh_lock);
- NNI_FREE_STRUCT(oh);
-}
-
-// Inspired by Python dict implementation. This probe will visit every
-// cell. We always hash consecutively assigned IDs.
-#define NNI_OBJHASH_NEXTPROBE(h, j) ((((j) *5) + 1) & (h->oh_cap - 1))
-
-// nni_objhash_find_node finds the object hash node associated with a given id.
-// The object hash lock must be held by the caller.
-static nni_objhash_node *
-nni_objhash_find_node(nni_objhash *oh, uint32_t id)
-{
- uint32_t index;
- nni_objhash_node *node;
-
- if (oh->oh_count == 0) {
- return (NULL);
- }
-
- index = id & (oh->oh_cap - 1);
-
- for (;;) {
- node = &oh->oh_nodes[index];
-
- if ((node->on_val == NULL) && (node->on_skips == 0)) {
- return (NULL);
- }
- if (node->on_id == id) {
- return (node);
- }
- index = NNI_OBJHASH_NEXTPROBE(oh, index);
- }
-}
-
-// nni_objhash_find looks up the object, and bumps the reference on it.
-// The caller should drop the reference when done by calling nni_objhash_unref.
-int
-nni_objhash_find(nni_objhash *oh, uint32_t id, void **valp)
-{
- nni_objhash_node *node;
- int rv;
-
- nni_mtx_lock(&oh->oh_lock);
- node = nni_objhash_find_node(oh, id);
-
- if ((node != NULL) && (node->on_val != NULL)) {
- if (valp != NULL) {
- *valp = node->on_val;
- }
- node->on_refcnt++;
- rv = 0;
- } else {
- rv = NNG_ENOENT;
- }
- nni_mtx_unlock(&oh->oh_lock);
- return (rv);
-}
-
-// Resize the object hash. This is called internally with the lock
-// for the object hash held. Grow indicates that this is being called
-// from a function that intends to add data, so extra space is needed.
-static int
-nni_objhash_resize(nni_objhash *oh, int grow)
-{
- size_t newsize;
- size_t oldsize;
- nni_objhash_node *newnodes;
- nni_objhash_node *oldnodes;
- uint32_t i;
-
- if ((!grow) && (oh->oh_count == 0) && (oh->oh_cap != 0)) {
- // Table is empty, and we are unrefing. Lets reclaim the
- // space. Note that this means that allocations which
- // fluctuate between one and zero are going to bang on the
- // allocator a bit. Since such cases should not be very
- // performance sensitive, this is probably okay.
- nni_free(oh->oh_nodes, oh->oh_cap * sizeof(nni_objhash_node));
- oh->oh_cap = 0;
- oh->oh_nodes = NULL;
- oh->oh_minload = 0;
- oh->oh_maxload = 0;
- return (0);
- }
-
- if ((oh->oh_load < oh->oh_maxload) &&
- (oh->oh_load >= oh->oh_minload)) {
- // No resize needed.
- return (0);
- }
-
- oldsize = oh->oh_cap;
- newsize = oh->oh_cap;
-
- newsize = 8;
- while (newsize < (oh->oh_count * 2)) {
- newsize *= 2;
- }
-
- oldnodes = oh->oh_nodes;
- newnodes = nni_alloc(sizeof(nni_objhash_node) * newsize);
- if (newnodes == NULL) {
- return (NNG_ENOMEM);
- }
- memset(newnodes, 0, sizeof(nni_objhash_node) * newsize);
-
- oh->oh_nodes = newnodes;
- oh->oh_cap = newsize;
- if (newsize > 8) {
- oh->oh_minload = newsize / 8;
- oh->oh_maxload = newsize * 2 / 3;
- } else {
- oh->oh_minload = 0;
- oh->oh_maxload = 5;
- }
- for (i = 0; i < oldsize; i++) {
- uint32_t index;
- if (oldnodes[i].on_val == NULL) {
- continue;
- }
- index = oldnodes[i].on_id & (newsize - 1);
- for (;;) {
- if (newnodes[index].on_val == NULL) {
- oh->oh_load++;
- newnodes[index].on_val = oldnodes[i].on_val;
- newnodes[index].on_id = oldnodes[i].on_id;
- newnodes[index].on_refcnt =
- oldnodes[i].on_refcnt;
- break;
- }
- newnodes[index].on_skips++;
- index = NNI_OBJHASH_NEXTPROBE(oh, index);
- }
- }
- if (oldsize != 0) {
- nni_free(oldnodes, sizeof(nni_objhash_node) * oldsize);
- }
- return (0);
-}
-
-void
-nni_objhash_unref(nni_objhash *oh, uint32_t id)
-{
- void * val;
- uint32_t index;
- nni_objhash_node *node;
- nni_objhash_dtor dtor;
-
- nni_mtx_lock(&oh->oh_lock);
-
- dtor = oh->oh_dtor;
-
- node = nni_objhash_find_node(oh, id);
- NNI_ASSERT(node != NULL);
- val = node->on_val;
-
- NNI_ASSERT(node->on_refcnt > 0);
- NNI_ASSERT(node->on_refcnt < 1000000); // reasonable limit, debug only
- node->on_refcnt--;
-
- // If we have further references, we are done, except that if we have
- // only one remaining reference, we might want to wake up another
- // thread blocked in nni_objhash_unref_wait.
- if (node->on_refcnt != 0) {
- if (node->on_refcnt == 1) {
- nni_cv_wake(&oh->oh_cv);
- }
- nni_mtx_unlock(&oh->oh_lock);
- return;
- }
-
- NNI_ASSERT(node->on_refcnt == 0);
- index = id & (oh->oh_cap - 1);
- for (;;) {
- node = &oh->oh_nodes[index];
- if (node->on_id == id) {
- break;
- }
-
- NNI_ASSERT(node->on_skips != 0);
- node->on_skips--;
- if ((node->on_val == NULL) && (node->on_skips == 0)) {
- oh->oh_load--;
- }
- index = NNI_OBJHASH_NEXTPROBE(oh, index);
- }
-
- NNI_ASSERT(node->on_val != NULL);
- NNI_ASSERT(node->on_refcnt == 0);
- NNI_ASSERT(node->on_id == id);
-
- node->on_val = NULL;
- oh->oh_count--;
- if (node->on_skips == 0) {
- oh->oh_load--;
- }
- // Reclaim the buffer if we want, but preserve the limits.
- nni_objhash_resize(oh, 0);
-
- nni_mtx_unlock(&oh->oh_lock);
-
- // Now run the destructor.
- dtor(val);
-}
-
-void
-nni_objhash_unref_wait(nni_objhash *oh, uint32_t id)
-{
- void * val;
- uint32_t index;
- nni_objhash_node *node;
- nni_objhash_dtor dtor;
-
- nni_mtx_lock(&oh->oh_lock);
-
- dtor = oh->oh_dtor;
-
- node = nni_objhash_find_node(oh, id);
- NNI_ASSERT(node != NULL);
- NNI_ASSERT(node->on_refcnt > 0);
- val = node->on_val;
-
- while (node->on_refcnt != 1) {
- nni_cv_wait(&oh->oh_cv);
- // If the table resizes, it can invalidate our old node.
- node = nni_objhash_find_node(oh, id);
- }
- node->on_refcnt--;
- NNI_ASSERT(node->on_refcnt == 0);
-
- index = id & (oh->oh_cap - 1);
- for (;;) {
- node = &oh->oh_nodes[index];
- if (node->on_id == id) {
- break;
- }
-
- NNI_ASSERT(node->on_skips != 0);
- node->on_skips--;
- if ((node->on_val == NULL) && (node->on_skips == 0)) {
- oh->oh_load--;
- }
- index = NNI_OBJHASH_NEXTPROBE(oh, index);
- }
-
- NNI_ASSERT(node->on_val != NULL);
- NNI_ASSERT(node->on_refcnt == 0);
- NNI_ASSERT(node->on_id == id);
-
- node->on_val = NULL;
- oh->oh_count--;
- if (node->on_skips == 0) {
- oh->oh_load--;
- }
- // Reclaim the buffer if we want, but preserve the limits.
- nni_objhash_resize(oh, 0);
-
- nni_mtx_unlock(&oh->oh_lock);
-
- // Now run the destructor.
- dtor(val);
-}
-
-// Allocate a new object hash entry. Note that this will execute the
-// constructor with the object hash lock held. Consequently, code that
-// runs the constructor must not run for long periods of time, since that
-// can block all other uses of the object hash.
-int
-nni_objhash_alloc(nni_objhash *oh, uint32_t *idp, void **valp)
-{
- uint32_t id;
- uint32_t index;
- nni_objhash_node *node;
-
- nni_mtx_lock(&oh->oh_lock);
-
- if (oh->oh_count > (oh->oh_maxval - oh->oh_minval)) {
- // Really more like ENOSPC.. the table is filled to max.
- nni_mtx_unlock(&oh->oh_lock);
- return (NNG_ENOMEM);
- }
-
- nni_objhash_resize(oh, 1);
-
- for (;;) {
- id = oh->oh_dynval;
- oh->oh_dynval++;
- if ((oh->oh_dynval > oh->oh_maxval) ||
- (oh->oh_dynval < oh->oh_minval)) {
- oh->oh_dynval = oh->oh_minval;
- }
-
- if (nni_objhash_find_node(oh, id) == NULL) {
- // We can use this ID, great!
- break;
- }
- }
-
- // We know the ID we're going to use, but we have to walk again,
- // because we need to note whether we had to skip (probe), and mark
- // them so they don't get nuked along the way.
- // check to see if anything is located there.
- index = id & (oh->oh_cap - 1);
- for (;;) {
- node = &oh->oh_nodes[index];
- if (node->on_val == NULL) {
- break;
- }
- NNI_ASSERT(node->on_id != id);
- node->on_skips++;
- index = NNI_OBJHASH_NEXTPROBE(oh, index);
- }
-
- NNI_ASSERT(node->on_refcnt == 0);
- node->on_id = id;
- node->on_refcnt++;
-
- node->on_val = oh->oh_ctor(id);
-
- if (node->on_val == NULL) {
- // Constructor failed; walk *again* to undo the skip
- // increments.
- node->on_refcnt--;
- index = id & (oh->oh_cap - 1);
- for (;;) {
- node = &oh->oh_nodes[index];
- if (node->on_val == NULL) {
- NNI_ASSERT(node->on_id == id);
- break;
- }
- NNI_ASSERT(node->on_skips != 0);
- node->on_skips--;
- index = NNI_OBJHASH_NEXTPROBE(oh, index);
- }
-
- nni_mtx_unlock(&oh->oh_lock);
- return (NNG_ENOMEM); // no other return from ctor
- }
-
- oh->oh_count++;
- if (node->on_skips == 0) {
- oh->oh_load++;
- }
- *valp = node->on_val;
- *idp = id;
-
- NNI_ASSERT(node->on_refcnt == 1);
-
- nni_mtx_unlock(&oh->oh_lock);
- return (0);
-}
-
-size_t
-nni_objhash_count(nni_objhash *oh)
-{
- return (oh->oh_count);
-}
diff --git a/src/core/objhash.h b/src/core/objhash.h
deleted file mode 100644
index 97f666b8..00000000
--- a/src/core/objhash.h
+++ /dev/null
@@ -1,58 +0,0 @@
-//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#ifndef CORE_OBJHASH_H
-#define CORE_OBJHASH_H
-
-#include "core/nng_impl.h"
-
-// Object Hash. This is a generic object manager, which lets us deal
-// with reference counting of objects, and provides a unique ID for
-// objects that will not generally be reused. Object Hash manages it's
-// own locking. Object IDs start from a random positive value, and
-// generally increment. The ID assigned to an object will always be
-// positive.
-//
-// Similar to our linked lists, consumers must supply a node structure
-// in their object. The implementation uses this for reference counting
-// and so forth.
-//
-// In terms of implementation, the underlying hash uses open addressing,
-// combined with an improved probe (taken from Python) to avoid collisions.
-// Our algorithm just uses the low order bits, and we use table sizes that
-// are powers of two to make the modulo dirt cheap.
-//
-
-typedef struct nni_objhash nni_objhash;
-typedef struct nni_objhash_node nni_objhash_node;
-
-// Object constructor function. This is expected to allocate an object.
-// It takes the generated object ID as an argument, which it can store on
-// the object itself. It should return NULL if resources cannot be allocated;
-// there are no other valid reasons for this to fail.
-typedef void *(*nni_objhash_ctor)(uint32_t);
-
-// Object destructor function. This should release any resources and perform
-// any other deinitialization.
-typedef void (*nni_objhash_dtor)(void *);
-
-// nni_objhash_init initializes the object hash; the constructor and and
-// destructor functions are supplied.
-extern int nni_objhash_init(
- nni_objhash **, nni_objhash_ctor, nni_objhash_dtor);
-
-extern void nni_objhash_fini(nni_objhash *);
-
-extern int nni_objhash_find(nni_objhash *, uint32_t, void **);
-extern void nni_objhash_unref(nni_objhash *, uint32_t);
-extern void nni_objhash_unref_wait(nni_objhash *, uint32_t);
-extern int nni_objhash_alloc(nni_objhash *, uint32_t *, void **);
-extern size_t nni_objhash_count(nni_objhash *);
-
-#endif // CORE_OBJHASH_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 240af252..46ed2100 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -14,9 +14,9 @@
// Socket implementation.
-static nni_objhash *nni_socks = NULL;
-static nni_list nni_sock_list;
-static nni_mtx nni_sock_lk;
+static nni_list nni_sock_list;
+static nni_idhash *nni_sock_hash;
+static nni_mtx nni_sock_lk;
uint32_t
nni_sock_id(nni_sock *s)
@@ -42,52 +42,53 @@ int
nni_sock_find(nni_sock **sockp, uint32_t id)
{
int rv;
- nni_sock *sock;
+ nni_sock *s;
if ((rv = nni_init()) != 0) {
return (rv);
}
- if ((rv = nni_objhash_find(nni_socks, id, (void **) &sock)) != 0) {
- return (rv);
- }
- nni_mtx_lock(&sock->s_mx);
- if ((sock->s_closed) || (sock->s_data == NULL)) {
- nni_objhash_unref(nni_socks, id);
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- nni_mtx_unlock(&sock->s_mx);
-
- if (sockp != NULL) {
- *sockp = sock;
+ nni_mtx_lock(&nni_sock_lk);
+ if ((rv = nni_idhash_find(nni_sock_hash, id, (void **) &s)) == 0) {
+ if (s->s_closed) {
+ rv = NNG_ECLOSED;
+ } else {
+ s->s_refcnt++;
+ *sockp = s;
+ }
}
+ nni_mtx_unlock(&nni_sock_lk);
- return (0);
+ return (rv);
}
void
-nni_sock_rele(nni_sock *sock)
+nni_sock_rele(nni_sock *s)
{
- nni_objhash_unref(nni_socks, sock->s_id);
+ nni_mtx_lock(&s->s_mx);
+ s->s_refcnt--;
+ if (s->s_closing) {
+ nni_cv_wake(&s->s_cv);
+ }
+ nni_mtx_unlock(&s->s_mx);
}
static int
nni_sock_pipe_start(nni_pipe *pipe)
{
- nni_sock *sock = pipe->p_sock;
+ nni_sock *s = pipe->p_sock;
void * pdata = nni_pipe_get_proto_data(pipe);
int rv;
- NNI_ASSERT(sock != NULL);
- if (sock->s_closing) {
+ NNI_ASSERT(s != NULL);
+ if (s->s_closing) {
// We're closing, bail out.
return (NNG_ECLOSED);
}
- if (nni_pipe_peer(pipe) != sock->s_peer_id.p_id) {
+ if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
// Peer protocol mismatch.
return (NNG_EPROTO);
}
- if ((rv = sock->s_pipe_ops.pipe_start(pdata)) != 0) {
+ if ((rv = s->s_pipe_ops.pipe_start(pdata)) != 0) {
// Protocol rejection for other reasons.
// E.g. pair and already have active connected partner.
return (rv);
@@ -113,39 +114,38 @@ nni_sock_pipe_start_cb(void *arg)
}
int
-nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
+nni_sock_pipe_add(nni_sock *s, nni_ep *ep, nni_pipe *pipe)
{
int rv;
// Initialize protocol pipe data.
- nni_mtx_lock(&sock->s_mx);
+ nni_mtx_lock(&s->s_mx);
nni_mtx_lock(&ep->ep_mtx);
- if ((sock->s_closing) || (ep->ep_closed)) {
+ if ((s->s_closing) || (ep->ep_closed)) {
nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
rv = nni_aio_init(&pipe->p_start_aio, nni_sock_pipe_start_cb, pipe);
if (rv != 0) {
nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
return (rv);
}
- rv = sock->s_pipe_ops.pipe_init(
- &pipe->p_proto_data, pipe, sock->s_data);
+ rv = s->s_pipe_ops.pipe_init(&pipe->p_proto_data, pipe, s->s_data);
if (rv != 0) {
nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_lock(&sock->s_mx);
+ nni_mtx_lock(&s->s_mx);
return (rv);
}
// Save the protocol destructor.
- pipe->p_proto_dtor = sock->s_pipe_ops.pipe_fini;
- pipe->p_sock = sock;
+ pipe->p_proto_dtor = s->s_pipe_ops.pipe_fini;
+ pipe->p_sock = s;
pipe->p_ep = ep;
- nni_list_append(&sock->s_pipes, pipe);
+ nni_list_append(&s->s_pipes, pipe);
nni_list_append(&ep->ep_pipes, pipe);
// Start the initial negotiation I/O...
@@ -159,7 +159,7 @@ nni_sock_pipe_add(nni_sock *sock, nni_ep *ep, nni_pipe *pipe)
}
nni_mtx_unlock(&ep->ep_mtx);
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
return (0);
}
@@ -305,92 +305,83 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
NNI_FREE_STRUCT(notify);
}
-static void *
-nni_sock_ctor(uint32_t id)
+static void
+nni_sock_destroy(nni_sock *s)
{
- int rv;
- nni_sock *sock;
-
- if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
- return (NULL);
- }
- // s_protocol, s_peer, and s_flags undefined as yet.
- sock->s_linger = 0;
- sock->s_sndtimeo = -1;
- sock->s_rcvtimeo = -1;
- sock->s_closing = 0;
- sock->s_reconn = NNI_SECOND;
- sock->s_reconnmax = 0;
- sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
- sock->s_id = id;
- NNI_LIST_NODE_INIT(&sock->s_node);
-
- nni_pipe_sock_list_init(&sock->s_pipes);
-
- nni_ep_list_init(&sock->s_eps);
-
- sock->s_send_fd.sn_init = 0;
- sock->s_recv_fd.sn_init = 0;
-
- if (((rv = nni_mtx_init(&sock->s_mx)) != 0) ||
- ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) {
- goto fail;
+ if (s == NULL) {
+ return;
}
- rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock);
- if (rv != 0) {
- goto fail;
+ // Close any open notification pipes.
+ if (s->s_recv_fd.sn_init) {
+ nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
}
- rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock);
- if (rv != 0) {
- goto fail;
+ if (s->s_send_fd.sn_init) {
+ nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd);
}
- if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) ||
- ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) {
- goto fail;
+ // The protocol needs to clean up its state.
+ if (s->s_data != NULL) {
+ s->s_sock_ops.sock_fini(s->s_data);
}
- return (sock);
-
-fail:
- nni_ev_fini(&sock->s_send_ev);
- nni_ev_fini(&sock->s_recv_ev);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
- return (NULL);
+ nni_ev_fini(&s->s_send_ev);
+ nni_ev_fini(&s->s_recv_ev);
+ nni_msgq_fini(s->s_urq);
+ nni_msgq_fini(s->s_uwq);
+ nni_cv_fini(&s->s_cv);
+ nni_mtx_fini(&s->s_mx);
+ NNI_FREE_STRUCT(s);
}
-static void
-nni_sock_dtor(void *ptr)
+static int
+nni_sock_create(nni_sock **sp, const nni_proto *proto)
{
- nni_sock *sock = ptr;
-
- // Close any open notification pipes.
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_close(
- sock->s_recv_fd.sn_wfd, sock->s_recv_fd.sn_rfd);
- }
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_close(
- sock->s_send_fd.sn_wfd, sock->s_send_fd.sn_rfd);
- }
+ int rv;
+ nni_sock *s;
- // The protocol needs to clean up its state.
- if (sock->s_data != NULL) {
- sock->s_sock_ops.sock_fini(sock->s_data);
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ s->s_linger = 0;
+ s->s_sndtimeo = -1;
+ s->s_rcvtimeo = -1;
+ s->s_closing = 0;
+ s->s_reconn = NNI_SECOND;
+ s->s_reconnmax = 0;
+ s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
+ s->s_id = 0;
+ s->s_refcnt = 0;
+ s->s_send_fd.sn_init = 0;
+ s->s_recv_fd.sn_init = 0;
+ s->s_self_id = proto->proto_self;
+ s->s_peer_id = proto->proto_peer;
+ s->s_flags = proto->proto_flags;
+ s->s_sock_ops = *proto->proto_sock_ops;
+ s->s_pipe_ops = *proto->proto_pipe_ops;
+
+ NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
+ NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
+
+ NNI_ASSERT(s->s_pipe_ops.pipe_start != NULL);
+ NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL);
+
+ NNI_LIST_NODE_INIT(&s->s_node);
+ nni_pipe_sock_list_init(&s->s_pipes);
+ nni_ep_list_init(&s->s_eps);
+
+ if (((rv = nni_mtx_init(&s->s_mx)) != 0) ||
+ ((rv = nni_cv_init(&s->s_cv, &s->s_mx)) != 0) ||
+ ((rv = nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s)) != 0) ||
+ ((rv = nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s)) != 0) ||
+ ((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
+ ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
+ ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) {
+ nni_sock_destroy(s);
+ return (rv);
}
-
- nni_ev_fini(&sock->s_send_ev);
- nni_ev_fini(&sock->s_recv_ev);
- nni_msgq_fini(sock->s_urq);
- nni_msgq_fini(sock->s_uwq);
- nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_mx);
- NNI_FREE_STRUCT(sock);
+ *sp = s;
+ return (rv);
}
int
@@ -399,10 +390,11 @@ nni_sock_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
- if (((rv = nni_objhash_init(
- &nni_socks, nni_sock_ctor, nni_sock_dtor)) != 0) ||
+ if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) ||
((rv = nni_mtx_init(&nni_sock_lk)) != 0)) {
nni_sock_sys_fini();
+ } else {
+ nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
}
return (rv);
}
@@ -410,15 +402,14 @@ nni_sock_sys_init(void)
void
nni_sock_sys_fini(void)
{
- nni_objhash_fini(nni_socks);
- nni_socks = NULL;
+ nni_idhash_fini(nni_sock_hash);
nni_mtx_fini(&nni_sock_lk);
}
int
nni_sock_open(nni_sock **sockp, const nni_proto *proto)
{
- nni_sock *sock;
+ nni_sock *s = NULL;
int rv;
uint32_t sockid;
@@ -427,41 +418,23 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
return (NNG_ENOTSUP);
}
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
-
- rv = nni_objhash_alloc(nni_socks, &sockid, (void **) &sock);
- if (rv != 0) {
- return (rv);
- }
-
- // We make a copy of the protocol operations.
- sock->s_self_id = proto->proto_self;
- sock->s_peer_id = proto->proto_peer;
- sock->s_flags = proto->proto_flags;
- sock->s_sock_ops = *proto->proto_sock_ops;
- sock->s_pipe_ops = *proto->proto_pipe_ops;
-
- NNI_ASSERT(sock->s_sock_ops.sock_open != NULL);
- NNI_ASSERT(sock->s_sock_ops.sock_close != NULL);
-
- NNI_ASSERT(sock->s_pipe_ops.pipe_start != NULL);
- NNI_ASSERT(sock->s_pipe_ops.pipe_stop != NULL);
-
- if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) {
- nni_objhash_unref(nni_socks, sockid);
+ if (((rv = nni_init()) != 0) ||
+ ((rv = nni_sock_create(&s, proto)) != 0)) {
+ nni_sock_destroy(s);
return (rv);
}
- sock->s_sock_ops.sock_open(sock->s_data);
-
nni_mtx_lock(&nni_sock_lk);
- nni_list_append(&nni_sock_list, sock);
+ if ((rv = nni_idhash_alloc(nni_sock_hash, &s->s_id, s)) != 0) {
+ nni_sock_destroy(s);
+ } else {
+ nni_list_append(&nni_sock_list, s);
+ s->s_sock_ops.sock_open(s->s_data);
+ *sockp = s;
+ }
nni_mtx_unlock(&nni_sock_lk);
- *sockp = sock;
- return (0);
+ return (rv);
}
// nni_sock_shutdown shuts down the socket; after this point no further
@@ -591,37 +564,39 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
// after this function is called, as the pointer may reference invalid
// memory or other objects.
void
-nni_sock_close(nni_sock *sock)
+nni_sock_close(nni_sock *s)
{
// Shutdown everything if not already done. This operation
// is idempotent.
- nni_sock_shutdown(sock);
+ nni_sock_shutdown(s);
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closed) {
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_lock(&nni_sock_lk);
+ if (s->s_closed) {
+ // Some other thread called close. All we need to do is
+ // drop our reference count.
+ nni_mtx_unlock(&nni_sock_lk);
+ nni_sock_rele(s);
return;
}
- sock->s_closed = 1;
- nni_mtx_unlock(&sock->s_mx);
+ s->s_closed = 1;
+ nni_idhash_remove(nni_sock_hash, s->s_id);
- nni_mtx_lock(&nni_sock_lk);
- nni_list_node_remove(&sock->s_node);
- nni_mtx_unlock(&nni_sock_lk);
+ // We might have been removed from the list already, e.g. by
+ // nni_sock_closeall. This is idempotent.
+ nni_list_node_remove(&s->s_node);
- // At this point nothing else should be referencing us.
- // As with UNIX close, it is a gross error for the caller
- // to have concurrent threads using this. We've taken care to
- // ensure that any active consumers have been stopped, but if
- // user code attempts to utilize the socket *after* this point,
- // the results may be tragic.
+ nni_mtx_unlock(&nni_sock_lk);
- // Unreference twice. First drops the reference our caller
- // acquired to start the open, and the second (blocking) one
- // is the reference created for us at socket creation.
+ // Wait for all other references to drop. Note that we
+ // have a reference already (from our caller).
+ nni_mtx_lock(&s->s_mx);
+ while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_pipes)) ||
+ (!nni_list_empty(&s->s_eps))) {
+ nni_cv_wait(&s->s_cv);
+ }
+ nni_mtx_unlock(&s->s_mx);
- nni_objhash_unref(nni_socks, sock->s_id);
- nni_objhash_unref_wait(nni_socks, sock->s_id);
+ nni_sock_destroy(s);
}
void
@@ -636,13 +611,12 @@ nni_sock_closeall(void)
nni_mtx_unlock(&nni_sock_lk);
return;
}
- id = s->s_id;
+ // Bump the reference count. The close call below will
+ // drop it.
+ s->s_refcnt++;
nni_list_node_remove(&s->s_node);
nni_mtx_unlock(&nni_sock_lk);
-
- if (nni_sock_find(&s, id) == 0) {
- nni_sock_close(s);
- }
+ nni_sock_close(s);
}
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 6824b04b..76ebca12 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -15,41 +15,39 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
struct nni_socket {
- nni_mtx s_mx;
- nni_cv s_cv;
+ nni_list_node s_node;
+ nni_mtx s_mx;
+ nni_cv s_cv;
+ nni_cv s_close_cv;
uint32_t s_id;
+ uint32_t s_flags;
+ unsigned s_refcnt; // protected by global lock
+ void * s_data; // Protocol private
nni_msgq *s_uwq; // Upper write queue
nni_msgq *s_urq; // Upper read queue
- nni_list_node s_node;
-
nni_proto_id s_self_id;
nni_proto_id s_peer_id;
- uint32_t s_flags;
-
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
- void *s_data; // Protocol private
-
// XXX: options
nni_duration s_linger; // linger time
nni_duration s_sndtimeo; // send timeout
nni_duration s_rcvtimeo; // receive timeout
nni_duration s_reconn; // reconnect time
nni_duration s_reconnmax; // max reconnect time
+ size_t s_rcvmaxsz; // maximum receive size
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
- size_t s_rcvmaxsz; // maximum receive size
-
int s_ep_pend; // EP dial/listen in progress
int s_closing; // Socket is closing
- int s_closed; // Socket closed
+ int s_closed; // Socket closed, protected by global lock
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
int s_recverr; // Protocol state machine use
@@ -59,8 +57,6 @@ struct nni_socket {
nni_notifyfd s_send_fd;
nni_notifyfd s_recv_fd;
-
- uint32_t s_nextid; // Next Pipe ID.
};
extern int nni_sock_sys_init(void);
diff --git a/src/nng_compat.c b/src/nng_compat.c
index 34fc6553..e03ff42d 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -41,6 +41,7 @@ static struct {
{ NNG_EMSGSIZE, EMSGSIZE },
{ NNG_ECONNABORTED, ECONNABORTED },
{ NNG_ECONNRESET, ECONNRESET },
+ { NNG_ECANCELED, EBADF },
{ 0, 0 },
// clang-format on
};