aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c51
1 files changed, 50 insertions, 1 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 56347e4e..734e4da8 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s)
}
+// XXX: don't expose the upper queues to protocols, because we need to
+// trap on activity in those queues!
+
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
@@ -82,6 +85,34 @@ nni_reaper(void *arg)
}
+static void
+nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg)
+{
+ nni_sock *sock = arg;
+
+ if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) {
+ return; // No interest in writability of read queue.
+ }
+ nni_mtx_lock(&sock->s_mx);
+ nni_ev_submit(&sock->s_recv_ev);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+static void
+nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg)
+{
+ nni_sock *sock = arg;
+
+ if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) {
+ return; // No interest in readability of write queue.
+ }
+ nni_mtx_lock(&sock->s_mx);
+ nni_ev_submit(&sock->s_send_ev);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
nni_mtx *
nni_sock_mtx(nni_sock *sock)
{
@@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
+ if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) ||
+ ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) {
+ goto fail;
+ }
+
+ if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) ||
+ ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) {
goto fail;
}
@@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_thr_run(&sock->s_worker_thr[i]);
}
+ nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
+ nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
+
nni_thr_run(&sock->s_reaper);
+ nni_thr_run(&sock->s_notifier);
*sockp = sock;
return (0);
@@ -239,7 +280,10 @@ fail:
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
+ nni_ev_fini(&sock->s_send_ev);
+ nni_ev_fini(&sock->s_recv_ev);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_notify_cv);
@@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock)
sock->s_sock_ops.sock_close(sock->s_data);
+ nni_cv_wake(&sock->s_notify_cv);
nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
@@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock)
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_wait(&sock->s_worker_thr[i]);
}
+ nni_thr_wait(&sock->s_notifier);
nni_thr_wait(&sock->s_reaper);
// At this point, there are no threads blocked inside of us
@@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock)
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
+ nni_ev_fini(&sock->s_send_ev);
+ nni_ev_fini(&sock->s_recv_ev);
nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_notify_mx);