aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/defs.h2
-rw-r--r--src/core/init.c1
-rw-r--r--src/core/socket.c72
-rw-r--r--src/core/socket.h8
4 files changed, 81 insertions, 2 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index eb1a6475..ff36cb3f 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -22,13 +22,13 @@
nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, # x)
// These types are common but have names shared with user space.
-typedef struct nng_socket nni_sock;
typedef struct nng_msg nni_msg;
typedef struct nng_sockaddr nni_sockaddr;
typedef struct nng_event nni_event;
typedef struct nng_notify nni_notify;
// These are our own names.
+typedef struct nni_socket nni_sock;
typedef struct nni_ep nni_ep;
typedef struct nni_pipe nni_pipe;
typedef struct nni_tran nni_tran;
diff --git a/src/core/init.c b/src/core/init.c
index e0ce3bae..48a5096c 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -37,6 +37,7 @@ nni_init_helper(void)
}
nni_idhash_set_limits(nni_pipes, 1, 0x7fffffff,
nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(nni_sockets, 1, 0xffffffff, 1);
nni_idlock = &nni_idlock_x;
nni_tran_init();
return (0);
diff --git a/src/core/socket.c b/src/core/socket.c
index d9119a64..b8c5544b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -13,6 +13,13 @@
// Socket implementation.
+uint32_t
+nni_sock_id(nni_sock *s)
+{
+ return (s->s_id);
+}
+
+
// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
// the upper read and write queues.
nni_msgq *
@@ -29,6 +36,48 @@ nni_sock_recvq(nni_sock *s)
}
+int
+nni_sock_hold(nni_sock **sockp, uint32_t id)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
+ if (rv == 0) {
+ if (sock->s_closing) {
+ rv = NNG_ECLOSED;
+ } else {
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_refcnt++;
+ nni_mtx_unlock(&sock->s_mx);
+ *sockp = sock;
+ }
+ }
+ nni_mtx_unlock(nni_idlock);
+
+ if (rv == NNG_ENOENT) {
+ rv = NNG_ECLOSED;
+ }
+ return (rv);
+}
+
+
+void
+nni_sock_rele(nni_sock *sock)
+{
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_refcnt--;
+ if (sock->s_closing) {
+ nni_cv_wake(&sock->s_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
// XXX: don't expose the upper queues to protocols, because we need to
// trap on activity in those queues!
@@ -182,6 +231,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
if ((proto = nni_proto_find(pnum)) == NULL) {
return (NNG_ENOTSUP);
}
@@ -257,6 +309,16 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock);
+ nni_mtx_unlock(nni_idlock);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ // Caller always gets the socket held.
+ sock->s_refcnt = 1;
+
if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) {
goto fail;
}
@@ -290,6 +352,11 @@ fail:
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ if (sock->s_id != 0) {
+ nni_mtx_lock(nni_idlock);
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ nni_mtx_unlock(nni_idlock);
+ }
nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
nni_ev_fini(&sock->s_send_ev);
@@ -415,6 +482,11 @@ nni_sock_close(nni_sock *sock)
// Shutdown everything if not already done. This operation
// is idempotent.
nni_sock_shutdown(sock);
+ nni_mtx_lock(&sock->s_mx);
+ while (sock->s_refcnt > 1) {
+ nni_cv_wait(&sock->s_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller
diff --git a/src/core/socket.h b/src/core/socket.h
index 1743869a..2a30fae5 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -13,10 +13,13 @@
// NB: This structure is supplied here for use by the CORE. Use of this library
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
-struct nng_socket {
+struct nni_socket {
nni_mtx s_mx;
nni_cv s_cv;
+ uint32_t s_id;
+ uint32_t s_refcnt;
+
nni_msgq * s_uwq; // Upper write queue
nni_msgq * s_urq; // Upper read queue
@@ -60,6 +63,8 @@ struct nng_socket {
uint32_t s_nextid; // Next Pipe ID.
};
+extern int nni_sock_hold(nni_sock **, uint32_t);
+extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);
extern int nni_sock_shutdown(nni_sock *);
@@ -71,6 +76,7 @@ extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time);
extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time);
extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int);
extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int);
+extern uint32_t nni_sock_id(nni_sock *);
// Set error codes for applications. These are only ever
// called from the filter functions in protocols, and thus