diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/defs.h | 10 | ||||
| -rw-r--r-- | src/core/endpt.h | 4 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 82 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 42 | ||||
| -rw-r--r-- | src/core/platform.h | 43 | ||||
| -rw-r--r-- | src/core/socket.c | 115 | ||||
| -rw-r--r-- | src/core/socket.h | 23 |
7 files changed, 205 insertions, 114 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index b294e0f3..a37377f7 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -10,6 +10,8 @@ #ifndef CORE_DEFS_H #define CORE_DEFS_H +#include <stdint.h> + // C compilers may get unhappy when named arguments are not used. While // there are things like __attribute__((unused)) which are arguably // superior, support for such are not universal. @@ -32,4 +34,12 @@ typedef struct nni_pipe_ops nni_pipe_ops; typedef struct nni_protocol nni_protocol; +typedef int nni_signal; // Used as a turnstile/wakeup channel. +typedef uint64_t nni_time; // An absolute time in microseconds. +typedef int nni_duration; // A relative time in microseconds. + +// Some default timing things. +#define NNI_TIME_NEVER ((nni_time) 0xffffffffull) +#define NNI_TIME_ZERO ((nni_time) 0) + #endif // CORE_DEFS_H diff --git a/src/core/endpt.h b/src/core/endpt.h index 063dcb7b..0fa69678 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -24,8 +24,8 @@ struct nng_endpt { nni_thread_t ep_dialer; nni_thread_t ep_listener; int ep_close; - nni_mutex_t ep_mx; - nni_cond_t ep_cv; + nni_mutex ep_mx; + nni_cond ep_cv; }; int nni_endpt_create(nni_endpt **, nni_socket *, const char *); diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index cbdb94c6..7d892b88 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -112,22 +112,20 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal) int -nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) +nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, + nni_time expire, nni_signal *signal) { - uint64_t expire; - - if (tmout >= 0) { - expire = nni_clock() + tmout; - } else { - expire = 0xffffffffffffffffull; - } - nni_mutex_enter(&mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == mq->mq_cap) && (!*signal)) { + while ((!mq->mq_closed) && + (mq->mq_len == mq->mq_cap) && + (!*signal)) { if (expire <= nni_clock()) { nni_mutex_exit(&mq->mq_lock); - return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); + if (expire == NNI_TIME_ZERO) { + return (NNG_EAGAIN); + } + return (NNG_ETIMEDOUT); } (void) nni_cond_waituntil(&mq->mq_writeable, expire); } @@ -160,23 +158,19 @@ nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, int tmout, int *signal) } -int -nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) +static int +nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, + nni_time expire, nni_signal *signal) { - uint64_t expire; - - if (tmout >= 0) { - expire = nni_clock() + tmout; - } else { - expire = 0xffffffffffffffffull; - } - nni_mutex_enter(&mq->mq_lock); while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { if (expire <= nni_clock()) { nni_mutex_exit(&mq->mq_lock); - return (tmout == 0 ? NNG_EAGAIN : NNG_ETIMEDOUT); + if (expire == NNI_TIME_ZERO) { + return (NNG_EAGAIN); + } + return (NNG_ETIMEDOUT); } (void) nni_cond_waituntil(&mq->mq_readable, expire); } @@ -212,20 +206,52 @@ nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, int tmout, int *signal) int -nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp, int tmout) +nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp) { - int nosig = 0; + nni_signal nosig = 0; + + return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig)); +} + + +int +nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal) +{ + return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal)); +} + - return (nni_msgqueue_get_sig(mq, msgp, tmout, &nosig)); +int +nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire) +{ + nni_signal nosig = 0; + + return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig)); +} + + +int +nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg) +{ + nni_signal nosig = 0; + + return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig)); +} + + +int +nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal) +{ + return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal)); } int -nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg, int tmout) +nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) { - int nosig = 0; + nni_signal nosig = 0; - return (nni_msgqueue_put_sig(mq, msg, tmout, &nosig)); + return (nni_msgqueue_put_impl(mq, msg, expire, &nosig)); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index e44e203a..d97482af 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -27,37 +27,47 @@ extern int nni_msgqueue_create(nni_msgqueue **, int); // messages that may be in the queue. extern void nni_msgqueue_destroy(nni_msgqueue *); -// nni_msgqueue_put attempts to put a message to the queue. It will wait -// for the timeout (us), if the value is positive. If the value is negative -// then it will wait forever. If the value is zero, it will just check, and -// return immediately whether a message can be put or not. Valid returns are -// NNG_ECLOSED if the queue is closed or NNG_ETIMEDOUT if the message cannot -// be placed after a time, or NNG_EAGAIN if the operation cannot succeed -// immediately and a zero timeout is specified. Note that timeout granularity -// may be limited -- for example Windows systems have a millisecond resolution -// timeout capability. -extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *, int); +// nni_msgqueue_put puts the message to the queue. It blocks until it +// was able to do so, or the queue is closed, returning either 0 on +// success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is +// returned, the caller is responsible for freeing the message with +// nni_msg_free(), otherwise the message is "owned" by the queue, and +// the caller is not permitted to access it further. +extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *); -// nni_msgqueue_get gets the message from the queue, using a timeout just -// like nni_msgqueue_put. -extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **, int); +// nni_msgqueue_get gets the message from the queue. It blocks until a +// message is available, or the queue is closed, returning either 0 on +// success or NNG_ECLOSED if the queue was closed. If a message is +// provided, the caller is assumes ownership of the message and must +// call nni_msg_free() when it is finished with it. +extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **); + +// nni_msgqueue_put_until is like nni_msgqueue_put, except that if the +// system clock reaches the specified time without being able to place +// the message in the queue, it will return NNG_ETIMEDOUT. +extern int nni_msgqueue_put_until(nni_msgqueue *, nni_msg *, nni_time); + +// nni_msgqueue_get_until is like nni_msgqueue_put, except that if the +// system clock reaches the specified time without being able to retrieve +// a message from the queue, it will return NNG_ETIMEDOUT. +extern int nni_msgqueue_get_until(nni_msgqueue *, nni_msg **, nni_time); // nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it // can be interrupted by nni_msgqueue_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, int, int *); +extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, nni_signal *); // nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it // can be interrupted by nni_msgqueue_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, int, int *); +extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, nni_signal *); // nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in // the msgqueue, if they have registered an interest in the same turnstile. // It modifies the turnstile's value under the lock to a non-zero value. -extern void nni_msgqueue_signal(nni_msgqueue *, int *); +extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *); // nni_msgqueue_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue diff --git a/src/core/platform.h b/src/core/platform.h index 6722e666..58e4cd36 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -62,13 +62,6 @@ extern void nni_free(void *, size_t); typedef struct nni_mutex nni_mutex; typedef struct nni_cond nni_cond; -// XXX: REMOVE THESE -typedef struct nni_mutex * nni_mutex_t; -typedef struct nni_cond * nni_cond_t; -extern int nni_mutex_create(nni_mutex_t *); -extern void nni_mutex_destroy(nni_mutex_t); -extern int nni_cond_create(nni_cond_t *, nni_mutex_t); -extern void nni_cond_destroy(nni_cond_t); // Mutex handling. @@ -104,30 +97,21 @@ extern void nni_cond_fini(nni_cond *); // nni_cond_broadcast wakes all waiters on the condition. This should be // called with the lock held. -extern void nni_cond_broadcast(nni_cond_t); +extern void nni_cond_broadcast(nni_cond *); // nni_cond_signal wakes a signal waiter. -extern void nni_cond_signal(nni_cond_t); +extern void nni_cond_signal(nni_cond *); // nni_cond_wait waits for a wake up on the condition variable. The // associated lock is atomically released and reacquired upon wake up. // Callers can be spuriously woken. The associated lock must be held. -extern void nni_cond_wait(nni_cond_t); - -// nni_cond_timedwait waits for a wakeup on the condition variable, just -// as with nni_condwait, but it will also wake after the given number of -// microseconds has passed. (This is a relative timed wait.) Early -// wakeups are permitted, and the caller must take care to double check any -// conditions. The return value is 0 on success, or an error code, which -// can be NNG_ETIMEDOUT. Note that it is permissible to wait for longer -// than the timeout based on the resolution of your system clock. -extern int nni_cond_timedwait(nni_cond_t, int); +extern void nni_cond_wait(nni_cond *); // nni_cond_waituntil waits for a wakeup on the condition variable, or // until the system time reaches the specified absolute time. (It is an // absolute form of nni_cond_timedwait.) Early wakeups are possible, so // check the condition. It will return either NNG_ETIMEDOUT, or 0. -extern int nni_cond_waituntil(nni_cond_t, uint64_t); +extern int nni_cond_waituntil(nni_cond *, nni_time); typedef struct nni_thread * nni_thread_t; typedef struct nni_thread nni_thread; @@ -142,12 +126,15 @@ extern int nni_thread_create(nni_thread **, void (*fn)(void *), void *); extern void nni_thread_reap(nni_thread *); // nn_clock returns a number of microseconds since some arbitrary time -// in the past. The values returned by nni_clock may be used with -// nni_cond_timedwait. -extern uint64_t nni_clock(void); +// in the past. The values returned by nni_clock must use the same base +// as the times used in nni_cond_waituntil. The nni_clock() must return +// values > 0, and must return values smaller than 2^63. (We could relax +// this last constraint, but there is no reason to, and leaves us the option +// of using negative values for other purposes in the future.) +extern nni_time nni_clock(void); // nni_usleep sleeps for the specified number of microseconds (at least). -extern void nni_usleep(uint64_t); +extern void nni_usleep(nni_duration); // nni_platform_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, @@ -158,8 +145,9 @@ extern void nni_usleep(uint64_t); // nni_plat_fini has been called. // // The function argument should be called if the platform has not initialized -// (i.e. exactly once please), and its result passed back to the caller. -// +// (i.e. exactly once), and its result passed back to the caller. If it +// does not return 0 (success), then it may be called again to try to +// initialize the platform again at a later date. extern int nni_plat_init(int (*)(void)); // nni_platform_fini is called to clean up resources. It is intended to @@ -167,7 +155,8 @@ extern int nni_plat_init(int (*)(void)); // will be called until nni_platform_init is called. extern void nni_plat_fini(void); -// Actual platforms we support. +// Actual platforms we support. This is included up front so that we can +// get the specific types that are supplied by the platform. #if defined(PLATFORM_POSIX) #include "platform/posix/posix_impl.h" #else diff --git a/src/core/socket.c b/src/core/socket.c index 0272f454..c500db31 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -32,7 +32,7 @@ int nni_socket_create(nni_socket **sockp, uint16_t proto) { nni_socket *sock; - struct nni_protocol *ops; + nni_protocol *ops; int rv; if ((ops = nni_protocol_find(proto)) == NULL) { @@ -43,12 +43,12 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) } sock->s_ops = *ops; - if ((rv = nni_mutex_create(&sock->s_mx)) != 0) { + if ((rv = nni_mutex_init(&sock->s_mx)) != 0) { nni_free(sock, sizeof (*sock)); return (rv); } - if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) { - nni_mutex_destroy(sock->s_mx); + if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) { + nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -57,8 +57,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) // TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { - nni_cond_destroy(sock->s_cv); - nni_mutex_destroy(sock->s_mx); + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); nni_free(sock, sizeof (*sock)); return (rv); } @@ -73,9 +73,10 @@ nni_socket_close(nni_socket *sock) { nni_pipe *pipe; nni_endpt *ep; + uint64_t linger; - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; @@ -90,9 +91,8 @@ nni_socket_close(nni_socket *sock) or nni_ep_shutdown(ep); #endif - break; /* REMOVE ME */ } - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); // XXX: TODO. This is a place where we should drain the write side // msgqueue, effectively getting a linger on the socket. The @@ -110,11 +110,10 @@ nni_socket_close(nni_socket *sock) // Now we should attempt to wait for the list of pipes to drop to // zero -- indicating that the protocol has shut things down // cleanly, voluntarily. (I.e. it finished its drain.) - nni_mutex_enter(sock->s_mx); + linger = nni_clock(); + nni_mutex_enter(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { - // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger); - int rv = NNG_ETIMEDOUT; - if (rv == NNG_ETIMEDOUT) { + if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { break; } } @@ -130,35 +129,50 @@ nni_socket_close(nni_socket *sock) // quickly too! If this blocks for any non-trivial amount of time // here, it indicates a protocol implementation bug. while (nni_list_first(&sock->s_pipes) != NULL) { - nni_cond_wait(sock->s_cv); + nni_cond_wait(&sock->s_cv); } // Wait to make sure endpoint listeners have shutdown and exited // as well. They should have done so *long* ago. while (nni_list_first(&sock->s_eps) != NULL) { - nni_cond_wait(sock->s_cv); + nni_cond_wait(&sock->s_cv); } + nni_mutex_exit(&sock->s_mx); + return (0); } int -nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) +nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout) { int rv; int besteffort; + nni_time expire; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } else if (tmout < 0) { + expire = NNI_TIME_NEVER; + } else { + expire = NNI_TIME_ZERO; + } // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. // a REP socket with no REQ pending. - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } if ((rv = sock->s_senderr) != 0) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (rv); } besteffort = sock->s_besteffort; - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); if (sock->s_ops.proto_send_filter != NULL) { msg = sock->s_ops.proto_send_filter(sock->s_data, msg); @@ -170,9 +184,9 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) if (besteffort) { // BestEffort mode -- if we cannot handle the message due to // backpressure, we just throw it away, and don't complain. - tmout = 0; + expire = NNI_TIME_ZERO; } - rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); + rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire); if (besteffort && (rv == NNG_EAGAIN)) { // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); @@ -181,6 +195,47 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout) return (rv); } +int +nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) +{ + int rv; + nni_time expire; + nni_msg *msg; + + if (tmout > 0) { + expire = nni_clock() + tmout; + } else if (tmout < 0) { + expire = NNI_TIME_NEVER; + } else { + expire = NNI_TIME_ZERO; + } + + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } + if ((rv = sock->s_recverr) != 0) { + nni_mutex_exit(&sock->s_mx); + return (rv); + } + nni_mutex_exit(&sock->s_mx); + + for (;;) { + rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); + if (rv != 0) { + return (rv); + } + msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); + if (msg != NULL) { + break; + } + // Protocol dropped the message; try again. + } + + *msgp = msg; + return (0); +} // nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t @@ -195,9 +250,9 @@ nni_socket_proto(nni_socket *sock) void nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) { - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); if (pipe->p_sock != sock) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); } // Remove the pipe from the protocol. Protocols may @@ -215,9 +270,9 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe) // If we're closing, wake the socket if we finished draining. if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) { - nni_cond_broadcast(sock->s_cv); + nni_cond_broadcast(&sock->s_cv); } - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); } @@ -226,18 +281,18 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) { int rv; - nni_mutex_enter(sock->s_mx); + nni_mutex_enter(&sock->s_mx); if (sock->s_closing) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (NNG_ECLOSED); } if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { - nni_mutex_exit(sock->s_mx); + nni_mutex_exit(&sock->s_mx); return (rv); } nni_list_append(&sock->s_pipes, pipe); pipe->p_sock = sock; - /* XXX: Publish event */ - nni_mutex_exit(sock->s_mx); + // XXX: Publish event + nni_mutex_exit(&sock->s_mx); return (0); } diff --git a/src/core/socket.h b/src/core/socket.h index 55944c92..ec4acfdb 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -14,24 +14,25 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. struct nng_socket { - nni_mutex_t s_mx; - nni_cond_t s_cv; + nni_mutex s_mx; + nni_cond s_cv; - nni_msgqueue_t s_uwq; // Upper write queue - nni_msgqueue_t s_urq; // Upper read queue + nni_msgqueue * s_uwq; // Upper write queue + nni_msgqueue * s_urq; // Upper read queue - struct nni_protocol s_ops; + nni_protocol s_ops; - void * s_data; // Protocol private + void * s_data; // Protocol private // XXX: options - nni_list_t s_eps; - nni_list_t s_pipes; + nni_list_t s_eps; + nni_list_t s_pipes; - int s_closing; // Socket is closing - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use + int s_closing; // Socket is closing + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use + int s_recverr; // Protocol state machine use }; extern int nni_socket_create(nni_socket **, uint16_t); |
