summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c72
1 files changed, 72 insertions, 0 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index d9119a64..b8c5544b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -13,6 +13,13 @@
// Socket implementation.
+uint32_t
+nni_sock_id(nni_sock *s)
+{
+ return (s->s_id);
+}
+
+
// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
// the upper read and write queues.
nni_msgq *
@@ -29,6 +36,48 @@ nni_sock_recvq(nni_sock *s)
}
+int
+nni_sock_hold(nni_sock **sockp, uint32_t id)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
+ if (rv == 0) {
+ if (sock->s_closing) {
+ rv = NNG_ECLOSED;
+ } else {
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_refcnt++;
+ nni_mtx_unlock(&sock->s_mx);
+ *sockp = sock;
+ }
+ }
+ nni_mtx_unlock(nni_idlock);
+
+ if (rv == NNG_ENOENT) {
+ rv = NNG_ECLOSED;
+ }
+ return (rv);
+}
+
+
+void
+nni_sock_rele(nni_sock *sock)
+{
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_refcnt--;
+ if (sock->s_closing) {
+ nni_cv_wake(&sock->s_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
// XXX: don't expose the upper queues to protocols, because we need to
// trap on activity in those queues!
@@ -182,6 +231,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
if ((proto = nni_proto_find(pnum)) == NULL) {
return (NNG_ENOTSUP);
}
@@ -257,6 +309,16 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock);
+ nni_mtx_unlock(nni_idlock);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ // Caller always gets the socket held.
+ sock->s_refcnt = 1;
+
if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) {
goto fail;
}
@@ -290,6 +352,11 @@ fail:
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ if (sock->s_id != 0) {
+ nni_mtx_lock(nni_idlock);
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ nni_mtx_unlock(nni_idlock);
+ }
nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
nni_ev_fini(&sock->s_send_ev);
@@ -415,6 +482,11 @@ nni_sock_close(nni_sock *sock)
// Shutdown everything if not already done. This operation
// is idempotent.
nni_sock_shutdown(sock);
+ nni_mtx_lock(&sock->s_mx);
+ while (sock->s_refcnt > 1) {
+ nni_cv_wait(&sock->s_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller