diff options
| -rw-r--r-- | src/core/defs.h | 10 | ||||
| -rw-r--r-- | src/core/endpt.h | 4 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 82 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 42 | ||||
| -rw-r--r-- | src/core/platform.h | 43 | ||||
| -rw-r--r-- | src/core/socket.c | 115 | ||||
| -rw-r--r-- | src/core/socket.h | 23 | ||||
| -rw-r--r-- | src/platform/posix/posix_clock.c | 16 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.c | 109 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 48 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 17 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 4 |
13 files changed, 249 insertions, 266 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index b294e0f3..a37377f7 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -10,6 +10,8 @@ #ifndef CORE_DEFS_H #define CORE_DEFS_H +#include <stdint.h> + // C compilers may get unhappy when named arguments are not used. While // there are things like __attribute__((unused)) which are arguably // superior, support for such are not universal. @@ -32,4 +34,12 @@ typedef struct nni_pipe_ops nni_pipe_ops; typedef struct nni_protocol nni_protocol; +typedef int nni_signal; // Used as a turnstile/wakeup channel. +typedef uint64_t nni_time; // An absolute time in microseconds. +typedef int nni_duration; // A relative time in microseconds. + +// Some default timing things. +#define NNI_TIME_NEVER ((nni_time) 0xffffffffull) +#define NNI_TIME_ZERO ((nni_time) 0) + #endif // CORE_DEFS_H diff --git a/src/core/endpt.h b/src/core/endpt.h index 063dcb7b..0fa69678 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -24,8 +24,8 @@ struct nng_endpt { nni_thread_t ep_dialer; nni_thread_t ep_listener; int ep_close; - nni_mutex_t ep_mx; - nni_cond_t ep_cv; + nni_mutex ep_mx; + nni_cond ep_cv; }; int nni_endpt_create(nni_endpt **, nni_socket *, const char *); diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index cbdb94c6..7d892b88 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -112,22 +112,20 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal) int -nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) +nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, + nni_time expire, nni_signal *signal) { - uint64_t expire; - - if (tmout >= 0) { - expire = nni_clock() + tmout; - } else { - expire = 0xffffffffffffffffull; - } - nni_mutex_enter(&mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { + while ((!mq->mq_closed) && + (mq->mq_len == mq->mq_cap) && + (!*signal)) { if (expire <= nni_clock()) { nni_mutex_exit(&mq->mq_lock); - return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); + if (expire == NNI_TIME_ZERO) { + return (NNG_EAGAIN); + } + return (NNG_ETIMEDOUT); } (void) nni_cond_waituntil(&mq->mq_writeable, expire); } @@ -160,23 +158,19 @@ nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) } -int -nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) +static int +nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, + nni_time expire, nni_signal *signal) { - uint64_t expire; - - if (tmout >= 0) { - expire = nni_clock() + tmout; - } else { - expire = 0xffffffffffffffffull; - } - nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { if (expire <= nni_clock()) { nni_mutex_exit(&mq->mq_lock); - return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); + if (expire == NNI_TIME_ZERO) { + return (NNG_EAGAIN); + } + return (NNG_ETIMEDOUT); } (void) nni_cond_waituntil(&mq->mq_readable, expire); } @@ -212,20 +206,52 @@ nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) int -nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout) +nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp) { - int nosig = 0; + nni_signal nosig = 0; + + return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig)); +} + + +int +nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal) +{ + return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal)); +} + - return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig)); +int +nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire) +{ + nni_signal nosig = 0; + + return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig)); +} + + +int +nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg) +{ + nni_signal nosig = 0; + + return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig)); +} + + +int +nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal) +{ + return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal)); } int -nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout) +nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) { - int nosig = 0; + nni_signal nosig = 0; - return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig)); + return (nni_msgqueue_put_impl(mq, msg, expire, &nosig)); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index e44e203a..d97482af 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -27,37 +27,47 @@ extern int nni_msgqueue_create(nni_msgqueue **, int); // messages that may be in the queue. extern void nni_msgqueue_destroy(nni_msgqueue *); -// nni_msgqueue_put attempts to put a message to the queue. It will wait -// for the timeout (us), if the value is positive. If the value is negative -// then it will wait forever. If the value is zero, it will just check, and -// return immediately whether a message can be put or not. Valid returns are -// NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot -// be placed after a time, or NNG_EAGAIN if the operation cannot succeed -// immediately and a zero timeout is specified. Note that timeout granularity -// may be limited -- for example Windows systems have a millisecond resolution -// timeout capability. -extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *, int); +// nni_msgqueue_put puts the message to the queue. It blocks until it +// was able to do so, or the queue is closed, returning either 0 on +// success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is +// returned, the caller is responsible for freeing the message with +// nni_msg_free(), otherwise the message is "owned" by the queue, and +// the caller is not permitted to access it further. +extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *); -// nni_msgqueue_get gets the message from the queue, using a timeout just -// like nni_msgqueue_put. -extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **, int); +// nni_msgqueue_get gets the message from the queue. It blocks until a +// message is available, or the queue is closed, returning either 0 on +// success or NNG_ECLOSED if the queue was closed. If a message is +// provided, the caller is assumes ownership of the message and must +// call nni_msg_free() when it is finished with it. +extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **); + +// nni_msgqueue_put_until is like nni_msgqueue_put, except that if the +// system clock reaches the specified time without being able to place +// the message in the queue, it will return NNG_ETIMEDOUT. +extern int nni_msgqueue_put_until(nni_msgqueue *, nni_msg *, nni_time); + +// nni_msgqueue_get_until is like nni_msgqueue_put, except that if the +// system clock reaches the specified time without being able to retrieve +// a message from the queue, it will return NNG_ETIMEDOUT. +extern int nni_msgqueue_get_until(nni_msgqueue *, nni_msg **, nni_time); // nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it // can be interrupted by nni_msgqueue_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, int, int *); +extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, nni_signal *); // nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it // can be interrupted by nni_msgqueue_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, int, int *); +extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, nni_signal *); // nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in // the msgqueue, if they have registered an interest in the same turnstile. // It modifies the turnstile's value under the lock to a non-zero value. -extern void nni_msgqueue_signal(nni_msgqueue *, int *); +extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *); // nni_msgqueue_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue diff --git a/src/core/platform.h b/src/core/platform.h index 6722e666..58e4cd36 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -62,13 +62,6 @@ extern void nni_free(void *, size_t); typedef struct nni_mutex nni_mutex; typedef struct nni_cond nni_cond; -// XXX: REMOVE THESE -typedef struct nni_mutex * nni_mutex_t; -typedef struct nni_cond * nni_cond_t; -extern int nni_mutex_create(nni_mutex_t *); -extern void nni_mutex_destroy(nni_mutex_t); -extern int nni_cond_create(nni_cond_t *, nni_mutex_t); -extern void nni_cond_destroy(nni_cond_t); // Mutex handling. @@ -104,30 +97,21 @@ extern void nni_cond_fini(nni_cond *); // nni_cond_broadcast wakes all waiters on the condition. This should be // called with the lock held. -extern void nni_cond_broadcast(nni_cond_t); +extern void nni_cond_broadcast(nni_cond *); // nni_cond_signal wakes a signal waiter. -extern void nni_cond_signal(nni_cond_t); +extern void nni_cond_signal(nni_cond *); // nni_cond_wait waits for a wake up on the condition variable. The // associated lock is atomically released and reacquired upon wake up. // Callers can be spuriously woken. The associated lock must be held. -extern void nni_cond_wait(nni_cond_t); - -// nni_cond_timedwait waits for a wakeup on the condition variable, just -// as with nni_condwait, but it will also wake after the given number of -// microseconds has passed. (This is a relative timed wait.) Early -// wakeups are permitted, and the caller must take care to double check any -// conditions. The return value is 0 on success, or an error code, which -// can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer -// than the timeout based on the resolution of your system clock. -extern int nni_cond_timedwait(nni_cond_t, int); +extern void nni_cond_wait(nni_cond *); // nni_cond_waituntil waits for a wakeup on the condition variable, or // until the system time reaches the specified absolute time. (It is an // absolute form of nni_cond_timedwait.) Early wakeups are possible, so // check the condition. It will return either NNG_ETIMEDOUT, or 0. -extern int nni_cond_waituntil(nni_cond_t, uint64_t); +extern int nni_cond_waituntil(nni_cond *, nni_time); typedef struct nni_thread * nni_thread_t; typedef struct nni_thread nni_thread; @@ -142,12 +126,15 @@ extern int nni_thread_create(nni_thread **, void (*fn)(void *), void *); extern void nni_thread_reap(nni_thread *); // nn_clock returns a number of microseconds since some arbitrary time -// in the past. The values returned by nni_clock may be used with -// nni_cond_timedwait. -extern uint64_t nni_clock(void); +// in the past. The values returned by nni_clock must use the same base +// as the times used in nni_cond_waituntil. The nni_clock() must return +// values > 0, and must return values smaller than 2^63. (We could relax +// this last constraint, but there is no reason to, and leaves us the option +// of using negative values for other purposes in the future.) +extern nni_time nni_clock(void); // nni_usleep sleeps for the specified number of microseconds (at least). -extern void nni_usleep(uint64_t); +extern void nni_usleep(nni_duration); // nni_platform_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, @@ -158,8 +145,9 @@ extern void nni_usleep(uint64_t); // nni_plat_fini has been called. // // The function argument should be called if the platform has not initialized -// (i.e. exactly once please), and its result passed back to the caller. -// +// (i.e. exactly once), and its result passed back to the caller. If it +// does not return 0 (success), then it may be called again to try to +// initialize the platform again at a later date. extern int nni_plat_init(int (*)(void)); // nni_platform_fini is called to clean up resources. It is intended to @@ -167,7 +155,8 @@ extern int nni_plat_init(int (*)(void)); // will be called until nni_platform_init is called. extern void nni_plat_fini(void); -// Actual platforms we support. +// Actual platforms we support. This is included up front so that we can +// get the specific types that are supplied by the platform. #if defined(PLATFORM_POSIX) #include "platform/posix/posix_impl.h" #else diff --git a/src/core/socket.c b/src/core/socket.c index 0272f454..c500db31 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -32,7 +32,7 @@ int nni_socket_create(nni_socket **sockp, uint16_t proto) { nni_socket *sock; - struct nni_protocol *ops; + nni_protocol *ops; int rv; if ((ops = nni_protocol_find(proto)) == NULL) { @@ -43,12 +43,12 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) } sock->s_ops = *ops; - if ((rv = nni_mutex_create(&sock->s_mx)) != 0) { + if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); return (rv); } - if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) { - nni_mutex_destroy(sock->s_mx); + if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) { + nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -57,8 +57,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { - nni_cond_destroy(sock->s_cv); - nni_mutex_destroy(sock->s_mx); + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -73,9 +73,10 @@ nni_socket_close(nni_socket *sock) { nni_pipe *pipe; nni_endpt *ep; + uint64_t linger; - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; @@ -90,9 +91,8 @@ nni_socket_close(nni_socket *sock) or nni_ep_shutdown(ep); #endif - break; /* REMOVE ME */ } - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); // XXX: TODO. This is a place where we should drain the write side // msgqueue, effectively getting a linger on the socket. The @@ -110,11 +110,10 @@ nni_socket_close(nni_socket *sock) // Now we should attempt to wait for the list of pipes to drop to // zero -- indicating that the protocol has shut things down // cleanly, voluntarily. (I.e. it finished its drain.) - nni_mutex_enter(sock->s_mx); + linger = nni_clock(); + nni_mutex_enter(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { - // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger); - int rv = NNG_ETIMEDOUT; - if (rv == NNG_ETIMEDOUT) { + if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { break; } } @@ -130,35 +129,50 @@ nni_socket_close(nni_socket *sock) // quickly too! If this blocks for any non-trivial amount of time // here, it indicates a protocol implementation bug. while (nni_list_first(&sock->s_pipes) != NULL) { - nni_cond_wait(sock->s_cv); + nni_cond_wait(&sock->s_cv); } // Wait to make sure endpoint listeners have shutdown and exited // as well. They should have done so *long* ago. while (nni_list_first(&sock->s_eps) != NULL) { - nni_cond_wait(sock->s_cv); + nni_cond_wait(&sock->s_cv); } + nni_mutex_exit(&sock->s_mx); + return (0); } int -nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) +nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout) { int rv; int besteffort; + nni_time expire; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } else if (tmout < 0) { + expire = NNI_TIME_NEVER; + } else { + expire = NNI_TIME_ZERO; + } // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. // a REP socket with no REQ pending. - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = sock->s_senderr) != 0) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (rv); } besteffort = sock->s_besteffort; - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); if (sock->s_ops.proto_send_filter != NULL) { msg = sock->s_ops.proto_send_filter(sock->s_data, msg); @@ -170,9 +184,9 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) if (besteffort) { // BestEffort mode -- if we cannot handle the message due to // backpressure, we just throw it away, and don't complain. - tmout = 0; + expire = NNI_TIME_ZERO; } - rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); + rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire); if (besteffort && (rv == NNG_EAGAIN)) { // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); @@ -181,6 +195,47 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) return (rv); } +int +nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) +{ + int rv; + nni_time expire; + nni_msg *msg; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } else if (tmout < 0) { + expire = NNI_TIME_NEVER; + } else { + expire = NNI_TIME_ZERO; + } + + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } + if ((rv = sock->s_recverr) != 0) { + nni_mutex_exit(&sock->s_mx); + return (rv); + } + nni_mutex_exit(&sock->s_mx); + + for (;;) { + rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); + if (rv != 0) { + return (rv); + } + msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); + if (msg != NULL) { + break; + } + // Protocol dropped the message; try again. + } + + *msgp = msg; + return (0); +} // nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t @@ -195,9 +250,9 @@ nni_socket_proto(nni_socket *sock) void nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) { - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); if (pipe->p_sock != sock) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); } // Remove the pipe from the protocol. Protocols may @@ -215,9 +270,9 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // If we're closing, wake the socket if we finished draining. if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_cond_broadcast(sock->s_cv); + nni_cond_broadcast(&sock->s_cv); } - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); } @@ -226,18 +281,18 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) { int rv; - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (rv); } nni_list_append(&sock->s_pipes, pipe); pipe->p_sock = sock; - /* XXX: Publish event */ - nni_mutex_exit(sock->s_mx); + // XXX: Publish event + nni_mutex_exit(&sock->s_mx); return (0); } diff --git a/src/core/socket.h b/src/core/socket.h index 55944c92..ec4acfdb 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -14,24 +14,25 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. struct nng_socket { - nni_mutex_t s_mx; - nni_cond_t s_cv; + nni_mutex s_mx; + nni_cond s_cv; - nni_msgqueue_t s_uwq; // Upper write queue - nni_msgqueue_t s_urq; // Upper read queue + nni_msgqueue * s_uwq; // Upper write queue + nni_msgqueue * s_urq; // Upper read queue - struct nni_protocol s_ops; + nni_protocol s_ops; - void * s_data; // Protocol private + void * s_data; // Protocol private // XXX: options - nni_list_t s_eps; - nni_list_t s_pipes; + nni_list_t s_eps; + nni_list_t s_pipes; - int s_closing; // Socket is closing - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use + int s_closing; // Socket is closing + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use + int s_recverr; // Protocol state machine use }; extern int nni_socket_create(nni_socket **, uint16_t); diff --git a/src/platform/posix/posix_clock.c b/src/platform/posix/posix_clock.c index 48a1e09b..c4206ebe 100644 --- a/src/platform/posix/posix_clock.c +++ b/src/platform/posix/posix_clock.c @@ -19,11 +19,11 @@ #ifndef NNG_USE_GETTIMEOFDAY // Use POSIX realtime stuff -uint64_t +nni_time nni_clock(void) { struct timespec ts; - uint64_t usec; + nni_time usec; if (clock_gettime(NNG_USE_CLOCKID, &ts) != 0) { /* This should never ever occur. */ @@ -38,7 +38,7 @@ nni_clock(void) void -nni_usleep(uint64_t usec) +nni_usleep(nni_duration usec) { struct timespec ts; @@ -67,10 +67,10 @@ nni_usleep(uint64_t usec) #include <sys/time.h> #include <poll.h> -uint64_t +nni_time nni_clock(void) { - uint64_t usec; + nni_time usec; struct timeval tv; @@ -86,7 +86,7 @@ nni_clock(void) void -nni_usleep(uint64_t usec) +nni_usleep(nni_duration usec) { // So probably there is no nanosleep. We could in theory use // pthread condition variables, but that means doing memory @@ -97,8 +97,8 @@ nni_usleep(uint64_t usec) // So we can use poll() instead, which is rather coarse, but // pretty much guaranteed to work. struct pollfd pfd; - uint64_t now; - uint64_t expire; + nni_time now; + nni_time expire; // Possibly we could pass NULL instead of pfd, but passing a valid // pointer ensures that if the system dereferences the pointer it diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 4f93b101..0fa15b43 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -42,4 +42,4 @@ struct nni_cond { }; #endif -#endif // PLATFORM_POSIX_IMPL_H
\ No newline at end of file +#endif // PLATFORM_POSIX_IMPL_H diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c index c0a2721c..3d91ee3d 100644 --- a/src/platform/posix/posix_synch.c +++ b/src/platform/posix/posix_synch.c @@ -42,54 +42,6 @@ nni_mutex_fini(nni_mutex *mp) } -// XXX: REMOVE THIS FUNCTION -int -nni_mutex_create(nni_mutex_t *mp) -{ - struct nni_mutex *m; - pthread_mutexattr_t attr; - int rv; - - if ((m = nni_alloc(sizeof (*m))) == NULL) { - return (NNG_ENOMEM); - } - - // We ask for more error checking... - if (pthread_mutexattr_init(&attr) != 0) { - nni_free(m, sizeof (*m)); - return (NNG_ENOMEM); - } - - if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK) != 0) { - nni_panic("pthread_mutexattr_settype failed"); - } - - rv = pthread_mutex_init(&m->mx, &attr); - - if (pthread_mutexattr_destroy(&attr) != 0) { - nni_panic("pthread_mutexattr_destroy failed"); - } - - if (rv != 0) { - nni_free(m, sizeof (*m)); - return (NNG_ENOMEM); - } - *mp = m; - return (0); -} - - -// XXX: REMOVE THIS FUNCTION -void -nni_mutex_destroy(nni_mutex_t m) -{ - if (pthread_mutex_destroy(&m->mx) != 0) { - nni_panic("pthread_mutex_destroy failed"); - } - nni_free(m, sizeof (*m)); -} - - void nni_mutex_enter(nni_mutex *m) { @@ -140,55 +92,6 @@ nni_cond_fini(nni_cond *c) } -// XXXX: REMOVE THIS FUNCTION -static int -nni_cond_attr(pthread_condattr_t **attrpp) -{ - *attrpp = NULL; - return (0); -} - - -// XXX: REMOVE THIS FUNCTION -int -nni_cond_create(nni_cond **cvp, nni_mutex *mx) -{ - /* - * By preference, we use a CLOCK_MONOTONIC version of condition - * variables, which insulates us from changes to the system time. - */ - struct nni_cond *c; - pthread_condattr_t *attrp; - int rv; - - if ((rv = nni_cond_attr(&attrp)) != 0) { - return (rv); - } - if ((c = nni_alloc(sizeof (*c))) == NULL) { - return (NNG_ENOMEM); - } - c->mx = &mx->mx; - if (pthread_cond_init(&c->cv, attrp) != 0) { - /* In theory could be EAGAIN, but handle like ENOMEM */ - nni_free(c, sizeof (*c)); - return (NNG_ENOMEM); - } - *cvp = c; - return (0); -} - - -// XXX: REMOVE THIS FUNCTION -void -nni_cond_destroy(nni_cond *c) -{ - if (pthread_cond_destroy(&c->cv) != 0) { - nni_panic("pthread_cond_destroy failed"); - } - nni_free(c, sizeof (*c)); -} - - void nni_cond_signal(nni_cond *c) { @@ -237,16 +140,4 @@ nni_cond_waituntil(nni_cond *c, uint64_t usec) return (0); } - -int -nni_cond_timedwait(nni_cond *c, int usec) -{ - if (usec < 0) { - nni_cond_wait(c); - return (0); - } - return (nni_cond_waituntil(c, ((uint64_t) usec) + nni_clock())); -} - - #endif diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 79f69762..80f3b96f 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -23,14 +23,14 @@ struct nni_thread { void (*func)(void *); }; -static pthread_mutex_t plat_lock = PTHREAD_MUTEX_INITIALIZER; -static int plat_init = 0; -static int plat_fork = 0; +static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER; +static int nni_plat_inited = 0; +static int nni_plat_forked = 0; static void * nni_thrfunc(void *arg) { - nni_thread_t thr = arg; + nni_thread *thr = arg; thr->func(thr->arg); return (NULL); @@ -38,9 +38,9 @@ nni_thrfunc(void *arg) int -nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg) +nni_thread_create(nni_thread **tp, void (*fn)(void *), void *arg) { - nni_thread_t thr; + nni_thread *thr; int rv; if ((thr = nni_alloc(sizeof (*thr))) == NULL) { @@ -59,7 +59,7 @@ nni_thread_create(nni_thread_t *tp, void (*fn)(void *), void *arg) void -nni_thread_reap(nni_thread_t thr) +nni_thread_reap(nni_thread * thr) { int rv; @@ -73,7 +73,7 @@ nni_thread_reap(nni_thread_t thr) void nni_atfork_child(void) { - plat_fork = 1; + nni_plat_forked = 1; } @@ -85,48 +85,48 @@ nni_plat_init(int (*helper)(void)) { int rv; - if (plat_fork) { + if (nni_plat_forked) { nni_panic("nng is fork-reentrant safe"); } - if (plat_init) { + if (nni_plat_inited) { return (0); // fast path } - pthread_mutex_lock(&plat_lock); - if (plat_init) { // check again under the lock to be sure - pthread_mutex_unlock(&plat_lock); + pthread_mutex_lock(&nni_plat_lock); + if (nni_plat_inited) { // check again under the lock to be sure + pthread_mutex_unlock(&nni_plat_lock); return (0); } if (pthread_condattr_init(&nni_condattr) != 0) { - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); return (NNG_ENOMEM); } #if !defined(NNG_USE_GETTIMEOFDAY) && NNG_USE_CLOCKID != CLOCK_REALTIME if (pthread_condattr_setclock(&nni_condattr, NNG_USE_CLOCKID) != 0) { - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); return (NNG_ENOMEM); } #endif if (pthread_mutexattr_init(&nni_mutexattr) != 0) { - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); return (NNG_ENOMEM); } if (pthread_mutexattr_settype(&nni_mutexattr, PTHREAD_MUTEX_ERRORCHECK) != 0) { - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); return (NNG_ENOMEM); } if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) { - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); return (NNG_ENOMEM); } if ((rv = helper()) == 0) { - plat_init = 1; + nni_plat_inited = 1; } - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); return (rv); } @@ -135,13 +135,13 @@ nni_plat_init(int (*helper)(void)) void nni_plat_fini(void) { - pthread_mutex_lock(&plat_lock); - if (plat_init) { + pthread_mutex_lock(&nni_plat_lock); + if (nni_plat_inited) { pthread_mutexattr_destroy(&nni_mutexattr); pthread_condattr_destroy(&nni_condattr); - plat_init = 0; + nni_plat_inited = 0; } - pthread_mutex_unlock(&plat_lock); + pthread_mutex_unlock(&nni_plat_lock); } diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 9523ca84..364a72fa 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -33,8 +33,8 @@ typedef struct nni_pair_pipe { nni_pipe * pipe; nni_pair_sock * pair; int good; - nni_thread_t sthr; - nni_thread_t rthr; + nni_thread * sthr; + nni_thread * rthr; int sigclose; } nni_pair_pipe; @@ -162,8 +162,8 @@ nni_pair_sender(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; - nni_msgqueue_t uwq = pair->uwq; - nni_msgqueue_t urq = pair->urq; + nni_msgqueue *uwq = pair->uwq; + nni_msgqueue *urq = pair->urq; nni_pipe *pipe = pp->pipe; nni_msg *msg; int rv; @@ -177,7 +177,7 @@ nni_pair_sender(void *arg) for (;;) { - rv = nni_msgqueue_get_sig(uwq, &msg, -1, &pp->sigclose); + rv = nni_msgqueue_get_sig(uwq, &msg, &pp->sigclose); if (rv != 0) { break; } @@ -198,8 +198,8 @@ nni_pair_receiver(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; - nni_msgqueue_t urq = pair->urq; - nni_msgqueue_t uwq = pair->uwq; + nni_msgqueue *urq = pair->urq; + nni_msgqueue *uwq = pair->uwq; nni_pipe *pipe = pp->pipe; nni_msg *msg; int rv; @@ -216,7 +216,7 @@ nni_pair_receiver(void *arg) if (rv != 0) { break; } - rv = nni_msgqueue_put_sig(urq, msg, -1, &pp->sigclose); + rv = nni_msgqueue_put_sig(urq, msg, &pp->sigclose); if (rv != 0) { nni_msg_free(msg); break; @@ -227,6 +227,7 @@ nni_pair_receiver(void *arg) nni_socket_rem_pipe(pair->sock, pipe); } + // TODO: probably we could replace these with NULL, since we have no // protocol specific options? static int diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index cb79d65a..23046933 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -142,7 +142,7 @@ nni_inproc_pipe_send(void *arg, nni_msg *msg) // TODO: look at the message expiration and use that to set up // the timeout. (And if it expired already, throw it away.) - return (nni_msgqueue_put(pipe->wq, msg, -1)); + return (nni_msgqueue_put(pipe->wq, msg)); } @@ -151,7 +151,7 @@ nni_inproc_pipe_recv(void *arg, nni_msg **msgp) { nni_inproc_pipe *pipe = arg; - return (nni_msgqueue_get(pipe->rq, msgp, -1)); + return (nni_msgqueue_get(pipe->rq, msgp)); } |
