summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-02 11:54:33 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-02 11:54:33 -0800
commit9eb080db1c450228169cc58f14d946211378fcf7 (patch)
treefcbbe3840b25c9651badd75950599f65e82caf01 /src/core
parentcceda25b65423de694f34b3decc7812eb46a4c1e (diff)
downloadnng-9eb080db1c450228169cc58f14d946211378fcf7.tar.gz
nng-9eb080db1c450228169cc58f14d946211378fcf7.tar.bz2
nng-9eb080db1c450228169cc58f14d946211378fcf7.zip
Change msgqueue -> msgq.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c63
-rw-r--r--src/core/msgqueue.h77
-rw-r--r--src/core/options.c8
-rw-r--r--src/core/options.h4
-rw-r--r--src/core/protocol.h4
-rw-r--r--src/core/socket.c26
-rw-r--r--src/core/socket.h4
7 files changed, 98 insertions, 88 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 4807fe0f..0f1872dc 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -14,7 +14,7 @@
// differences and improvements. For example, these can grow, and either
// side can close, and they may be closed more than once.
-struct nni_msgqueue {
+struct nni_msgq {
nni_mtx mq_lock;
nni_cv mq_readable;
nni_cv mq_writeable;
@@ -31,9 +31,9 @@ struct nni_msgqueue {
};
int
-nni_msgqueue_create(nni_msgqueue **mqp, int cap)
+nni_msgq_init(nni_msgq **mqp, int cap)
{
- struct nni_msgqueue *mq;
+ struct nni_msgq *mq;
int rv;
int alloc;
@@ -92,10 +92,13 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
void
-nni_msgqueue_destroy(nni_msgqueue *mq)
+nni_msgq_fini(nni_msgq *mq)
{
nni_msg *msg;
+ if (mq == NULL) {
+ return;
+ }
nni_cv_fini(&mq->mq_drained);
nni_cv_fini(&mq->mq_writeable);
nni_cv_fini(&mq->mq_readable);
@@ -117,11 +120,11 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
}
-// nni_msgqueue_signal raises a signal on the signal object. This allows a
+// nni_msgq_signal raises a signal on the signal object. This allows a
// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
// Note that the signal object must be *zero* if no signal is raised.
void
-nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
+nni_msgq_signal(nni_msgq *mq, int *signal)
{
nni_mtx_lock(&mq->mq_lock);
*signal = 1;
@@ -134,8 +137,7 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
int
-nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
- nni_time expire, nni_signal *signal)
+nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
@@ -161,7 +163,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
// interrupted?
- if (*signal) {
+ if (*sig) {
nni_mtx_unlock(&mq->mq_lock);
return (NNG_EINTR);
}
@@ -198,11 +200,11 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
-// nni_msgqueue_putback will attempt to put a single message back
+// nni_msgq_putback will attempt to put a single message back
// to the head of the queue. It never blocks. Message queues always
// have room for at least one putback.
int
-nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg)
+nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
nni_mtx_lock(&mq->mq_lock);
@@ -234,8 +236,7 @@ nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg)
static int
-nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
- nni_time expire, nni_signal *signal)
+nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
@@ -254,7 +255,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
nni_mtx_unlock(&mq->mq_lock);
return (NNG_EAGAIN);
}
- if (*signal) {
+ if (*sig) {
nni_mtx_unlock(&mq->mq_lock);
return (NNG_EINTR);
}
@@ -288,57 +289,57 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
int
-nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp)
+nni_msgq_get(nni_msgq *mq, nni_msg **msgp)
{
nni_signal nosig = 0;
- return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig));
+ return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, &nosig));
}
int
-nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal)
+nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal)
{
- return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal));
+ return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal));
}
int
-nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire)
+nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
nni_signal nosig = 0;
- return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig));
+ return (nni_msgq_get_(mq, msgp, expire, &nosig));
}
int
-nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg)
+nni_msgq_put(nni_msgq *mq, nni_msg *msg)
{
nni_signal nosig = 0;
- return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig));
+ return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, &nosig));
}
int
-nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal)
+nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal)
{
- return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal));
+ return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal));
}
int
-nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
+nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
nni_signal nosig = 0;
- return (nni_msgqueue_put_impl(mq, msg, expire, &nosig));
+ return (nni_msgq_put_(mq, msg, expire, &nosig));
}
void
-nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
+nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
@@ -363,7 +364,7 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
void
-nni_msgqueue_close(nni_msgqueue *mq)
+nni_msgq_close(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
@@ -384,7 +385,7 @@ nni_msgqueue_close(nni_msgqueue *mq)
int
-nni_msgqueue_len(nni_msgqueue *mq)
+nni_msgq_len(nni_msgq *mq)
{
int rv;
@@ -396,7 +397,7 @@ nni_msgqueue_len(nni_msgqueue *mq)
int
-nni_msgqueue_cap(nni_msgqueue *mq)
+nni_msgq_cap(nni_msgq *mq)
{
int rv;
@@ -408,7 +409,7 @@ nni_msgqueue_cap(nni_msgqueue *mq)
int
-nni_msgqueue_resize(nni_msgqueue *mq, int cap)
+nni_msgq_resize(nni_msgq *mq, int cap)
{
int alloc;
nni_msg *msg;
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 900fed2b..9ee4335a 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -24,80 +24,89 @@
//
// Readers & writers in a message queue can be woken either by a timeout
// or by a specific signal (arranged by the caller).
-//
-// TODO: Add message queue growing, and pushback.
-typedef struct nni_msgqueue nni_msgqueue;
+typedef struct nni_msgq nni_msgq;
-// nni_msgqueue_create creates a message queue with the given capacity,
+// nni_msgq_init creates a message queue with the given capacity,
// which must be a positive number. It returns NNG_EINVAL if the capacity
// is invalid, or NNG_ENOMEM if resources cannot be allocated.
-extern int nni_msgqueue_create(nni_msgqueue **, int);
+extern int nni_msgq_init(nni_msgq **, int);
-// nni_msgqueue_destroy destroys a message queue. It will also free any
+// nni_msgq_fini destroys a message queue. It will also free any
// messages that may be in the queue.
-extern void nni_msgqueue_destroy(nni_msgqueue *);
+extern void nni_msgq_fini(nni_msgq *);
-// nni_msgqueue_put puts the message to the queue. It blocks until it
+// nni_msgq_put puts the message to the queue. It blocks until it
// was able to do so, or the queue is closed, returning either 0 on
// success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is
// returned, the caller is responsible for freeing the message with
// nni_msg_free(), otherwise the message is "owned" by the queue, and
// the caller is not permitted to access it further.
-extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *);
+extern int nni_msgq_put(nni_msgq *, nni_msg *);
-// nni_msgqueue_putback returns a message to the head of the queue.
+// nni_msgq_putback returns a message to the head of the queue.
// This is a non-blocking operation, and it returns EAGAIN if there
// is no room. There is always at least room for one putback after
-// a message is retried with nni_msgqueue_get.
-extern int nni_msgqueue_putback(nni_msgqueue *, nni_msg *);
+// a message is retried with nni_msgq_get.
+extern int nni_msgq_putback(nni_msgq *, nni_msg *);
-// nni_msgqueue_get gets the message from the queue. It blocks until a
+// nni_msgq_get gets the message from the queue. It blocks until a
// message is available, or the queue is closed, returning either 0 on
// success or NNG_ECLOSED if the queue was closed. If a message is
// provided, the caller is assumes ownership of the message and must
// call nni_msg_free() when it is finished with it.
-extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **);
+extern int nni_msgq_get(nni_msgq *, nni_msg **);
-// nni_msgqueue_put_until is like nni_msgqueue_put, except that if the
+// nni_msgq_put_until is like nni_msgq_put, except that if the
// system clock reaches the specified time without being able to place
// the message in the queue, it will return NNG_ETIMEDOUT.
-extern int nni_msgqueue_put_until(nni_msgqueue *, nni_msg *, nni_time);
+extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time);
-// nni_msgqueue_get_until is like nni_msgqueue_put, except that if the
+// nni_msgq_get_until is like nni_msgq_put, except that if the
// system clock reaches the specified time without being able to retrieve
// a message from the queue, it will return NNG_ETIMEDOUT.
-extern int nni_msgqueue_get_until(nni_msgqueue *, nni_msg **, nni_time);
+extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time);
-// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it
+// nni_msgq_put_sig is an enhanced version of nni_msgq_put, but it
// can be interrupted by nni_msgqueue_signal using the same final pointer,
// which can be thought of as a turnstile. If interrupted it returns EINTR.
// The turnstile should be initialized to zero.
-extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, nni_signal *);
+extern int nni_msgq_put_sig(nni_msgq *, nni_msg *, nni_signal *);
-// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it
-// can be interrupted by nni_msgqueue_signal using the same final pointer,
+// nni_msgq_get_sig is an enhanced version of nni_msgq_get_t, but it
+// can be interrupted by nni_msgq_signal using the same final pointer,
// which can be thought of as a turnstile. If interrupted it returns EINTR.
// The turnstile should be initialized to zero.
-extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, nni_signal *);
+extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *);
-// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in
-// the msgqueue, if they have registered an interest in the same turnstile.
+// nni_msgq_signal delivers a signal / interrupt to waiters blocked in
+// the msgq, if they have registered an interest in the same turnstile.
// It modifies the turnstile's value under the lock to a non-zero value.
-extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *);
+extern void nni_msgq_signal(nni_msgq *, nni_signal *);
-// nni_msgqueue_close closes the queue. After this all operates on the
+// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
-extern void nni_msgqueue_close(nni_msgqueue *);
+extern void nni_msgq_close(nni_msgq *);
-// nni_msgqueue_drain is like nng_msgqueue_close, except that reads
+// nni_msgq_drain is like nng_msgq_close, except that reads
// against the queue are permitted for up to the time limit. The
// operation blocks until either the queue is empty, or the timeout
// has expired. Any messages still in the queue at the timeout are freed.
-extern void nni_msgqueue_drain(nni_msgqueue *, nni_time);
-
-extern int nni_msgqueue_resize(nni_msgqueue *, int);
-extern int nni_msgqueue_cap(nni_msgqueue *mq);
-extern int nni_msgqueue_len(nni_msgqueue *mq);
+extern void nni_msgq_drain(nni_msgq *, nni_time);
+
+// nni_msgq_resize resizes the message queue; messages already in the queue
+// will be preserved as long as there is room. Messages that are dropped
+// due to no room are taken from the most recent. (Oldest messages are
+// preserved.)
+extern int nni_msgq_resize(nni_msgq *, int);
+
+// nni_msgq_cap returns the "capacity" of the message queue. This does not
+// include the extra room for pushback, nor the extra slot reserved to make
+// zero-length message queues possible. As a consequence, it is possible
+// for the message queue to contain up to 2 more messages than the capacity.
+extern int nni_msgq_cap(nni_msgq *mq);
+
+// nni_msgq_len returns the number of messages currently in the queue.
+extern int nni_msgq_len(nni_msgq *mq);
#endif // CORE_MSQUEUE_H
diff --git a/src/core/options.c b/src/core/options.c
index b970a0e7..24d7a9eb 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -77,7 +77,7 @@ nni_getopt_int(int *ptr, void *val, size_t *sizep)
int
-nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz)
+nni_setopt_buf(nni_msgq *mq, const void *val, size_t sz)
{
int len;
@@ -94,14 +94,14 @@ nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz)
// size could be quite large indeed in this case.
return (NNG_EINVAL);
}
- return (nni_msgqueue_resize(mq, len));
+ return (nni_msgq_resize(mq, len));
}
int
-nni_getopt_buf(nni_msgqueue *mq, void *val, size_t *sizep)
+nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep)
{
- int len = nni_msgqueue_cap(mq);
+ int len = nni_msgq_cap(mq);
int sz = *sizep;
diff --git a/src/core/options.h b/src/core/options.h
index ec5d28f1..84d958c1 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -15,10 +15,10 @@
// variable sized options.
// nni_setopt_buf sets the queue size for the message queue.
-extern int nni_setopt_buf(nni_msgqueue *, const void *, size_t);
+extern int nni_setopt_buf(nni_msgq *, const void *, size_t);
// nni_getopt_buf gets the queue size for the message queue.
-extern int nni_getopt_buf(nni_msgqueue *, void *, size_t *);
+extern int nni_getopt_buf(nni_msgq *, void *, size_t *);
// nni_setopt_duration sets the duration. Durations must be legal,
// either a positive value, 0, or -1 to indicate forever.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 3a391ece..6fd48b40 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -63,11 +63,11 @@ struct nni_protocol {
// nni_socket_sendq obtains the upper writeq. The protocol should
// recieve messages from this, and place them on the appropriate pipe.
-extern nni_msgqueue *nni_socket_sendq(nni_socket *);
+extern nni_msgq *nni_socket_sendq(nni_socket *);
// nni_socket_recvq obtains the upper readq. The protocol should
// inject incoming messages from pipes to it.
-extern nni_msgqueue *nni_socket_recvq(nni_socket *);
+extern nni_msgq *nni_socket_recvq(nni_socket *);
// nni_socket_recv_err sets an error code to be returned to clients
// rather than waiting for a message. Set it to 0 to resume normal
diff --git a/src/core/socket.c b/src/core/socket.c
index a8904500..c64ba5ef 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -15,14 +15,14 @@
// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
// the upper read and write queues.
-nni_msgqueue *
+nni_msgq *
nni_socket_sendq(nni_socket *s)
{
return (s->s_uwq);
}
-nni_msgqueue *
+nni_msgq *
nni_socket_recvq(nni_socket *s)
{
return (s->s_urq);
@@ -126,15 +126,15 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (rv);
}
- if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) {
+ if ((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) {
nni_thr_fini(&sock->s_reaper);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
return (rv);
}
- if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) {
- nni_msgqueue_destroy(sock->s_uwq);
+ if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) {
+ nni_msgq_fini(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
@@ -143,8 +143,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
}
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
- nni_msgqueue_destroy(sock->s_urq);
- nni_msgqueue_destroy(sock->s_uwq);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
@@ -192,7 +192,7 @@ nni_socket_close(nni_socket *sock)
// except that the protocol gets a chance to get the messages and
// push them down to the transport. This operation can *block*
// until the linger time has expired.
- nni_msgqueue_drain(sock->s_uwq, linger);
+ nni_msgq_drain(sock->s_uwq, linger);
// Generally, unless the protocol is blocked trying to perform
// writes (e.g. a slow reader on the other side), it should be
@@ -211,7 +211,7 @@ nni_socket_close(nni_socket *sock)
// Close the upper read queue immediately. This can happen
// safely while we hold the lock.
- nni_msgqueue_close(sock->s_urq);
+ nni_msgq_close(sock->s_urq);
// Go through and schedule close on all pipes.
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
@@ -231,8 +231,8 @@ nni_socket_close(nni_socket *sock)
sock->s_ops.proto_destroy(sock->s_data);
// And we need to clean up *our* state.
- nni_msgqueue_destroy(sock->s_urq);
- nni_msgqueue_destroy(sock->s_uwq);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
@@ -273,7 +273,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire)
// backpressure, we just throw it away, and don't complain.
expire = NNI_TIME_ZERO;
}
- rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire);
+ rv = nni_msgq_put_until(sock->s_uwq, msg, expire);
if (besteffort && (rv == NNG_EAGAIN)) {
// Pretend this worked... it didn't, but pretend.
nni_msg_free(msg);
@@ -301,7 +301,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire)
nni_mtx_unlock(&sock->s_mx);
for (;;) {
- rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire);
+ rv = nni_msgq_get_until(sock->s_urq, &msg, expire);
if (rv != 0) {
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 8d72c5e6..bd0a5c8a 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -17,8 +17,8 @@ struct nng_socket {
nni_mtx s_mx;
nni_cv s_cv;
- nni_msgqueue * s_uwq; // Upper write queue
- nni_msgqueue * s_urq; // Upper read queue
+ nni_msgq * s_uwq; // Upper write queue
+ nni_msgq * s_urq; // Upper read queue
nni_protocol s_ops;