aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-25 19:02:30 -0800
committerGarrett D'Amore <garrett@saucelabs.com>2016-12-27 20:42:03 -0800
commitca74e80e9b0695a1c374840058025e567189dd14 (patch)
treee707312a06fca922dfe97cdcb8b04615282f0875 /src
parent0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c (diff)
downloadnng-ca74e80e9b0695a1c374840058025e567189dd14.tar.gz
nng-ca74e80e9b0695a1c374840058025e567189dd14.tar.bz2
nng-ca74e80e9b0695a1c374840058025e567189dd14.zip
Buffer resizing implemented. (Needed for single threaded inproc tests.)
Diffstat (limited to 'src')
-rw-r--r--src/core/msgqueue.c110
-rw-r--r--src/core/msgqueue.h4
-rw-r--r--src/core/socket.c44
3 files changed, 152 insertions, 6 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 859c0bd9..efa0bb72 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -311,9 +311,8 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
}
// If we timedout, free any remaining messages in the queue.
while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get];
- mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
@@ -333,13 +332,112 @@ nni_msgqueue_close(nni_msgqueue *mq)
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
- nni_msg *msg = mq->mq_msgs[mq->mq_get];
- mq->mq_get++;
- if (mq->mq_get > mq->mq_cap) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_alloc) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_mutex_exit(&mq->mq_lock);
+}
+
+
+int
+nni_msgqueue_len(nni_msgqueue *mq)
+{
+ int rv;
+
+ nni_mutex_enter(&mq->mq_lock);
+ rv = mq->mq_len;
+ nni_mutex_exit(&mq->mq_lock);
+ return (rv);
+}
+
+
+int
+nni_msgqueue_cap(nni_msgqueue *mq)
+{
+ int rv;
+
+ nni_mutex_enter(&mq->mq_lock);
+ rv = mq->mq_cap;
+ nni_mutex_exit(&mq->mq_lock);
+ return (rv);
+}
+
+
+int
+nni_msgqueue_resize(nni_msgqueue *mq, int cap)
+{
+ int alloc;
+ nni_msg *msg;
+ nni_msg **newq, **oldq;
+ int oldget;
+ int oldput;
+ int oldcap;
+ int oldlen;
+ int oldalloc;
+
+ alloc = cap + 2;
+
+ if (alloc > mq->mq_alloc) {
+ newq = nni_alloc(sizeof (nni_msg *) * alloc);
+ if (newq == NULL) {
+ return (NNG_ENOMEM);
+ }
+ } else {
+ newq = NULL;
+ }
+
+ nni_mutex_enter(&mq->mq_lock);
+ while (mq->mq_len > (cap + 1)) {
+ // too many messages -- we allow that one for
+ // the case of pushback or cap == 0.
+ // we delete the oldest messages first
+ msg = mq->mq_msgs[mq->mq_get++];
+ if (mq->mq_get > mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
nni_msg_free(msg);
}
+ if (newq == NULL) {
+ // Just shrinking the queue, no changes
+ mq->mq_cap = cap;
+ goto out;
+ }
+
+ oldq = mq->mq_msgs;
+ oldget = mq->mq_get;
+ oldput = mq->mq_put;
+ oldcap = mq->mq_cap;
+ oldalloc = mq->mq_alloc;
+ oldlen = mq->mq_len;
+
+ mq->mq_msgs = newq;
+ mq->mq_len = mq->mq_get = mq->mq_put = 0;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+
+ while (oldlen) {
+ mq->mq_msgs[mq->mq_put++] = oldq[oldget++];
+ if (oldget == oldalloc) {
+ oldget = 0;
+ }
+ if (mq->mq_put == mq->mq_alloc) {
+ mq->mq_put = 0;
+ }
+ mq->mq_len++;
+ oldlen--;
+ }
+ nni_free(oldq, sizeof (nni_msg *) * oldalloc);
+
+out:
+ // Wake everyone up -- we changed everything.
+ nni_cond_broadcast(&mq->mq_readable);
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
+ return (0);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 2684d42d..5efbc245 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -90,4 +90,8 @@ extern void nni_msgqueue_close(nni_msgqueue *);
// has expired. Any messages still in the queue at the timeout are freed.
extern void nni_msgqueue_drain(nni_msgqueue *, nni_time);
+extern int nni_msgqueue_resize(nni_msgqueue *, int);
+extern int nni_msgqueue_cap(nni_msgqueue *mq);
+extern int nni_msgqueue_len(nni_msgqueue *mq);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/socket.c b/src/core/socket.c
index ce5dbf3c..91c3c03e 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -447,6 +447,38 @@ nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
}
+static int
+nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz)
+{
+ int len;
+
+ if (sz < sizeof (len)) {
+ return (NNG_EINVAL);
+ }
+ memcpy(&len, val, sizeof (len));
+ if (len < 0) {
+ return (NNG_EINVAL);
+ }
+ return (nni_msgqueue_resize(mq, len));
+}
+
+
+static int
+nni_getopt_buf(nni_msgqueue *mq, void *val, size_t *sizep)
+{
+ int len = nni_msgqueue_cap(mq);
+
+ int sz = *sizep;
+
+ if (sz > sizeof (len)) {
+ sz = sizeof (len);
+ }
+ memcpy(val, &len, sz);
+ *sizep = sizeof (len);
+ return (0);
+}
+
+
int
nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
{
@@ -478,6 +510,12 @@ nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
case NNG_OPT_RECONN_MAXTIME:
rv = nni_setopt_duration(&sock->s_reconnmax, val, size);
break;
+ case NNG_OPT_SNDBUF:
+ rv = nni_setopt_buf(sock->s_uwq, val, size);
+ break;
+ case NNG_OPT_RCVBUF:
+ rv = nni_setopt_buf(sock->s_urq, val, size);
+ break;
}
nni_mutex_exit(&sock->s_mx);
return (rv);
@@ -515,6 +553,12 @@ nni_socket_getopt(nni_socket *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RECONN_MAXTIME:
rv = nni_getopt_duration(&sock->s_reconnmax, val, sizep);
break;
+ case NNG_OPT_SNDBUF:
+ rv = nni_getopt_buf(sock->s_uwq, val, sizep);
+ break;
+ case NNG_OPT_RCVBUF:
+ rv = nni_getopt_buf(sock->s_urq, val, sizep);
+ break;
}
nni_mutex_exit(&sock->s_mx);
return (rv);