aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-27 20:46:32 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-27 20:46:32 -0800
commit123e1439a284716c651eca037b95ba997e6c30db (patch)
treefc44cecd5fcb0e0f01532ac8e1ddf4d9499659f6 /src
parent9c8576a3f0357e611c0892dfe1d0d4c2afe73bf2 (diff)
downloadnng-123e1439a284716c651eca037b95ba997e6c30db.tar.gz
nng-123e1439a284716c651eca037b95ba997e6c30db.tar.bz2
nng-123e1439a284716c651eca037b95ba997e6c30db.zip
Send and receive now work.
This fixes a few core issues, and improves readability for the message queue code as well. inproc delivery of messages works now.
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c18
-rw-r--r--src/core/msgqueue.c70
-rw-r--r--src/core/socket.c7
-rw-r--r--src/nng.c7
4 files changed, 61 insertions, 41 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 24d92206..bb3ba268 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -197,7 +197,10 @@ nni_dialer(void *arg)
nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
// We need a different condvar...
- nni_cond_waituntil(&ep->ep_cv, cooldown);
+ rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
}
nni_mutex_exit(&ep->ep_mx);
}
@@ -312,8 +315,11 @@ nni_listener(void *arg)
cooldown = 10000;
cooldown += nni_clock();
while (!ep->ep_close) {
- nni_cond_waituntil(&ep->ep_cv,
+ rv = nni_cond_waituntil(&ep->ep_cv,
cooldown);
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
}
}
}
@@ -338,9 +344,13 @@ nni_listener(void *arg)
// time for the system to reclaim resources.
cooldown = 100000; // 100ms
}
+ cooldown += nni_clock();
nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- nni_cond_waituntil(&ep->ep_cv, cooldown);
+ while (!ep->ep_close) {
+ rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
}
nni_mutex_exit(&ep->ep_mx);
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index efa0bb72..41127c50 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -26,6 +26,7 @@ struct nni_msgqueue {
int mq_put;
int mq_closed;
int mq_rwait; // readers waiting (unbuffered)
+ int mq_wwait;
nni_msg ** mq_msgs;
};
@@ -136,6 +137,8 @@ int
nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
nni_time expire, nni_signal *signal)
{
+ int rv;
+
nni_mutex_enter(&mq->mq_lock);
for (;;) {
@@ -151,7 +154,9 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
// unbuffered, room for one, and a reader waiting?
- if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) {
+ if (mq->mq_rwait &&
+ (mq->mq_cap == 0) &&
+ (mq->mq_len == mq->mq_cap)) {
break;
}
@@ -167,14 +172,14 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
return (NNG_EAGAIN);
}
- // timedout?
- if (expire <= nni_clock()) {
+ // not writeable, so wait until something changes
+ mq->mq_wwait++;
+ rv = nni_cond_waituntil(&mq->mq_writeable, expire);
+ mq->mq_wwait--;
+ if (rv == NNG_ETIMEDOUT) {
nni_mutex_exit(&mq->mq_lock);
return (NNG_ETIMEDOUT);
}
-
- // not writeable, so wait until something changes
- (void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
// Writeable! Yay!!
@@ -185,8 +190,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
mq->mq_put = 0;
}
mq->mq_len++;
- if (mq->mq_len == 1) {
- nni_cond_signal(&mq->mq_readable);
+ if (mq->mq_rwait) {
+ nni_cond_broadcast(&mq->mq_readable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -197,49 +202,50 @@ static int
nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
nni_time expire, nni_signal *signal)
{
+ int rv;
+
nni_mutex_enter(&mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
- if (expire <= nni_clock()) {
- nni_mutex_exit(&mq->mq_lock);
- if (expire == NNI_TIME_ZERO) {
- return (NNG_EAGAIN);
- }
- return (NNG_ETIMEDOUT);
- }
- mq->mq_rwait++;
- if (mq->mq_cap == 0) {
- // If a writer is blocked, unblock him.
- nni_cond_signal(&mq->mq_writeable);
+ for (;;) {
+ // always prefer to deliver data if its there
+ if (mq->mq_len != 0) {
+ break;
}
- (void) nni_cond_waituntil(&mq->mq_readable, expire);
- mq->mq_rwait--;
- }
-
- // If there is any data left in the message queue, we will still
- // provide it, so that the reader can drain.
- if (mq->mq_len == 0) {
if (mq->mq_closed) {
- nni_cond_signal(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
-
+ if (expire == NNI_TIME_ZERO) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
if (*signal) {
- // We are being interrupted.
nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
+ if ((mq->mq_cap == 0) & (mq->mq_wwait)) {
+ // let a write waiter know we are ready
+ nni_cond_broadcast(&mq->mq_writeable);
+ }
+ mq->mq_rwait++;
+ rv = nni_cond_waituntil(&mq->mq_readable, expire);
+ mq->mq_rwait--;
+ if (rv == NNG_ETIMEDOUT) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
}
+ // Readable! Yay!!
+
*msgp = mq->mq_msgs[mq->mq_get];
mq->mq_len--;
mq->mq_get++;
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
- if (mq->mq_len == (mq->mq_cap - 1)) {
- nni_cond_signal(&mq->mq_writeable);
+ if (mq->mq_wwait) {
+ nni_cond_broadcast(&mq->mq_writeable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
diff --git a/src/core/socket.c b/src/core/socket.c
index 91c3c03e..aabcab7c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -61,7 +61,8 @@ nni_reaper(void *arg)
// If pipe was a connected (dialer) pipe,
// then let the endpoint know so it can try to
// reestablish the connection.
- if ((ep = pipe->p_ep) != NULL) {
+ if (((ep = pipe->p_ep) != NULL) &&
+ ((ep->ep_pipe == pipe))) {
ep->ep_pipe = NULL;
pipe->p_ep = NULL;
nni_mutex_enter(&ep->ep_mx);
@@ -318,7 +319,9 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
+ if (sock->s_ops.proto_recv_filter != NULL) {
+ msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
+ }
if (msg != NULL) {
break;
}
diff --git a/src/nng.c b/src/nng.c
index fb2d0861..49671935 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -63,13 +63,13 @@ nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags)
nni_time expire;
NNI_INIT_INT();
-
if ((flags == NNG_FLAG_NONBLOCK) || (s->s_rcvtimeo == 0)) {
expire = NNI_TIME_ZERO;
} else if (s->s_rcvtimeo < 0) {
expire = NNI_TIME_NEVER;
} else {
- expire = nni_clock() + s->s_rcvtimeo;
+ expire = nni_clock();
+ expire += s->s_rcvtimeo;
}
return (nni_socket_recvmsg(s, msgp, expire));
@@ -88,7 +88,8 @@ nng_sendmsg(nng_socket *s, nng_msg *msg, int flags)
} else if (s->s_sndtimeo < 0) {
expire = NNI_TIME_NEVER;
} else {
- expire = nni_clock() + s->s_sndtimeo;
+ expire = nni_clock();
+ expire += s->s_sndtimeo;
}
return (nni_socket_sendmsg(s, msg, expire));