aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-22 02:32:32 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-22 02:32:32 -0800
commitb93d5759c9b39ff153a14d474d800cd981f7dc97 (patch)
tree1a98b7ac74cd91003c38f53ae3eb01fb8027deef /src/core
parent769f9a2b66aca629eb4dd240a072849a48aa300f (diff)
downloadnng-b93d5759c9b39ff153a14d474d800cd981f7dc97.tar.gz
nng-b93d5759c9b39ff153a14d474d800cd981f7dc97.tar.bz2
nng-b93d5759c9b39ff153a14d474d800cd981f7dc97.zip
Event notification via pollable FDs verified working.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c139
-rw-r--r--src/core/msgqueue.h2
-rw-r--r--src/core/options.c8
-rw-r--r--src/core/protocol.h6
-rw-r--r--src/core/socket.c26
5 files changed, 119 insertions, 62 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3ec194fc..09f58a33 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -31,10 +31,61 @@ struct nni_msgq {
int mq_wwait;
nni_msg ** mq_msgs;
+ int mq_notify_sig;
+ nni_cv mq_notify_cv;
+ nni_thr mq_notify_thr;
nni_msgq_notify_fn mq_notify_fn;
void * mq_notify_arg;
};
+
+// nni_msgq_notifier thread runs if events callbacks are registered on the
+// message queue, and calls the notification callbacks outside of the
+// lock. It looks at the actual msgq state to trigger the right events.
+static void
+nni_msgq_notifier(void *arg)
+{
+ nni_msgq *mq = arg;
+ int sig;
+ nni_msgq_notify_fn fn;
+ void *fnarg;
+
+ for (;;) {
+ nni_mtx_lock(&mq->mq_lock);
+ while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) {
+ nni_cv_wait(&mq->mq_notify_cv);
+ }
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ break;
+ }
+
+ sig = mq->mq_notify_sig;
+ mq->mq_notify_sig = 0;
+
+ fn = mq->mq_notify_fn;
+ fnarg = mq->mq_notify_arg;
+ nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, sig, fnarg);
+ }
+ }
+}
+
+
+// nni_msgq_kick kicks the msgq notification thread. It should be called
+// with the lock held.
+static void
+nni_msgq_kick(nni_msgq *mq, int sig)
+{
+ if (mq->mq_notify_fn != NULL) {
+ mq->mq_notify_sig |= sig;
+ nni_cv_wake(&mq->mq_notify_cv);
+ }
+}
+
+
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -59,13 +110,10 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
- if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) {
goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
@@ -110,6 +158,7 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
+ nni_thr_fini(&mq->mq_notify_thr);
nni_cv_fini(&mq->mq_drained);
nni_cv_fini(&mq->mq_writeable);
nni_cv_fini(&mq->mq_readable);
@@ -131,13 +180,24 @@ nni_msgq_fini(nni_msgq *mq)
}
-void
+int
nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
{
+ int rv;
+
+ nni_thr_fini(&mq->mq_notify_thr);
+
nni_mtx_lock(&mq->mq_lock);
+ rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq);
+ if (rv != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
mq->mq_notify_fn = fn;
mq->mq_notify_arg = arg;
+ nni_thr_run(&mq->mq_notify_thr);
nni_mtx_unlock(&mq->mq_lock);
+ return (0);
}
@@ -197,6 +257,7 @@ nni_msgq_signal(nni_msgq *mq, int *signal)
mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_notify_cv);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -205,9 +266,6 @@ int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -247,8 +305,16 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
return (NNG_EAGAIN);
}
- // not writeable, so wait until something changes
+ // waiting....
mq->mq_wwait = 1;
+
+ // if we are unbuffered, kick the notifier, because we're
+ // writable.
+ if (mq->mq_cap == 0) {
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
+ }
+
+ // not writeable, so wait until something changes
rv = nni_cv_until(&mq->mq_writeable, expire);
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
@@ -267,18 +333,12 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_readable);
}
- notify = NNI_MSGQ_NOTIFY_CANGET;
if (mq->mq_len < mq->mq_cap) {
- notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
}
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
nni_mtx_unlock(&mq->mq_lock);
- // The notify callback is executed outside of the lock.
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
return (0);
}
@@ -289,10 +349,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
int
nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
-
nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
@@ -319,18 +375,9 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
nni_cv_wake(&mq->mq_readable);
}
- notify = NNI_MSGQ_NOTIFY_CANGET;
- if (mq->mq_len < mq->mq_cap) {
- notify |= NNI_MSGQ_NOTIFY_CANPUT;
- }
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
nni_mtx_unlock(&mq->mq_lock);
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
-
return (0);
}
@@ -339,9 +386,6 @@ static int
nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -372,6 +416,13 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_cv_wake(&mq->mq_writeable);
}
mq->mq_rwait = 1;
+
+ if (mq->mq_cap == 0) {
+ // If unbuffered, kick it since a writer would not
+ // block.
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
+ }
+
rv = nni_cv_until(&mq->mq_readable, expire);
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
@@ -391,18 +442,12 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_writeable);
}
- notify = NNI_MSGQ_NOTIFY_CANPUT;
- if (mq->mq_len > 0) {
- notify |= NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len) {
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
}
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
nni_mtx_unlock(&mq->mq_lock);
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
-
return (0);
}
@@ -475,6 +520,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_notify_cv);
while (mq->mq_len > 0) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
@@ -502,6 +548,7 @@ nni_msgq_close(nni_msgq *mq)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_notify_cv);
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 07f6aebb..41cda53e 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -140,6 +140,6 @@ typedef void (*nni_msgq_notify_fn)(nni_msgq *, int, void *);
// queue state changes. It notifies that the queue is readable, or writeable.
// Only one function can be registered (for simplicity), and it is called
// outside of the queue's lock.
-extern void nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *);
+extern int nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *);
#endif // CORE_MSQUEUE_H
diff --git a/src/core/options.c b/src/core/options.c
index 35411562..a0b74014 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -135,13 +135,13 @@ nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
}
switch (mask) {
- case NNG_EV_CAN_SEND:
- if ((s->s_flags & NNI_PROTO_FLAG_SEND) == 0) {
+ case NNG_EV_CAN_SND:
+ if ((s->s_flags & NNI_PROTO_FLAG_SND) == 0) {
return (NNG_ENOTSUP);
}
break;
- case NNG_EV_CAN_RECV:
- if ((s->s_flags & NNI_PROTO_FLAG_RECV) == 0) {
+ case NNG_EV_CAN_RCV:
+ if ((s->s_flags & NNI_PROTO_FLAG_RCV) == 0) {
return (NNG_ENOTSUP);
}
break;
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 52774080..9869afc9 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -100,9 +100,9 @@ struct nni_proto {
// These flags determine which operations make sense. We use them so that
// we can reject attempts to create notification fds for operations that make
// no sense.
-#define NNI_PROTO_FLAG_RECV 1 // Protocol can receive
-#define NNI_PROTO_FLAG_SEND 2 // Protocol can send
-#define NNI_PROTO_FLAG_SENDRECV 3 // Protocol can both send & recv
+#define NNI_PROTO_FLAG_RCV 1 // Protocol can receive
+#define NNI_PROTO_FLAG_SND 2 // Protocol can send
+#define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv
// These functions are not used by protocols, but rather by the socket
// core implementation. The lookups can be used by transports as well.
diff --git a/src/core/socket.c b/src/core/socket.c
index c0cf20b7..f451ba45 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -321,11 +321,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RECV, sock);
+ rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock);
if (rv != 0) {
goto fail;
}
- rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SEND, sock);
+ rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock);
if (rv != 0) {
goto fail;
}
@@ -364,12 +364,22 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
}
+ // XXX: This kills performance. Look at moving this to
+ // be conditional - if nobody has callbacks because their code is
+ // also threaded, then we don't need to jump through these hoops.
+ rv = nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+ rv = nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_run(&sock->s_worker_thr[i]);
}
- nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
- nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
nni_thr_run(&sock->s_reaper);
nni_thr_run(&sock->s_notifier);
@@ -808,12 +818,12 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RCVBUF:
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
- case NNG_OPT_SENDFD:
- rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SEND,
+ case NNG_OPT_SNDFD:
+ rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SND,
val, sizep);
break;
- case NNG_OPT_RECVFD:
- rv = nni_getopt_fd(sock, &sock->s_recv_fd, NNG_EV_CAN_RECV,
+ case NNG_OPT_RCVFD:
+ rv = nni_getopt_fd(sock, &sock->s_recv_fd, NNG_EV_CAN_RCV,
val, sizep);
break;
}