aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/event.c21
-rw-r--r--src/core/event.h6
-rw-r--r--src/core/msgqueue.c123
-rw-r--r--src/core/msgqueue.h11
-rw-r--r--src/core/socket.c51
-rw-r--r--src/core/socket.h2
6 files changed, 175 insertions, 39 deletions
diff --git a/src/core/event.c b/src/core/event.c
index 3f7d1143..1b264632 100644
--- a/src/core/event.c
+++ b/src/core/event.c
@@ -13,7 +13,7 @@
#include <string.h>
int
-nni_event_init(nni_event *event, int type, nni_sock *sock)
+nni_ev_init(nni_event *event, int type, nni_sock *sock)
{
int rv;
@@ -29,15 +29,26 @@ nni_event_init(nni_event *event, int type, nni_sock *sock)
void
-nni_event_fini(nni_event *event)
+nni_ev_fini(nni_event *event)
{
nni_cv_fini(&event->e_cv);
}
void
-nni_event_submit(nni_sock *sock, nni_event *event)
+nni_ev_submit(nni_event *event)
{
+ nni_sock *sock = event->e_sock;
+
+ // If nobody is listening, don't bother submitting anything.
+ // This reduces pressure on the socket locks & condvars, in the
+ // typical case.
+ if (nni_list_first(&sock->s_notify) == NULL) {
+ event->e_pending = 0;
+ event->e_done = 1;
+ return;
+ }
+
// Call with socket mutex owned!
if (event->e_pending == 0) {
event->e_pending = 1;
@@ -49,7 +60,7 @@ nni_event_submit(nni_sock *sock, nni_event *event)
void
-nni_event_wait(nni_sock *sock, nni_event *event)
+nni_ev_wait(nni_event *event)
{
// Call with socket mutex owned!
// Note that the socket mutex is dropped during the call.
@@ -60,7 +71,7 @@ nni_event_wait(nni_sock *sock, nni_event *event)
void
-nni_event_notifier(void *arg)
+nni_notifier(void *arg)
{
nni_sock *sock = arg;
nni_event *event;
diff --git a/src/core/event.h b/src/core/event.h
index d7faad2e..8b160a89 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -32,4 +32,10 @@ struct nng_notify {
int n_mask;
};
+extern void nni_notifier(void *);
+extern int nni_ev_init(nni_event *, int, nni_sock *);
+extern void nni_ev_fini(nni_event *);
+extern void nni_ev_submit(nni_event *); // call holding sock lock
+extern void nni_ev_wait(nni_event *); // call holding sock lock
+
#endif // CORE_EVENT_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index bc381cfc..344798d7 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,21 +15,24 @@
// side can close, and they may be closed more than once.
struct nni_msgq {
- nni_mtx mq_lock;
- nni_cv mq_readable;
- nni_cv mq_writeable;
- nni_cv mq_drained;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- int mq_puterr;
- int mq_geterr;
- int mq_rwait; // readers waiting (unbuffered)
- int mq_wwait;
- nni_msg ** mq_msgs;
+ nni_mtx mq_lock;
+ nni_cv mq_readable;
+ nni_cv mq_writeable;
+ nni_cv mq_drained;
+ int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
+ int mq_rwait; // readers waiting (unbuffered)
+ int mq_wwait;
+ nni_msg ** mq_msgs;
+
+ nni_msgq_notify_fn mq_notify_fn;
+ void * mq_notify_arg;
};
int
@@ -54,31 +57,20 @@ nni_msgq_init(nni_msgq **mqp, int cap)
return (NNG_ENOMEM);
}
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
- nni_free(mq, sizeof (*mq));
- return (rv);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- nni_mtx_fini(&mq->mq_lock);
- nni_free(mq, sizeof (*mq));
- return (NNG_ENOMEM);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ goto fail;
}
if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
- nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
- nni_mtx_fini(&mq->mq_lock);
- return (NNG_ENOMEM);
+ rv = NNG_ENOMEM;
+ goto fail;
}
mq->mq_cap = cap;
@@ -91,9 +83,22 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_geterr = 0;
mq->mq_wwait = 0;
mq->mq_rwait = 0;
+ mq->mq_notify_fn = NULL;
+ mq->mq_notify_arg = NULL;
*mqp = mq;
return (0);
+
+fail:
+ nni_cv_fini(&mq->mq_drained);
+ nni_cv_fini(&mq->mq_writeable);
+ nni_cv_fini(&mq->mq_readable);
+ nni_mtx_fini(&mq->mq_lock);
+ if (mq->mq_msgs != NULL) {
+ nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc);
+ }
+ NNI_FREE_STRUCT(mq);
+ return (rv);
}
@@ -127,6 +132,16 @@ nni_msgq_fini(nni_msgq *mq)
void
+nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_notify_fn = fn;
+ mq->mq_notify_arg = arg;
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
nni_msgq_set_put_error(nni_msgq *mq, int error)
{
nni_mtx_lock(&mq->mq_lock);
@@ -184,6 +199,9 @@ int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -243,7 +261,18 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
if (mq->mq_rwait) {
nni_cv_wake(&mq->mq_readable);
}
+ notify = NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len < mq->mq_cap) {
+ notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ // The notify callback is executed outside of the lock.
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
return (0);
}
@@ -254,6 +283,10 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
int
nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
+
nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
@@ -278,7 +311,19 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
if (mq->mq_rwait) {
nni_cv_wake(&mq->mq_readable);
}
+
+ notify = NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len < mq->mq_cap) {
+ notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
+
return (0);
}
@@ -287,6 +332,9 @@ static int
nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
+ int notify = 0;
+ nni_msgq_notify_fn fn;
+ void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -335,7 +383,18 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
if (mq->mq_wwait) {
nni_cv_wake(&mq->mq_writeable);
}
+ notify = NNI_MSGQ_NOTIFY_CANPUT;
+ if (mq->mq_len > 0) {
+ notify |= NNI_MSGQ_NOTIFY_CANGET;
+ }
+ fn = mq->mq_notify_fn;
+ arg = mq->mq_notify_arg;
nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, notify, arg);
+ }
+
return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 450c55a4..07f6aebb 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -131,4 +131,15 @@ extern int nni_msgq_cap(nni_msgq *mq);
// nni_msgq_len returns the number of messages currently in the queue.
extern int nni_msgq_len(nni_msgq *mq);
+#define NNI_MSGQ_NOTIFY_CANPUT 1
+#define NNI_MSGQ_NOTIFY_CANGET 2
+
+typedef void (*nni_msgq_notify_fn)(nni_msgq *, int, void *);
+
+// nni_msgq_notify registers a function to be called when the message
+// queue state changes. It notifies that the queue is readable, or writeable.
+// Only one function can be registered (for simplicity), and it is called
+// outside of the queue's lock.
+extern void nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 56347e4e..734e4da8 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,6 +29,9 @@ nni_sock_recvq(nni_sock *s)
}
+// XXX: don't expose the upper queues to protocols, because we need to
+// trap on activity in those queues!
+
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
@@ -82,6 +85,34 @@ nni_reaper(void *arg)
}
+static void
+nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg)
+{
+ nni_sock *sock = arg;
+
+ if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) {
+ return; // No interest in writability of read queue.
+ }
+ nni_mtx_lock(&sock->s_mx);
+ nni_ev_submit(&sock->s_recv_ev);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+static void
+nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg)
+{
+ nni_sock *sock = arg;
+
+ if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) {
+ return; // No interest in readability of write queue.
+ }
+ nni_mtx_lock(&sock->s_mx);
+ nni_ev_submit(&sock->s_send_ev);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
nni_mtx *
nni_sock_mtx(nni_sock *sock)
{
@@ -201,7 +232,13 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
+ if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) ||
+ ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) {
+ goto fail;
+ }
+
+ if (((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) ||
+ ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0)) {
goto fail;
}
@@ -228,7 +265,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_thr_run(&sock->s_worker_thr[i]);
}
+ nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
+ nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
+
nni_thr_run(&sock->s_reaper);
+ nni_thr_run(&sock->s_notifier);
*sockp = sock;
return (0);
@@ -239,7 +280,10 @@ fail:
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
+ nni_ev_fini(&sock->s_send_ev);
+ nni_ev_fini(&sock->s_recv_ev);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_notify_cv);
@@ -329,6 +373,7 @@ nni_sock_shutdown(nni_sock *sock)
sock->s_sock_ops.sock_close(sock->s_data);
+ nni_cv_wake(&sock->s_notify_cv);
nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
@@ -336,6 +381,7 @@ nni_sock_shutdown(nni_sock *sock)
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_wait(&sock->s_worker_thr[i]);
}
+ nni_thr_wait(&sock->s_notifier);
nni_thr_wait(&sock->s_reaper);
// At this point, there are no threads blocked inside of us
@@ -372,9 +418,12 @@ nni_sock_close(nni_sock *sock)
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
+ nni_ev_fini(&sock->s_send_ev);
+ nni_ev_fini(&sock->s_recv_ev);
nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_notify_mx);
diff --git a/src/core/socket.h b/src/core/socket.h
index d8b37252..424b1321 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -44,7 +44,7 @@ struct nng_socket {
nni_list s_reaps; // pipes to reap
nni_thr s_reaper;
- nni_thr s_notify_thr;
+ nni_thr s_notifier;
nni_thr s_worker_thr[NNI_MAXWORKERS];
int s_ep_pend; // EP dial/listen in progress