aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-27 20:46:32 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-27 20:46:32 -0800
commit123e1439a284716c651eca037b95ba997e6c30db (patch)
treefc44cecd5fcb0e0f01532ac8e1ddf4d9499659f6
parent9c8576a3f0357e611c0892dfe1d0d4c2afe73bf2 (diff)
downloadnng-123e1439a284716c651eca037b95ba997e6c30db.tar.gz
nng-123e1439a284716c651eca037b95ba997e6c30db.tar.bz2
nng-123e1439a284716c651eca037b95ba997e6c30db.zip
Send and receive now work.
This fixes a few core issues, and improves readability for the message queue code as well. inproc delivery of messages works now.
-rw-r--r--src/core/endpt.c18
-rw-r--r--src/core/msgqueue.c70
-rw-r--r--src/core/socket.c7
-rw-r--r--src/nng.c7
-rw-r--r--tests/sock.c49
5 files changed, 107 insertions, 44 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 24d92206..bb3ba268 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -197,7 +197,10 @@ nni_dialer(void *arg)
nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
// We need a different condvar...
- nni_cond_waituntil(&ep->ep_cv, cooldown);
+ rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
}
nni_mutex_exit(&ep->ep_mx);
}
@@ -312,8 +315,11 @@ nni_listener(void *arg)
cooldown = 10000;
cooldown += nni_clock();
while (!ep->ep_close) {
- nni_cond_waituntil(&ep->ep_cv,
+ rv = nni_cond_waituntil(&ep->ep_cv,
cooldown);
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
}
}
}
@@ -338,9 +344,13 @@ nni_listener(void *arg)
// time for the system to reclaim resources.
cooldown = 100000; // 100ms
}
+ cooldown += nni_clock();
nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- nni_cond_waituntil(&ep->ep_cv, cooldown);
+ while (!ep->ep_close) {
+ rv = nni_cond_waituntil(&ep->ep_cv, cooldown);
+ if (rv == NNG_ETIMEDOUT) {
+ break;
+ }
}
nni_mutex_exit(&ep->ep_mx);
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index efa0bb72..41127c50 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -26,6 +26,7 @@ struct nni_msgqueue {
int mq_put;
int mq_closed;
int mq_rwait; // readers waiting (unbuffered)
+ int mq_wwait;
nni_msg ** mq_msgs;
};
@@ -136,6 +137,8 @@ int
nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
nni_time expire, nni_signal *signal)
{
+ int rv;
+
nni_mutex_enter(&mq->mq_lock);
for (;;) {
@@ -151,7 +154,9 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
// unbuffered, room for one, and a reader waiting?
- if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) {
+ if (mq->mq_rwait &&
+ (mq->mq_cap == 0) &&
+ (mq->mq_len == mq->mq_cap)) {
break;
}
@@ -167,14 +172,14 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
return (NNG_EAGAIN);
}
- // timedout?
- if (expire <= nni_clock()) {
+ // not writeable, so wait until something changes
+ mq->mq_wwait++;
+ rv = nni_cond_waituntil(&mq->mq_writeable, expire);
+ mq->mq_wwait--;
+ if (rv == NNG_ETIMEDOUT) {
nni_mutex_exit(&mq->mq_lock);
return (NNG_ETIMEDOUT);
}
-
- // not writeable, so wait until something changes
- (void) nni_cond_waituntil(&mq->mq_writeable, expire);
}
// Writeable! Yay!!
@@ -185,8 +190,8 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
mq->mq_put = 0;
}
mq->mq_len++;
- if (mq->mq_len == 1) {
- nni_cond_signal(&mq->mq_readable);
+ if (mq->mq_rwait) {
+ nni_cond_broadcast(&mq->mq_readable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -197,49 +202,50 @@ static int
nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
nni_time expire, nni_signal *signal)
{
+ int rv;
+
nni_mutex_enter(&mq->mq_lock);
- while ((!mq->mq_closed) && (mq->mq_len == 0) && (*signal == 0)) {
- if (expire <= nni_clock()) {
- nni_mutex_exit(&mq->mq_lock);
- if (expire == NNI_TIME_ZERO) {
- return (NNG_EAGAIN);
- }
- return (NNG_ETIMEDOUT);
- }
- mq->mq_rwait++;
- if (mq->mq_cap == 0) {
- // If a writer is blocked, unblock him.
- nni_cond_signal(&mq->mq_writeable);
+ for (;;) {
+ // always prefer to deliver data if its there
+ if (mq->mq_len != 0) {
+ break;
}
- (void) nni_cond_waituntil(&mq->mq_readable, expire);
- mq->mq_rwait--;
- }
-
- // If there is any data left in the message queue, we will still
- // provide it, so that the reader can drain.
- if (mq->mq_len == 0) {
if (mq->mq_closed) {
- nni_cond_signal(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
-
+ if (expire == NNI_TIME_ZERO) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_EAGAIN);
+ }
if (*signal) {
- // We are being interrupted.
nni_mutex_exit(&mq->mq_lock);
return (NNG_EINTR);
}
+ if ((mq->mq_cap == 0) & (mq->mq_wwait)) {
+ // let a write waiter know we are ready
+ nni_cond_broadcast(&mq->mq_writeable);
+ }
+ mq->mq_rwait++;
+ rv = nni_cond_waituntil(&mq->mq_readable, expire);
+ mq->mq_rwait--;
+ if (rv == NNG_ETIMEDOUT) {
+ nni_mutex_exit(&mq->mq_lock);
+ return (NNG_ETIMEDOUT);
+ }
}
+ // Readable! Yay!!
+
*msgp = mq->mq_msgs[mq->mq_get];
mq->mq_len--;
mq->mq_get++;
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
- if (mq->mq_len == (mq->mq_cap - 1)) {
- nni_cond_signal(&mq->mq_writeable);
+ if (mq->mq_wwait) {
+ nni_cond_broadcast(&mq->mq_writeable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
diff --git a/src/core/socket.c b/src/core/socket.c
index 91c3c03e..aabcab7c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -61,7 +61,8 @@ nni_reaper(void *arg)
// If pipe was a connected (dialer) pipe,
// then let the endpoint know so it can try to
// reestablish the connection.
- if ((ep = pipe->p_ep) != NULL) {
+ if (((ep = pipe->p_ep) != NULL) &&
+ ((ep->ep_pipe == pipe))) {
ep->ep_pipe = NULL;
pipe->p_ep = NULL;
nni_mutex_enter(&ep->ep_mx);
@@ -318,7 +319,9 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
+ if (sock->s_ops.proto_recv_filter != NULL) {
+ msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
+ }
if (msg != NULL) {
break;
}
diff --git a/src/nng.c b/src/nng.c
index fb2d0861..49671935 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -63,13 +63,13 @@ nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags)
nni_time expire;
NNI_INIT_INT();
-
if ((flags == NNG_FLAG_NONBLOCK) || (s->s_rcvtimeo == 0)) {
expire = NNI_TIME_ZERO;
} else if (s->s_rcvtimeo < 0) {
expire = NNI_TIME_NEVER;
} else {
- expire = nni_clock() + s->s_rcvtimeo;
+ expire = nni_clock();
+ expire += s->s_rcvtimeo;
}
return (nni_socket_recvmsg(s, msgp, expire));
@@ -88,7 +88,8 @@ nng_sendmsg(nng_socket *s, nng_msg *msg, int flags)
} else if (s->s_sndtimeo < 0) {
expire = NNI_TIME_NEVER;
} else {
- expire = nni_clock() + s->s_sndtimeo;
+ expire = nni_clock();
+ expire += s->s_sndtimeo;
}
return (nni_socket_sendmsg(s, msg, expire));
diff --git a/tests/sock.c b/tests/sock.c
index 7ba4b3c4..929e73c8 100644
--- a/tests/sock.c
+++ b/tests/sock.c
@@ -10,6 +10,8 @@
#include "convey.h"
#include "nng.h"
+#include <string.h>
+
TestMain("Socket Operations", {
Convey("We are able to open a PAIR socket", {
int rv;
@@ -47,8 +49,8 @@ TestMain("Socket Operations", {
rv = nng_recvmsg(sock, &msg, 0);
So(rv == NNG_ETIMEDOUT);
So(msg == NULL);
- So(nni_clock() > (now + 500000));
- So(nni_clock() < (now + 1000000));
+ So(nni_clock() >= (now + when));
+ So(nni_clock() < (now + (when * 2)));
})
Convey("Recv nonblock with no pipes gives EAGAIN", {
@@ -143,6 +145,11 @@ TestMain("Socket Operations", {
Convey("We can send and receive messages", {
nng_socket *sock2 = NULL;
int len = 1;
+ nng_msg *msg;
+ size_t sz;
+ char *ptr;
+ uint64_t second = 1000000;
+
rv = nng_open(&sock2, NNG_PROTO_PAIR);
So(rv == 0);
@@ -155,6 +162,42 @@ TestMain("Socket Operations", {
So(rv == 0);
rv = nng_setopt(sock2, NNG_OPT_SNDBUF, &len, sizeof (len));
So(rv == 0);
+
+ rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &second, sizeof (second));
+ So(rv == 0);
+ rv = nng_setopt(sock, NNG_OPT_RCVTIMEO, &second, sizeof (second));
+ So(rv == 0);
+ rv = nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second, sizeof (second));
+ So(rv == 0);
+ rv = nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second, sizeof (second));
+ So(rv == 0);
+
+ rv = nng_listen(sock, "inproc://test1", NULL, NNG_FLAG_SYNCH);
+ So(rv == 0);
+ rv = nng_dial(sock2, "inproc://test1", NULL, 0);
+ So(rv == 0);
+
+ rv = nng_msg_alloc(&msg, 3);
+ So(rv == 0);
+ ptr = nng_msg_body(msg, &sz);
+ So(ptr != NULL);
+ So(sz == 3);
+
+ memcpy(ptr, "abc", 3);
+
+ rv = nng_sendmsg(sock, msg, 0);
+ So(rv == 0);
+
+ msg = NULL;
+ rv = nng_recvmsg(sock2, &msg, 0);
+ So(rv == 0);
+ So(msg != NULL);
+ ptr = nng_msg_body(msg, &sz);
+ So(ptr != NULL);
+ So(sz == 3);
+ So(memcmp(ptr, "abc", 3) == 0);
+ nng_msg_free(msg);
+ nng_close(sock2);
})
})
-}) \ No newline at end of file
+})