diff options
| -rw-r--r-- | src/core/msgqueue.c | 197 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 115 | ||||
| -rw-r--r-- | src/core/platform.h | 42 | ||||
| -rw-r--r-- | src/platform/posix/posix_alloc.c | 20 | ||||
| -rw-r--r-- | src/platform/posix/posix_clock.c | 88 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.c | 120 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 4 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 103 |
9 files changed, 341 insertions, 355 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6b208e70..cbdb94c6 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -1,35 +1,33 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "nng_impl.h" -/* - * Message queue. These operate in some respects like Go channels, - * but as we have access to the internals, we have made some fundamental - * differences and improvements. For example, these can grow, and either - * side can close, and they may be closed more than once. - */ +// Message queue. These operate in some respects like Go channels, +// but as we have access to the internals, we have made some fundamental +// differences and improvements. For example, these can grow, and either +// side can close, and they may be closed more than once. struct nni_msgqueue { - nni_mutex_t mq_lock; - nni_cond_t mq_readable; - nni_cond_t mq_writeable; + nni_mutex mq_lock; + nni_cond mq_readable; + nni_cond mq_writeable; int mq_cap; int mq_len; int mq_get; int mq_put; int mq_closed; - nng_msg_t * mq_msgs; + nni_msg ** mq_msgs; }; int -nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) +nni_msgqueue_create(nni_msgqueue **mqp, int cap) { struct nni_msgqueue *mq; int rv; @@ -40,24 +38,24 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) if ((mq = nni_alloc(sizeof (*mq))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) { + if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) { nni_free(mq, sizeof (*mq)); return (rv); } - if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) { - nni_mutex_destroy(mq->mq_lock); + if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) { + nni_mutex_fini(&mq->mq_lock); nni_free(mq, sizeof (*mq)); return (NNG_ENOMEM); } - if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) { - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) { + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) { - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } @@ -73,13 +71,13 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap) void -nni_msgqueue_destroy(nni_msgqueue_t mq) +nni_msgqueue_destroy(nni_msgqueue *mq) { - nni_msg_t msg; + nni_msg *msg; - nni_cond_destroy(mq->mq_writeable); - nni_cond_destroy(mq->mq_readable); - nni_mutex_destroy(mq->mq_lock); + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); /* Free any orphaned messages. */ while (mq->mq_len > 0) { @@ -97,69 +95,54 @@ nni_msgqueue_destroy(nni_msgqueue_t mq) } -/* - * nni_msgqueue_signal raises a signal on the signal object. This allows a - * waiter to be signaled, so that it can be woken e.g. due to a pipe closing. - * Note that the signal object must be *zero* if no signal is raised. - */ +// nni_msgqueue_signal raises a signal on the signal object. This allows a +// waiter to be signaled, so that it can be woken e.g. due to a pipe closing. +// Note that the signal object must be *zero* if no signal is raised. void -nni_msgqueue_signal(nni_msgqueue_t mq, int *signal) +nni_msgqueue_signal(nni_msgqueue *mq, int *signal) { - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); *signal = 1; - /* - * We have to wake everyone. - */ - nni_cond_broadcast(mq->mq_readable); - nni_cond_broadcast(mq->mq_writeable); - nni_mutex_exit(mq->mq_lock); + // We have to wake everyone. + nni_cond_broadcast(&mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_mutex_exit(&mq->mq_lock); } int -nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal) +nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) { - uint64_t expire, now; + uint64_t expire; - if (tmout > 0) { + if (tmout >= 0) { expire = nni_clock() + tmout; + } else { + expire = 0xffffffffffffffffull; } - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_writeable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); + if (expire <= nni_clock()) { + nni_mutex_exit(&mq->mq_lock); + return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); } - (void) nni_cond_timedwait(mq->mq_writeable, (expire - now)); + (void) nni_cond_waituntil(&mq->mq_writeable, expire); } if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } if ((mq->mq_len == mq->mq_cap) && (*signal)) { - /* - * We are being interrupted. We only allow an interrupt - * if there is no room though, because we'd really prefer - * to queue the data. Otherwise our failure to queue - * the data could lead to starvation. - */ - nni_mutex_exit(mq->mq_lock); + // We are being interrupted. We only allow an interrupt + // if there is no room though, because we'd really prefer + // to queue the data. Otherwise our failure to queue + // the data could lead to starvation. + nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } @@ -170,56 +153,46 @@ nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal) } mq->mq_len++; if (mq->mq_len == 1) { - (void) nni_cond_signal(mq->mq_readable); + (void) nni_cond_signal(&mq->mq_readable); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (0); } int -nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal) +nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) { - uint64_t expire, now; + uint64_t expire; - if (tmout > 0) { + if (tmout >= 0) { expire = nni_clock() + tmout; + } else { + expire = 0xffffffffffffffffull; } - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { - if (tmout == 0) { - nni_mutex_exit(mq->mq_lock); - return (NNG_EAGAIN); - } - - if (tmout < 0) { - (void) nni_cond_wait(mq->mq_readable); - continue; - } - - now = nni_clock(); - if (now >= expire) { - nni_mutex_exit(mq->mq_lock); - return (NNG_ETIMEDOUT); + if (expire <= nni_clock()) { + nni_mutex_exit(&mq->mq_lock); + return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); } - (void) nni_cond_timedwait(mq->mq_readable, (expire - now)); + (void) nni_cond_waituntil(&mq->mq_readable, expire); } if (mq->mq_closed) { - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } if ((mq->mq_len == 0) && (*signal)) { - /* - * We are being interrupted. We only allow an interrupt - * if there is no data though, because we'd really prefer - * to give back the data. Otherwise our failure to deal - * with the data could lead to starvation. - */ - nni_mutex_exit(mq->mq_lock); + // We are being interrupted. We only allow an interrupt + // if there is no data though, because we'd really prefer + // to give back the data. Otherwise our failure to deal + // with the data could lead to starvation; also lingering + // relies on this not interrupting if data is pending. + nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } @@ -231,15 +204,15 @@ nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal) } mq->mq_len++; if (mq->mq_len == (mq->mq_cap - 1)) { - (void) nni_cond_signal(mq->mq_writeable); + (void) nni_cond_signal(&mq->mq_writeable); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); return (0); } int -nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout) +nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout) { int nosig = 0; @@ -248,7 +221,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout) int -nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout) +nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout) { int nosig = 0; @@ -257,16 +230,16 @@ nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout) void -nni_msgqueue_close(nni_msgqueue_t mq) +nni_msgqueue_close(nni_msgqueue *mq) { nni_msg_t msg; - nni_mutex_enter(mq->mq_lock); + nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; - nni_cond_broadcast(mq->mq_writeable); - nni_cond_broadcast(mq->mq_readable); + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_readable); - /* Free the messages orphaned in the queue. */ + // Free the messages orphaned in the queue. while (mq->mq_len > 0) { msg = mq->mq_msgs[mq->mq_get]; mq->mq_get++; @@ -276,5 +249,5 @@ nni_msgqueue_close(nni_msgqueue_t mq) mq->mq_len--; nni_msg_free(msg); } - nni_mutex_exit(mq->mq_lock); + nni_mutex_exit(&mq->mq_lock); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index dbf21d11..e44e203a 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -1,74 +1,67 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #ifndef CORE_MSGQUEUE_H #define CORE_MSGQUEUE_H -#include "nng.h" +#include "nng_impl.h" -/* - * Message queues. Message queues work in some ways like Go channels; - * they are a thread-safe way to pass messages between subsystems. - */ +// Message queues. Message queues work in some ways like Go channels; +// they are a thread-safe way to pass messages between subsystems. They +// do have additional capabilities though. typedef struct nni_msgqueue * nni_msgqueue_t; +typedef struct nni_msgqueue nni_msgqueue; -/* - * nni_msgqueue_create creates a message queue with the given capacity, - * which must be a positive number. It returns NNG_EINVAL if the capacity - * is invalid, or NNG_ENOMEM if resources cannot be allocated. - */ -extern int nni_msgqueue_create(nni_msgqueue_t *, int); +// nni_msgqueue_create creates a message queue with the given capacity, +// which must be a positive number. It returns NNG_EINVAL if the capacity +// is invalid, or NNG_ENOMEM if resources cannot be allocated. +extern int nni_msgqueue_create(nni_msgqueue **, int); -/* - * nni_msgqueue_destroy destroys a message queue. It will also free any - * messages that may be in the queue. - */ -extern void nni_msgqueue_destroy(nni_msgqueue_t); +// nni_msgqueue_destroy destroys a message queue. It will also free any +// messages that may be in the queue. +extern void nni_msgqueue_destroy(nni_msgqueue *); -/* - * nni_msgqueue_put attempts to put a message to the queue. It will wait - * for the timeout (us), if the value is positive. If the value is negative - * then it will wait forever. If the value is zero, it will just check, and - * return immediately whether a message can be put or not. Valid returns are - * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot - * be placed after a time, or NNG_EAGAIN if the operation cannot succeed - * immediately and a zero timeout is specified. Note that timeout granularity - * may be limited -- for example Windows systems have a millisecond resolution - * timeout capability. - */ -extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int); +// nni_msgqueue_put attempts to put a message to the queue. It will wait +// for the timeout (us), if the value is positive. If the value is negative +// then it will wait forever. If the value is zero, it will just check, and +// return immediately whether a message can be put or not. Valid returns are +// NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot +// be placed after a time, or NNG_EAGAIN if the operation cannot succeed +// immediately and a zero timeout is specified. Note that timeout granularity +// may be limited -- for example Windows systems have a millisecond resolution +// timeout capability. +extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *, int); -/* - * nni_msgqueue_get gets the message from the queue, using a timeout just - * like nni_msgqueue_put. - */ -extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int); +// nni_msgqueue_get gets the message from the queue, using a timeout just +// like nni_msgqueue_put. +extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **, int); -/* - * The following two functions are interruptible versions of msgqueue_get - * and msgqueue_put. The signal argument (pointer) must be initialized - * to zero. Then, we can raise a signal, by calling nni_msgqueue_signal - * on the same object. The signal flag will remain raised until it is - * cleared to zero. If a routine is interrupted, it will return NNG_EINTR. - * Note that only threads using the signal object will be interrupted; - * this has no effect on other threads that may be waiting on the msgqueue - * as well. - */ -extern int nni_msgqueue_put_sig(nni_msgqueue_t, nng_msg_t, int, int *); -extern int nni_msgqueue_get_sig(nni_msgqueue_t, nng_msg_t *, int, int *); -extern void nni_msgqueue_signal(nni_msgqueue_t, int *); +// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it +// can be interrupted by nni_msgqueue_signal using the same final pointer, +// which can be thought of as a turnstile. If interrupted it returns EINTR. +// The turnstile should be initialized to zero. +extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, int, int *); -/* - * nni_msgqueue_close closes the queue. After this all operates on the - * message queue will return NNG_ECLOSED. Messages inside the queue - * are freed. Unlike closing a go channel, this operation is idempotent. - */ -extern void nni_msgqueue_close(nni_msgqueue_t); +// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it +// can be interrupted by nni_msgqueue_signal using the same final pointer, +// which can be thought of as a turnstile. If interrupted it returns EINTR. +// The turnstile should be initialized to zero. +extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, int, int *); -#endif /* CORE_MSQUEUE_H */ +// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in +// the msgqueue, if they have registered an interest in the same turnstile. +// It modifies the turnstile's value under the lock to a non-zero value. +extern void nni_msgqueue_signal(nni_msgqueue *, int *); + +// nni_msgqueue_close closes the queue. After this all operates on the +// message queue will return NNG_ECLOSED. Messages inside the queue +// are freed. Unlike closing a go channel, this operation is idempotent. +extern void nni_msgqueue_close(nni_msgqueue *); + +#endif // CORE_MSQUEUE_H diff --git a/src/core/platform.h b/src/core/platform.h index ef6df820..6722e666 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -61,22 +61,46 @@ extern void *nni_alloc(size_t); extern void nni_free(void *, size_t); typedef struct nni_mutex nni_mutex; +typedef struct nni_cond nni_cond; +// XXX: REMOVE THESE typedef struct nni_mutex * nni_mutex_t; typedef struct nni_cond * nni_cond_t; +extern int nni_mutex_create(nni_mutex_t *); +extern void nni_mutex_destroy(nni_mutex_t); +extern int nni_cond_create(nni_cond_t *, nni_mutex_t); +extern void nni_cond_destroy(nni_cond_t); // Mutex handling. + +// nni_mutex_init initializes a mutex structure. This may require dynamic +// allocation, depending on the platform. It can return NNG_ENOMEM if that +// fails. extern int nni_mutex_init(nni_mutex *); -extern void nni_mutex_fini(nni_mutex *); -extern int nni_mutex_create(nni_mutex_t *); +// nni_mutex_fini destroys the mutex and releases any resources allocated for +// it's use. +extern void nni_mutex_fini(nni_mutex *); -extern void nni_mutex_destroy(nni_mutex_t); +// nni_mutex_enter locks the mutex. This is not recursive -- a mutex can only +// be entered once. extern void nni_mutex_enter(nni_mutex *); + +// nni_mutex_exit unlocks the mutex. This can only be performed by the thread +// that owned the mutex. extern void nni_mutex_exit(nni_mutex *); + +// nni_mutex_tryenter tries to lock the mutex. If it can't, it may return +// NNG_EBUSY. extern int nni_mutex_tryenter(nni_mutex *); -extern int nni_cond_create(nni_cond_t *, nni_mutex_t); -extern void nni_cond_destroy(nni_cond_t); +// nni_cond_init initializes a condition variable. We require a mutex be +// supplied with it, and that mutex must always be held when performing any +// operations on the condition variable (other than fini.) This may require +// dynamic allocation, and if so this operation may fail with NNG_ENOMEM. +extern int nni_cond_init(nni_cond *, nni_mutex *); + +// nni_cond_fini releases all resources associated with condition variable. +extern void nni_cond_fini(nni_cond *); // nni_cond_broadcast wakes all waiters on the condition. This should be // called with the lock held. @@ -97,7 +121,13 @@ extern void nni_cond_wait(nni_cond_t); // conditions. The return value is 0 on success, or an error code, which // can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer // than the timeout based on the resolution of your system clock. -extern int nni_cond_timedwait(nni_cond_t, uint64_t); +extern int nni_cond_timedwait(nni_cond_t, int); + +// nni_cond_waituntil waits for a wakeup on the condition variable, or +// until the system time reaches the specified absolute time. (It is an +// absolute form of nni_cond_timedwait.) Early wakeups are possible, so +// check the condition. It will return either NNG_ETIMEDOUT, or 0. +extern int nni_cond_waituntil(nni_cond_t, uint64_t); typedef struct nni_thread * nni_thread_t; typedef struct nni_thread nni_thread; diff --git a/src/platform/posix/posix_alloc.c b/src/platform/posix/posix_alloc.c index eb5c889e..83fe305d 100644 --- a/src/platform/posix/posix_alloc.c +++ b/src/platform/posix/posix_alloc.c @@ -1,11 +1,11 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// #include "core/nng_impl.h" @@ -13,9 +13,7 @@ #include <stdlib.h> -/* - * POSIX memory allocation. This is pretty much standard C. - */ +// POSIX memory allocation. This is pretty much standard C. void * nni_alloc(size_t size) { diff --git a/src/platform/posix/posix_clock.c b/src/platform/posix/posix_clock.c index 83f0b151..48a1e09b 100644 --- a/src/platform/posix/posix_clock.c +++ b/src/platform/posix/posix_clock.c @@ -1,15 +1,13 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ - -/* - * POSIX clock stuff. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +// POSIX clock stuff. #include "core/nng_impl.h" #ifdef PLATFORM_POSIX_CLOCK @@ -20,9 +18,7 @@ #ifndef NNG_USE_GETTIMEOFDAY -/* - * Use POSIX realtime stuff. - */ +// Use POSIX realtime stuff uint64_t nni_clock(void) { @@ -56,18 +52,16 @@ nni_usleep(uint64_t usec) } -#else /* NNG_USE_GETTIMEOFDAY */ +#else // NNG_USE_GETTIMEOFDAY -/* - * If you're here, its because you don't have a modern clock_gettime with - * monotonic clocks, or the necessary pthread_condattr_settclock(). In - * this case, you should be advised that *bad* things can happen if your - * system clock changes time while programs using this library are running. - * (Basically, timeouts can take longer or shorter, leading to either hangs - * or apparent spurious errors. Eventually it should all sort itself out, - * but if you change the clock by a large amount you might wonder what the - * heck is happening until it does.) - */ +// If you're here, its because you don't have a modern clock_gettime with +// monotonic clocks, or the necessary pthread_condattr_settclock(). In +// this case, you should be advised that *bad* things can happen if your +// system clock changes time while programs using this library are running. +// (Basically, timeouts can take longer or shorter, leading to either hangs +// or apparent spurious errors. Eventually it should all sort itself out, +// but if you change the clock by a large amount you might wonder what the +// heck is happening until it does.) #include <pthread.h> #include <sys/time.h> @@ -94,25 +88,21 @@ nni_clock(void) void nni_usleep(uint64_t usec) { - /* - * So probably there is no nanosleep. We could in theory use - * pthread condition variables, but that means doing memory - * allocation, or forcing the use of pthreads where the platform - * might be preferring the use of another threading package. - * Additionally, use of pthreads means that we cannot use - * relative times in a clock_settime safe manner. - * So we can use poll() instead, which is rather coarse, but - * pretty much guaranteed to work. - */ + // So probably there is no nanosleep. We could in theory use + // pthread condition variables, but that means doing memory + // allocation, or forcing the use of pthreads where the platform + // might be preferring the use of another threading package. + // Additionally, use of pthreads means that we cannot use + // relative times in a clock_settime safe manner. + // So we can use poll() instead, which is rather coarse, but + // pretty much guaranteed to work. struct pollfd pfd; uint64_t now; uint64_t expire; - /* - * Possibly we could pass NULL instead of pfd, but passing a valid - * pointer ensures that if the system dereferences the pointer it - * won't come back with EFAULT. - */ + // Possibly we could pass NULL instead of pfd, but passing a valid + // pointer ensures that if the system dereferences the pointer it + // won't come back with EFAULT. pfd.fd = -1; pfd.events = 0; @@ -120,19 +110,17 @@ nni_usleep(uint64_t usec) expire = now + usec; while (now < expire) { - /* - * In theory we could round up to a whole number of msec, - * but under the covers poll already does some rounding up, - * and the loop above guarantees that we will not bail out - * early. So this gives us a better chance to avoid adding - * nearly an extra unneeded millisecond to the wait. - */ + // In theory we could round up to a whole number of msec, + // but under the covers poll already does some rounding up, + // and the loop above guarantees that we will not bail out + // early. So this gives us a better chance to avoid adding + // nearly an extra unneeded millisecond to the wait. (void) poll(&pfd, 0, (int) ((expire - now) / 1000)); now = nni_clock(); } } -#endif /* NNG_USE_GETTIMEOFDAY */ +#endif // NNG_USE_GETTIMEOFDAY -#endif /* PLATFORM_POSIX_CLOCK */ +#endif // PLATFORM_POSIX_CLOCK diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index a7b15edc..4f93b101 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -10,6 +10,13 @@ #ifndef PLATFORM_POSIX_IMPL_H #define PLATFORM_POSIX_IMPL_H +// Some dependency notes: +// +// PLATFORM_POSIX_THREAD and PLATFORM_POSIX_SYNCH depend on each other, +// and they both depend on PLATFORM_POSIX_CLOCK. Furthermore, note that +// when using PLATFORM_POSIX_CLOCK, your condition variable timeouts need +// to use the same base clock values. Normally all three should be used +// together. #ifdef PLATFORM_POSIX #define PLATFORM_POSIX_ALLOC #define PLATFORM_POSIX_DEBUG diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c index a3129f62..c0a2721c 100644 --- a/src/platform/posix/posix_synch.c +++ b/src/platform/posix/posix_synch.c @@ -1,23 +1,14 @@ -/* - * Copyright 2016 Garrett D'Amore <garrett@damore.org> - * - * This software is supplied under the terms of the MIT License, a - * copy of which should be located in the distribution where this - * file was obtained (LICENSE.txt). A copy of the license may also be - * found online at https://opensource.org/licenses/MIT. - */ - -/* - * This is more of a direct #include of a .c rather than .h file. - * But having it be a .h makes compiler rules work out properly. Do - * not include this more than once into your program, or you will - * get multiple symbols defined. - */ - -/* - * POSIX synchronization (mutexes and condition variables). This uses - * pthreads. - */ +// +// Copyright 2016 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +// POSIX synchronization (mutexes and condition variables). This uses +// pthreads. #include "core/nng_impl.h" @@ -51,6 +42,7 @@ nni_mutex_fini(nni_mutex *mp) } +// XXX: REMOVE THIS FUNCTION int nni_mutex_create(nni_mutex_t *mp) { @@ -62,7 +54,7 @@ nni_mutex_create(nni_mutex_t *mp) return (NNG_ENOMEM); } - /* We ask for more error checking... */ + // We ask for more error checking... if (pthread_mutexattr_init(&attr) != 0) { nni_free(m, sizeof (*m)); return (NNG_ENOMEM); @@ -87,6 +79,7 @@ nni_mutex_create(nni_mutex_t *mp) } +// XXX: REMOVE THIS FUNCTION void nni_mutex_destroy(nni_mutex_t m) { @@ -98,7 +91,7 @@ nni_mutex_destroy(nni_mutex_t m) void -nni_mutex_enter(nni_mutex_t m) +nni_mutex_enter(nni_mutex *m) { if (pthread_mutex_lock(&m->mx) != 0) { nni_panic("pthread_mutex_lock failed"); @@ -107,7 +100,7 @@ nni_mutex_enter(nni_mutex_t m) void -nni_mutex_exit(nni_mutex_t m) +nni_mutex_exit(nni_mutex *m) { if (pthread_mutex_unlock(&m->mx) != 0) { nni_panic("pthread_mutex_unlock failed"); @@ -116,7 +109,7 @@ nni_mutex_exit(nni_mutex_t m) int -nni_mutex_tryenter(nni_mutex_t m) +nni_mutex_tryenter(nni_mutex *m) { if (pthread_mutex_trylock(&m->mx) != 0) { return (NNG_EBUSY); @@ -125,51 +118,40 @@ nni_mutex_tryenter(nni_mutex_t m) } -static int -nni_cond_attr(pthread_condattr_t **attrpp) +int +nni_cond_init(nni_cond *c, nni_mutex *m) { -#if defined(NNG_USE_GETTIMEOFDAY) || NNG_USE_CLOCKID == CLOCK_REALTIME - *attrpp = NULL; + if (pthread_cond_init(&c->cv, &nni_condattr) != 0) { + // In theory could be EAGAIN, but handle like ENOMEM + nni_free(c, sizeof (*c)); + return (NNG_ENOMEM); + } + c->mx = &m->mx; return (0); +} -#else - /* In order to make this fast, avoid reinitializing attrs. */ - static pthread_condattr_t attr; - static pthread_mutex_t mx = PTHREAD_MUTEX_INITIALIZER; - static int init = 0; - int rv; - // For efficiency's sake, we try to reuse the same attr for the - // life of the library. This avoids many reallocations. Technically - // this means that we will leak the attr on exit(), but this is - // preferable to constantly allocating and reallocating it. - if (init) { - *attrpp = &attr; - return (0); +void +nni_cond_fini(nni_cond *c) +{ + if (pthread_cond_destroy(&c->cv) != 0) { + nni_panic("pthread_cond_destroy failed"); } +} - (void) pthread_mutex_lock(&mx); - while (!init) { - if ((rv = pthread_condattr_init(&attr)) != 0) { - (void) pthread_mutex_unlock(&mx); - return (NNG_ENOMEM); - } - rv = pthread_condattr_setclock(&attr, NNG_USE_CLOCKID); - if (rv != 0) { - nni_panic("condattr_setclock: %s", strerror(rv)); - } - init = 1; - } - (void) pthread_mutex_unlock(&mx); - *attrpp = &attr; - return (0); -#endif +// XXXX: REMOVE THIS FUNCTION +static int +nni_cond_attr(pthread_condattr_t **attrpp) +{ + *attrpp = NULL; + return (0); } +// XXX: REMOVE THIS FUNCTION int -nni_cond_create(nni_cond_t *cvp, nni_mutex_t mx) +nni_cond_create(nni_cond **cvp, nni_mutex *mx) { /* * By preference, we use a CLOCK_MONOTONIC version of condition @@ -196,8 +178,9 @@ nni_cond_create(nni_cond_t *cvp, nni_mutex_t mx) } +// XXX: REMOVE THIS FUNCTION void -nni_cond_destroy(nni_cond_t c) +nni_cond_destroy(nni_cond *c) { if (pthread_cond_destroy(&c->cv) != 0) { nni_panic("pthread_cond_destroy failed"); @@ -207,7 +190,7 @@ nni_cond_destroy(nni_cond_t c) void -nni_cond_signal(nni_cond_t c) +nni_cond_signal(nni_cond *c) { if (pthread_cond_signal(&c->cv) != 0) { nni_panic("pthread_cond_signal failed"); @@ -216,7 +199,7 @@ nni_cond_signal(nni_cond_t c) void -nni_cond_broadcast(nni_cond_t c) +nni_cond_broadcast(nni_cond *c) { if (pthread_cond_broadcast(&c->cv) != 0) { nni_panic("pthread_cond_broadcast failed"); @@ -225,7 +208,7 @@ nni_cond_broadcast(nni_cond_t c) void -nni_cond_wait(nni_cond_t c) +nni_cond_wait(nni_cond *c) { if (pthread_cond_wait(&c->cv, c->mx) != 0) { nni_panic("pthread_cond_wait failed"); @@ -234,7 +217,7 @@ nni_cond_wait(nni_cond_t c) int -nni_cond_timedwait(nni_cond_t c, uint64_t usec) +nni_cond_waituntil(nni_cond *c, uint64_t usec) { struct timespec ts; int rv; @@ -255,4 +238,15 @@ nni_cond_timedwait(nni_cond_t c, uint64_t usec) } +int +nni_cond_timedwait(nni_cond *c, int usec) +{ + if (usec < 0) { + nni_cond_wait(c); + return (0); + } + return (nni_cond_waituntil(c, ((uint64_t) usec) + nni_clock())); +} + + #endif diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 1bbe6e28..79f69762 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -28,7 +28,7 @@ static int plat_init = 0; static int plat_fork = 0; static void * -thrfunc(void *arg) +nni_thrfunc(void *arg) { nni_thread_t thr = arg; @@ -49,7 +49,7 @@ nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg) thr->func = fn; thr->arg = arg; - if ((rv = pthread_create(&thr->tid, NULL, thrfunc, thr)) != 0) { + if ((rv = pthread_create(&thr->tid, NULL, nni_thrfunc, thr)) != 0) { nni_free(thr, sizeof (*thr)); return (NNG_ENOMEM); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 029825b1..cb79d65a 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -22,8 +22,8 @@ typedef struct nni_inproc_pipe nni_inproc_pipe; typedef struct nni_inproc_ep nni_inproc_ep; typedef struct { - nni_mutex_t mx; - nni_cond_t cv; + nni_mutex mx; + nni_cond cv; nni_list_t eps; } nni_inproc_global; @@ -31,17 +31,17 @@ typedef struct { struct nni_inproc_pipe { const char * addr; nni_inproc_pair * pair; - nni_msgqueue_t rq; - nni_msgqueue_t wq; + nni_msgqueue * rq; + nni_msgqueue * wq; uint16_t peer; }; // nni_inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct nni_inproc_pair { - nni_mutex_t mx; + nni_mutex mx; int refcnt; - nni_msgqueue_t q[2]; + nni_msgqueue * q[2]; nni_inproc_pipe pipe[2]; char addr[NNG_MAXADDRLEN+1]; }; @@ -68,11 +68,11 @@ nni_inproc_init(void) { int rv; - if ((rv = nni_mutex_create(&nni_inproc.mx)) != 0) { + if ((rv = nni_mutex_init(&nni_inproc.mx)) != 0) { return (rv); } - if ((rv = nni_cond_create(&nni_inproc.cv, nni_inproc.mx)) != 0) { - nni_mutex_destroy(nni_inproc.mx); + if ((rv = nni_cond_init(&nni_inproc.cv, &nni_inproc.mx)) != 0) { + nni_mutex_fini(&nni_inproc.mx); return (rv); } NNI_LIST_INIT(&nni_inproc.eps, nni_inproc_ep, node); @@ -84,8 +84,8 @@ nni_inproc_init(void) static void nni_inproc_fini(void) { - nni_cond_destroy(nni_inproc.cv); - nni_mutex_destroy(nni_inproc.mx); + nni_cond_fini(&nni_inproc.cv); + nni_mutex_fini(&nni_inproc.mx); } @@ -104,18 +104,13 @@ nni_inproc_pipe_close(void *arg) static void nni_inproc_pair_destroy(nni_inproc_pair *pair) { - if (pair == NULL) { - return; - } if (pair->q[0]) { nni_msgqueue_destroy(pair->q[0]); } if (pair->q[1]) { nni_msgqueue_destroy(pair->q[1]); } - if (pair->mx) { - nni_mutex_destroy(pair->mx); - } + nni_mutex_fini(&pair->mx); nni_free(pair, sizeof (*pair)); } @@ -129,19 +124,19 @@ nni_inproc_pipe_destroy(void *arg) // We could assert the pipe closed... // If we are the last peer, then toss the pair structure. - nni_mutex_enter(pair->mx); + nni_mutex_enter(&pair->mx); pair->refcnt--; if (pair->refcnt == 0) { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); nni_inproc_pair_destroy(pair); } else { - nni_mutex_exit(pair->mx); + nni_mutex_exit(&pair->mx); } } static int -nni_inproc_pipe_send(void *arg, nng_msg_t msg) +nni_inproc_pipe_send(void *arg, nni_msg *msg) { nni_inproc_pipe *pipe = arg; @@ -152,7 +147,7 @@ nni_inproc_pipe_send(void *arg, nng_msg_t msg) static int -nni_inproc_pipe_recv(void *arg, nng_msg_t *msgp) +nni_inproc_pipe_recv(void *arg, nni_msg **msgp) { nni_inproc_pipe *pipe = arg; @@ -230,13 +225,13 @@ nni_inproc_ep_close(void *arg) { nni_inproc_ep *ep = arg; - nni_mutex_enter(nni_inproc.mx); + nni_mutex_enter(&nni_inproc.mx); if (!ep->closed) { ep->closed = 1; nni_list_remove(&nni_inproc.eps, ep); - nni_cond_broadcast(nni_inproc.cv); + nni_cond_broadcast(&nni_inproc.cv); } - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); } @@ -250,7 +245,7 @@ nni_inproc_ep_dial(void *arg, void **pipep) if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } - nni_mutex_enter(nni_inproc.mx); + nni_mutex_enter(&nni_inproc.mx); NNI_LIST_FOREACH (list, srch) { if (srch->mode != NNI_INPROC_EP_LISTEN) { continue; @@ -261,27 +256,27 @@ nni_inproc_ep_dial(void *arg, void **pipep) } if (srch == NULL) { // No listeners available. - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_ECONNREFUSED); } ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(list, ep); - nni_cond_broadcast(nni_inproc.cv); + nni_cond_broadcast(&nni_inproc.cv); for (;;) { if (ep->closed) { // Closer will have removed us from list. - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_ECLOSED); } if (ep->cpipe != NULL) { break; } - nni_cond_wait(nni_inproc.cv); + nni_cond_wait(&nni_inproc.cv); } // NB: The acceptor or closer removes us from the list. ep->mode = NNI_INPROC_EP_IDLE; *pipep = ep->cpipe; - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (ep->closed ? NNG_ECLOSED : 0); } @@ -296,9 +291,9 @@ nni_inproc_ep_listen(void *arg) if (ep->mode != NNI_INPROC_EP_IDLE) { return (NNG_EINVAL); } - nni_mutex_enter(nni_inproc.mx); + nni_mutex_enter(&nni_inproc.mx); if (ep->closed) { - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { @@ -306,13 +301,13 @@ nni_inproc_ep_listen(void *arg) continue; } if (strcmp(srch->addr, ep->addr) == 0) { - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (NNG_EADDRINUSE); } } ep->mode = NNI_INPROC_EP_LISTEN; nni_list_append(list, ep); - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (0); } @@ -326,14 +321,31 @@ nni_inproc_ep_accept(void *arg, void **pipep) nni_list_t *list = &nni_inproc.eps; int rv; - nni_mutex_enter(nni_inproc.mx); if (ep->mode != NNI_INPROC_EP_LISTEN) { - nni_mutex_exit(nni_inproc.mx); return (NNG_EINVAL); } + + // Preallocate the pair, so we don't do it while holding a lock + if ((pair = nni_alloc(sizeof (*pair))) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_mutex_init(&pair->mx)) != 0) { + nni_free(pair, sizeof (*pair)); + return (rv); + } + if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || + ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { + nni_inproc_pair_destroy(pair); + return (rv); + } + + nni_mutex_enter(&nni_inproc.mx); for (;;) { if (ep->closed) { - nni_mutex_exit(nni_inproc.mx); + // This is the only possible error path from the + // time we acquired the lock. + nni_mutex_exit(&nni_inproc.mx); + nni_inproc_pair_destroy(pair); return (NNG_ECLOSED); } NNI_LIST_FOREACH (list, srch) { @@ -347,16 +359,7 @@ nni_inproc_ep_accept(void *arg, void **pipep) if (srch != NULL) { break; } - nni_cond_wait(nni_inproc.cv); - } - if ((pair = nni_alloc(sizeof (*pair))) == NULL) { - nni_mutex_exit(nni_inproc.mx); - return (NNG_ENOMEM); - } - if (((rv = nni_mutex_create(&pair->mx)) != 0) || - ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || - ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) { - nni_inproc_pair_destroy(pair); + nni_cond_wait(&nni_inproc.cv); } (void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr); pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0]; @@ -368,9 +371,9 @@ nni_inproc_ep_accept(void *arg, void **pipep) pair->refcnt = 2; srch->cpipe = &pair->pipe[0]; *pipep = &pair->pipe[1]; - nni_cond_broadcast(nni_inproc.cv); + nni_cond_broadcast(&nni_inproc.cv); - nni_mutex_exit(nni_inproc.mx); + nni_mutex_exit(&nni_inproc.mx); return (0); } |
