summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c66
-rw-r--r--src/core/msgqueue.h10
-rw-r--r--src/core/socket.c47
3 files changed, 11 insertions, 112 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 8246279f..529e7f4d 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -17,7 +17,6 @@
struct nni_msgq {
nni_mtx mq_lock;
- nni_cv mq_drained;
int mq_cap;
int mq_alloc; // alloc is cap + 2...
int mq_len;
@@ -26,7 +25,6 @@ struct nni_msgq {
int mq_closed;
int mq_puterr;
int mq_geterr;
- int mq_draining;
int mq_besteffort;
nni_msg **mq_msgs;
@@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
- nni_cv_init(&mq->mq_drained, &mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- mq->mq_draining = 0;
- *mqp = mq;
+
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
- nni_cv_fini(&mq->mq_drained);
nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
@@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
mq->mq_cb_fn(mq->mq_cb_arg, flags);
}
-
- if (mq->mq_draining) {
- if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) {
- nni_cv_wake(&mq->mq_drained);
- }
- }
}
void
@@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
}
void
-nni_msgq_drain(nni_msgq *mq, nni_time expire)
-{
- nni_aio *aio;
-
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_closed = 1;
- mq->mq_draining = 1;
- while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- if (nni_cv_until(&mq->mq_drained, expire) != 0) {
- break;
- }
- }
-
- // Timed out or writers drained.
-
- // Complete the putq as NNG_ECLOSED.
- while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
-
- // Free any remaining messages in the queue.
- while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get++];
- if (mq->mq_get >= mq->mq_alloc) {
- mq->mq_get = 0;
- }
- mq->mq_len--;
- nni_msg_free(msg);
- }
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_close(nni_msgq *mq)
{
nni_aio *aio;
@@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cv_wake(&mq->mq_drained);
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 9cc650e0..93a26eb6 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -17,10 +17,6 @@
// they are a thread-safe way to pass messages between subsystems. They
// do have additional capabilities though.
//
-// A closed message queue cannot be written to, but if there are messages
-// still in it and it is draining, it can be read from. This permits
-// linger operations to work.
-//
// Message queues can be closed many times safely.
//
// Readers & writers in a message queue can be woken either by a timeout
@@ -108,12 +104,6 @@ extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *);
// are freed. Unlike closing a go channel, this operation is idempotent.
extern void nni_msgq_close(nni_msgq *);
-// nni_msgq_drain is like nng_msgq_close, except that reads
-// against the queue are permitted for up to the time limit. The
-// operation blocks until either the queue is empty, or the timeout
-// has expired. Any messages still in the queue at the timeout are freed.
-extern void nni_msgq_drain(nni_msgq *, nni_time);
-
// nni_msgq_resize resizes the message queue; messages already in the queue
// will be preserved as long as there is room. Messages that are dropped
// due to no room are taken from the most recent. (Oldest messages are
diff --git a/src/core/socket.c b/src/core/socket.c
index 67a2991c..4021c0c6 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -69,7 +69,6 @@ struct nni_socket {
nni_proto_ctx_ops s_ctx_ops;
// options
- nni_duration s_linger; // linger time
nni_duration s_sndtimeo; // send timeout
nni_duration s_rcvtimeo; // receive timeout
nni_duration s_reconn; // reconnect time
@@ -546,7 +545,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->s_linger = 0;
s->s_sndtimeo = -1;
s->s_rcvtimeo = -1;
s->s_closing = 0;
@@ -586,8 +584,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) ||
- ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger,
- sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo,
sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo,
@@ -686,7 +682,6 @@ nni_sock_shutdown(nni_sock *sock)
nni_pipe *pipe;
nni_ep * ep;
nni_ep * nep;
- nni_time linger;
nni_ctx * ctx;
nni_ctx * nctx;
@@ -698,15 +693,6 @@ nni_sock_shutdown(nni_sock *sock)
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
- // Special optimization; if there are no pipes connected,
- // then there is no reason to linger since there's nothing that
- // could possibly send this data out.
- if (nni_list_first(&sock->s_pipes) == NULL) {
- linger = NNI_TIME_ZERO;
- } else {
- linger = nni_clock() + sock->s_linger;
- }
-
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
@@ -732,21 +718,6 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&nni_sock_lk);
- // XXX: Add protocol specific drain here. This should replace the
- // msgq_drain feature below. Probably msgq_drain will need to
- // be changed to take an AIO for completion.
-
- // We drain the upper write queue. This is just like closing it,
- // except that the protocol gets a chance to get the messages and
- // push them down to the transport. This operation can *block*
- // until the linger time has expired. We only do this for sendable
- // sockets that are actually using the message queue of course.
- if ((nni_sock_flags(sock) &
- (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) ==
- NNI_PROTO_FLAG_SND) {
- nni_msgq_drain(sock->s_uwq, linger);
- }
-
// Generally, unless the protocol is blocked trying to perform
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
@@ -760,11 +731,6 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_unlock(&nni_sock_lk);
nni_mtx_lock(&sock->s_mx);
- while (nni_list_first(&sock->s_pipes) != NULL) {
- if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
- break;
- }
- }
// At this point, we've done everything we politely can to give
// the protocol a chance to flush its write side. Now its time
@@ -1029,10 +995,7 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
// Also check a few generic things. We do this if no transport
// was found, or even if a transport rejected one of the settings.
if ((rv == NNG_ENOTSUP) || (rv == 0)) {
- if ((strcmp(name, NNG_OPT_LINGER) == 0)) {
- nng_duration d;
- rv = nni_copyin_ms(&d, v, sz, t);
- } else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
+ if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
size_t z;
// just a sanity test on the size; it also ensures that
// a size can be set even with no transport configured.
@@ -1102,14 +1065,6 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
}
- // For some options, which also have an impact on the socket
- // behavior, we save a local value. Note that the transport
- // will already have had a chance to veto this.
-
- if (strcmp(name, NNG_OPT_LINGER) == 0) {
- rv = nni_copyin_ms(&s->s_linger, v, sz, t);
- }
-
if (rv == 0) {
// Remove and toss the old value, we are using a new one.
if (oldv != NULL) {