summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/core/idhash.c87
-rw-r--r--src/core/idhash.h25
-rw-r--r--src/core/init.c26
-rw-r--r--src/core/init.h12
-rw-r--r--src/core/socket.c85
-rw-r--r--src/core/socket.h3
-rw-r--r--src/nng.c11
-rw-r--r--src/nng.h6
-rw-r--r--src/protocol/bus/bus.c1
-rw-r--r--src/protocol/reqrep/rep.c12
-rw-r--r--src/protocol/survey/respond.c12
12 files changed, 184 insertions, 98 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e70074c2..2df849ae 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -32,7 +32,9 @@ set (NNG_SOURCES
core/clock.c
core/clock.h
core/endpt.c
+ core/endpt.h
core/event.c
+ core/event.h
core/idhash.c
core/idhash.h
core/init.c
diff --git a/src/core/idhash.c b/src/core/idhash.c
index 0b680bc9..07368a65 100644
--- a/src/core/idhash.c
+++ b/src/core/idhash.c
@@ -11,54 +11,58 @@
#include <string.h>
-typedef struct {
+struct nni_idhash_entry {
uint32_t ihe_key;
uint32_t ihe_skips;
void * ihe_val;
-} nni_idhash_entry;
-
-struct nni_idhash {
- uint32_t ih_cap;
- uint32_t ih_count;
- uint32_t ih_load;
- uint32_t ih_minload; // considers placeholders
- uint32_t ih_maxload;
- uint32_t ih_walkers;
- uint32_t ih_minval;
- uint32_t ih_maxval;
- uint32_t ih_dynval;
- nni_idhash_entry * ih_entries;
};
+
int
-nni_idhash_create(nni_idhash **hp)
+nni_idhash_init(nni_idhash *h)
{
- nni_idhash *h;
-
- if ((h = NNI_ALLOC_STRUCT(h)) == NULL) {
- return (NNG_ENOMEM);
- }
- h->ih_entries = nni_alloc(8 * sizeof (nni_idhash_entry));
- if (h->ih_entries == NULL) {
- NNI_FREE_STRUCT(h);
- return (NNG_ENOMEM);
- }
- (void) memset(h->ih_entries, 0, (8 * sizeof (nni_idhash_entry)));
+ h->ih_entries = NULL;
h->ih_count = 0;
h->ih_load = 0;
- h->ih_cap = 8;
- h->ih_maxload = 5;
+ h->ih_cap = 0;
+ h->ih_maxload = 0;
h->ih_minload = 0; // never shrink below this
h->ih_walkers = 0;
h->ih_minval = 0;
h->ih_maxval = 0xffffffff;
h->ih_dynval = 0;
- *hp = h;
return (0);
}
void
+nni_idhash_fini(nni_idhash *h)
+{
+ NNI_ASSERT(h->ih_walkers == 0);
+ if (h->ih_entries != NULL) {
+ nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry));
+ h->ih_entries = NULL;
+ h->ih_cap = h->ih_count = 0;
+ h->ih_load = h->ih_minload = h->ih_maxload = 0;
+ }
+}
+
+
+void
+nni_idhash_reclaim(nni_idhash *h)
+{
+ // Reclaim the buffer if we want, but preserve the limits.
+ if ((h->ih_count == 0) && (h->ih_cap != 0) && (h->ih_walkers == 0)) {
+ nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry));
+ h->ih_cap = 0;
+ h->ih_entries = NULL;
+ h->ih_minload = 0;
+ h->ih_maxload = 0;
+ }
+}
+
+
+void
nni_idhash_set_limits(nni_idhash *h, uint32_t minval, uint32_t maxval,
uint32_t start)
{
@@ -71,16 +75,6 @@ nni_idhash_set_limits(nni_idhash *h, uint32_t minval, uint32_t maxval,
}
-void
-nni_idhash_destroy(nni_idhash *h)
-{
- if (h != NULL) {
- nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry));
- NNI_FREE_STRUCT(h);
- }
-}
-
-
// Inspired by Python dict implementation. This probe will visit every
// cell. We always hash consecutively assigned IDs.
#define NNI_IDHASH_NEXTPROBE(h, j) \
@@ -91,6 +85,10 @@ nni_idhash_find(nni_idhash *h, uint32_t id, void **valp)
{
uint32_t index = id & (h->ih_cap - 1);
+ if (h->ih_count == 0) {
+ return (NNG_ENOENT);
+ }
+
for (;;) {
if ((h->ih_entries[index].ihe_val == NULL) &&
(h->ih_entries[index].ihe_skips == 0)) {
@@ -165,7 +163,9 @@ nni_hash_resize(nni_idhash *h)
index = NNI_IDHASH_NEXTPROBE(h, index);
}
}
- nni_free(oldents, sizeof (nni_idhash_entry) * oldsize);
+ if (oldsize != 0) {
+ nni_free(oldents, sizeof (nni_idhash_entry) * oldsize);
+ }
return (0);
}
@@ -286,11 +286,10 @@ nni_idhash_alloc(nni_idhash *h, uint32_t *idp, void *val)
}
-int
-nni_idhash_count(nni_idhash *h, uint32_t *countp)
+size_t
+nni_idhash_count(nni_idhash *h)
{
- *countp = h->ih_count;
- return (0);
+ return (h->ih_count);
}
diff --git a/src/core/idhash.h b/src/core/idhash.h
index e8876ad5..4616ccde 100644
--- a/src/core/idhash.h
+++ b/src/core/idhash.h
@@ -22,7 +22,23 @@
// use table sizes that are powers of two. Note that hash items
// must be non-NULL. The table is locked.
-typedef struct nni_idhash nni_idhash;
+typedef struct nni_idhash nni_idhash;
+typedef struct nni_idhash_entry nni_idhash_entry;
+
+// The details of the nni_idhash are "private". But they let us inline
+// this into structures.
+struct nni_idhash {
+ size_t ih_cap;
+ size_t ih_count;
+ size_t ih_load;
+ size_t ih_minload; // considers placeholders
+ size_t ih_maxload;
+ uint32_t ih_walkers;
+ uint32_t ih_minval;
+ uint32_t ih_maxval;
+ uint32_t ih_dynval;
+ nni_idhash_entry * ih_entries;
+};
// nni_idhash_walkfn is called when walking a hash table. If the
// return value is non-zero, then nni_idhash_walk will terminate further
@@ -32,14 +48,17 @@ typedef struct nni_idhash nni_idhash;
// Note that the walkfn must not attempt to change the hash table.
// The user must provide any locking needed.
typedef int (*nni_idhash_walkfn)(void *, uint32_t, void *);
-extern int nni_idhash_create(nni_idhash **);
+extern int nni_idhash_init(nni_idhash *);
+extern void nni_idhash_fini(nni_idhash *);
+extern void nni_idhash_reclaim(nni_idhash *);
extern void nni_idhash_set_limits(nni_idhash *, uint32_t, uint32_t, uint32_t);
+extern int nni_idhash_create(nni_idhash **);
extern void nni_idhash_destroy(nni_idhash *);
extern int nni_idhash_find(nni_idhash *, uint32_t, void **);
extern int nni_idhash_remove(nni_idhash *, uint32_t);
extern int nni_idhash_insert(nni_idhash *, uint32_t, void *);
extern int nni_idhash_alloc(nni_idhash *, uint32_t *, void *);
-extern int nni_idhash_count(nni_idhash *, uint32_t *);
+extern size_t nni_idhash_count(nni_idhash *);
extern int nni_idhash_walk(nni_idhash *, nni_idhash_walkfn, void *);
#endif // CORE_IDHASH_H
diff --git a/src/core/init.c b/src/core/init.c
index 48a5096c..8ada0247 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -15,6 +15,10 @@ nni_idhash *nni_endpoints;
nni_idhash *nni_pipes;
nni_idhash *nni_sockets;
nni_mtx *nni_idlock;
+
+static nni_idhash nni_endpoints_x;
+static nni_idhash nni_pipes_x;
+static nni_idhash nni_sockets_x;
static nni_mtx nni_idlock_x;
static int
@@ -28,17 +32,23 @@ nni_init_helper(void)
if ((rv = nni_mtx_init(&nni_idlock_x)) != 0) {
return (rv);
}
- if (((rv = nni_idhash_create(&nni_endpoints)) != 0) ||
- ((rv = nni_idhash_create(&nni_pipes)) != 0) ||
- ((rv = nni_idhash_create(&nni_sockets)) != 0)) {
+ if (((rv = nni_idhash_init(&nni_endpoints_x)) != 0) ||
+ ((rv = nni_idhash_init(&nni_pipes_x)) != 0) ||
+ ((rv = nni_idhash_init(&nni_sockets_x)) != 0)) {
nni_mtx_fini(&nni_idlock_x);
nni_random_fini();
return (rv);
}
- nni_idhash_set_limits(nni_pipes, 1, 0x7fffffff,
+ nni_idhash_set_limits(&nni_pipes_x, 1, 0x7fffffff,
nni_random() & 0x7fffffff);
- nni_idhash_set_limits(nni_sockets, 1, 0xffffffff, 1);
+ nni_idhash_set_limits(&nni_sockets_x, 1, 0x7fffffff, 1);
+ nni_idhash_set_limits(&nni_endpoints_x, 1, 0xffffffff, 1);
+
nni_idlock = &nni_idlock_x;
+ nni_pipes = &nni_pipes_x;
+ nni_endpoints = &nni_endpoints_x;
+ nni_sockets = &nni_sockets_x;
+
nni_tran_init();
return (0);
}
@@ -54,9 +64,9 @@ nni_init(void)
void
nni_fini(void)
{
- nni_idhash_destroy(nni_endpoints);
- nni_idhash_destroy(nni_pipes);
- nni_idhash_destroy(nni_sockets);
+ nni_idhash_fini(&nni_endpoints_x);
+ nni_idhash_fini(&nni_pipes_x);
+ nni_idhash_fini(&nni_sockets_x);
nni_mtx_fini(&nni_idlock_x);
nni_tran_fini();
nni_random_fini();
diff --git a/src/core/init.h b/src/core/init.h
index bae78fc3..e48b6705 100644
--- a/src/core/init.h
+++ b/src/core/init.h
@@ -15,17 +15,17 @@
// nni_init is called each time the user enters the library. It ensures that
// the library is initlialized properly, and also deals with checks such as
// whether the process has forked since last initialization.
-extern int nni_init(void);
+int nni_init(void);
// nni_fini tears everything down. In the future it may be used to ensure
// that all resources used by the library are released back to the system.
-extern void nni_fini(void);
+void nni_fini(void);
// Private hash tables matching IDs to values. Consumers need to use the
// nni_idlock to protect access to these.
-nni_idhash *nni_endpoints;
-nni_idhash *nni_pipes;
-nni_idhash *nni_sockets;
-nni_mtx *nni_idlock;
+extern nni_mtx *nni_idlock;
+extern nni_idhash *nni_endpoints;
+extern nni_idhash *nni_pipes;
+extern nni_idhash *nni_sockets;
#endif // CORE_INIT_H
diff --git a/src/core/socket.c b/src/core/socket.c
index b1f8f2c4..52df70a2 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -47,34 +47,58 @@ nni_sock_hold(nni_sock **sockp, uint32_t id)
}
nni_mtx_lock(nni_idlock);
rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
+ if ((rv != 0) || (sock->s_closed)) {
+ nni_mtx_unlock(nni_idlock);
+ return (NNG_ECLOSED);
+ }
+ sock->s_refcnt++;
nni_mtx_unlock(nni_idlock);
+ *sockp = sock;
- if (rv == 0) {
- if (sock->s_closing) {
- rv = NNG_ECLOSED;
- } else {
- nni_mtx_lock(&sock->s_mx);
- sock->s_refcnt++;
- nni_mtx_unlock(&sock->s_mx);
- *sockp = sock;
- }
- }
- if (rv == NNG_ENOENT) {
- rv = NNG_ECLOSED;
- }
- return (rv);
+ return (0);
}
void
nni_sock_rele(nni_sock *sock)
{
- nni_mtx_lock(&sock->s_mx);
+ nni_mtx_lock(nni_idlock);
sock->s_refcnt--;
- if (sock->s_closing) {
- nni_cv_wake(&sock->s_cv);
+ if ((sock->s_closed) && (sock->s_refcnt == 1)) {
+ nni_cv_wake(&sock->s_refcv);
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(nni_idlock);
+}
+
+
+// nni_sock_hold_close is a special hold acquired by the nng_close
+// function. This waits until it has exclusive access, and then marks
+// the socket unusuable by anything else.
+int
+nni_sock_hold_close(nni_sock **sockp, uint32_t id)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
+ if (rv != 0) {
+ nni_mtx_unlock(nni_idlock);
+ return (NNG_ECLOSED);
+ }
+ sock->s_closed = 1;
+ sock->s_refcnt++;
+ while (sock->s_refcnt != 1) {
+ nni_cv_wait(&sock->s_refcv);
+ }
+ nni_mtx_unlock(nni_idlock);
+ *sockp = sock;
+
+ return (0);
}
@@ -290,6 +314,10 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
+ if ((rv = nni_cv_init(&sock->s_refcv, nni_idlock)) != 0) {
+ goto fail;
+ }
+
rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RECV, sock);
if (rv != 0) {
goto fail;
@@ -355,6 +383,11 @@ fail:
if (sock->s_id != 0) {
nni_mtx_lock(nni_idlock);
nni_idhash_remove(nni_sockets, sock->s_id);
+ if (nni_idhash_count(nni_sockets) == 0) {
+ nni_idhash_reclaim(nni_pipes);
+ nni_idhash_reclaim(nni_endpoints);
+ nni_idhash_reclaim(nni_sockets);
+ }
nni_mtx_unlock(nni_idlock);
}
nni_thr_fini(&sock->s_notifier);
@@ -363,6 +396,7 @@ fail:
nni_ev_fini(&sock->s_recv_ev);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
+ nni_cv_fini(&sock->s_refcv);
nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_notify_mx);
@@ -472,7 +506,8 @@ nni_sock_shutdown(nni_sock *sock)
// nni_sock_close shuts down the socket, then releases any resources
// associated with it. It is a programmer error to reference the socket
// after this function is called, as the pointer may reference invalid
-// memory or other objects.
+// memory or other objects. The socket should have been acquired with
+// nni_sock_hold_close().
void
nni_sock_close(nni_sock *sock)
{
@@ -482,11 +517,6 @@ nni_sock_close(nni_sock *sock)
// Shutdown everything if not already done. This operation
// is idempotent.
nni_sock_shutdown(sock);
- nni_mtx_lock(&sock->s_mx);
- while (sock->s_refcnt > 1) {
- nni_cv_wait(&sock->s_cv);
- }
- nni_mtx_unlock(&sock->s_mx);
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller
@@ -497,6 +527,12 @@ nni_sock_close(nni_sock *sock)
nni_mtx_lock(nni_idlock);
nni_idhash_remove(nni_sockets, sock->s_id);
+ if (nni_idhash_count(nni_sockets) == 0) {
+ nni_idhash_reclaim(nni_pipes);
+ nni_idhash_reclaim(nni_endpoints);
+ nni_idhash_reclaim(nni_sockets);
+ }
+
nni_mtx_unlock(nni_idlock);
// The protocol needs to clean up its state.
@@ -516,6 +552,7 @@ nni_sock_close(nni_sock *sock)
nni_msgq_fini(sock->s_uwq);
nni_ev_fini(&sock->s_send_ev);
nni_ev_fini(&sock->s_recv_ev);
+ nni_cv_fini(&sock->s_refcv);
nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_notify_mx);
diff --git a/src/core/socket.h b/src/core/socket.h
index 2a30fae5..2f34a038 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -19,6 +19,7 @@ struct nni_socket {
uint32_t s_id;
uint32_t s_refcnt;
+ nni_cv s_refcv;
nni_msgq * s_uwq; // Upper write queue
nni_msgq * s_urq; // Upper read queue
@@ -52,6 +53,7 @@ struct nni_socket {
int s_ep_pend; // EP dial/listen in progress
int s_closing; // Socket is closing
+ int s_closed; // Socket closed
int s_reapexit; // Shutdown the reaper
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
@@ -64,6 +66,7 @@ struct nni_socket {
};
extern int nni_sock_hold(nni_sock **, uint32_t);
+extern int nni_sock_hold_close(nni_sock **, uint32_t);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);
diff --git a/src/nng.c b/src/nng.c
index a8b145f9..c6dfb7f4 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -56,7 +56,9 @@ nng_close(nng_socket sid)
int rv;
nni_sock *sock;
- if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ // Close is special, because we still want to be able to get
+ // a hold on the socket even if shutdown was called.
+ if ((rv = nni_sock_hold_close(&sock, sid)) != 0) {
return (rv);
}
// No release -- close releases it.
@@ -607,3 +609,10 @@ nng_device(nng_socket sock1, nng_socket sock2)
// Device TBD.
return (NNG_ENOTSUP);
}
+
+
+void
+nng_usleep(uint64_t usec)
+{
+ nni_usleep(usec);
+}
diff --git a/src/nng.h b/src/nng.h
index c6d733d1..8dc3fedd 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -372,6 +372,12 @@ NNG_DECL int64_t nng_stat_value(nng_stat *);
// which means that messages from one side are forwarded to the other.
NNG_DECL int nng_device(nng_socket, nng_socket);
+// Sleep for the specified usecs. This is intended for use by test
+// programs (to avoid needing to expose the rest of the private details).
+// Applications are discouraged from using this -- use your platform
+// time services instead.
+NNG_DECL void nng_usleep(uint64_t);
+
// Pollset functionality. TBD. (Note that I'd rather avoid this
// altogether, because I believe that the notification mechanism I've
// created offers a superior way to handle this. I don't think many
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 986d8ed7..02467ffe 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -94,6 +94,7 @@ nni_bus_pipe_fini(void *arg)
nni_bus_pipe *ppipe = arg;
if (ppipe != NULL) {
+ nni_msgq_fini(ppipe->sendq);
NNI_FREE_STRUCT(ppipe);
}
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 6410e6db..a34f6eea 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -26,7 +26,7 @@ struct nni_rep_sock {
nni_msgq * urq;
int raw;
int ttl;
- nni_idhash * pipes;
+ nni_idhash pipes;
char * btrace;
size_t btrace_len;
};
@@ -53,7 +53,7 @@ nni_rep_sock_init(void **repp, nni_sock *sock)
rep->raw = 0;
rep->btrace = NULL;
rep->btrace_len = 0;
- if ((rv = nni_idhash_create(&rep->pipes)) != 0) {
+ if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
NNI_FREE_STRUCT(rep);
return (rv);
}
@@ -73,7 +73,7 @@ nni_rep_sock_fini(void *arg)
nni_rep_sock *rep = arg;
if (rep != NULL) {
- nni_idhash_destroy(rep->pipes);
+ nni_idhash_fini(&rep->pipes);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
}
@@ -121,7 +121,7 @@ nni_rep_pipe_add(void *arg)
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp));
+ return (nni_idhash_insert(&rep->pipes, nni_pipe_id(rp->pipe), rp));
}
@@ -131,7 +131,7 @@ nni_rep_pipe_rem(void *arg)
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
+ nni_idhash_remove(&rep->pipes, nni_pipe_id(rp->pipe));
}
@@ -166,7 +166,7 @@ nni_rep_sock_send(void *arg)
nni_msg_trim_header(msg, 4);
nni_mtx_lock(mx);
- if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) {
+ if (nni_idhash_find(&rep->pipes, id, (void **) &rp) != 0) {
nni_mtx_unlock(mx);
nni_msg_free(msg);
continue;
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 5603b675..263e023a 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -24,7 +24,7 @@ struct nni_resp_sock {
nni_sock * nsock;
int raw;
int ttl;
- nni_idhash * pipes;
+ nni_idhash pipes;
char * btrace;
size_t btrace_len;
};
@@ -51,7 +51,7 @@ nni_resp_sock_init(void **pp, nni_sock *nsock)
psock->raw = 0;
psock->btrace = NULL;
psock->btrace_len = 0;
- if ((rv = nni_idhash_create(&psock->pipes)) != 0) {
+ if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
NNI_FREE_STRUCT(psock);
return (rv);
}
@@ -68,7 +68,7 @@ nni_resp_sock_fini(void *arg)
nni_resp_sock *psock = arg;
if (psock != NULL) {
- nni_idhash_destroy(psock->pipes);
+ nni_idhash_fini(&psock->pipes);
if (psock->btrace != NULL) {
nni_free(psock->btrace, psock->btrace_len);
}
@@ -117,7 +117,7 @@ nni_resp_pipe_add(void *arg)
nni_resp_sock *psock = ppipe->psock;
int rv;
- rv = nni_idhash_insert(psock->pipes, nni_pipe_id(ppipe->npipe), ppipe);
+ rv = nni_idhash_insert(&psock->pipes, nni_pipe_id(ppipe->npipe), ppipe);
return (rv);
}
@@ -128,7 +128,7 @@ nni_resp_pipe_rem(void *arg)
nni_resp_pipe *ppipe = arg;
nni_resp_sock *psock = ppipe->psock;
- nni_idhash_remove(psock->pipes, nni_pipe_id(ppipe->npipe));
+ nni_idhash_remove(&psock->pipes, nni_pipe_id(ppipe->npipe));
}
@@ -163,7 +163,7 @@ nni_resp_sock_send(void *arg)
nni_msg_trim_header(msg, 4);
nni_mtx_lock(mx);
- if (nni_idhash_find(psock->pipes, id, (void **) &ppipe) != 0) {
+ if (nni_idhash_find(&psock->pipes, id, (void **) &ppipe) != 0) {
nni_mtx_unlock(mx);
nni_msg_free(msg);
continue;