diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-27 20:46:32 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-27 20:46:32 -0800 |
| commit | 123e1439a284716c651eca037b95ba997e6c30db (patch) | |
| tree | fc44cecd5fcb0e0f01532ac8e1ddf4d9499659f6 /src/core | |
| parent | 9c8576a3f0357e611c0892dfe1d0d4c2afe73bf2 (diff) | |
| download | nng-123e1439a284716c651eca037b95ba997e6c30db.tar.gz nng-123e1439a284716c651eca037b95ba997e6c30db.tar.bz2 nng-123e1439a284716c651eca037b95ba997e6c30db.zip | |
Send and receive now work.
This fixes a few core issues, and improves readability for the
message queue code as well. inproc delivery of messages works
now.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 18 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 70 | ||||
| -rw-r--r-- | src/core/socket.c | 7 |
3 files changed, 57 insertions, 38 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 24d92206..bb3ba268 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -197,7 +197,10 @@ nni_dialer(void *arg) nni_mutex_enter(&ep->ep_mx); while (!ep->ep_close) { // We need a different condvar... - nni_cond_waituntil(&ep->ep_cv, cooldown); + rv = nni_cond_waituntil(&ep->ep_cv, cooldown); + if (rv == NNG_ETIMEDOUT) { + break; + } } nni_mutex_exit(&ep->ep_mx); } @@ -312,8 +315,11 @@ nni_listener(void *arg) cooldown = 10000; cooldown += nni_clock(); while (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, + rv = nni_cond_waituntil(&ep->ep_cv, cooldown); + if (rv == NNG_ETIMEDOUT) { + break; + } } } } @@ -338,9 +344,13 @@ nni_listener(void *arg) // time for the system to reclaim resources. cooldown = 100000; // 100ms } + cooldown += nni_clock(); nni_mutex_enter(&ep->ep_mx); - if (!ep->ep_close) { - nni_cond_waituntil(&ep->ep_cv, cooldown); + while (!ep->ep_close) { + rv = nni_cond_waituntil(&ep->ep_cv, cooldown); + if (rv == NNG_ETIMEDOUT) { + break; + } } nni_mutex_exit(&ep->ep_mx); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index efa0bb72..41127c50 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -26,6 +26,7 @@ struct nni_msgqueue { int mq_put; int mq_closed; int mq_rwait; // readers waiting (unbuffered) + int mq_wwait; nni_msg ** mq_msgs; }; @@ -136,6 +137,8 @@ int nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, nni_time expire, nni_signal *signal) { + int rv; + nni_mutex_enter(&mq->mq_lock); for (;;) { @@ -151,7 +154,9 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } // unbuffered, room for one, and a reader waiting? - if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) { + if (mq->mq_rwait && + (mq->mq_cap == 0) && + (mq->mq_len == mq->mq_cap)) { break; } @@ -167,14 +172,14 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, return (NNG_EAGAIN); } - // timedout? - if (expire <= nni_clock()) { + // not writeable, so wait until something changes + mq->mq_wwait++; + rv = nni_cond_waituntil(&mq->mq_writeable, expire); + mq->mq_wwait--; + if (rv == NNG_ETIMEDOUT) { nni_mutex_exit(&mq->mq_lock); return (NNG_ETIMEDOUT); } - - // not writeable, so wait until something changes - (void) nni_cond_waituntil(&mq->mq_writeable, expire); } // Writeable! Yay!! @@ -185,8 +190,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, mq->mq_put = 0; } mq->mq_len++; - if (mq->mq_len == 1) { - nni_cond_signal(&mq->mq_readable); + if (mq->mq_rwait) { + nni_cond_broadcast(&mq->mq_readable); } nni_mutex_exit(&mq->mq_lock); return (0); @@ -197,49 +202,50 @@ static int nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, nni_time expire, nni_signal *signal) { + int rv; + nni_mutex_enter(&mq->mq_lock); - while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) { - if (expire <= nni_clock()) { - nni_mutex_exit(&mq->mq_lock); - if (expire == NNI_TIME_ZERO) { - return (NNG_EAGAIN); - } - return (NNG_ETIMEDOUT); - } - mq->mq_rwait++; - if (mq->mq_cap == 0) { - // If a writer is blocked, unblock him. - nni_cond_signal(&mq->mq_writeable); + for (;;) { + // always prefer to deliver data if its there + if (mq->mq_len != 0) { + break; } - (void) nni_cond_waituntil(&mq->mq_readable, expire); - mq->mq_rwait--; - } - - // If there is any data left in the message queue, we will still - // provide it, so that the reader can drain. - if (mq->mq_len == 0) { if (mq->mq_closed) { - nni_cond_signal(&mq->mq_drained); nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } - + if (expire == NNI_TIME_ZERO) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_EAGAIN); + } if (*signal) { - // We are being interrupted. nni_mutex_exit(&mq->mq_lock); return (NNG_EINTR); } + if ((mq->mq_cap == 0) & (mq->mq_wwait)) { + // let a write waiter know we are ready + nni_cond_broadcast(&mq->mq_writeable); + } + mq->mq_rwait++; + rv = nni_cond_waituntil(&mq->mq_readable, expire); + mq->mq_rwait--; + if (rv == NNG_ETIMEDOUT) { + nni_mutex_exit(&mq->mq_lock); + return (NNG_ETIMEDOUT); + } } + // Readable! Yay!! + *msgp = mq->mq_msgs[mq->mq_get]; mq->mq_len--; mq->mq_get++; if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } - if (mq->mq_len == (mq->mq_cap - 1)) { - nni_cond_signal(&mq->mq_writeable); + if (mq->mq_wwait) { + nni_cond_broadcast(&mq->mq_writeable); } nni_mutex_exit(&mq->mq_lock); return (0); diff --git a/src/core/socket.c b/src/core/socket.c index 91c3c03e..aabcab7c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -61,7 +61,8 @@ nni_reaper(void *arg) // If pipe was a connected (dialer) pipe, // then let the endpoint know so it can try to // reestablish the connection. - if ((ep = pipe->p_ep) != NULL) { + if (((ep = pipe->p_ep) != NULL) && + ((ep->ep_pipe == pipe))) { ep->ep_pipe = NULL; pipe->p_ep = NULL; nni_mutex_enter(&ep->ep_mx); @@ -318,7 +319,9 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); + if (sock->s_ops.proto_recv_filter != NULL) { + msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); + } if (msg != NULL) { break; } |
