summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 13:22:18 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 13:22:18 -0800
commitee969ad99dc1e07e1c38876223e7aed13463b121 (patch)
tree40bde325d041532661e7dcc3441185aca7701e53 /src
parent6c1325a2b17548a4249d26a846bc32b95b7d747d (diff)
downloadnng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.gz
nng-ee969ad99dc1e07e1c38876223e7aed13463b121.tar.bz2
nng-ee969ad99dc1e07e1c38876223e7aed13463b121.zip
Synchronization enhancements - inproc & msgqueue. Absolute waits...
Diffstat (limited to 'src')
-rw-r--r--src/core/msgqueue.c197
-rw-r--r--src/core/msgqueue.h115
-rw-r--r--src/core/platform.h42
-rw-r--r--src/platform/posix/posix_alloc.c20
-rw-r--r--src/platform/posix/posix_clock.c88
-rw-r--r--src/platform/posix/posix_impl.h7
-rw-r--r--src/platform/posix/posix_synch.c120
-rw-r--r--src/platform/posix/posix_thread.c4
-rw-r--r--src/transport/inproc/inproc.c103
9 files changed, 341 insertions, 355 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 6b208e70..cbdb94c6 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -1,35 +1,33 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "nng_impl.h"
-/*
- * Message queue. These operate in some respects like Go channels,
- * but as we have access to the internals, we have made some fundamental
- * differences and improvements. For example, these can grow, and either
- * side can close, and they may be closed more than once.
- */
+// Message queue. These operate in some respects like Go channels,
+// but as we have access to the internals, we have made some fundamental
+// differences and improvements. For example, these can grow, and either
+// side can close, and they may be closed more than once.
struct nni_msgqueue {
- nni_mutex_t mq_lock;
- nni_cond_t mq_readable;
- nni_cond_t mq_writeable;
+ nni_mutex mq_lock;
+ nni_cond mq_readable;
+ nni_cond mq_writeable;
int mq_cap;
int mq_len;
int mq_get;
int mq_put;
int mq_closed;
- nng_msg_t * mq_msgs;
+ nni_msg ** mq_msgs;
};
int
-nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
+nni_msgqueue_create(nni_msgqueue **mqp, int cap)
{
struct nni_msgqueue *mq;
int rv;
@@ -40,24 +38,24 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
if ((mq = nni_alloc(sizeof (*mq))) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mutex_create(&mq->mq_lock)) != 0) {
+ if ((rv = nni_mutex_init(&mq->mq_lock)) != 0) {
nni_free(mq, sizeof (*mq));
return (rv);
}
- if ((rv = nni_cond_create(&mq->mq_readable, mq->mq_lock)) != 0) {
- nni_mutex_destroy(mq->mq_lock);
+ if ((rv = nni_cond_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
+ nni_mutex_fini(&mq->mq_lock);
nni_free(mq, sizeof (*mq));
return (NNG_ENOMEM);
}
- if ((rv = nni_cond_create(&mq->mq_writeable, mq->mq_lock)) != 0) {
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
+ if ((rv = nni_cond_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg_t) * cap)) == NULL) {
- nni_cond_destroy(mq->mq_writeable);
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
@@ -73,13 +71,13 @@ nni_msgqueue_create(nni_msgqueue_t *mqp, int cap)
void
-nni_msgqueue_destroy(nni_msgqueue_t mq)
+nni_msgqueue_destroy(nni_msgqueue *mq)
{
- nni_msg_t msg;
+ nni_msg *msg;
- nni_cond_destroy(mq->mq_writeable);
- nni_cond_destroy(mq->mq_readable);
- nni_mutex_destroy(mq->mq_lock);
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
/* Free any orphaned messages. */
while (mq->mq_len > 0) {
@@ -97,69 +95,54 @@ nni_msgqueue_destroy(nni_msgqueue_t mq)
}
-/*
- * nni_msgqueue_signal raises a signal on the signal object. This allows a
- * waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
- * Note that the signal object must be *zero* if no signal is raised.
- */
+// nni_msgqueue_signal raises a signal on the signal object. This allows a
+// waiter to be signaled, so that it can be woken e.g. due to a pipe closing.
+// Note that the signal object must be *zero* if no signal is raised.
void
-nni_msgqueue_signal(nni_msgqueue_t mq, int *signal)
+nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
{
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
*signal = 1;
- /*
- * We have to wake everyone.
- */
- nni_cond_broadcast(mq->mq_readable);
- nni_cond_broadcast(mq->mq_writeable);
- nni_mutex_exit(mq->mq_lock);
+ // We have to wake everyone.
+ nni_cond_broadcast(&mq->mq_readable);
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_mutex_exit(&mq->mq_lock);
}
int
-nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal)
+nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal)
{
- uint64_t expire, now;
+ uint64_t expire;
- if (tmout > 0) {
+ if (tmout >= 0) {
expire = nni_clock() + tmout;
+ } else {
+ expire = 0xffffffffffffffffull;
}
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) {
- if (tmout == 0) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- if (tmout < 0) {
- (void) nni_cond_wait(mq->mq_writeable);
- continue;
- }
-
- now = nni_clock();
- if (now >= expire) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ETIMEDOUT);
+ if (expire <= nni_clock()) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
}
- (void) nni_cond_timedwait(mq->mq_writeable, (expire - now));
+ (void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
if (mq->mq_closed) {
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
if ((mq->mq_len == mq->mq_cap) && (*signal)) {
- /*
- * We are being interrupted. We only allow an interrupt
- * if there is no room though, because we'd really prefer
- * to queue the data. Otherwise our failure to queue
- * the data could lead to starvation.
- */
- nni_mutex_exit(mq->mq_lock);
+ // We are being interrupted. We only allow an interrupt
+ // if there is no room though, because we'd really prefer
+ // to queue the data. Otherwise our failure to queue
+ // the data could lead to starvation.
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
@@ -170,56 +153,46 @@ nni_msgqueue_put_sig(nni_msgqueue_t mq, nni_msg_t msg, int tmout, int *signal)
}
mq->mq_len++;
if (mq->mq_len == 1) {
- (void) nni_cond_signal(mq->mq_readable);
+ (void) nni_cond_signal(&mq->mq_readable);
}
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (0);
}
int
-nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal)
+nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal)
{
- uint64_t expire, now;
+ uint64_t expire;
- if (tmout > 0) {
+ if (tmout >= 0) {
expire = nni_clock() + tmout;
+ } else {
+ expire = 0xffffffffffffffffull;
}
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
- if (tmout == 0) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- if (tmout < 0) {
- (void) nni_cond_wait(mq->mq_readable);
- continue;
- }
-
- now = nni_clock();
- if (now >= expire) {
- nni_mutex_exit(mq->mq_lock);
- return (NNG_ETIMEDOUT);
+ if (expire <= nni_clock()) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
}
- (void) nni_cond_timedwait(mq->mq_readable, (expire - now));
+ (void) nni_cond_waituntil(&mq->mq_readable, expire);
}
if (mq->mq_closed) {
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
if ((mq->mq_len == 0) && (*signal)) {
- /*
- * We are being interrupted. We only allow an interrupt
- * if there is no data though, because we'd really prefer
- * to give back the data. Otherwise our failure to deal
- * with the data could lead to starvation.
- */
- nni_mutex_exit(mq->mq_lock);
+ // We are being interrupted. We only allow an interrupt
+ // if there is no data though, because we'd really prefer
+ // to give back the data. Otherwise our failure to deal
+ // with the data could lead to starvation; also lingering
+ // relies on this not interrupting if data is pending.
+ nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
@@ -231,15 +204,15 @@ nni_msgqueue_get_sig(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout, int *signal)
}
mq->mq_len++;
if (mq->mq_len == (mq->mq_cap - 1)) {
- (void) nni_cond_signal(mq->mq_writeable);
+ (void) nni_cond_signal(&mq->mq_writeable);
}
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
return (0);
}
int
-nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout)
+nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout)
{
int nosig = 0;
@@ -248,7 +221,7 @@ nni_msgqueue_get(nni_msgqueue_t mq, nni_msg_t *msgp, int tmout)
int
-nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout)
+nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout)
{
int nosig = 0;
@@ -257,16 +230,16 @@ nni_msgqueue_put(nni_msgqueue_t mq, nni_msg_t msg, int tmout)
void
-nni_msgqueue_close(nni_msgqueue_t mq)
+nni_msgqueue_close(nni_msgqueue *mq)
{
nni_msg_t msg;
- nni_mutex_enter(mq->mq_lock);
+ nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cond_broadcast(mq->mq_writeable);
- nni_cond_broadcast(mq->mq_readable);
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_readable);
- /* Free the messages orphaned in the queue. */
+ // Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
msg = mq->mq_msgs[mq->mq_get];
mq->mq_get++;
@@ -276,5 +249,5 @@ nni_msgqueue_close(nni_msgqueue_t mq)
mq->mq_len--;
nni_msg_free(msg);
}
- nni_mutex_exit(mq->mq_lock);
+ nni_mutex_exit(&mq->mq_lock);
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index dbf21d11..e44e203a 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -1,74 +1,67 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#ifndef CORE_MSGQUEUE_H
#define CORE_MSGQUEUE_H
-#include "nng.h"
+#include "nng_impl.h"
-/*
- * Message queues. Message queues work in some ways like Go channels;
- * they are a thread-safe way to pass messages between subsystems.
- */
+// Message queues. Message queues work in some ways like Go channels;
+// they are a thread-safe way to pass messages between subsystems. They
+// do have additional capabilities though.
typedef struct nni_msgqueue * nni_msgqueue_t;
+typedef struct nni_msgqueue nni_msgqueue;
-/*
- * nni_msgqueue_create creates a message queue with the given capacity,
- * which must be a positive number. It returns NNG_EINVAL if the capacity
- * is invalid, or NNG_ENOMEM if resources cannot be allocated.
- */
-extern int nni_msgqueue_create(nni_msgqueue_t *, int);
+// nni_msgqueue_create creates a message queue with the given capacity,
+// which must be a positive number. It returns NNG_EINVAL if the capacity
+// is invalid, or NNG_ENOMEM if resources cannot be allocated.
+extern int nni_msgqueue_create(nni_msgqueue **, int);
-/*
- * nni_msgqueue_destroy destroys a message queue. It will also free any
- * messages that may be in the queue.
- */
-extern void nni_msgqueue_destroy(nni_msgqueue_t);
+// nni_msgqueue_destroy destroys a message queue. It will also free any
+// messages that may be in the queue.
+extern void nni_msgqueue_destroy(nni_msgqueue *);
-/*
- * nni_msgqueue_put attempts to put a message to the queue. It will wait
- * for the timeout (us), if the value is positive. If the value is negative
- * then it will wait forever. If the value is zero, it will just check, and
- * return immediately whether a message can be put or not. Valid returns are
- * NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot
- * be placed after a time, or NNG_EAGAIN if the operation cannot succeed
- * immediately and a zero timeout is specified. Note that timeout granularity
- * may be limited -- for example Windows systems have a millisecond resolution
- * timeout capability.
- */
-extern int nni_msgqueue_put(nni_msgqueue_t, nng_msg_t, int);
+// nni_msgqueue_put attempts to put a message to the queue. It will wait
+// for the timeout (us), if the value is positive. If the value is negative
+// then it will wait forever. If the value is zero, it will just check, and
+// return immediately whether a message can be put or not. Valid returns are
+// NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot
+// be placed after a time, or NNG_EAGAIN if the operation cannot succeed
+// immediately and a zero timeout is specified. Note that timeout granularity
+// may be limited -- for example Windows systems have a millisecond resolution
+// timeout capability.
+extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *, int);
-/*
- * nni_msgqueue_get gets the message from the queue, using a timeout just
- * like nni_msgqueue_put.
- */
-extern int nni_msgqueue_get(nni_msgqueue_t, nng_msg_t *, int);
+// nni_msgqueue_get gets the message from the queue, using a timeout just
+// like nni_msgqueue_put.
+extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **, int);
-/*
- * The following two functions are interruptible versions of msgqueue_get
- * and msgqueue_put. The signal argument (pointer) must be initialized
- * to zero. Then, we can raise a signal, by calling nni_msgqueue_signal
- * on the same object. The signal flag will remain raised until it is
- * cleared to zero. If a routine is interrupted, it will return NNG_EINTR.
- * Note that only threads using the signal object will be interrupted;
- * this has no effect on other threads that may be waiting on the msgqueue
- * as well.
- */
-extern int nni_msgqueue_put_sig(nni_msgqueue_t, nng_msg_t, int, int *);
-extern int nni_msgqueue_get_sig(nni_msgqueue_t, nng_msg_t *, int, int *);
-extern void nni_msgqueue_signal(nni_msgqueue_t, int *);
+// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it
+// can be interrupted by nni_msgqueue_signal using the same final pointer,
+// which can be thought of as a turnstile. If interrupted it returns EINTR.
+// The turnstile should be initialized to zero.
+extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, int, int *);
-/*
- * nni_msgqueue_close closes the queue. After this all operates on the
- * message queue will return NNG_ECLOSED. Messages inside the queue
- * are freed. Unlike closing a go channel, this operation is idempotent.
- */
-extern void nni_msgqueue_close(nni_msgqueue_t);
+// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it
+// can be interrupted by nni_msgqueue_signal using the same final pointer,
+// which can be thought of as a turnstile. If interrupted it returns EINTR.
+// The turnstile should be initialized to zero.
+extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, int, int *);
-#endif /* CORE_MSQUEUE_H */
+// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in
+// the msgqueue, if they have registered an interest in the same turnstile.
+// It modifies the turnstile's value under the lock to a non-zero value.
+extern void nni_msgqueue_signal(nni_msgqueue *, int *);
+
+// nni_msgqueue_close closes the queue. After this all operates on the
+// message queue will return NNG_ECLOSED. Messages inside the queue
+// are freed. Unlike closing a go channel, this operation is idempotent.
+extern void nni_msgqueue_close(nni_msgqueue *);
+
+#endif // CORE_MSQUEUE_H
diff --git a/src/core/platform.h b/src/core/platform.h
index ef6df820..6722e666 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -61,22 +61,46 @@ extern void *nni_alloc(size_t);
extern void nni_free(void *, size_t);
typedef struct nni_mutex nni_mutex;
+typedef struct nni_cond nni_cond;
+// XXX: REMOVE THESE
typedef struct nni_mutex * nni_mutex_t;
typedef struct nni_cond * nni_cond_t;
+extern int nni_mutex_create(nni_mutex_t *);
+extern void nni_mutex_destroy(nni_mutex_t);
+extern int nni_cond_create(nni_cond_t *, nni_mutex_t);
+extern void nni_cond_destroy(nni_cond_t);
// Mutex handling.
+
+// nni_mutex_init initializes a mutex structure. This may require dynamic
+// allocation, depending on the platform. It can return NNG_ENOMEM if that
+// fails.
extern int nni_mutex_init(nni_mutex *);
-extern void nni_mutex_fini(nni_mutex *);
-extern int nni_mutex_create(nni_mutex_t *);
+// nni_mutex_fini destroys the mutex and releases any resources allocated for
+// it's use.
+extern void nni_mutex_fini(nni_mutex *);
-extern void nni_mutex_destroy(nni_mutex_t);
+// nni_mutex_enter locks the mutex. This is not recursive -- a mutex can only
+// be entered once.
extern void nni_mutex_enter(nni_mutex *);
+
+// nni_mutex_exit unlocks the mutex. This can only be performed by the thread
+// that owned the mutex.
extern void nni_mutex_exit(nni_mutex *);
+
+// nni_mutex_tryenter tries to lock the mutex. If it can't, it may return
+// NNG_EBUSY.
extern int nni_mutex_tryenter(nni_mutex *);
-extern int nni_cond_create(nni_cond_t *, nni_mutex_t);
-extern void nni_cond_destroy(nni_cond_t);
+// nni_cond_init initializes a condition variable. We require a mutex be
+// supplied with it, and that mutex must always be held when performing any
+// operations on the condition variable (other than fini.) This may require
+// dynamic allocation, and if so this operation may fail with NNG_ENOMEM.
+extern int nni_cond_init(nni_cond *, nni_mutex *);
+
+// nni_cond_fini releases all resources associated with condition variable.
+extern void nni_cond_fini(nni_cond *);
// nni_cond_broadcast wakes all waiters on the condition. This should be
// called with the lock held.
@@ -97,7 +121,13 @@ extern void nni_cond_wait(nni_cond_t);
// conditions. The return value is 0 on success, or an error code, which
// can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer
// than the timeout based on the resolution of your system clock.
-extern int nni_cond_timedwait(nni_cond_t, uint64_t);
+extern int nni_cond_timedwait(nni_cond_t, int);
+
+// nni_cond_waituntil waits for a wakeup on the condition variable, or
+// until the system time reaches the specified absolute time. (It is an
+// absolute form of nni_cond_timedwait.) Early wakeups are possible, so
+// check the condition. It will return either NNG_ETIMEDOUT, or 0.
+extern int nni_cond_waituntil(nni_cond_t, uint64_t);
typedef struct nni_thread * nni_thread_t;
typedef struct nni_thread nni_thread;
diff --git a/src/platform/posix/posix_alloc.c b/src/platform/posix/posix_alloc.c
index eb5c889e..83fe305d 100644
--- a/src/platform/posix/posix_alloc.c
+++ b/src/platform/posix/posix_alloc.c
@@ -1,11 +1,11 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
#include "core/nng_impl.h"
@@ -13,9 +13,7 @@
#include <stdlib.h>
-/*
- * POSIX memory allocation. This is pretty much standard C.
- */
+// POSIX memory allocation. This is pretty much standard C.
void *
nni_alloc(size_t size)
{
diff --git a/src/platform/posix/posix_clock.c b/src/platform/posix/posix_clock.c
index 83f0b151..48a1e09b 100644
--- a/src/platform/posix/posix_clock.c
+++ b/src/platform/posix/posix_clock.c
@@ -1,15 +1,13 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
-
-/*
- * POSIX clock stuff.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+// POSIX clock stuff.
#include "core/nng_impl.h"
#ifdef PLATFORM_POSIX_CLOCK
@@ -20,9 +18,7 @@
#ifndef NNG_USE_GETTIMEOFDAY
-/*
- * Use POSIX realtime stuff.
- */
+// Use POSIX realtime stuff
uint64_t
nni_clock(void)
{
@@ -56,18 +52,16 @@ nni_usleep(uint64_t usec)
}
-#else /* NNG_USE_GETTIMEOFDAY */
+#else // NNG_USE_GETTIMEOFDAY
-/*
- * If you're here, its because you don't have a modern clock_gettime with
- * monotonic clocks, or the necessary pthread_condattr_settclock(). In
- * this case, you should be advised that *bad* things can happen if your
- * system clock changes time while programs using this library are running.
- * (Basically, timeouts can take longer or shorter, leading to either hangs
- * or apparent spurious errors. Eventually it should all sort itself out,
- * but if you change the clock by a large amount you might wonder what the
- * heck is happening until it does.)
- */
+// If you're here, its because you don't have a modern clock_gettime with
+// monotonic clocks, or the necessary pthread_condattr_settclock(). In
+// this case, you should be advised that *bad* things can happen if your
+// system clock changes time while programs using this library are running.
+// (Basically, timeouts can take longer or shorter, leading to either hangs
+// or apparent spurious errors. Eventually it should all sort itself out,
+// but if you change the clock by a large amount you might wonder what the
+// heck is happening until it does.)
#include <pthread.h>
#include <sys/time.h>
@@ -94,25 +88,21 @@ nni_clock(void)
void
nni_usleep(uint64_t usec)
{
- /*
- * So probably there is no nanosleep. We could in theory use
- * pthread condition variables, but that means doing memory
- * allocation, or forcing the use of pthreads where the platform
- * might be preferring the use of another threading package.
- * Additionally, use of pthreads means that we cannot use
- * relative times in a clock_settime safe manner.
- * So we can use poll() instead, which is rather coarse, but
- * pretty much guaranteed to work.
- */
+ // So probably there is no nanosleep. We could in theory use
+ // pthread condition variables, but that means doing memory
+ // allocation, or forcing the use of pthreads where the platform
+ // might be preferring the use of another threading package.
+ // Additionally, use of pthreads means that we cannot use
+ // relative times in a clock_settime safe manner.
+ // So we can use poll() instead, which is rather coarse, but
+ // pretty much guaranteed to work.
struct pollfd pfd;
uint64_t now;
uint64_t expire;
- /*
- * Possibly we could pass NULL instead of pfd, but passing a valid
- * pointer ensures that if the system dereferences the pointer it
- * won't come back with EFAULT.
- */
+ // Possibly we could pass NULL instead of pfd, but passing a valid
+ // pointer ensures that if the system dereferences the pointer it
+ // won't come back with EFAULT.
pfd.fd = -1;
pfd.events = 0;
@@ -120,19 +110,17 @@ nni_usleep(uint64_t usec)
expire = now + usec;
while (now < expire) {
- /*
- * In theory we could round up to a whole number of msec,
- * but under the covers poll already does some rounding up,
- * and the loop above guarantees that we will not bail out
- * early. So this gives us a better chance to avoid adding
- * nearly an extra unneeded millisecond to the wait.
- */
+ // In theory we could round up to a whole number of msec,
+ // but under the covers poll already does some rounding up,
+ // and the loop above guarantees that we will not bail out
+ // early. So this gives us a better chance to avoid adding
+ // nearly an extra unneeded millisecond to the wait.
(void) poll(&pfd, 0, (int) ((expire - now) / 1000));
now = nni_clock();
}
}
-#endif /* NNG_USE_GETTIMEOFDAY */
+#endif // NNG_USE_GETTIMEOFDAY
-#endif /* PLATFORM_POSIX_CLOCK */
+#endif // PLATFORM_POSIX_CLOCK
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index a7b15edc..4f93b101 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -10,6 +10,13 @@
#ifndef PLATFORM_POSIX_IMPL_H
#define PLATFORM_POSIX_IMPL_H
+// Some dependency notes:
+//
+// PLATFORM_POSIX_THREAD and PLATFORM_POSIX_SYNCH depend on each other,
+// and they both depend on PLATFORM_POSIX_CLOCK. Furthermore, note that
+// when using PLATFORM_POSIX_CLOCK, your condition variable timeouts need
+// to use the same base clock values. Normally all three should be used
+// together.
#ifdef PLATFORM_POSIX
#define PLATFORM_POSIX_ALLOC
#define PLATFORM_POSIX_DEBUG
diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c
index a3129f62..c0a2721c 100644
--- a/src/platform/posix/posix_synch.c
+++ b/src/platform/posix/posix_synch.c
@@ -1,23 +1,14 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
-
-/*
- * This is more of a direct #include of a .c rather than .h file.
- * But having it be a .h makes compiler rules work out properly. Do
- * not include this more than once into your program, or you will
- * get multiple symbols defined.
- */
-
-/*
- * POSIX synchronization (mutexes and condition variables). This uses
- * pthreads.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+// POSIX synchronization (mutexes and condition variables). This uses
+// pthreads.
#include "core/nng_impl.h"
@@ -51,6 +42,7 @@ nni_mutex_fini(nni_mutex *mp)
}
+// XXX: REMOVE THIS FUNCTION
int
nni_mutex_create(nni_mutex_t *mp)
{
@@ -62,7 +54,7 @@ nni_mutex_create(nni_mutex_t *mp)
return (NNG_ENOMEM);
}
- /* We ask for more error checking... */
+ // We ask for more error checking...
if (pthread_mutexattr_init(&attr) != 0) {
nni_free(m, sizeof (*m));
return (NNG_ENOMEM);
@@ -87,6 +79,7 @@ nni_mutex_create(nni_mutex_t *mp)
}
+// XXX: REMOVE THIS FUNCTION
void
nni_mutex_destroy(nni_mutex_t m)
{
@@ -98,7 +91,7 @@ nni_mutex_destroy(nni_mutex_t m)
void
-nni_mutex_enter(nni_mutex_t m)
+nni_mutex_enter(nni_mutex *m)
{
if (pthread_mutex_lock(&m->mx) != 0) {
nni_panic("pthread_mutex_lock failed");
@@ -107,7 +100,7 @@ nni_mutex_enter(nni_mutex_t m)
void
-nni_mutex_exit(nni_mutex_t m)
+nni_mutex_exit(nni_mutex *m)
{
if (pthread_mutex_unlock(&m->mx) != 0) {
nni_panic("pthread_mutex_unlock failed");
@@ -116,7 +109,7 @@ nni_mutex_exit(nni_mutex_t m)
int
-nni_mutex_tryenter(nni_mutex_t m)
+nni_mutex_tryenter(nni_mutex *m)
{
if (pthread_mutex_trylock(&m->mx) != 0) {
return (NNG_EBUSY);
@@ -125,51 +118,40 @@ nni_mutex_tryenter(nni_mutex_t m)
}
-static int
-nni_cond_attr(pthread_condattr_t **attrpp)
+int
+nni_cond_init(nni_cond *c, nni_mutex *m)
{
-#if defined(NNG_USE_GETTIMEOFDAY) || NNG_USE_CLOCKID == CLOCK_REALTIME
- *attrpp = NULL;
+ if (pthread_cond_init(&c->cv, &nni_condattr) != 0) {
+ // In theory could be EAGAIN, but handle like ENOMEM
+ nni_free(c, sizeof (*c));
+ return (NNG_ENOMEM);
+ }
+ c->mx = &m->mx;
return (0);
+}
-#else
- /* In order to make this fast, avoid reinitializing attrs. */
- static pthread_condattr_t attr;
- static pthread_mutex_t mx = PTHREAD_MUTEX_INITIALIZER;
- static int init = 0;
- int rv;
- // For efficiency's sake, we try to reuse the same attr for the
- // life of the library. This avoids many reallocations. Technically
- // this means that we will leak the attr on exit(), but this is
- // preferable to constantly allocating and reallocating it.
- if (init) {
- *attrpp = &attr;
- return (0);
+void
+nni_cond_fini(nni_cond *c)
+{
+ if (pthread_cond_destroy(&c->cv) != 0) {
+ nni_panic("pthread_cond_destroy failed");
}
+}
- (void) pthread_mutex_lock(&mx);
- while (!init) {
- if ((rv = pthread_condattr_init(&attr)) != 0) {
- (void) pthread_mutex_unlock(&mx);
- return (NNG_ENOMEM);
- }
- rv = pthread_condattr_setclock(&attr, NNG_USE_CLOCKID);
- if (rv != 0) {
- nni_panic("condattr_setclock: %s", strerror(rv));
- }
- init = 1;
- }
- (void) pthread_mutex_unlock(&mx);
- *attrpp = &attr;
- return (0);
-#endif
+// XXXX: REMOVE THIS FUNCTION
+static int
+nni_cond_attr(pthread_condattr_t **attrpp)
+{
+ *attrpp = NULL;
+ return (0);
}
+// XXX: REMOVE THIS FUNCTION
int
-nni_cond_create(nni_cond_t *cvp, nni_mutex_t mx)
+nni_cond_create(nni_cond **cvp, nni_mutex *mx)
{
/*
* By preference, we use a CLOCK_MONOTONIC version of condition
@@ -196,8 +178,9 @@ nni_cond_create(nni_cond_t *cvp, nni_mutex_t mx)
}
+// XXX: REMOVE THIS FUNCTION
void
-nni_cond_destroy(nni_cond_t c)
+nni_cond_destroy(nni_cond *c)
{
if (pthread_cond_destroy(&c->cv) != 0) {
nni_panic("pthread_cond_destroy failed");
@@ -207,7 +190,7 @@ nni_cond_destroy(nni_cond_t c)
void
-nni_cond_signal(nni_cond_t c)
+nni_cond_signal(nni_cond *c)
{
if (pthread_cond_signal(&c->cv) != 0) {
nni_panic("pthread_cond_signal failed");
@@ -216,7 +199,7 @@ nni_cond_signal(nni_cond_t c)
void
-nni_cond_broadcast(nni_cond_t c)
+nni_cond_broadcast(nni_cond *c)
{
if (pthread_cond_broadcast(&c->cv) != 0) {
nni_panic("pthread_cond_broadcast failed");
@@ -225,7 +208,7 @@ nni_cond_broadcast(nni_cond_t c)
void
-nni_cond_wait(nni_cond_t c)
+nni_cond_wait(nni_cond *c)
{
if (pthread_cond_wait(&c->cv, c->mx) != 0) {
nni_panic("pthread_cond_wait failed");
@@ -234,7 +217,7 @@ nni_cond_wait(nni_cond_t c)
int
-nni_cond_timedwait(nni_cond_t c, uint64_t usec)
+nni_cond_waituntil(nni_cond *c, uint64_t usec)
{
struct timespec ts;
int rv;
@@ -255,4 +238,15 @@ nni_cond_timedwait(nni_cond_t c, uint64_t usec)
}
+int
+nni_cond_timedwait(nni_cond *c, int usec)
+{
+ if (usec < 0) {
+ nni_cond_wait(c);
+ return (0);
+ }
+ return (nni_cond_waituntil(c, ((uint64_t) usec) + nni_clock()));
+}
+
+
#endif
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 1bbe6e28..79f69762 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -28,7 +28,7 @@ static int plat_init = 0;
static int plat_fork = 0;
static void *
-thrfunc(void *arg)
+nni_thrfunc(void *arg)
{
nni_thread_t thr = arg;
@@ -49,7 +49,7 @@ nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg)
thr->func = fn;
thr->arg = arg;
- if ((rv = pthread_create(&thr->tid, NULL, thrfunc, thr)) != 0) {
+ if ((rv = pthread_create(&thr->tid, NULL, nni_thrfunc, thr)) != 0) {
nni_free(thr, sizeof (*thr));
return (NNG_ENOMEM);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 029825b1..cb79d65a 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -22,8 +22,8 @@ typedef struct nni_inproc_pipe nni_inproc_pipe;
typedef struct nni_inproc_ep nni_inproc_ep;
typedef struct {
- nni_mutex_t mx;
- nni_cond_t cv;
+ nni_mutex mx;
+ nni_cond cv;
nni_list_t eps;
} nni_inproc_global;
@@ -31,17 +31,17 @@ typedef struct {
struct nni_inproc_pipe {
const char * addr;
nni_inproc_pair * pair;
- nni_msgqueue_t rq;
- nni_msgqueue_t wq;
+ nni_msgqueue * rq;
+ nni_msgqueue * wq;
uint16_t peer;
};
// nni_inproc_pair represents a pair of pipes. Because we control both
// sides of the pipes, we can allocate and free this in one structure.
struct nni_inproc_pair {
- nni_mutex_t mx;
+ nni_mutex mx;
int refcnt;
- nni_msgqueue_t q[2];
+ nni_msgqueue * q[2];
nni_inproc_pipe pipe[2];
char addr[NNG_MAXADDRLEN+1];
};
@@ -68,11 +68,11 @@ nni_inproc_init(void)
{
int rv;
- if ((rv = nni_mutex_create(&nni_inproc.mx)) != 0) {
+ if ((rv = nni_mutex_init(&nni_inproc.mx)) != 0) {
return (rv);
}
- if ((rv = nni_cond_create(&nni_inproc.cv, nni_inproc.mx)) != 0) {
- nni_mutex_destroy(nni_inproc.mx);
+ if ((rv = nni_cond_init(&nni_inproc.cv, &nni_inproc.mx)) != 0) {
+ nni_mutex_fini(&nni_inproc.mx);
return (rv);
}
NNI_LIST_INIT(&nni_inproc.eps, nni_inproc_ep, node);
@@ -84,8 +84,8 @@ nni_inproc_init(void)
static void
nni_inproc_fini(void)
{
- nni_cond_destroy(nni_inproc.cv);
- nni_mutex_destroy(nni_inproc.mx);
+ nni_cond_fini(&nni_inproc.cv);
+ nni_mutex_fini(&nni_inproc.mx);
}
@@ -104,18 +104,13 @@ nni_inproc_pipe_close(void *arg)
static void
nni_inproc_pair_destroy(nni_inproc_pair *pair)
{
- if (pair == NULL) {
- return;
- }
if (pair->q[0]) {
nni_msgqueue_destroy(pair->q[0]);
}
if (pair->q[1]) {
nni_msgqueue_destroy(pair->q[1]);
}
- if (pair->mx) {
- nni_mutex_destroy(pair->mx);
- }
+ nni_mutex_fini(&pair->mx);
nni_free(pair, sizeof (*pair));
}
@@ -129,19 +124,19 @@ nni_inproc_pipe_destroy(void *arg)
// We could assert the pipe closed...
// If we are the last peer, then toss the pair structure.
- nni_mutex_enter(pair->mx);
+ nni_mutex_enter(&pair->mx);
pair->refcnt--;
if (pair->refcnt == 0) {
- nni_mutex_exit(pair->mx);
+ nni_mutex_exit(&pair->mx);
nni_inproc_pair_destroy(pair);
} else {
- nni_mutex_exit(pair->mx);
+ nni_mutex_exit(&pair->mx);
}
}
static int
-nni_inproc_pipe_send(void *arg, nng_msg_t msg)
+nni_inproc_pipe_send(void *arg, nni_msg *msg)
{
nni_inproc_pipe *pipe = arg;
@@ -152,7 +147,7 @@ nni_inproc_pipe_send(void *arg, nng_msg_t msg)
static int
-nni_inproc_pipe_recv(void *arg, nng_msg_t *msgp)
+nni_inproc_pipe_recv(void *arg, nni_msg **msgp)
{
nni_inproc_pipe *pipe = arg;
@@ -230,13 +225,13 @@ nni_inproc_ep_close(void *arg)
{
nni_inproc_ep *ep = arg;
- nni_mutex_enter(nni_inproc.mx);
+ nni_mutex_enter(&nni_inproc.mx);
if (!ep->closed) {
ep->closed = 1;
nni_list_remove(&nni_inproc.eps, ep);
- nni_cond_broadcast(nni_inproc.cv);
+ nni_cond_broadcast(&nni_inproc.cv);
}
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
}
@@ -250,7 +245,7 @@ nni_inproc_ep_dial(void *arg, void **pipep)
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
- nni_mutex_enter(nni_inproc.mx);
+ nni_mutex_enter(&nni_inproc.mx);
NNI_LIST_FOREACH (list, srch) {
if (srch->mode != NNI_INPROC_EP_LISTEN) {
continue;
@@ -261,27 +256,27 @@ nni_inproc_ep_dial(void *arg, void **pipep)
}
if (srch == NULL) {
// No listeners available.
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECONNREFUSED);
}
ep->mode = NNI_INPROC_EP_DIAL;
nni_list_append(list, ep);
- nni_cond_broadcast(nni_inproc.cv);
+ nni_cond_broadcast(&nni_inproc.cv);
for (;;) {
if (ep->closed) {
// Closer will have removed us from list.
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECLOSED);
}
if (ep->cpipe != NULL) {
break;
}
- nni_cond_wait(nni_inproc.cv);
+ nni_cond_wait(&nni_inproc.cv);
}
// NB: The acceptor or closer removes us from the list.
ep->mode = NNI_INPROC_EP_IDLE;
*pipep = ep->cpipe;
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (ep->closed ? NNG_ECLOSED : 0);
}
@@ -296,9 +291,9 @@ nni_inproc_ep_listen(void *arg)
if (ep->mode != NNI_INPROC_EP_IDLE) {
return (NNG_EINVAL);
}
- nni_mutex_enter(nni_inproc.mx);
+ nni_mutex_enter(&nni_inproc.mx);
if (ep->closed) {
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_ECLOSED);
}
NNI_LIST_FOREACH (list, srch) {
@@ -306,13 +301,13 @@ nni_inproc_ep_listen(void *arg)
continue;
}
if (strcmp(srch->addr, ep->addr) == 0) {
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (NNG_EADDRINUSE);
}
}
ep->mode = NNI_INPROC_EP_LISTEN;
nni_list_append(list, ep);
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (0);
}
@@ -326,14 +321,31 @@ nni_inproc_ep_accept(void *arg, void **pipep)
nni_list_t *list = &nni_inproc.eps;
int rv;
- nni_mutex_enter(nni_inproc.mx);
if (ep->mode != NNI_INPROC_EP_LISTEN) {
- nni_mutex_exit(nni_inproc.mx);
return (NNG_EINVAL);
}
+
+ // Preallocate the pair, so we don't do it while holding a lock
+ if ((pair = nni_alloc(sizeof (*pair))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_mutex_init(&pair->mx)) != 0) {
+ nni_free(pair, sizeof (*pair));
+ return (rv);
+ }
+ if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) ||
+ ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) {
+ nni_inproc_pair_destroy(pair);
+ return (rv);
+ }
+
+ nni_mutex_enter(&nni_inproc.mx);
for (;;) {
if (ep->closed) {
- nni_mutex_exit(nni_inproc.mx);
+ // This is the only possible error path from the
+ // time we acquired the lock.
+ nni_mutex_exit(&nni_inproc.mx);
+ nni_inproc_pair_destroy(pair);
return (NNG_ECLOSED);
}
NNI_LIST_FOREACH (list, srch) {
@@ -347,16 +359,7 @@ nni_inproc_ep_accept(void *arg, void **pipep)
if (srch != NULL) {
break;
}
- nni_cond_wait(nni_inproc.cv);
- }
- if ((pair = nni_alloc(sizeof (*pair))) == NULL) {
- nni_mutex_exit(nni_inproc.mx);
- return (NNG_ENOMEM);
- }
- if (((rv = nni_mutex_create(&pair->mx)) != 0) ||
- ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) ||
- ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) {
- nni_inproc_pair_destroy(pair);
+ nni_cond_wait(&nni_inproc.cv);
}
(void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr);
pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0];
@@ -368,9 +371,9 @@ nni_inproc_ep_accept(void *arg, void **pipep)
pair->refcnt = 2;
srch->cpipe = &pair->pipe[0];
*pipep = &pair->pipe[1];
- nni_cond_broadcast(nni_inproc.cv);
+ nni_cond_broadcast(&nni_inproc.cv);
- nni_mutex_exit(nni_inproc.mx);
+ nni_mutex_exit(&nni_inproc.mx);
return (0);
}