diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/msgqueue.c | 197 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 115 | ||||
| -rw-r--r-- | src/core/platform.h | 42 |
3 files changed, 175 insertions, 179 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6b208e70..cbdb94c6 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,35 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "nng_impl.h" -/* - * Message queue. These operate in some respects like Go channels, - * but as we have access to the internals, we have made some fundamental - * differences and improvements. For example, these can grow, and either - * side can close, and they may be closed more than once. - */ +// Message queue. These operate in some respects like Go channels, +// but as we have access to the internals, we have made some fundamental +// differences and improvements. For example, these can grow, and either +// side can close, and they may be closed more than once. struct nni_msgqueue { - nni_mutex_t mq_lock; - nni_cond_t mq_readable; - nni_cond_t mq_writeable; + nni_mutex mq_lock; + nni_cond mq_readable; + nni_cond mq_writeable; int mq_cap; int mq_len; int mq_get; int mq_put; int mq_closed; - nng_msg_t * mq_msgs; + nni_msg ** mq_msgs; }; int -nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) +nni_msgqueue_create(nni_msgqueue **mqp, int cap) { struct nni_msgqueue *mq; int rv; @@ -40,24 +38,24 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) if ((mq = nni_alloc(sizeof (*mq))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) { + if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) { nni_free(mq, sizeof (*mq)); return (rv); } - if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) { - nni_mutex_destroy(mq->mq_lock); + if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) { + nni_mutex_fini(&mq->mq_lock); nni_free(mq, sizeof (*mq)); return (NNG_ENOMEM); } - if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) { - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } @@ -73,13 +71,13 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) void -nni_msgqueue_destroy(nni_msgqueue_t mq) +nni_msgqueue_destroy(nni_msgqueue *mq) { - nni_msg_t msg; + nni_msg *msg; - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); /* Free any orphaned messages. */ while (mq->mq_len > 0) { @@ -97,69 +95,54 @@ nni_msgqueue_destroy(nni_msgqueue_t mq) } -/* - * nni_msgqueue_signal raises a signal on the signal object. This allows a - * waiter to be signaled, so that it can be woken e.g. due to a pipe closing. - * Note that the signal object must be *zero* if no signal is raised. - */ +// nni_msgqueue_signal raises a signal on the signal object. This allows a +// waiter to be signaled, so that it can be woken e.g. due to a pipe closing. +// Note that the signal object must be *zero* if no signal is raised. void -nni_msgqueue_signal(nni_msgqueue_t mq, int *signal) +nni_msgqueue_signal(nni_msgqueue *mq, int *signal) { - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); *signal = 1; - /* - * We have to wake everyone. - */ - nni_cond_broadcast(mq->mq_readable); - nni_cond_broadcast(mq->mq_writeable); - nni_mutex_exit(mq->mq_lock); + // We have to wake everyone. + nni_cond_broadcast(&mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_mutex_exit(&mq->mq_lock); } int -nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal) +nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) { - uint64_t expire, now; + uint64_t expire; - if (tmout > 0) { + if (tmout >= 0) { expire = nni_clock() + tmout; + } else { + expire = 0xffffffffffffffffull; } - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_writeable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); + if (expire <= nni_clock()) { + nni_mutex_exit(&mq->mq_lock); + return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); } - (void) nni_cond_timedwait(mq->mq_writeable, (expire - now)); + (void) nni_cond_waituntil(&mq->mq_writeable, expire); } if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } if ((mq->mq_len == mq->mq_cap) && (*signal)) { - /* - * We are being interrupted. We only allow an interrupt - * if there is no room though, because we'd really prefer - * to queue the data. Otherwise our failure to queue - * the data could lead to starvation. - */ - nni_mutex_exit(mq->mq_lock); + // We are being interrupted. We only allow an interrupt + // if there is no room though, because we'd really prefer + // to queue the data. Otherwise our failure to queue + // the data could lead to starvation. + nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } @@ -170,56 +153,46 @@ nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal) } mq->mq_len++; if (mq->mq_len == 1) { - (void) nni_cond_signal(mq->mq_readable); + (void) nni_cond_signal(&mq->mq_readable); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (0); } int -nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal) +nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) { - uint64_t expire, now; + uint64_t expire; - if (tmout > 0) { + if (tmout >= 0) { expire = nni_clock() + tmout; + } else { + expire = 0xffffffffffffffffull; } - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_readable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); + if (expire <= nni_clock()) { + nni_mutex_exit(&mq->mq_lock); + return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); } - (void) nni_cond_timedwait(mq->mq_readable, (expire - now)); + (void) nni_cond_waituntil(&mq->mq_readable, expire); } if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } if ((mq->mq_len == 0) && (*signal)) { - /* - * We are being interrupted. We only allow an interrupt - * if there is no data though, because we'd really prefer - * to give back the data. Otherwise our failure to deal - * with the data could lead to starvation. - */ - nni_mutex_exit(mq->mq_lock); + // We are being interrupted. We only allow an interrupt + // if there is no data though, because we'd really prefer + // to give back the data. Otherwise our failure to deal + // with the data could lead to starvation; also lingering + // relies on this not interrupting if data is pending. + nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } @@ -231,15 +204,15 @@ nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal) } mq->mq_len++; if (mq->mq_len == (mq->mq_cap - 1)) { - (void) nni_cond_signal(mq->mq_writeable); + (void) nni_cond_signal(&mq->mq_writeable); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (0); } int -nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout) +nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout) { int nosig = 0; @@ -248,7 +221,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout) int -nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout) +nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout) { int nosig = 0; @@ -257,16 +230,16 @@ nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout) void -nni_msgqueue_close(nni_msgqueue_t mq) +nni_msgqueue_close(nni_msgqueue *mq) { nni_msg_t msg; - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; - nni_cond_broadcast(mq->mq_writeable); - nni_cond_broadcast(mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_readable); - /* Free the messages orphaned in the queue. */ + // Free the messages orphaned in the queue. while (mq->mq_len > 0) { msg = mq->mq_msgs[mq->mq_get]; mq->mq_get++; @@ -276,5 +249,5 @@ nni_msgqueue_close(nni_msgqueue_t mq) mq->mq_len--; nni_msg_free(msg); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index dbf21d11..e44e203a 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -1,74 +1,67 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_MSGQUEUE_H #define CORE_MSGQUEUE_H -#include "nng.h" +#include "nng_impl.h" -/* - * Message queues. Message queues work in some ways like Go channels; - * they are a thread-safe way to pass messages between subsystems. - */ +// Message queues. Message queues work in some ways like Go channels; +// they are a thread-safe way to pass messages between subsystems. They +// do have additional capabilities though. typedef struct nni_msgqueue * nni_msgqueue_t; +typedef struct nni_msgqueue nni_msgqueue; -/* - * nni_msgqueue_create creates a message queue with the given capacity, - * which must be a positive number. It returns NNG_EINVAL if the capacity - * is invalid, or NNG_ENOMEM if resources cannot be allocated. - */ -extern int nni_msgqueue_create(nni_msgqueue_t *, int); +// nni_msgqueue_create creates a message queue with the given capacity, +// which must be a positive number. It returns NNG_EINVAL if the capacity +// is invalid, or NNG_ENOMEM if resources cannot be allocated. +extern int nni_msgqueue_create(nni_msgqueue **, int); -/* - * nni_msgqueue_destroy destroys a message queue. It will also free any - * messages that may be in the queue. - */ -extern void nni_msgqueue_destroy(nni_msgqueue_t); +// nni_msgqueue_destroy destroys a message queue. It will also free any +// messages that may be in the queue. +extern void nni_msgqueue_destroy(nni_msgqueue *); -/* - * nni_msgqueue_put attempts to put a message to the queue. It will wait - * for the timeout (us), if the value is positive. If the value is negative - * then it will wait forever. If the value is zero, it will just check, and - * return immediately whether a message can be put or not. Valid returns are - * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot - * be placed after a time, or NNG_EAGAIN if the operation cannot succeed - * immediately and a zero timeout is specified. Note that timeout granularity - * may be limited -- for example Windows systems have a millisecond resolution - * timeout capability. - */ -extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); +// nni_msgqueue_put attempts to put a message to the queue. It will wait +// for the timeout (us), if the value is positive. If the value is negative +// then it will wait forever. If the value is zero, it will just check, and +// return immediately whether a message can be put or not. Valid returns are +// NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot +// be placed after a time, or NNG_EAGAIN if the operation cannot succeed +// immediately and a zero timeout is specified. Note that timeout granularity +// may be limited -- for example Windows systems have a millisecond resolution +// timeout capability. +extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *, int); -/* - * nni_msgqueue_get gets the message from the queue, using a timeout just - * like nni_msgqueue_put. - */ -extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); +// nni_msgqueue_get gets the message from the queue, using a timeout just +// like nni_msgqueue_put. +extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **, int); -/* - * The following two functions are interruptible versions of msgqueue_get - * and msgqueue_put. The signal argument (pointer) must be initialized - * to zero. Then, we can raise a signal, by calling nni_msgqueue_signal - * on the same object. The signal flag will remain raised until it is - * cleared to zero. If a routine is interrupted, it will return NNG_EINTR. - * Note that only threads using the signal object will be interrupted; - * this has no effect on other threads that may be waiting on the msgqueue - * as well. - */ -extern int nni_msgqueue_put_sig(nni_msgqueue_t, nng_msg_t, int, int *); -extern int nni_msgqueue_get_sig(nni_msgqueue_t, nng_msg_t *, int, int *); -extern void nni_msgqueue_signal(nni_msgqueue_t, int *); +// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it +// can be interrupted by nni_msgqueue_signal using the same final pointer, +// which can be thought of as a turnstile. If interrupted it returns EINTR. +// The turnstile should be initialized to zero. +extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, int, int *); -/* - * nni_msgqueue_close closes the queue. After this all operates on the - * message queue will return NNG_ECLOSED. Messages inside the queue - * are freed. Unlike closing a go channel, this operation is idempotent. - */ -extern void nni_msgqueue_close(nni_msgqueue_t); +// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it +// can be interrupted by nni_msgqueue_signal using the same final pointer, +// which can be thought of as a turnstile. If interrupted it returns EINTR. +// The turnstile should be initialized to zero. +extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, int, int *); -#endif /* CORE_MSQUEUE_H */ +// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in +// the msgqueue, if they have registered an interest in the same turnstile. +// It modifies the turnstile's value under the lock to a non-zero value. +extern void nni_msgqueue_signal(nni_msgqueue *, int *); + +// nni_msgqueue_close closes the queue. After this all operates on the +// message queue will return NNG_ECLOSED. Messages inside the queue +// are freed. Unlike closing a go channel, this operation is idempotent. +extern void nni_msgqueue_close(nni_msgqueue *); + +#endif // CORE_MSQUEUE_H diff --git a/src/core/platform.h b/src/core/platform.h index ef6df820..6722e666 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -61,22 +61,46 @@ extern void *nni_alloc(size_t); extern void nni_free(void *, size_t); typedef struct nni_mutex nni_mutex; +typedef struct nni_cond nni_cond; +// XXX: REMOVE THESE typedef struct nni_mutex * nni_mutex_t; typedef struct nni_cond * nni_cond_t; +extern int nni_mutex_create(nni_mutex_t *); +extern void nni_mutex_destroy(nni_mutex_t); +extern int nni_cond_create(nni_cond_t *, nni_mutex_t); +extern void nni_cond_destroy(nni_cond_t); // Mutex handling. + +// nni_mutex_init initializes a mutex structure. This may require dynamic +// allocation, depending on the platform. It can return NNG_ENOMEM if that +// fails. extern int nni_mutex_init(nni_mutex *); -extern void nni_mutex_fini(nni_mutex *); -extern int nni_mutex_create(nni_mutex_t *); +// nni_mutex_fini destroys the mutex and releases any resources allocated for +// it's use. +extern void nni_mutex_fini(nni_mutex *); -extern void nni_mutex_destroy(nni_mutex_t); +// nni_mutex_enter locks the mutex. This is not recursive -- a mutex can only +// be entered once. extern void nni_mutex_enter(nni_mutex *); + +// nni_mutex_exit unlocks the mutex. This can only be performed by the thread +// that owned the mutex. extern void nni_mutex_exit(nni_mutex *); + +// nni_mutex_tryenter tries to lock the mutex. If it can't, it may return +// NNG_EBUSY. extern int nni_mutex_tryenter(nni_mutex *); -extern int nni_cond_create(nni_cond_t *, nni_mutex_t); -extern void nni_cond_destroy(nni_cond_t); +// nni_cond_init initializes a condition variable. We require a mutex be +// supplied with it, and that mutex must always be held when performing any +// operations on the condition variable (other than fini.) This may require +// dynamic allocation, and if so this operation may fail with NNG_ENOMEM. +extern int nni_cond_init(nni_cond *, nni_mutex *); + +// nni_cond_fini releases all resources associated with condition variable. +extern void nni_cond_fini(nni_cond *); // nni_cond_broadcast wakes all waiters on the condition. This should be // called with the lock held. @@ -97,7 +121,13 @@ extern void nni_cond_wait(nni_cond_t); // conditions. The return value is 0 on success, or an error code, which // can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer // than the timeout based on the resolution of your system clock. -extern int nni_cond_timedwait(nni_cond_t, uint64_t); +extern int nni_cond_timedwait(nni_cond_t, int); + +// nni_cond_waituntil waits for a wakeup on the condition variable, or +// until the system time reaches the specified absolute time. (It is an +// absolute form of nni_cond_timedwait.) Early wakeups are possible, so +// check the condition. It will return either NNG_ETIMEDOUT, or 0. +extern int nni_cond_waituntil(nni_cond_t, uint64_t); typedef struct nni_thread * nni_thread_t; typedef struct nni_thread nni_thread; |
