diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-25 19:02:30 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@saucelabs.com> | 2016-12-27 20:42:03 -0800 |
| commit | ca74e80e9b0695a1c374840058025e567189dd14 (patch) | |
| tree | e707312a06fca922dfe97cdcb8b04615282f0875 /src/core | |
| parent | 0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c (diff) | |
| download | nng-ca74e80e9b0695a1c374840058025e567189dd14.tar.gz nng-ca74e80e9b0695a1c374840058025e567189dd14.tar.bz2 nng-ca74e80e9b0695a1c374840058025e567189dd14.zip | |
Buffer resizing implemented. (Needed for single threaded inproc tests.)
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 110 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 4 | ||||
| -rw-r--r-- | src/core/socket.c | 44 |
3 files changed, 152 insertions, 6 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 859c0bd9..efa0bb72 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -311,9 +311,8 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) } // If we timedout, free any remaining messages in the queue. while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get]; - mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { + nni_msg *msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; @@ -333,13 +332,112 @@ nni_msgqueue_close(nni_msgqueue *mq) // Free the messages orphaned in the queue. while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get]; - mq->mq_get++; - if (mq->mq_get > mq->mq_cap) { + nni_msg *msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_alloc) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_mutex_exit(&mq->mq_lock); +} + + +int +nni_msgqueue_len(nni_msgqueue *mq) +{ + int rv; + + nni_mutex_enter(&mq->mq_lock); + rv = mq->mq_len; + nni_mutex_exit(&mq->mq_lock); + return (rv); +} + + +int +nni_msgqueue_cap(nni_msgqueue *mq) +{ + int rv; + + nni_mutex_enter(&mq->mq_lock); + rv = mq->mq_cap; + nni_mutex_exit(&mq->mq_lock); + return (rv); +} + + +int +nni_msgqueue_resize(nni_msgqueue *mq, int cap) +{ + int alloc; + nni_msg *msg; + nni_msg **newq, **oldq; + int oldget; + int oldput; + int oldcap; + int oldlen; + int oldalloc; + + alloc = cap + 2; + + if (alloc > mq->mq_alloc) { + newq = nni_alloc(sizeof (nni_msg *) * alloc); + if (newq == NULL) { + return (NNG_ENOMEM); + } + } else { + newq = NULL; + } + + nni_mutex_enter(&mq->mq_lock); + while (mq->mq_len > (cap + 1)) { + // too many messages -- we allow that one for + // the case of pushback or cap == 0. + // we delete the oldest messages first + msg = mq->mq_msgs[mq->mq_get++]; + if (mq->mq_get > mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; nni_msg_free(msg); } + if (newq == NULL) { + // Just shrinking the queue, no changes + mq->mq_cap = cap; + goto out; + } + + oldq = mq->mq_msgs; + oldget = mq->mq_get; + oldput = mq->mq_put; + oldcap = mq->mq_cap; + oldalloc = mq->mq_alloc; + oldlen = mq->mq_len; + + mq->mq_msgs = newq; + mq->mq_len = mq->mq_get = mq->mq_put = 0; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + + while (oldlen) { + mq->mq_msgs[mq->mq_put++] = oldq[oldget++]; + if (oldget == oldalloc) { + oldget = 0; + } + if (mq->mq_put == mq->mq_alloc) { + mq->mq_put = 0; + } + mq->mq_len++; + oldlen--; + } + nni_free(oldq, sizeof (nni_msg *) * oldalloc); + +out: + // Wake everyone up -- we changed everything. + nni_cond_broadcast(&mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_drained); nni_mutex_exit(&mq->mq_lock); + return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 2684d42d..5efbc245 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -90,4 +90,8 @@ extern void nni_msgqueue_close(nni_msgqueue *); // has expired. Any messages still in the queue at the timeout are freed. extern void nni_msgqueue_drain(nni_msgqueue *, nni_time); +extern int nni_msgqueue_resize(nni_msgqueue *, int); +extern int nni_msgqueue_cap(nni_msgqueue *mq); +extern int nni_msgqueue_len(nni_msgqueue *mq); + #endif // CORE_MSQUEUE_H diff --git a/src/core/socket.c b/src/core/socket.c index ce5dbf3c..91c3c03e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -447,6 +447,38 @@ nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep) } +static int +nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz) +{ + int len; + + if (sz < sizeof (len)) { + return (NNG_EINVAL); + } + memcpy(&len, val, sizeof (len)); + if (len < 0) { + return (NNG_EINVAL); + } + return (nni_msgqueue_resize(mq, len)); +} + + +static int +nni_getopt_buf(nni_msgqueue *mq, void *val, size_t *sizep) +{ + int len = nni_msgqueue_cap(mq); + + int sz = *sizep; + + if (sz > sizeof (len)) { + sz = sizeof (len); + } + memcpy(val, &len, sz); + *sizep = sizeof (len); + return (0); +} + + int nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) { @@ -478,6 +510,12 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size) case NNG_OPT_RECONN_MAXTIME: rv = nni_setopt_duration(&sock->s_reconnmax, val, size); break; + case NNG_OPT_SNDBUF: + rv = nni_setopt_buf(sock->s_uwq, val, size); + break; + case NNG_OPT_RCVBUF: + rv = nni_setopt_buf(sock->s_urq, val, size); + break; } nni_mutex_exit(&sock->s_mx); return (rv); @@ -515,6 +553,12 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep) case NNG_OPT_RECONN_MAXTIME: rv = nni_getopt_duration(&sock->s_reconnmax, val, sizep); break; + case NNG_OPT_SNDBUF: + rv = nni_getopt_buf(sock->s_uwq, val, sizep); + break; + case NNG_OPT_RCVBUF: + rv = nni_getopt_buf(sock->s_urq, val, sizep); + break; } nni_mutex_exit(&sock->s_mx); return (rv); |
