diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/msgqueue.c | 63 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 77 | ||||
| -rw-r--r-- | src/core/options.c | 8 | ||||
| -rw-r--r-- | src/core/options.h | 4 | ||||
| -rw-r--r-- | src/core/protocol.h | 4 | ||||
| -rw-r--r-- | src/core/socket.c | 26 | ||||
| -rw-r--r-- | src/core/socket.h | 4 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 20 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 36 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 22 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 26 |
11 files changed, 148 insertions, 142 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 4807fe0f..0f1872dc 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -14,7 +14,7 @@ // differences and improvements. For example, these can grow, and either // side can close, and they may be closed more than once. -struct nni_msgqueue { +struct nni_msgq { nni_mtx mq_lock; nni_cv mq_readable; nni_cv mq_writeable; @@ -31,9 +31,9 @@ struct nni_msgqueue { }; int -nni_msgqueue_create(nni_msgqueue **mqp, int cap) +nni_msgq_init(nni_msgq **mqp, int cap) { - struct nni_msgqueue *mq; + struct nni_msgq *mq; int rv; int alloc; @@ -92,10 +92,13 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) void -nni_msgqueue_destroy(nni_msgqueue *mq) +nni_msgq_fini(nni_msgq *mq) { nni_msg *msg; + if (mq == NULL) { + return; + } nni_cv_fini(&mq->mq_drained); nni_cv_fini(&mq->mq_writeable); nni_cv_fini(&mq->mq_readable); @@ -117,11 +120,11 @@ nni_msgqueue_destroy(nni_msgqueue *mq) } -// nni_msgqueue_signal raises a signal on the signal object. This allows a +// nni_msgq_signal raises a signal on the signal object. This allows a // waiter to be signaled, so that it can be woken e.g. due to a pipe closing. // Note that the signal object must be *zero* if no signal is raised. void -nni_msgqueue_signal(nni_msgqueue *mq, int *signal) +nni_msgq_signal(nni_msgq *mq, int *signal) { nni_mtx_lock(&mq->mq_lock); *signal = 1; @@ -134,8 +137,7 @@ nni_msgqueue_signal(nni_msgqueue *mq, int *signal) int -nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, - nni_time expire, nni_signal *signal) +nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig) { int rv; @@ -161,7 +163,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } // interrupted? - if (*signal) { + if (*sig) { nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } @@ -198,11 +200,11 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } -// nni_msgqueue_putback will attempt to put a single message back +// nni_msgq_putback will attempt to put a single message back // to the head of the queue. It never blocks. Message queues always // have room for at least one putback. int -nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) +nni_msgq_putback(nni_msgq *mq, nni_msg *msg) { nni_mtx_lock(&mq->mq_lock); @@ -234,8 +236,7 @@ nni_msgqueue_putback(nni_msgqueue *mq, nni_msg *msg) static int -nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, - nni_time expire, nni_signal *signal) +nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig) { int rv; @@ -254,7 +255,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, nni_mtx_unlock(&mq->mq_lock); return (NNG_EAGAIN); } - if (*signal) { + if (*sig) { nni_mtx_unlock(&mq->mq_lock); return (NNG_EINTR); } @@ -288,57 +289,57 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, int -nni_msgqueue_get(nni_msgqueue *mq, nni_msg **msgp) +nni_msgq_get(nni_msgq *mq, nni_msg **msgp) { nni_signal nosig = 0; - return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, &nosig)); + return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, &nosig)); } int -nni_msgqueue_get_sig(nni_msgqueue *mq, nni_msg **msgp, nni_signal *signal) +nni_msgq_get_sig(nni_msgq *mq, nni_msg **msgp, nni_signal *signal) { - return (nni_msgqueue_get_impl(mq, msgp, NNI_TIME_NEVER, signal)); + return (nni_msgq_get_(mq, msgp, NNI_TIME_NEVER, signal)); } int -nni_msgqueue_get_until(nni_msgqueue *mq, nni_msg **msgp, nni_time expire) +nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) { nni_signal nosig = 0; - return (nni_msgqueue_get_impl(mq, msgp, expire, &nosig)); + return (nni_msgq_get_(mq, msgp, expire, &nosig)); } int -nni_msgqueue_put(nni_msgqueue *mq, nni_msg *msg) +nni_msgq_put(nni_msgq *mq, nni_msg *msg) { nni_signal nosig = 0; - return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, &nosig)); + return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, &nosig)); } int -nni_msgqueue_put_sig(nni_msgqueue *mq, nni_msg *msg, nni_signal *signal) +nni_msgq_put_sig(nni_msgq *mq, nni_msg *msg, nni_signal *signal) { - return (nni_msgqueue_put_impl(mq, msg, NNI_TIME_NEVER, signal)); + return (nni_msgq_put_(mq, msg, NNI_TIME_NEVER, signal)); } int -nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) +nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) { nni_signal nosig = 0; - return (nni_msgqueue_put_impl(mq, msg, expire, &nosig)); + return (nni_msgq_put_(mq, msg, expire, &nosig)); } void -nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) +nni_msgq_drain(nni_msgq *mq, nni_time expire) { nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; @@ -363,7 +364,7 @@ nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) void -nni_msgqueue_close(nni_msgqueue *mq) +nni_msgq_close(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; @@ -384,7 +385,7 @@ nni_msgqueue_close(nni_msgqueue *mq) int -nni_msgqueue_len(nni_msgqueue *mq) +nni_msgq_len(nni_msgq *mq) { int rv; @@ -396,7 +397,7 @@ nni_msgqueue_len(nni_msgqueue *mq) int -nni_msgqueue_cap(nni_msgqueue *mq) +nni_msgq_cap(nni_msgq *mq) { int rv; @@ -408,7 +409,7 @@ nni_msgqueue_cap(nni_msgqueue *mq) int -nni_msgqueue_resize(nni_msgqueue *mq, int cap) +nni_msgq_resize(nni_msgq *mq, int cap) { int alloc; nni_msg *msg; diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 900fed2b..9ee4335a 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -24,80 +24,89 @@ // // Readers & writers in a message queue can be woken either by a timeout // or by a specific signal (arranged by the caller). -// -// TODO: Add message queue growing, and pushback. -typedef struct nni_msgqueue nni_msgqueue; +typedef struct nni_msgq nni_msgq; -// nni_msgqueue_create creates a message queue with the given capacity, +// nni_msgq_init creates a message queue with the given capacity, // which must be a positive number. It returns NNG_EINVAL if the capacity // is invalid, or NNG_ENOMEM if resources cannot be allocated. -extern int nni_msgqueue_create(nni_msgqueue **, int); +extern int nni_msgq_init(nni_msgq **, int); -// nni_msgqueue_destroy destroys a message queue. It will also free any +// nni_msgq_fini destroys a message queue. It will also free any // messages that may be in the queue. -extern void nni_msgqueue_destroy(nni_msgqueue *); +extern void nni_msgq_fini(nni_msgq *); -// nni_msgqueue_put puts the message to the queue. It blocks until it +// nni_msgq_put puts the message to the queue. It blocks until it // was able to do so, or the queue is closed, returning either 0 on // success or NNG_ECLOSED if the queue was closed. If NNG_ECLOSED is // returned, the caller is responsible for freeing the message with // nni_msg_free(), otherwise the message is "owned" by the queue, and // the caller is not permitted to access it further. -extern int nni_msgqueue_put(nni_msgqueue *, nni_msg *); +extern int nni_msgq_put(nni_msgq *, nni_msg *); -// nni_msgqueue_putback returns a message to the head of the queue. +// nni_msgq_putback returns a message to the head of the queue. // This is a non-blocking operation, and it returns EAGAIN if there // is no room. There is always at least room for one putback after -// a message is retried with nni_msgqueue_get. -extern int nni_msgqueue_putback(nni_msgqueue *, nni_msg *); +// a message is retried with nni_msgq_get. +extern int nni_msgq_putback(nni_msgq *, nni_msg *); -// nni_msgqueue_get gets the message from the queue. It blocks until a +// nni_msgq_get gets the message from the queue. It blocks until a // message is available, or the queue is closed, returning either 0 on // success or NNG_ECLOSED if the queue was closed. If a message is // provided, the caller is assumes ownership of the message and must // call nni_msg_free() when it is finished with it. -extern int nni_msgqueue_get(nni_msgqueue *, nni_msg **); +extern int nni_msgq_get(nni_msgq *, nni_msg **); -// nni_msgqueue_put_until is like nni_msgqueue_put, except that if the +// nni_msgq_put_until is like nni_msgq_put, except that if the // system clock reaches the specified time without being able to place // the message in the queue, it will return NNG_ETIMEDOUT. -extern int nni_msgqueue_put_until(nni_msgqueue *, nni_msg *, nni_time); +extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time); -// nni_msgqueue_get_until is like nni_msgqueue_put, except that if the +// nni_msgq_get_until is like nni_msgq_put, except that if the // system clock reaches the specified time without being able to retrieve // a message from the queue, it will return NNG_ETIMEDOUT. -extern int nni_msgqueue_get_until(nni_msgqueue *, nni_msg **, nni_time); +extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time); -// nni_msgqueue_put_sig is an enhanced version of nni_msgqueue_put, but it +// nni_msgq_put_sig is an enhanced version of nni_msgq_put, but it // can be interrupted by nni_msgqueue_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_put_sig(nni_msgqueue *, nni_msg *, nni_signal *); +extern int nni_msgq_put_sig(nni_msgq *, nni_msg *, nni_signal *); -// nni_msgqueue_get_sig is an enhanced version of nni_msgqueue_get_t, but it -// can be interrupted by nni_msgqueue_signal using the same final pointer, +// nni_msgq_get_sig is an enhanced version of nni_msgq_get_t, but it +// can be interrupted by nni_msgq_signal using the same final pointer, // which can be thought of as a turnstile. If interrupted it returns EINTR. // The turnstile should be initialized to zero. -extern int nni_msgqueue_get_sig(nni_msgqueue *, nni_msg **, nni_signal *); +extern int nni_msgq_get_sig(nni_msgq *, nni_msg **, nni_signal *); -// nni_msgqueue_signal delivers a signal / interrupt to waiters blocked in -// the msgqueue, if they have registered an interest in the same turnstile. +// nni_msgq_signal delivers a signal / interrupt to waiters blocked in +// the msgq, if they have registered an interest in the same turnstile. // It modifies the turnstile's value under the lock to a non-zero value. -extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *); +extern void nni_msgq_signal(nni_msgq *, nni_signal *); -// nni_msgqueue_close closes the queue. After this all operates on the +// nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. -extern void nni_msgqueue_close(nni_msgqueue *); +extern void nni_msgq_close(nni_msgq *); -// nni_msgqueue_drain is like nng_msgqueue_close, except that reads +// nni_msgq_drain is like nng_msgq_close, except that reads // against the queue are permitted for up to the time limit. The // operation blocks until either the queue is empty, or the timeout // has expired. Any messages still in the queue at the timeout are freed. -extern void nni_msgqueue_drain(nni_msgqueue *, nni_time); - -extern int nni_msgqueue_resize(nni_msgqueue *, int); -extern int nni_msgqueue_cap(nni_msgqueue *mq); -extern int nni_msgqueue_len(nni_msgqueue *mq); +extern void nni_msgq_drain(nni_msgq *, nni_time); + +// nni_msgq_resize resizes the message queue; messages already in the queue +// will be preserved as long as there is room. Messages that are dropped +// due to no room are taken from the most recent. (Oldest messages are +// preserved.) +extern int nni_msgq_resize(nni_msgq *, int); + +// nni_msgq_cap returns the "capacity" of the message queue. This does not +// include the extra room for pushback, nor the extra slot reserved to make +// zero-length message queues possible. As a consequence, it is possible +// for the message queue to contain up to 2 more messages than the capacity. +extern int nni_msgq_cap(nni_msgq *mq); + +// nni_msgq_len returns the number of messages currently in the queue. +extern int nni_msgq_len(nni_msgq *mq); #endif // CORE_MSQUEUE_H diff --git a/src/core/options.c b/src/core/options.c index b970a0e7..24d7a9eb 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -77,7 +77,7 @@ nni_getopt_int(int *ptr, void *val, size_t *sizep) int -nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz) +nni_setopt_buf(nni_msgq *mq, const void *val, size_t sz) { int len; @@ -94,14 +94,14 @@ nni_setopt_buf(nni_msgqueue *mq, const void *val, size_t sz) // size could be quite large indeed in this case. return (NNG_EINVAL); } - return (nni_msgqueue_resize(mq, len)); + return (nni_msgq_resize(mq, len)); } int -nni_getopt_buf(nni_msgqueue *mq, void *val, size_t *sizep) +nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep) { - int len = nni_msgqueue_cap(mq); + int len = nni_msgq_cap(mq); int sz = *sizep; diff --git a/src/core/options.h b/src/core/options.h index ec5d28f1..84d958c1 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -15,10 +15,10 @@ // variable sized options. // nni_setopt_buf sets the queue size for the message queue. -extern int nni_setopt_buf(nni_msgqueue *, const void *, size_t); +extern int nni_setopt_buf(nni_msgq *, const void *, size_t); // nni_getopt_buf gets the queue size for the message queue. -extern int nni_getopt_buf(nni_msgqueue *, void *, size_t *); +extern int nni_getopt_buf(nni_msgq *, void *, size_t *); // nni_setopt_duration sets the duration. Durations must be legal, // either a positive value, 0, or -1 to indicate forever. diff --git a/src/core/protocol.h b/src/core/protocol.h index 3a391ece..6fd48b40 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -63,11 +63,11 @@ struct nni_protocol { // nni_socket_sendq obtains the upper writeq. The protocol should // recieve messages from this, and place them on the appropriate pipe. -extern nni_msgqueue *nni_socket_sendq(nni_socket *); +extern nni_msgq *nni_socket_sendq(nni_socket *); // nni_socket_recvq obtains the upper readq. The protocol should // inject incoming messages from pipes to it. -extern nni_msgqueue *nni_socket_recvq(nni_socket *); +extern nni_msgq *nni_socket_recvq(nni_socket *); // nni_socket_recv_err sets an error code to be returned to clients // rather than waiting for a message. Set it to 0 to resume normal diff --git a/src/core/socket.c b/src/core/socket.c index a8904500..c64ba5ef 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -15,14 +15,14 @@ // nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain // the upper read and write queues. -nni_msgqueue * +nni_msgq * nni_socket_sendq(nni_socket *s) { return (s->s_uwq); } -nni_msgqueue * +nni_msgq * nni_socket_recvq(nni_socket *s) { return (s->s_urq); @@ -126,15 +126,15 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) return (rv); } - if ((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) { + if ((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) { nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); } - if ((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0) { - nni_msgqueue_destroy(sock->s_uwq); + if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { + nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -143,8 +143,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) } if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { - nni_msgqueue_destroy(sock->s_urq); - nni_msgqueue_destroy(sock->s_uwq); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -192,7 +192,7 @@ nni_socket_close(nni_socket *sock) // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* // until the linger time has expired. - nni_msgqueue_drain(sock->s_uwq, linger); + nni_msgq_drain(sock->s_uwq, linger); // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be @@ -211,7 +211,7 @@ nni_socket_close(nni_socket *sock) // Close the upper read queue immediately. This can happen // safely while we hold the lock. - nni_msgqueue_close(sock->s_urq); + nni_msgq_close(sock->s_urq); // Go through and schedule close on all pipes. while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { @@ -231,8 +231,8 @@ nni_socket_close(nni_socket *sock) sock->s_ops.proto_destroy(sock->s_data); // And we need to clean up *our* state. - nni_msgqueue_destroy(sock->s_urq); - nni_msgqueue_destroy(sock->s_uwq); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); @@ -273,7 +273,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire) // backpressure, we just throw it away, and don't complain. expire = NNI_TIME_ZERO; } - rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire); + rv = nni_msgq_put_until(sock->s_uwq, msg, expire); if (besteffort && (rv == NNG_EAGAIN)) { // Pretend this worked... it didn't, but pretend. nni_msg_free(msg); @@ -301,7 +301,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire) nni_mtx_unlock(&sock->s_mx); for (;;) { - rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire); + rv = nni_msgq_get_until(sock->s_urq, &msg, expire); if (rv != 0) { return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 8d72c5e6..bd0a5c8a 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -17,8 +17,8 @@ struct nng_socket { nni_mtx s_mx; nni_cv s_cv; - nni_msgqueue * s_uwq; // Upper write queue - nni_msgqueue * s_urq; // Upper read queue + nni_msgq * s_uwq; // Upper write queue + nni_msgq * s_urq; // Upper read queue nni_protocol s_ops; diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 6ca891f0..642f2845 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -24,8 +24,8 @@ struct nni_pair_sock { nni_socket * sock; nni_pair_pipe * pipe; nni_mtx mx; - nni_msgqueue * uwq; - nni_msgqueue * urq; + nni_msgq * uwq; + nni_msgq * urq; }; // An nni_pair_pipe is our per-pipe protocol private structure. We keep @@ -118,14 +118,14 @@ nni_pair_sender(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; - nni_msgqueue *uwq = pair->uwq; - nni_msgqueue *urq = pair->urq; + nni_msgq *uwq = pair->uwq; + nni_msgq *urq = pair->urq; nni_pipe *pipe = pp->pipe; nni_msg *msg; int rv; for (;;) { - rv = nni_msgqueue_get_sig(uwq, &msg, &pp->sigclose); + rv = nni_msgq_get_sig(uwq, &msg, &pp->sigclose); if (rv != 0) { break; } @@ -135,7 +135,7 @@ nni_pair_sender(void *arg) break; } } - nni_msgqueue_signal(urq, &pp->sigclose); + nni_msgq_signal(urq, &pp->sigclose); nni_pipe_close(pipe); } @@ -145,8 +145,8 @@ nni_pair_receiver(void *arg) { nni_pair_pipe *pp = arg; nni_pair_sock *pair = pp->pair; - nni_msgqueue *urq = pair->urq; - nni_msgqueue *uwq = pair->uwq; + nni_msgq *urq = pair->urq; + nni_msgq *uwq = pair->uwq; nni_pipe *pipe = pp->pipe; nni_msg *msg; int rv; @@ -156,13 +156,13 @@ nni_pair_receiver(void *arg) if (rv != 0) { break; } - rv = nni_msgqueue_put_sig(urq, msg, &pp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &pp->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgqueue_signal(uwq, &pp->sigclose); + nni_msgq_signal(uwq, &pp->sigclose); nni_pipe_close(pipe); } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 53007e12..9323428c 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -23,8 +23,8 @@ typedef struct nni_rep_sock nni_rep_sock; struct nni_rep_sock { nni_socket * sock; nni_mtx mx; - nni_msgqueue * uwq; - nni_msgqueue * urq; + nni_msgq * uwq; + nni_msgq * urq; int raw; int ttl; nni_thr sender; @@ -37,7 +37,7 @@ struct nni_rep_sock { struct nni_rep_pipe { nni_pipe * pipe; nni_rep_sock * rep; - nni_msgqueue * sendq; + nni_msgq * sendq; int sigclose; }; @@ -111,14 +111,14 @@ nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) rp->pipe = pipe; rp->sigclose = 0; - rv = nni_msgqueue_create(&rp->sendq, 2); + rv = nni_msgq_init(&rp->sendq, 2); if (rv != 0) { return (rv); } nni_mtx_lock(&rep->mx); if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { - nni_msgqueue_destroy(rp->sendq); + nni_msgq_fini(rp->sendq); nni_mtx_unlock(&rep->mx); return (rv); } @@ -136,7 +136,7 @@ nni_rep_rem_pipe(void *arg, void *data) nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); nni_mtx_unlock(&rep->mx); - nni_msgqueue_destroy(rp->sendq); + nni_msgq_fini(rp->sendq); } @@ -148,8 +148,8 @@ static void nni_rep_topsender(void *arg) { nni_rep_sock *rep = arg; - nni_msgqueue *uwq = rep->uwq; - nni_msgqueue *urq = rep->urq; + nni_msgq *uwq = rep->uwq; + nni_msgq *urq = rep->urq; nni_msg *msg; for (;;) { @@ -159,7 +159,7 @@ nni_rep_topsender(void *arg) nni_rep_pipe *rp; int rv; - if ((rv = nni_msgqueue_get(uwq, &msg)) != 0) { + if ((rv = nni_msgq_get(uwq, &msg)) != 0) { break; } // We yank the outgoing pipe id from the header @@ -184,7 +184,7 @@ nni_rep_topsender(void *arg) continue; } // Try a non-blocking put to the lower writer. - rv = nni_msgqueue_put_until(rp->sendq, msg, NNI_TIME_ZERO); + rv = nni_msgq_put_until(rp->sendq, msg, NNI_TIME_ZERO); if (rv != 0) { // message queue is full, we have no choice but // to drop it. This should not happen under normal @@ -201,8 +201,8 @@ nni_rep_sender(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgqueue *urq = rep->urq; - nni_msgqueue *wq = rp->sendq; + nni_msgq *urq = rep->urq; + nni_msgq *wq = rp->sendq; nni_pipe *pipe = rp->pipe; nni_msg *msg; uint8_t *body; @@ -210,7 +210,7 @@ nni_rep_sender(void *arg) int rv; for (;;) { - rv = nni_msgqueue_get_sig(wq, &msg, &rp->sigclose); + rv = nni_msgq_get_sig(wq, &msg, &rp->sigclose); if (rv != 0) { break; } @@ -221,7 +221,7 @@ nni_rep_sender(void *arg) break; } } - nni_msgqueue_signal(urq, &rp->sigclose); + nni_msgq_signal(urq, &rp->sigclose); nni_pipe_close(pipe); } @@ -231,8 +231,8 @@ nni_rep_receiver(void *arg) { nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgqueue *urq = rep->urq; - nni_msgqueue *uwq = rep->uwq; + nni_msgq *urq = rep->urq; + nni_msgq *uwq = rep->uwq; nni_pipe *pipe = rp->pipe; nni_msg *msg; int rv; @@ -288,13 +288,13 @@ again: } // Now send it up. - rv = nni_msgqueue_put_sig(urq, msg, &rp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &rp->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgqueue_signal(uwq, &rp->sigclose); + nni_msgq_signal(uwq, &rp->sigclose); nni_pipe_close(pipe); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 6c8f8543..35df0e56 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -24,8 +24,8 @@ struct nni_req_sock { nni_socket * sock; nni_mtx mx; nni_cv cv; - nni_msgqueue * uwq; - nni_msgqueue * urq; + nni_msgq * uwq; + nni_msgq * urq; nni_duration retry; nni_time resend; nni_thr resender; @@ -146,14 +146,14 @@ nni_req_sender(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_msgqueue *uwq = req->uwq; - nni_msgqueue *urq = req->urq; + nni_msgq *uwq = req->uwq; + nni_msgq *urq = req->urq; nni_pipe *pipe = rp->pipe; nni_msg *msg; int rv; for (;;) { - rv = nni_msgqueue_get_sig(uwq, &msg, &rp->sigclose); + rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose); if (rv != 0) { break; } @@ -163,7 +163,7 @@ nni_req_sender(void *arg) break; } } - nni_msgqueue_signal(urq, &rp->sigclose); + nni_msgq_signal(urq, &rp->sigclose); nni_pipe_close(pipe); } @@ -173,8 +173,8 @@ nni_req_receiver(void *arg) { nni_req_pipe *rp = arg; nni_req_sock *req = rp->req; - nni_msgqueue *urq = req->urq; - nni_msgqueue *uwq = req->uwq; + nni_msgq *urq = req->urq; + nni_msgq *uwq = req->uwq; nni_pipe *pipe = rp->pipe; nni_msg *msg; int rv; @@ -202,13 +202,13 @@ nni_req_receiver(void *arg) // This should never happen - could be an assert. nni_panic("Failed to trim REQ header from body"); } - rv = nni_msgqueue_put_sig(urq, msg, &rp->sigclose); + rv = nni_msgq_put_sig(urq, msg, &rp->sigclose); if (rv != 0) { nni_msg_free(msg); break; } } - nni_msgqueue_signal(uwq, &rp->sigclose); + nni_msgq_signal(uwq, &rp->sigclose); nni_pipe_close(pipe); } @@ -283,7 +283,7 @@ nni_req_resender(void *arg) nni_msg *dup; // XXX: check for final timeout on this? if (nni_msg_dup(&dup, req->reqmsg) != 0) { - if (nni_msgqueue_putback(req->uwq, dup) != 0) { + if (nni_msgq_putback(req->uwq, dup) != 0) { nni_msg_free(dup); } } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 27cf47cb..688518b3 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -30,8 +30,8 @@ typedef struct { struct nni_inproc_pipe { const char * addr; nni_inproc_pair * pair; - nni_msgqueue * rq; - nni_msgqueue * wq; + nni_msgq * rq; + nni_msgq * wq; uint16_t peer; }; @@ -40,7 +40,7 @@ struct nni_inproc_pipe { struct nni_inproc_pair { nni_mtx mx; int refcnt; - nni_msgqueue * q[2]; + nni_msgq * q[2]; nni_inproc_pipe pipe[2]; char addr[NNG_MAXADDRLEN+1]; }; @@ -91,8 +91,8 @@ nni_inproc_pipe_close(void *arg) { nni_inproc_pipe *pipe = arg; - nni_msgqueue_close(pipe->rq); - nni_msgqueue_close(pipe->wq); + nni_msgq_close(pipe->rq); + nni_msgq_close(pipe->wq); } @@ -101,12 +101,8 @@ nni_inproc_pipe_close(void *arg) static void nni_inproc_pair_destroy(nni_inproc_pair *pair) { - if (pair->q[0]) { - nni_msgqueue_destroy(pair->q[0]); - } - if (pair->q[1]) { - nni_msgqueue_destroy(pair->q[1]); - } + nni_msgq_fini(pair->q[0]); + nni_msgq_fini(pair->q[1]); nni_mtx_fini(&pair->mx); NNI_FREE_STRUCT(pair); } @@ -137,7 +133,7 @@ nni_inproc_pipe_send(void *arg, nni_msg *msg) { nni_inproc_pipe *pipe = arg; - return (nni_msgqueue_put(pipe->wq, msg)); + return (nni_msgq_put(pipe->wq, msg)); } @@ -146,7 +142,7 @@ nni_inproc_pipe_recv(void *arg, nni_msg **msgp) { nni_inproc_pipe *pipe = arg; - return (nni_msgqueue_get(pipe->rq, msgp)); + return (nni_msgq_get(pipe->rq, msgp)); } @@ -354,8 +350,8 @@ nni_inproc_ep_accept(void *arg, void **pipep) NNI_FREE_STRUCT(pair); return (rv); } - if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) || - ((rv = nni_msgqueue_create(&pair->q[1], 4)) != 0)) { + if (((rv = nni_msgq_init(&pair->q[0], 4)) != 0) || + ((rv = nni_msgq_init(&pair->q[1], 4)) != 0)) { nni_inproc_pair_destroy(pair); return (rv); } |
