aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-14 12:46:15 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-14 12:46:15 -0700
commit5dfd550c68284438aeaacbaef815fc7d2f75f068 (patch)
treef6e1d0139e673f133cdcc4114977f5b67689c80d /src/core
parentc66ef25c7dfd0c2a3c4a8aa8eea223fa186c2311 (diff)
downloadnng-5dfd550c68284438aeaacbaef815fc7d2f75f068.tar.gz
nng-5dfd550c68284438aeaacbaef815fc7d2f75f068.tar.bz2
nng-5dfd550c68284438aeaacbaef815fc7d2f75f068.zip
fixes #308 Close can block
Ultimately, this just removes the support for lingering altogether. Based on prior experience, lingering has always been unreliable, and was removed in legacy libnanomsg ages ago. The problem is that operating system support for lingering is very inconsistent at best, and for some transports the very concept is somewhat meaningless. Making things worse, we were never able to adequately capture an exit() event from another thread -- so lingering was always a false promise. Applications that need to be sure that messages are delivered should either include an ack in their protocol, use req/rep (which has an ack), or inject a suitable delay of their own. For things going over local networks, an extra delay of 100 msec should be sufficient *most of the time*.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c66
-rw-r--r--src/core/msgqueue.h10
-rw-r--r--src/core/socket.c47
3 files changed, 11 insertions, 112 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8246279f..529e7f4d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -17,7 +17,6 @@
struct nni_msgq {
nni_mtx mq_lock;
- nni_cv mq_drained;
int mq_cap;
int mq_alloc; // alloc is cap + 2...
int mq_len;
@@ -26,7 +25,6 @@ struct nni_msgq {
int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_draining;
int mq_besteffort;
nni_msg **mq_msgs;
@@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
- nni_cv_init(&mq->mq_drained, &mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- mq->mq_draining = 0;
- *mqp = mq;
+
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
@@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
mq->mq_cb_fn(mq->mq_cb_arg, flags);
}
-
- if (mq->mq_draining) {
- if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) {
- nni_cv_wake(&mq->mq_drained);
- }
- }
}
void
@@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
void
-nni_msgq_drain(nni_msgq *mq, nni_time expire)
-{
- nni_aio *aio;
-
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
- mq->mq_draining = 1;
- while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- if (nni_cv_until(&mq->mq_drained, expire) != 0) {
- break;
- }
- }
-
- // Timed out or writers drained.
-
- // Complete the putq as NNG_ECLOSED.
- while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
-
- // Free any remaining messages in the queue.
- while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get++];
- if (mq->mq_get >= mq->mq_alloc) {
- mq->mq_get = 0;
- }
- mq->mq_len--;
- nni_msg_free(msg);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_close(nni_msgq *mq)
{
nni_aio *aio;
@@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cv_wake(&mq->mq_drained);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 9cc650e0..93a26eb6 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -17,10 +17,6 @@
// they are a thread-safe way to pass messages between subsystems. They
// do have additional capabilities though.
//
-// A closed message queue cannot be written to, but if there are messages
-// still in it and it is draining, it can be read from. This permits
-// linger operations to work.
-//
// Message queues can be closed many times safely.
//
// Readers & writers in a message queue can be woken either by a timeout
@@ -108,12 +104,6 @@ extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *);
// are freed. Unlike closing a go channel, this operation is idempotent.
extern void nni_msgq_close(nni_msgq *);
-// nni_msgq_drain is like nng_msgq_close, except that reads
-// against the queue are permitted for up to the time limit. The
-// operation blocks until either the queue is empty, or the timeout
-// has expired. Any messages still in the queue at the timeout are freed.
-extern void nni_msgq_drain(nni_msgq *, nni_time);
-
// nni_msgq_resize resizes the message queue; messages already in the queue
// will be preserved as long as there is room. Messages that are dropped
// due to no room are taken from the most recent. (Oldest messages are
diff --git a/src/core/socket.c b/src/core/socket.c
index 67a2991c..4021c0c6 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,7 +69,6 @@ struct nni_socket {
nni_proto_ctx_ops s_ctx_ops;
// options
- nni_duration s_linger; // linger time
nni_duration s_sndtimeo; // send timeout
nni_duration s_rcvtimeo; // receive timeout
nni_duration s_reconn; // reconnect time
@@ -546,7 +545,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->s_linger = 0;
s->s_sndtimeo = -1;
s->s_rcvtimeo = -1;
s->s_closing = 0;
@@ -586,8 +584,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) ||
- ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger,
- sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo,
sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo,
@@ -686,7 +682,6 @@ nni_sock_shutdown(nni_sock *sock)
nni_pipe *pipe;
nni_ep * ep;
nni_ep * nep;
- nni_time linger;
nni_ctx * ctx;
nni_ctx * nctx;
@@ -698,15 +693,6 @@ nni_sock_shutdown(nni_sock *sock)
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
- // Special optimization; if there are no pipes connected,
- // then there is no reason to linger since there's nothing that
- // could possibly send this data out.
- if (nni_list_first(&sock->s_pipes) == NULL) {
- linger = NNI_TIME_ZERO;
- } else {
- linger = nni_clock() + sock->s_linger;
- }
-
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
@@ -732,21 +718,6 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&nni_sock_lk);
- // XXX: Add protocol specific drain here. This should replace the
- // msgq_drain feature below. Probably msgq_drain will need to
- // be changed to take an AIO for completion.
-
- // We drain the upper write queue. This is just like closing it,
- // except that the protocol gets a chance to get the messages and
- // push them down to the transport. This operation can *block*
- // until the linger time has expired. We only do this for sendable
- // sockets that are actually using the message queue of course.
- if ((nni_sock_flags(sock) &
- (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) ==
- NNI_PROTO_FLAG_SND) {
- nni_msgq_drain(sock->s_uwq, linger);
- }
-
// Generally, unless the protocol is blocked trying to perform
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
@@ -760,11 +731,6 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&nni_sock_lk);
nni_mtx_lock(&sock->s_mx);
- while (nni_list_first(&sock->s_pipes) != NULL) {
- if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
- break;
- }
- }
// At this point, we've done everything we politely can to give
// the protocol a chance to flush its write side. Now its time
@@ -1029,10 +995,7 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
// Also check a few generic things. We do this if no transport
// was found, or even if a transport rejected one of the settings.
if ((rv == NNG_ENOTSUP) || (rv == 0)) {
- if ((strcmp(name, NNG_OPT_LINGER) == 0)) {
- nng_duration d;
- rv = nni_copyin_ms(&d, v, sz, t);
- } else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
+ if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
size_t z;
// just a sanity test on the size; it also ensures that
// a size can be set even with no transport configured.
@@ -1102,14 +1065,6 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
}
- // For some options, which also have an impact on the socket
- // behavior, we save a local value. Note that the transport
- // will already have had a chance to veto this.
-
- if (strcmp(name, NNG_OPT_LINGER) == 0) {
- rv = nni_copyin_ms(&s->s_linger, v, sz, t);
- }
-
if (rv == 0) {
// Remove and toss the old value, we are using a new one.
if (oldv != NULL) {