aboutsummaryrefslogtreecommitdiff
path: root/src/core/msgqueue.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-14 12:46:15 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-14 12:46:15 -0700
commit5dfd550c68284438aeaacbaef815fc7d2f75f068 (patch)
treef6e1d0139e673f133cdcc4114977f5b67689c80d /src/core/msgqueue.c
parentc66ef25c7dfd0c2a3c4a8aa8eea223fa186c2311 (diff)
downloadnng-5dfd550c68284438aeaacbaef815fc7d2f75f068.tar.gz
nng-5dfd550c68284438aeaacbaef815fc7d2f75f068.tar.bz2
nng-5dfd550c68284438aeaacbaef815fc7d2f75f068.zip
fixes #308 Close can block
Ultimately, this just removes the support for lingering altogether. Based on prior experience, lingering has always been unreliable, and was removed in legacy libnanomsg ages ago. The problem is that operating system support for lingering is very inconsistent at best, and for some transports the very concept is somewhat meaningless. Making things worse, we were never able to adequately capture an exit() event from another thread -- so lingering was always a false promise. Applications that need to be sure that messages are delivered should either include an ack in their protocol, use req/rep (which has an ack), or inject a suitable delay of their own. For things going over local networks, an extra delay of 100 msec should be sufficient *most of the time*.
Diffstat (limited to 'src/core/msgqueue.c')
-rw-r--r--src/core/msgqueue.c66
1 files changed, 10 insertions, 56 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8246279f..529e7f4d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -17,7 +17,6 @@
struct nni_msgq {
nni_mtx mq_lock;
- nni_cv mq_drained;
int mq_cap;
int mq_alloc; // alloc is cap + 2...
int mq_len;
@@ -26,7 +25,6 @@ struct nni_msgq {
int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_draining;
int mq_besteffort;
nni_msg **mq_msgs;
@@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
- nni_cv_init(&mq->mq_drained, &mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- mq->mq_draining = 0;
- *mqp = mq;
+
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
@@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
mq->mq_cb_fn(mq->mq_cb_arg, flags);
}
-
- if (mq->mq_draining) {
- if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) {
- nni_cv_wake(&mq->mq_drained);
- }
- }
}
void
@@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
void
-nni_msgq_drain(nni_msgq *mq, nni_time expire)
-{
- nni_aio *aio;
-
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
- mq->mq_draining = 1;
- while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- if (nni_cv_until(&mq->mq_drained, expire) != 0) {
- break;
- }
- }
-
- // Timed out or writers drained.
-
- // Complete the putq as NNG_ECLOSED.
- while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
-
- // Free any remaining messages in the queue.
- while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get++];
- if (mq->mq_get >= mq->mq_alloc) {
- mq->mq_get = 0;
- }
- mq->mq_len--;
- nni_msg_free(msg);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_close(nni_msgq *mq)
{
nni_aio *aio;
@@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cv_wake(&mq->mq_drained);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}