aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-22 15:23:21 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-22 15:23:21 -0800
commit934c1316ae47754a2e368c65228c3cbfe552680f (patch)
treee81c4d2854df83e3d908c9269dd35c0600fa6acb /src
parentee969ad99dc1e07e1c38876223e7aed13463b121 (diff)
downloadnng-934c1316ae47754a2e368c65228c3cbfe552680f.tar.gz
nng-934c1316ae47754a2e368c65228c3cbfe552680f.tar.bz2
nng-934c1316ae47754a2e368c65228c3cbfe552680f.zip
Inline locks (fewer allocs), simpler absolute times for wakeups. nn_sock_recv.
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h10
-rw-r--r--src/core/endpt.h4
-rw-r--r--src/core/msgqueue.c82
-rw-r--r--src/core/msgqueue.h42
-rw-r--r--src/core/platform.h43
-rw-r--r--src/core/socket.c115
-rw-r--r--src/core/socket.h23
-rw-r--r--src/platform/posix/posix_clock.c16
-rw-r--r--src/platform/posix/posix_impl.h2
-rw-r--r--src/platform/posix/posix_synch.c109
-rw-r--r--src/platform/posix/posix_thread.c48
-rw-r--r--src/protocol/pair/pair.c17
-rw-r--r--src/transport/inproc/inproc.c4
13 files changed, 249 insertions, 266 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index b294e0f3..a37377f7 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -10,6 +10,8 @@
#ifndef CORE_DEFS_H
#define CORE_DEFS_H
+#include <stdint.h>
+
// C compilers may get unhappy when named arguments are not used. While
// there are things like __attribute__((unused)) which are arguably
// superior, support for such are not universal.
@@ -32,4 +34,12 @@ typedef struct nni_pipe_ops nni_pipe_ops;
typedef struct nni_protocol nni_protocol;
+typedef int nni_signal; // Used as a turnstile/wakeup channel.
+typedef uint64_t nni_time; // An absolute time in microseconds.
+typedef int nni_duration; // A relative time in microseconds.
+
+// Some default timing things.
+#define NNI_TIME_NEVER ((nni_time) 0xffffffffull)
+#define NNI_TIME_ZERO ((nni_time) 0)
+
#endif // CORE_DEFS_H
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 063dcb7b..0fa69678 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -24,8 +24,8 @@ struct nng_endpt {
nni_thread_t ep_dialer;
nni_thread_t ep_listener;
int ep_close;
- nni_mutex_t ep_mx;
- nni_cond_t ep_cv;
+ nni_mutex ep_mx;
+ nni_cond ep_cv;
};
int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index cbdb94c6..7d892b88 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -112,22 +112,20 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal)
int
-nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal)
+nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
+ nni_time expire, nni_signal *signal)
{
- uint64_t expire;
-
- if (tmout >= 0) {
- expire = nni_clock() + tmout;
- } else {
- expire = 0xffffffffffffffffull;
- }
-
nni_mutex_enter(&mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) {
+ while ((!mq->mq_closed) &&
+ (mq->mq_len == mq->mq_cap) &&
+ (!*signal)) {
if (expire <= nni_clock()) {
nni_mutex_exit(&mq->mq_lock);
- return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
+ if (expire == NNI_TIME_ZERO) {
+ return (NNG_EAGAIN);
+ }
+ return (NNG_ETIMEDOUT);
}
(void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
@@ -160,23 +158,19 @@ nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal)
}
-int
-nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal)
+static int
+nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
+ nni_time expire, nni_signal *signal)
{
- uint64_t expire;
-
- if (tmout >= 0) {
- expire = nni_clock() + tmout;
- } else {
- expire = 0xffffffffffffffffull;
- }
-
nni_mutex_enter(&mq->mq_lock);
while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
if (expire <= nni_clock()) {
nni_mutex_exit(&mq->mq_lock);
- return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT);
+ if (expire == NNI_TIME_ZERO) {
+ return (NNG_EAGAIN);
+ }
+ return (NNG_ETIMEDOUT);
}
(void) nni_cond_waituntil(&mq->mq_readable, expire);
}
@@ -212,20 +206,52 @@ nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal)
int
-nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout)
+nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp)
{
- int nosig = 0;
+ nni_signal nosig = 0;
+
+ return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig));
+}
+
+
+int
+nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal)
+{
+ return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal));
+}
+
- return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig));
+int
+nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire)
+{
+ nni_signal nosig = 0;
+
+ return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig));
+}
+
+
+int
+nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg)
+{
+ nni_signal nosig = 0;
+
+ return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig));
+}
+
+
+int
+nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal)
+{
+ return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal));
}
int
-nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout)
+nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
{
- int nosig = 0;
+ nni_signal nosig = 0;
- return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig));
+ return (nni_msgqueue_put_impl(mq, msg, expire, &nosig));
}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index e44e203a..d97482af 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -27,37 +27,47 @@ extern int nni_msgqueue_create(nni_msgqueue **, int);
// messages that may be in the queue.
extern void nni_msgqueue_destroy(nni_msgqueue *);
-// nni_msgqueue_put attempts to put a message to the queue. It will wait
-// for the timeout (us), if the value is positive. If the value is negative
-// then it will wait forever. If the value is zero, it will just check, and
-// return immediately whether a message can be put or not. Valid returns are
-// NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot
-// be placed after a time, or NNG_EAGAIN if the operation cannot succeed
-// immediately and a zero timeout is specified. Note that timeout granularity
-// may be limited -- for example Windows systems have a millisecond resolution
-// timeout capability.
-extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *, int);
+// nni_msgqueue_put puts the message to the queue. It blocks until it
+// was able to do so, or the queue is closed, returning either 0 on
+// success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is
+// returned, the caller is responsible for freeing the message with
+// nni_msg_free(), otherwise the message is "owned" by the queue, and
+// the caller is not permitted to access it further.
+extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *);
-// nni_msgqueue_get gets the message from the queue, using a timeout just
-// like nni_msgqueue_put.
-extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **, int);
+// nni_msgqueue_get gets the message from the queue. It blocks until a
+// message is available, or the queue is closed, returning either 0 on
+// success or NNG_ECLOSED if the queue was closed. If a message is
+// provided, the caller is assumes ownership of the message and must
+// call nni_msg_free() when it is finished with it.
+extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **);
+
+// nni_msgqueue_put_until is like nni_msgqueue_put, except that if the
+// system clock reaches the specified time without being able to place
+// the message in the queue, it will return NNG_ETIMEDOUT.
+extern int nni_msgqueue_put_until(nni_msgqueue *, nni_msg *, nni_time);
+
+// nni_msgqueue_get_until is like nni_msgqueue_put, except that if the
+// system clock reaches the specified time without being able to retrieve
+// a message from the queue, it will return NNG_ETIMEDOUT.
+extern int nni_msgqueue_get_until(nni_msgqueue *, nni_msg **, nni_time);
// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it
// can be interrupted by nni_msgqueue_signal using the same final pointer,
// which can be thought of as a turnstile. If interrupted it returns EINTR.
// The turnstile should be initialized to zero.
-extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, int, int *);
+extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, nni_signal *);
// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it
// can be interrupted by nni_msgqueue_signal using the same final pointer,
// which can be thought of as a turnstile. If interrupted it returns EINTR.
// The turnstile should be initialized to zero.
-extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, int, int *);
+extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, nni_signal *);
// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in
// the msgqueue, if they have registered an interest in the same turnstile.
// It modifies the turnstile's value under the lock to a non-zero value.
-extern void nni_msgqueue_signal(nni_msgqueue *, int *);
+extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *);
// nni_msgqueue_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
diff --git a/src/core/platform.h b/src/core/platform.h
index 6722e666..58e4cd36 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -62,13 +62,6 @@ extern void nni_free(void *, size_t);
typedef struct nni_mutex nni_mutex;
typedef struct nni_cond nni_cond;
-// XXX: REMOVE THESE
-typedef struct nni_mutex * nni_mutex_t;
-typedef struct nni_cond * nni_cond_t;
-extern int nni_mutex_create(nni_mutex_t *);
-extern void nni_mutex_destroy(nni_mutex_t);
-extern int nni_cond_create(nni_cond_t *, nni_mutex_t);
-extern void nni_cond_destroy(nni_cond_t);
// Mutex handling.
@@ -104,30 +97,21 @@ extern void nni_cond_fini(nni_cond *);
// nni_cond_broadcast wakes all waiters on the condition. This should be
// called with the lock held.
-extern void nni_cond_broadcast(nni_cond_t);
+extern void nni_cond_broadcast(nni_cond *);
// nni_cond_signal wakes a signal waiter.
-extern void nni_cond_signal(nni_cond_t);
+extern void nni_cond_signal(nni_cond *);
// nni_cond_wait waits for a wake up on the condition variable. The
// associated lock is atomically released and reacquired upon wake up.
// Callers can be spuriously woken. The associated lock must be held.
-extern void nni_cond_wait(nni_cond_t);
-
-// nni_cond_timedwait waits for a wakeup on the condition variable, just
-// as with nni_condwait, but it will also wake after the given number of
-// microseconds has passed. (This is a relative timed wait.) Early
-// wakeups are permitted, and the caller must take care to double check any
-// conditions. The return value is 0 on success, or an error code, which
-// can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer
-// than the timeout based on the resolution of your system clock.
-extern int nni_cond_timedwait(nni_cond_t, int);
+extern void nni_cond_wait(nni_cond *);
// nni_cond_waituntil waits for a wakeup on the condition variable, or
// until the system time reaches the specified absolute time. (It is an
// absolute form of nni_cond_timedwait.) Early wakeups are possible, so
// check the condition. It will return either NNG_ETIMEDOUT, or 0.
-extern int nni_cond_waituntil(nni_cond_t, uint64_t);
+extern int nni_cond_waituntil(nni_cond *, nni_time);
typedef struct nni_thread * nni_thread_t;
typedef struct nni_thread nni_thread;
@@ -142,12 +126,15 @@ extern int nni_thread_create(nni_thread **, void (*fn)(void *), void *);
extern void nni_thread_reap(nni_thread *);
// nn_clock returns a number of microseconds since some arbitrary time
-// in the past. The values returned by nni_clock may be used with
-// nni_cond_timedwait.
-extern uint64_t nni_clock(void);
+// in the past. The values returned by nni_clock must use the same base
+// as the times used in nni_cond_waituntil. The nni_clock() must return
+// values > 0, and must return values smaller than 2^63. (We could relax
+// this last constraint, but there is no reason to, and leaves us the option
+// of using negative values for other purposes in the future.)
+extern nni_time nni_clock(void);
// nni_usleep sleeps for the specified number of microseconds (at least).
-extern void nni_usleep(uint64_t);
+extern void nni_usleep(nni_duration);
// nni_platform_init is called to allow the platform the chance to
// do any necessary initialization. This routine MUST be idempotent,
@@ -158,8 +145,9 @@ extern void nni_usleep(uint64_t);
// nni_plat_fini has been called.
//
// The function argument should be called if the platform has not initialized
-// (i.e. exactly once please), and its result passed back to the caller.
-//
+// (i.e. exactly once), and its result passed back to the caller. If it
+// does not return 0 (success), then it may be called again to try to
+// initialize the platform again at a later date.
extern int nni_plat_init(int (*)(void));
// nni_platform_fini is called to clean up resources. It is intended to
@@ -167,7 +155,8 @@ extern int nni_plat_init(int (*)(void));
// will be called until nni_platform_init is called.
extern void nni_plat_fini(void);
-// Actual platforms we support.
+// Actual platforms we support. This is included up front so that we can
+// get the specific types that are supplied by the platform.
#if defined(PLATFORM_POSIX)
#include "platform/posix/posix_impl.h"
#else
diff --git a/src/core/socket.c b/src/core/socket.c
index 0272f454..c500db31 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -32,7 +32,7 @@ int
nni_socket_create(nni_socket **sockp, uint16_t proto)
{
nni_socket *sock;
- struct nni_protocol *ops;
+ nni_protocol *ops;
int rv;
if ((ops = nni_protocol_find(proto)) == NULL) {
@@ -43,12 +43,12 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
}
sock->s_ops = *ops;
- if ((rv = nni_mutex_create(&sock->s_mx)) != 0) {
+ if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
return (rv);
}
- if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) {
- nni_mutex_destroy(sock->s_mx);
+ if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) {
+ nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -57,8 +57,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
// TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
- nni_cond_destroy(sock->s_cv);
- nni_mutex_destroy(sock->s_mx);
+ nni_cond_fini(&sock->s_cv);
+ nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -73,9 +73,10 @@ nni_socket_close(nni_socket *sock)
{
nni_pipe *pipe;
nni_endpt *ep;
+ uint64_t linger;
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
@@ -90,9 +91,8 @@ nni_socket_close(nni_socket *sock)
or nni_ep_shutdown(ep);
#endif
- break; /* REMOVE ME */
}
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
// XXX: TODO. This is a place where we should drain the write side
// msgqueue, effectively getting a linger on the socket. The
@@ -110,11 +110,10 @@ nni_socket_close(nni_socket *sock)
// Now we should attempt to wait for the list of pipes to drop to
// zero -- indicating that the protocol has shut things down
// cleanly, voluntarily. (I.e. it finished its drain.)
- nni_mutex_enter(sock->s_mx);
+ linger = nni_clock();
+ nni_mutex_enter(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
- // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger);
- int rv = NNG_ETIMEDOUT;
- if (rv == NNG_ETIMEDOUT) {
+ if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
break;
}
}
@@ -130,35 +129,50 @@ nni_socket_close(nni_socket *sock)
// quickly too! If this blocks for any non-trivial amount of time
// here, it indicates a protocol implementation bug.
while (nni_list_first(&sock->s_pipes) != NULL) {
- nni_cond_wait(sock->s_cv);
+ nni_cond_wait(&sock->s_cv);
}
// Wait to make sure endpoint listeners have shutdown and exited
// as well. They should have done so *long* ago.
while (nni_list_first(&sock->s_eps) != NULL) {
- nni_cond_wait(sock->s_cv);
+ nni_cond_wait(&sock->s_cv);
}
+ nni_mutex_exit(&sock->s_mx);
+
return (0);
}
int
-nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout)
+nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
{
int rv;
int besteffort;
+ nni_time expire;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ } else if (tmout < 0) {
+ expire = NNI_TIME_NEVER;
+ } else {
+ expire = NNI_TIME_ZERO;
+ }
// Senderr is typically set by protocols when the state machine
// indicates that it is no longer valid to send a message. E.g.
// a REP socket with no REQ pending.
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = sock->s_senderr) != 0) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
return (rv);
}
besteffort = sock->s_besteffort;
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
if (sock->s_ops.proto_send_filter != NULL) {
msg = sock->s_ops.proto_send_filter(sock->s_data, msg);
@@ -170,9 +184,9 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout)
if (besteffort) {
// BestEffort mode -- if we cannot handle the message due to
// backpressure, we just throw it away, and don't complain.
- tmout = 0;
+ expire = NNI_TIME_ZERO;
}
- rv = nni_msgqueue_put(sock->s_uwq, msg, tmout);
+ rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire);
if (besteffort && (rv == NNG_EAGAIN)) {
// Pretend this worked... it didn't, but pretend.
nni_msg_free(msg);
@@ -181,6 +195,47 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout)
return (rv);
}
+int
+nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
+{
+ int rv;
+ nni_time expire;
+ nni_msg *msg;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ } else if (tmout < 0) {
+ expire = NNI_TIME_NEVER;
+ } else {
+ expire = NNI_TIME_ZERO;
+ }
+
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+ if ((rv = sock->s_recverr) != 0) {
+ nni_mutex_exit(&sock->s_mx);
+ return (rv);
+ }
+ nni_mutex_exit(&sock->s_mx);
+
+ for (;;) {
+ rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire);
+ if (rv != 0) {
+ return (rv);
+ }
+ msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
+ if (msg != NULL) {
+ break;
+ }
+ // Protocol dropped the message; try again.
+ }
+
+ *msgp = msg;
+ return (0);
+}
// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
@@ -195,9 +250,9 @@ nni_socket_proto(nni_socket *sock)
void
nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
{
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
if (pipe->p_sock != sock) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
}
// Remove the pipe from the protocol. Protocols may
@@ -215,9 +270,9 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
// If we're closing, wake the socket if we finished draining.
if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
- nni_cond_broadcast(sock->s_cv);
+ nni_cond_broadcast(&sock->s_cv);
}
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
}
@@ -226,18 +281,18 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
{
int rv;
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
return (NNG_ECLOSED);
}
if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
pipe->p_sock = sock;
- /* XXX: Publish event */
- nni_mutex_exit(sock->s_mx);
+ // XXX: Publish event
+ nni_mutex_exit(&sock->s_mx);
return (0);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 55944c92..ec4acfdb 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -14,24 +14,25 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
struct nng_socket {
- nni_mutex_t s_mx;
- nni_cond_t s_cv;
+ nni_mutex s_mx;
+ nni_cond s_cv;
- nni_msgqueue_t s_uwq; // Upper write queue
- nni_msgqueue_t s_urq; // Upper read queue
+ nni_msgqueue * s_uwq; // Upper write queue
+ nni_msgqueue * s_urq; // Upper read queue
- struct nni_protocol s_ops;
+ nni_protocol s_ops;
- void * s_data; // Protocol private
+ void * s_data; // Protocol private
// XXX: options
- nni_list_t s_eps;
- nni_list_t s_pipes;
+ nni_list_t s_eps;
+ nni_list_t s_pipes;
- int s_closing; // Socket is closing
- int s_besteffort; // Best effort mode delivery
- int s_senderr; // Protocol state machine use
+ int s_closing; // Socket is closing
+ int s_besteffort; // Best effort mode delivery
+ int s_senderr; // Protocol state machine use
+ int s_recverr; // Protocol state machine use
};
extern int nni_socket_create(nni_socket **, uint16_t);
diff --git a/src/platform/posix/posix_clock.c b/src/platform/posix/posix_clock.c
index 48a1e09b..c4206ebe 100644
--- a/src/platform/posix/posix_clock.c
+++ b/src/platform/posix/posix_clock.c
@@ -19,11 +19,11 @@
#ifndef NNG_USE_GETTIMEOFDAY
// Use POSIX realtime stuff
-uint64_t
+nni_time
nni_clock(void)
{
struct timespec ts;
- uint64_t usec;
+ nni_time usec;
if (clock_gettime(NNG_USE_CLOCKID, &ts) != 0) {
/* This should never ever occur. */
@@ -38,7 +38,7 @@ nni_clock(void)
void
-nni_usleep(uint64_t usec)
+nni_usleep(nni_duration usec)
{
struct timespec ts;
@@ -67,10 +67,10 @@ nni_usleep(uint64_t usec)
#include <sys/time.h>
#include <poll.h>
-uint64_t
+nni_time
nni_clock(void)
{
- uint64_t usec;
+ nni_time usec;
struct timeval tv;
@@ -86,7 +86,7 @@ nni_clock(void)
void
-nni_usleep(uint64_t usec)
+nni_usleep(nni_duration usec)
{
// So probably there is no nanosleep. We could in theory use
// pthread condition variables, but that means doing memory
@@ -97,8 +97,8 @@ nni_usleep(uint64_t usec)
// So we can use poll() instead, which is rather coarse, but
// pretty much guaranteed to work.
struct pollfd pfd;
- uint64_t now;
- uint64_t expire;
+ nni_time now;
+ nni_time expire;
// Possibly we could pass NULL instead of pfd, but passing a valid
// pointer ensures that if the system dereferences the pointer it
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 4f93b101..0fa15b43 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -42,4 +42,4 @@ struct nni_cond {
};
#endif
-#endif // PLATFORM_POSIX_IMPL_H \ No newline at end of file
+#endif // PLATFORM_POSIX_IMPL_H
diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c
index c0a2721c..3d91ee3d 100644
--- a/src/platform/posix/posix_synch.c
+++ b/src/platform/posix/posix_synch.c
@@ -42,54 +42,6 @@ nni_mutex_fini(nni_mutex *mp)
}
-// XXX: REMOVE THIS FUNCTION
-int
-nni_mutex_create(nni_mutex_t *mp)
-{
- struct nni_mutex *m;
- pthread_mutexattr_t attr;
- int rv;
-
- if ((m = nni_alloc(sizeof (*m))) == NULL) {
- return (NNG_ENOMEM);
- }
-
- // We ask for more error checking...
- if (pthread_mutexattr_init(&attr) != 0) {
- nni_free(m, sizeof (*m));
- return (NNG_ENOMEM);
- }
-
- if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK) != 0) {
- nni_panic("pthread_mutexattr_settype failed");
- }
-
- rv = pthread_mutex_init(&m->mx, &attr);
-
- if (pthread_mutexattr_destroy(&attr) != 0) {
- nni_panic("pthread_mutexattr_destroy failed");
- }
-
- if (rv != 0) {
- nni_free(m, sizeof (*m));
- return (NNG_ENOMEM);
- }
- *mp = m;
- return (0);
-}
-
-
-// XXX: REMOVE THIS FUNCTION
-void
-nni_mutex_destroy(nni_mutex_t m)
-{
- if (pthread_mutex_destroy(&m->mx) != 0) {
- nni_panic("pthread_mutex_destroy failed");
- }
- nni_free(m, sizeof (*m));
-}
-
-
void
nni_mutex_enter(nni_mutex *m)
{
@@ -140,55 +92,6 @@ nni_cond_fini(nni_cond *c)
}
-// XXXX: REMOVE THIS FUNCTION
-static int
-nni_cond_attr(pthread_condattr_t **attrpp)
-{
- *attrpp = NULL;
- return (0);
-}
-
-
-// XXX: REMOVE THIS FUNCTION
-int
-nni_cond_create(nni_cond **cvp, nni_mutex *mx)
-{
- /*
- * By preference, we use a CLOCK_MONOTONIC version of condition
- * variables, which insulates us from changes to the system time.
- */
- struct nni_cond *c;
- pthread_condattr_t *attrp;
- int rv;
-
- if ((rv = nni_cond_attr(&attrp)) != 0) {
- return (rv);
- }
- if ((c = nni_alloc(sizeof (*c))) == NULL) {
- return (NNG_ENOMEM);
- }
- c->mx = &mx->mx;
- if (pthread_cond_init(&c->cv, attrp) != 0) {
- /* In theory could be EAGAIN, but handle like ENOMEM */
- nni_free(c, sizeof (*c));
- return (NNG_ENOMEM);
- }
- *cvp = c;
- return (0);
-}
-
-
-// XXX: REMOVE THIS FUNCTION
-void
-nni_cond_destroy(nni_cond *c)
-{
- if (pthread_cond_destroy(&c->cv) != 0) {
- nni_panic("pthread_cond_destroy failed");
- }
- nni_free(c, sizeof (*c));
-}
-
-
void
nni_cond_signal(nni_cond *c)
{
@@ -237,16 +140,4 @@ nni_cond_waituntil(nni_cond *c, uint64_t usec)
return (0);
}
-
-int
-nni_cond_timedwait(nni_cond *c, int usec)
-{
- if (usec < 0) {
- nni_cond_wait(c);
- return (0);
- }
- return (nni_cond_waituntil(c, ((uint64_t) usec) + nni_clock()));
-}
-
-
#endif
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 79f69762..80f3b96f 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -23,14 +23,14 @@ struct nni_thread {
void (*func)(void *);
};
-static pthread_mutex_t plat_lock = PTHREAD_MUTEX_INITIALIZER;
-static int plat_init = 0;
-static int plat_fork = 0;
+static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
+static int nni_plat_inited = 0;
+static int nni_plat_forked = 0;
static void *
nni_thrfunc(void *arg)
{
- nni_thread_t thr = arg;
+ nni_thread *thr = arg;
thr->func(thr->arg);
return (NULL);
@@ -38,9 +38,9 @@ nni_thrfunc(void *arg)
int
-nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg)
+nni_thread_create(nni_thread **tp, void (*fn)(void *), void *arg)
{
- nni_thread_t thr;
+ nni_thread *thr;
int rv;
if ((thr = nni_alloc(sizeof (*thr))) == NULL) {
@@ -59,7 +59,7 @@ nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg)
void
-nni_thread_reap(nni_thread_t thr)
+nni_thread_reap(nni_thread * thr)
{
int rv;
@@ -73,7 +73,7 @@ nni_thread_reap(nni_thread_t thr)
void
nni_atfork_child(void)
{
- plat_fork = 1;
+ nni_plat_forked = 1;
}
@@ -85,48 +85,48 @@ nni_plat_init(int (*helper)(void))
{
int rv;
- if (plat_fork) {
+ if (nni_plat_forked) {
nni_panic("nng is fork-reentrant safe");
}
- if (plat_init) {
+ if (nni_plat_inited) {
return (0); // fast path
}
- pthread_mutex_lock(&plat_lock);
- if (plat_init) { // check again under the lock to be sure
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_lock(&nni_plat_lock);
+ if (nni_plat_inited) { // check again under the lock to be sure
+ pthread_mutex_unlock(&nni_plat_lock);
return (0);
}
if (pthread_condattr_init(&nni_condattr) != 0) {
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
return (NNG_ENOMEM);
}
#if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME
if (pthread_condattr_setclock(&nni_condattr, NNG_USE_CLOCKID) != 0) {
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
return (NNG_ENOMEM);
}
#endif
if (pthread_mutexattr_init(&nni_mutexattr) != 0) {
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
return (NNG_ENOMEM);
}
if (pthread_mutexattr_settype(&nni_mutexattr,
PTHREAD_MUTEX_ERRORCHECK) != 0) {
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
return (NNG_ENOMEM);
}
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
return (NNG_ENOMEM);
}
if ((rv = helper()) == 0) {
- plat_init = 1;
+ nni_plat_inited = 1;
}
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
return (rv);
}
@@ -135,13 +135,13 @@ nni_plat_init(int (*helper)(void))
void
nni_plat_fini(void)
{
- pthread_mutex_lock(&plat_lock);
- if (plat_init) {
+ pthread_mutex_lock(&nni_plat_lock);
+ if (nni_plat_inited) {
pthread_mutexattr_destroy(&nni_mutexattr);
pthread_condattr_destroy(&nni_condattr);
- plat_init = 0;
+ nni_plat_inited = 0;
}
- pthread_mutex_unlock(&plat_lock);
+ pthread_mutex_unlock(&nni_plat_lock);
}
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 9523ca84..364a72fa 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -33,8 +33,8 @@ typedef struct nni_pair_pipe {
nni_pipe * pipe;
nni_pair_sock * pair;
int good;
- nni_thread_t sthr;
- nni_thread_t rthr;
+ nni_thread * sthr;
+ nni_thread * rthr;
int sigclose;
} nni_pair_pipe;
@@ -162,8 +162,8 @@ nni_pair_sender(void *arg)
{
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
- nni_msgqueue_t uwq = pair->uwq;
- nni_msgqueue_t urq = pair->urq;
+ nni_msgqueue *uwq = pair->uwq;
+ nni_msgqueue *urq = pair->urq;
nni_pipe *pipe = pp->pipe;
nni_msg *msg;
int rv;
@@ -177,7 +177,7 @@ nni_pair_sender(void *arg)
for (;;) {
- rv = nni_msgqueue_get_sig(uwq, &msg, -1, &pp->sigclose);
+ rv = nni_msgqueue_get_sig(uwq, &msg, &pp->sigclose);
if (rv != 0) {
break;
}
@@ -198,8 +198,8 @@ nni_pair_receiver(void *arg)
{
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
- nni_msgqueue_t urq = pair->urq;
- nni_msgqueue_t uwq = pair->uwq;
+ nni_msgqueue *urq = pair->urq;
+ nni_msgqueue *uwq = pair->uwq;
nni_pipe *pipe = pp->pipe;
nni_msg *msg;
int rv;
@@ -216,7 +216,7 @@ nni_pair_receiver(void *arg)
if (rv != 0) {
break;
}
- rv = nni_msgqueue_put_sig(urq, msg, -1, &pp->sigclose);
+ rv = nni_msgqueue_put_sig(urq, msg, &pp->sigclose);
if (rv != 0) {
nni_msg_free(msg);
break;
@@ -227,6 +227,7 @@ nni_pair_receiver(void *arg)
nni_socket_rem_pipe(pair->sock, pipe);
}
+
// TODO: probably we could replace these with NULL, since we have no
// protocol specific options?
static int
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index cb79d65a..23046933 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -142,7 +142,7 @@ nni_inproc_pipe_send(void *arg, nni_msg *msg)
// TODO: look at the message expiration and use that to set up
// the timeout. (And if it expired already, throw it away.)
- return (nni_msgqueue_put(pipe->wq, msg, -1));
+ return (nni_msgqueue_put(pipe->wq, msg));
}
@@ -151,7 +151,7 @@ nni_inproc_pipe_recv(void *arg, nni_msg **msgp)
{
nni_inproc_pipe *pipe = arg;
- return (nni_msgqueue_get(pipe->rq, msgp, -1));
+ return (nni_msgqueue_get(pipe->rq, msgp));
}