aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-11 20:50:21 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-11 20:50:21 -0800
commit3d90bae8eda62fecdf367932fca591b965838e20 (patch)
tree31ba290a4fe1a1eae570e6cfb45a76cc936b8834
parent97fb819ccfd0d4cb7f02d7fc521d9478ba050776 (diff)
downloadnng-3d90bae8eda62fecdf367932fca591b965838e20.tar.gz
nng-3d90bae8eda62fecdf367932fca591b965838e20.tar.bz2
nng-3d90bae8eda62fecdf367932fca591b965838e20.zip
Removing some dead code.
-rw-r--r--src/core/event.c18
-rw-r--r--src/core/event.h1
-rw-r--r--src/core/msgqueue.c84
3 files changed, 36 insertions, 67 deletions
diff --git a/src/core/event.c b/src/core/event.c
index 73bc1c00..98156a92 100644
--- a/src/core/event.c
+++ b/src/core/event.c
@@ -49,6 +49,13 @@ nni_ev_submit(nni_event *event)
return;
}
+ // XXX: taskq_dispatch the event processing.
+ // This probably should bump a reference count on the socket
+ // first.
+ // XXX: One question of note... the aio structures we use elsewhere
+ // would be better than this. So instead of the handler doing two
+ // context switches we can just do one.
+
// Call with socket mutex owned!
if (event->e_pending == 0) {
event->e_pending = 1;
@@ -60,17 +67,6 @@ nni_ev_submit(nni_event *event)
void
-nni_ev_wait(nni_event *event)
-{
- // Call with socket mutex owned!
- // Note that the socket mutex is dropped during the call.
- while ((event->e_pending) && (!event->e_done)) {
- nni_cv_wait(&event->e_cv);
- }
-}
-
-
-void
nni_notifier(void *arg)
{
nni_sock *sock = arg;
diff --git a/src/core/event.h b/src/core/event.h
index 74d6fddb..d07aa9dd 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -36,7 +36,6 @@ extern void nni_notifier(void *);
extern int nni_ev_init(nni_event *, int, nni_sock *);
extern void nni_ev_fini(nni_event *);
extern void nni_ev_submit(nni_event *); // call holding sock lock
-extern void nni_ev_wait(nni_event *); // call holding sock lock
extern nni_notify *nni_add_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_rem_notify(nni_sock *, nni_notify *);
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 60744ba1..153d7c15 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -27,8 +27,6 @@ struct nni_msgq {
int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_rwait; // readers waiting (unbuffered)
- int mq_wwait;
nni_msg ** mq_msgs;
int mq_notify_sig;
@@ -39,6 +37,8 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
+ nni_list mq_aio_notify_get;
+ nni_list mq_aio_notify_put;
nni_timer_node mq_timer;
nni_time mq_expire;
@@ -117,6 +117,8 @@ nni_msgq_init(nni_msgq **mqp, int cap)
}
NNI_LIST_INIT(&mq->mq_aio_putq, nni_aio, a_prov_node);
NNI_LIST_INIT(&mq->mq_aio_getq, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&mq->mq_aio_notify_get, nni_aio, a_prov_node);
+ NNI_LIST_INIT(&mq->mq_aio_notify_put, nni_aio, a_prov_node);
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
@@ -142,8 +144,6 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_wwait = 0;
- mq->mq_rwait = 0;
mq->mq_notify_fn = NULL;
mq->mq_notify_arg = NULL;
mq->mq_expire = NNI_TIME_NEVER;
@@ -314,8 +314,7 @@ nni_msgq_run_putq(nni_msgq *mq)
}
// Otherwise if we have room in the buffer, just queue it.
- if ((mq->mq_len < mq->mq_cap) ||
- ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
+ if (mq->mq_len < mq->mq_cap) {
nni_list_remove(&mq->mq_aio_putq, waio);
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
@@ -388,6 +387,25 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
+ nni_aio *aio;
+
+ if (mq->mq_closed) {
+ return;
+ }
+ if ((mq->mq_len < mq->mq_cap) ||
+ (nni_list_first(&mq->mq_aio_getq) != NULL)) {
+
+ NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
+ // This stays on the list.
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
+
+ if ((mq->mq_len != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
+ NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
+ nni_aio_finish(aio, 0, 0);
+ }
+ }
}
@@ -472,7 +490,6 @@ nni_msgq_canput(nni_msgq *mq)
return (0);
}
if ((mq->mq_len < mq->mq_cap) ||
- (mq->mq_rwait != 0) || // XXX: REMOVE ME
(nni_list_first(&mq->mq_aio_getq) != NULL)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
@@ -491,7 +508,6 @@ nni_msgq_canget(nni_msgq *mq)
return (0);
}
if ((mq->mq_len != 0) ||
- (mq->mq_wwait != 0) ||
(nni_list_first(&mq->mq_aio_putq) != NULL)) {
nni_mtx_unlock(&mq->mq_lock);
return (1);
@@ -527,8 +543,7 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
// Otherwise if we have room in the buffer, just queue it.
- if ((mq->mq_len < mq->mq_cap) ||
- ((mq->mq_len == mq->mq_cap) && mq->mq_rwait)) {
+ if (mq->mq_len < mq->mq_cap) {
mq->mq_msgs[mq->mq_put++] = msg;
if (mq->mq_put == mq->mq_alloc) {
mq->mq_put = 0;
@@ -593,6 +608,7 @@ nni_msgq_run_timeout(void *arg)
}
+#if 0
int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
@@ -682,47 +698,6 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_mtx_lock(&mq->mq_lock);
- for (;;) {
- // always prefer to deliver data if its there
- if (mq->mq_len != 0) {
- break;
- }
- if (mq->mq_closed) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ECLOSED);
- }
- if ((rv = mq->mq_geterr) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- if (expire == NNI_TIME_ZERO) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EAGAIN);
- }
- if (*sig) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EINTR);
- }
- if ((mq->mq_cap == 0) && (mq->mq_wwait)) {
- // let a write waiter know we are ready
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_writeable);
- }
- mq->mq_rwait = 1;
-
- if (mq->mq_cap == 0) {
- // If unbuffered, kick it since a writer would not
- // block.
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
- }
-
- rv = nni_cv_until(&mq->mq_readable, expire);
- if (rv == NNG_ETIMEDOUT) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ETIMEDOUT);
- }
- }
-
// Readable! Yay!!
*msgp = mq->mq_msgs[mq->mq_get];
@@ -746,6 +721,9 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
}
+#endif
+
+
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -805,8 +783,6 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- mq->mq_wwait = 0;
- mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_notify_cv);
@@ -836,8 +812,6 @@ nni_msgq_close(nni_msgq *mq)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- mq->mq_wwait = 0;
- mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_notify_cv);