diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 66 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 10 | ||||
| -rw-r--r-- | src/core/socket.c | 47 |
3 files changed, 11 insertions, 112 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 8246279f..529e7f4d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -17,7 +17,6 @@ struct nni_msgq { nni_mtx mq_lock; - nni_cv mq_drained; int mq_cap; int mq_alloc; // alloc is cap + 2... int mq_len; @@ -26,7 +25,6 @@ struct nni_msgq { int mq_closed; int mq_puterr; int mq_geterr; - int mq_draining; int mq_besteffort; nni_msg **mq_msgs; @@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - nni_cv_init(&mq->mq_drained, &mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - mq->mq_draining = 0; - *mqp = mq; + + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); /* Free any orphaned messages. */ @@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq) } mq->mq_cb_fn(mq->mq_cb_arg, flags); } - - if (mq->mq_draining) { - if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) { - nni_cv_wake(&mq->mq_drained); - } - } } void @@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) } void -nni_msgq_drain(nni_msgq *mq, nni_time expire) -{ - nni_aio *aio; - - nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; - mq->mq_draining = 1; - while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) { - if (nni_cv_until(&mq->mq_drained, expire) != 0) { - break; - } - } - - // Timed out or writers drained. - - // Complete the putq as NNG_ECLOSED. - while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - - // Free any remaining messages in the queue. - while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get++]; - if (mq->mq_get >= mq->mq_alloc) { - mq->mq_get = 0; - } - mq->mq_len--; - nni_msg_free(msg); - } - nni_mtx_unlock(&mq->mq_lock); -} - -void nni_msgq_close(nni_msgq *mq) { nni_aio *aio; @@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap) out: // Wake everyone up -- we changed everything. - nni_cv_wake(&mq->mq_drained); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 9cc650e0..93a26eb6 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -17,10 +17,6 @@ // they are a thread-safe way to pass messages between subsystems. They // do have additional capabilities though. // -// A closed message queue cannot be written to, but if there are messages -// still in it and it is draining, it can be read from. This permits -// linger operations to work. -// // Message queues can be closed many times safely. // // Readers & writers in a message queue can be woken either by a timeout @@ -108,12 +104,6 @@ extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); // are freed. Unlike closing a go channel, this operation is idempotent. extern void nni_msgq_close(nni_msgq *); -// nni_msgq_drain is like nng_msgq_close, except that reads -// against the queue are permitted for up to the time limit. The -// operation blocks until either the queue is empty, or the timeout -// has expired. Any messages still in the queue at the timeout are freed. -extern void nni_msgq_drain(nni_msgq *, nni_time); - // nni_msgq_resize resizes the message queue; messages already in the queue // will be preserved as long as there is room. Messages that are dropped // due to no room are taken from the most recent. (Oldest messages are diff --git a/src/core/socket.c b/src/core/socket.c index 67a2991c..4021c0c6 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,7 +69,6 @@ struct nni_socket { nni_proto_ctx_ops s_ctx_ops; // options - nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout nni_duration s_reconn; // reconnect time @@ -546,7 +545,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->s_linger = 0; s->s_sndtimeo = -1; s->s_rcvtimeo = -1; s->s_closing = 0; @@ -586,8 +584,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger, - sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) || ((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo, sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) || ((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo, @@ -686,7 +682,6 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe *pipe; nni_ep * ep; nni_ep * nep; - nni_time linger; nni_ctx * ctx; nni_ctx * nctx; @@ -698,15 +693,6 @@ nni_sock_shutdown(nni_sock *sock) // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; - // Special optimization; if there are no pipes connected, - // then there is no reason to linger since there's nothing that - // could possibly send this data out. - if (nni_list_first(&sock->s_pipes) == NULL) { - linger = NNI_TIME_ZERO; - } else { - linger = nni_clock() + sock->s_linger; - } - // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { @@ -732,21 +718,6 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&nni_sock_lk); - // XXX: Add protocol specific drain here. This should replace the - // msgq_drain feature below. Probably msgq_drain will need to - // be changed to take an AIO for completion. - - // We drain the upper write queue. This is just like closing it, - // except that the protocol gets a chance to get the messages and - // push them down to the transport. This operation can *block* - // until the linger time has expired. We only do this for sendable - // sockets that are actually using the message queue of course. - if ((nni_sock_flags(sock) & - (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) == - NNI_PROTO_FLAG_SND) { - nni_msgq_drain(sock->s_uwq, linger); - } - // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it @@ -760,11 +731,6 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&nni_sock_lk); nni_mtx_lock(&sock->s_mx); - while (nni_list_first(&sock->s_pipes) != NULL) { - if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { - break; - } - } // At this point, we've done everything we politely can to give // the protocol a chance to flush its write side. Now its time @@ -1029,10 +995,7 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) // Also check a few generic things. We do this if no transport // was found, or even if a transport rejected one of the settings. if ((rv == NNG_ENOTSUP) || (rv == 0)) { - if ((strcmp(name, NNG_OPT_LINGER) == 0)) { - nng_duration d; - rv = nni_copyin_ms(&d, v, sz, t); - } else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) { + if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) { size_t z; // just a sanity test on the size; it also ensures that // a size can be set even with no transport configured. @@ -1102,14 +1065,6 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } } - // For some options, which also have an impact on the socket - // behavior, we save a local value. Note that the transport - // will already have had a chance to veto this. - - if (strcmp(name, NNG_OPT_LINGER) == 0) { - rv = nni_copyin_ms(&s->s_linger, v, sz, t); - } - if (rv == 0) { // Remove and toss the old value, we are using a new one. if (oldv != NULL) { |
