diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-02 11:54:33 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-02 11:54:33 -0800 |
| commit | 9eb080db1c450228169cc58f14d946211378fcf7 (patch) | |
| tree | fcbbe3840b25c9651badd75950599f65e82caf01 /src/core | |
| parent | cceda25b65423de694f34b3decc7812eb46a4c1e (diff) | |
| download | nng-9eb080db1c450228169cc58f14d946211378fcf7.tar.gz nng-9eb080db1c450228169cc58f14d946211378fcf7.tar.bz2 nng-9eb080db1c450228169cc58f14d946211378fcf7.zip | |
Change msgqueue -> msgq.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 63 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 77 | ||||
| -rw-r--r-- | src/core/options.c | 8 | ||||
| -rw-r--r-- | src/core/options.h | 4 | ||||
| -rw-r--r-- | src/core/protocol.h | 4 | ||||
| -rw-r--r-- | src/core/socket.c | 26 | ||||
| -rw-r--r-- | src/core/socket.h | 4 |
7 files changed, 98 insertions, 88 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 4807fe0f..0f1872dc 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -14,7 +14,7 @@ // differences and improvements. For example, these can grow, and either // side can close, and they may be closed more than once. -struct nni_msgqueue { +struct nni_msgq { nni_mtx mq_lock; nni_cv mq_readable; nni_cv mq_writeable; @@ -31,9 +31,9 @@ struct nni_msgqueue { }; int -nni_msgqueue_create(nni_msgqueue **mqp, int cap) +nni_msgq_init(nni_msgq **mqp, int cap) { - struct nni_msgqueue *mq; + struct nni_msgq *mq; int rv; int alloc; @@ -92,10 +92,13 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) void -nni_msgqueue_destroy(nni_msgqueue *mq) +nni_msgq_fini(nni_msgq *mq) { nni_msg *msg; + if (mq == NULL) { + return; + } nni_cv_fini(&mq->mq_drained); nni_cv_fini(&mq->mq_writeable); nni_cv_fini(&mq->mq_readable); @@ -117,11 +120,11 @@ nni_msgqueue_destroy(nni_msgqueue *mq) } -// nni_msgqueue_signal raises a signal on the signal object. This allows a +// nni_msgq_signal raises a signal on the signal object. This allows a // waiter to be signaled, so that it can be woken e.g. due to a pipe closing. // Note that the signal object must be *zero* if no signal is raised. void -nni_msgqueue_signal(nni_msgqueue *mq, int *signal) +nni_msgq_signal(nni_msgq *mq, int *signal) { nni_mtx_lock(&mq->mq_lock); *signal = 1; @@ -134,8 +137,7 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal) int -nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, - nni_time expire, nni_signal *signal) +nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { int rv; @@ -161,7 +163,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } // interrupted? - if (*signal) { + if (*sig) { nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } @@ -198,11 +200,11 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } -// nni_msgqueue_putback will attempt to put a single message back +// nni_msgq_putback will attempt to put a single message back // to the head of the queue. It never blocks. Message queues always // have room for at least one putback. int -nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) +nni_msgq_putback(nni_msgq *mq, nni_msg *msg) { nni_mtx_lock(&mq->mq_lock); @@ -234,8 +236,7 @@ nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) static int -nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, - nni_time expire, nni_signal *signal) +nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) { int rv; @@ -254,7 +255,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); } - if (*signal) { + if (*sig) { nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } @@ -288,57 +289,57 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, int -nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp) +nni_msgq_get(nni_msgq *mq, nni_msg **msgp) { nni_signal nosig = 0; - return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig)); + return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, &nosig)); } int -nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal) +nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { - return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal)); + return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal)); } int -nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire) +nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { nni_signal nosig = 0; - return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig)); + return (nni_msgq_get_(mq, msgp, expire, &nosig)); } int -nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg) +nni_msgq_put(nni_msgq *mq, nni_msg *msg) { nni_signal nosig = 0; - return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig)); + return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, &nosig)); } int -nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal) +nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) { - return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal)); + return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal)); } int -nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) +nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { nni_signal nosig = 0; - return (nni_msgqueue_put_impl(mq, msg, expire, &nosig)); + return (nni_msgq_put_(mq, msg, expire, &nosig)); } void -nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) +nni_msgq_drain(nni_msgq *mq, nni_time expire) { nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; @@ -363,7 +364,7 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) void -nni_msgqueue_close(nni_msgqueue *mq) +nni_msgq_close(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; @@ -384,7 +385,7 @@ nni_msgqueue_close(nni_msgqueue *mq) int -nni_msgqueue_len(nni_msgqueue *mq) +nni_msgq_len(nni_msgq *mq) { int rv; @@ -396,7 +397,7 @@ nni_msgqueue_len(nni_msgqueue *mq) int -nni_msgqueue_cap(nni_msgqueue *mq) +nni_msgq_cap(nni_msgq *mq) { int rv; @@ -408,7 +409,7 @@ nni_msgqueue_cap(nni_msgqueue *mq) int -nni_msgqueue_resize(nni_msgqueue *mq, int cap) +nni_msgq_resize(nni_msgq *mq, int cap) { int alloc; nni_msg *msg; diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 900fed2b..9ee4335a 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -24,80 +24,89 @@ // // Readers & writers in a message queue can be woken either by a timeout // or by a specific signal (arranged by the caller). -// -// TODO: Add message queue growing, and pushback. -typedef struct nni_msgqueue nni_msgqueue; +typedef struct nni_msgq nni_msgq; -// nni_msgqueue_create creates a message queue with the given capacity, +// nni_msgq_init creates a message queue with the given capacity, // which must be a positive number. It returns NNG_EINVAL if the capacity // is invalid, or NNG_ENOMEM if resources cannot be allocated. -extern int nni_msgqueue_create(nni_msgqueue **, int); +extern int nni_msgq_init(nni_msgq **, int); -// nni_msgqueue_destroy destroys a message queue. It will also free any +// nni_msgq_fini destroys a message queue. It will also free any // messages that may be in the queue. -extern void nni_msgqueue_destroy(nni_msgqueue *); +extern void nni_msgq_fini(nni_msgq *); -// nni_msgqueue_put puts the message to the queue. It blocks until it +// nni_msgq_put puts the message to the queue. It blocks until it // was able to do so, or the queue is closed, returning either 0 on // success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is // returned, the caller is responsible for freeing the message with // nni_msg_free(), otherwise the message is "owned" by the queue, and // the caller is not permitted to access it further. -extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *); +extern int nni_msgq_put(nni_msgq *, nni_msg *); -// nni_msgqueue_putback returns a message to the head of the queue. +// nni_msgq_putback returns a message to the head of the queue. // This is a non-blocking operation, and it returns EAGAIN if there // is no room. There is always at least room for one putback after -// a message is retried with nni_msgqueue_get. -extern int nni_msgqueue_putback(nni_msgqueue *, nni_msg *); +// a message is retried with nni_msgq_get. +extern int nni_msgq_putback(nni_msgq *, nni_msg *); -// nni_msgqueue_get gets the message from the queue. It blocks until a +// nni_msgq_get gets the message from the queue. It blocks until a // message is available, or the queue is closed, returning either 0 on // success or NNG_ECLOSED if the queue was closed. If a message is // provided, the caller is assumes ownership of the message and must // call nni_msg_free() when it is finished with it. -extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **); +extern int nni_msgq_get(nni_msgq *, nni_msg **); -// nni_msgqueue_put_until is like nni_msgqueue_put, except that if the +// nni_msgq_put_until is like nni_msgq_put, except that if the // system clock reaches the specified time without being able to place // the message in the queue, it will return NNG_ETIMEDOUT. -extern int nni_msgqueue_put_until(nni_msgqueue *, nni_msg *, nni_time); +extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time); -// nni_msgqueue_get_until is like nni_msgqueue_put, except that if the +// nni_msgq_get_until is like nni_msgq_put, except that if the // system clock reaches the specified time without being able to retrieve // a message from the queue, it will return NNG_ETIMEDOUT. -extern int nni_msgqueue_get_until(nni_msgqueue *, nni_msg **, nni_time); +extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time); -// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it +// nni_msgq_put_sig is an enhanced version of nni_msgq_put, but it // can be interrupted by nni_msgqueue_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, nni_signal *); +extern int nni_msgq_put_sig(nni_msgq *, nni_msg *, nni_signal *); -// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it -// can be interrupted by nni_msgqueue_signal using the same final pointer, +// nni_msgq_get_sig is an enhanced version of nni_msgq_get_t, but it +// can be interrupted by nni_msgq_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, nni_signal *); +extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *); -// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in -// the msgqueue, if they have registered an interest in the same turnstile. +// nni_msgq_signal delivers a signal / interrupt to waiters blocked in +// the msgq, if they have registered an interest in the same turnstile. // It modifies the turnstile's value under the lock to a non-zero value. -extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *); +extern void nni_msgq_signal(nni_msgq *, nni_signal *); -// nni_msgqueue_close closes the queue. After this all operates on the +// nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. -extern void nni_msgqueue_close(nni_msgqueue *); +extern void nni_msgq_close(nni_msgq *); -// nni_msgqueue_drain is like nng_msgqueue_close, except that reads +// nni_msgq_drain is like nng_msgq_close, except that reads // against the queue are permitted for up to the time limit. The // operation blocks until either the queue is empty, or the timeout // has expired. Any messages still in the queue at the timeout are freed. -extern void nni_msgqueue_drain(nni_msgqueue *, nni_time); - -extern int nni_msgqueue_resize(nni_msgqueue *, int); -extern int nni_msgqueue_cap(nni_msgqueue *mq); -extern int nni_msgqueue_len(nni_msgqueue *mq); +extern void nni_msgq_drain(nni_msgq *, nni_time); + +// nni_msgq_resize resizes the message queue; messages already in the queue +// will be preserved as long as there is room. Messages that are dropped +// due to no room are taken from the most recent. (Oldest messages are +// preserved.) +extern int nni_msgq_resize(nni_msgq *, int); + +// nni_msgq_cap returns the "capacity" of the message queue. This does not +// include the extra room for pushback, nor the extra slot reserved to make +// zero-length message queues possible. As a consequence, it is possible +// for the message queue to contain up to 2 more messages than the capacity. +extern int nni_msgq_cap(nni_msgq *mq); + +// nni_msgq_len returns the number of messages currently in the queue. +extern int nni_msgq_len(nni_msgq *mq); #endif // CORE_MSQUEUE_H diff --git a/src/core/options.c b/src/core/options.c index b970a0e7..24d7a9eb 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -77,7 +77,7 @@ nni_getopt_int(int *ptr, void *val, size_t *sizep) int -nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz) +nni_setopt_buf(nni_msgq *mq, const void *val, size_t sz) { int len; @@ -94,14 +94,14 @@ nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz) // size could be quite large indeed in this case. return (NNG_EINVAL); } - return (nni_msgqueue_resize(mq, len)); + return (nni_msgq_resize(mq, len)); } int -nni_getopt_buf(nni_msgqueue *mq, void *val, size_t *sizep) +nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep) { - int len = nni_msgqueue_cap(mq); + int len = nni_msgq_cap(mq); int sz = *sizep; diff --git a/src/core/options.h b/src/core/options.h index ec5d28f1..84d958c1 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -15,10 +15,10 @@ // variable sized options. // nni_setopt_buf sets the queue size for the message queue. -extern int nni_setopt_buf(nni_msgqueue *, const void *, size_t); +extern int nni_setopt_buf(nni_msgq *, const void *, size_t); // nni_getopt_buf gets the queue size for the message queue. -extern int nni_getopt_buf(nni_msgqueue *, void *, size_t *); +extern int nni_getopt_buf(nni_msgq *, void *, size_t *); // nni_setopt_duration sets the duration. Durations must be legal, // either a positive value, 0, or -1 to indicate forever. diff --git a/src/core/protocol.h b/src/core/protocol.h index 3a391ece..6fd48b40 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -63,11 +63,11 @@ struct nni_protocol { // nni_socket_sendq obtains the upper writeq. The protocol should // recieve messages from this, and place them on the appropriate pipe. -extern nni_msgqueue *nni_socket_sendq(nni_socket *); +extern nni_msgq *nni_socket_sendq(nni_socket *); // nni_socket_recvq obtains the upper readq. The protocol should // inject incoming messages from pipes to it. -extern nni_msgqueue *nni_socket_recvq(nni_socket *); +extern nni_msgq *nni_socket_recvq(nni_socket *); // nni_socket_recv_err sets an error code to be returned to clients // rather than waiting for a message. Set it to 0 to resume normal diff --git a/src/core/socket.c b/src/core/socket.c index a8904500..c64ba5ef 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -15,14 +15,14 @@ // nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain // the upper read and write queues. -nni_msgqueue * +nni_msgq * nni_socket_sendq(nni_socket *s) { return (s->s_uwq); } -nni_msgqueue * +nni_msgq * nni_socket_recvq(nni_socket *s) { return (s->s_urq); @@ -126,15 +126,15 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) { + if ((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) { nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); } - if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) { - nni_msgqueue_destroy(sock->s_uwq); + if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { + nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -143,8 +143,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) } if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { - nni_msgqueue_destroy(sock->s_urq); - nni_msgqueue_destroy(sock->s_uwq); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -192,7 +192,7 @@ nni_socket_close(nni_socket *sock) // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* // until the linger time has expired. - nni_msgqueue_drain(sock->s_uwq, linger); + nni_msgq_drain(sock->s_uwq, linger); // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be @@ -211,7 +211,7 @@ nni_socket_close(nni_socket *sock) // Close the upper read queue immediately. This can happen // safely while we hold the lock. - nni_msgqueue_close(sock->s_urq); + nni_msgq_close(sock->s_urq); // Go through and schedule close on all pipes. while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { @@ -231,8 +231,8 @@ nni_socket_close(nni_socket *sock) sock->s_ops.proto_destroy(sock->s_data); // And we need to clean up *our* state. - nni_msgqueue_destroy(sock->s_urq); - nni_msgqueue_destroy(sock->s_uwq); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); @@ -273,7 +273,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire) // backpressure, we just throw it away, and don't complain. expire = NNI_TIME_ZERO; } - rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire); + rv = nni_msgq_put_until(sock->s_uwq, msg, expire); if (besteffort && (rv == NNG_EAGAIN)) { // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); @@ -301,7 +301,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire) nni_mtx_unlock(&sock->s_mx); for (;;) { - rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); + rv = nni_msgq_get_until(sock->s_urq, &msg, expire); if (rv != 0) { return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 8d72c5e6..bd0a5c8a 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -17,8 +17,8 @@ struct nng_socket { nni_mtx s_mx; nni_cv s_cv; - nni_msgqueue * s_uwq; // Upper write queue - nni_msgqueue * s_urq; // Upper read queue + nni_msgq * s_uwq; // Upper write queue + nni_msgq * s_urq; // Upper read queue nni_protocol s_ops; |
