aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-10-20 17:03:12 -0700
committerGarrett D'Amore <garrett@damore.org>2017-10-23 16:14:53 -0700
commit3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch)
treea45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/protocol/survey
parentfdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff)
downloadnng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2
nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/protocol/survey')
-rw-r--r--src/protocol/survey/respond.c66
-rw-r--r--src/protocol/survey/survey.c64
2 files changed, 87 insertions, 43 deletions
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index fcb067b0..4c3ea8e3 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -77,6 +77,7 @@ resp_sock_init(void **sp, nni_sock *nsock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
resp_sock_fini(s);
@@ -91,10 +92,7 @@ resp_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->uwq = nni_sock_sendq(nsock);
- nni_mtx_init(&s->mtx);
-
*sp = s;
- nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
}
@@ -349,9 +347,9 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
resp_sock *s = arg;
int rv;
- if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
- nni_sock_senderr(s->nsock, s->raw ? 0 : NNG_ESTATE);
- }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ nni_mtx_unlock(&s->mtx);
return (rv);
}
@@ -376,54 +374,62 @@ resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->ttl, buf, szp));
}
-static nni_msg *
-resp_sock_sfilter(void *arg, nni_msg *msg)
+static void
+resp_sock_send(void *arg, nni_aio *aio)
{
resp_sock *s = arg;
+ nni_msg * msg;
+ int rv;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
- return (msg);
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
}
- // Cannot send again until a receive is done...
- nni_sock_senderr(s->nsock, NNG_ESTATE);
+ msg = nni_aio_get_msg(aio);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
if (s->btrace == NULL) {
- nni_msg_free(msg);
- return (NULL);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
}
// drop anything else in the header...
nni_msg_header_clear(msg);
- if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
nni_free(s->btrace, s->btrace_len);
s->btrace = NULL;
s->btrace_len = 0;
- return (msg);
+
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
}
static nni_msg *
-resp_sock_rfilter(void *arg, nni_msg *msg)
+resp_sock_filter(void *arg, nni_msg *msg)
{
resp_sock *s = arg;
char * header;
size_t len;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
- nni_sock_senderr(s->nsock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (s->btrace != NULL) {
@@ -432,15 +438,26 @@ resp_sock_rfilter(void *arg, nni_msg *msg)
s->btrace_len = 0;
}
if ((s->btrace = nni_alloc(len)) == NULL) {
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
s->btrace_len = len;
memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
+static void
+resp_sock_recv(void *arg, nni_aio *aio)
+{
+ resp_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops resp_pipe_ops = {
.pipe_init = resp_pipe_init,
.pipe_fini = resp_pipe_fini,
@@ -468,9 +485,10 @@ static nni_proto_sock_ops resp_sock_ops = {
.sock_fini = resp_sock_fini,
.sock_open = resp_sock_open,
.sock_close = resp_sock_close,
+ .sock_filter = resp_sock_filter,
+ .sock_send = resp_sock_send,
+ .sock_recv = resp_sock_recv,
.sock_options = resp_sock_options,
- .sock_rfilter = resp_sock_rfilter,
- .sock_sfilter = resp_sock_sfilter,
};
static nni_proto resp_proto = {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 1205402e..1c15054f 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -92,7 +92,6 @@ surv_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
*sp = s;
- nni_sock_recverr(nsock, NNG_ESTATE);
return (0);
}
@@ -273,11 +272,12 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
surv_sock *s = arg;
int rv;
+ nni_mtx_lock(&s->mtx);
if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
- nni_sock_recverr(s->nsock, s->raw ? 0 : NNG_ESTATE);
s->survid = 0;
nni_timer_cancel(&s->timer);
}
+ nni_mtx_unlock(&s->mtx);
return (rv);
}
@@ -344,22 +344,43 @@ surv_timeout(void *arg)
{
surv_sock *s = arg;
- nni_sock_lock(s->nsock);
+ nni_mtx_lock(&s->mtx);
s->survid = 0;
- nni_sock_recverr(s->nsock, NNG_ESTATE);
+ nni_mtx_unlock(&s->mtx);
nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT);
- nni_sock_unlock(s->nsock);
}
-static nni_msg *
-surv_sock_sfilter(void *arg, nni_msg *msg)
+static void
+surv_sock_recv(void *arg, nni_aio *aio)
+{
+ surv_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (s->survid == 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+static void
+surv_sock_send(void *arg, nni_aio *aio)
{
surv_sock *s = arg;
+ nni_msg * msg;
+ int rv;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
- return (msg);
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
}
// Generate a new request ID. We always set the high
@@ -367,10 +388,12 @@ surv_sock_sfilter(void *arg, nni_msg *msg)
// backtrace. (Pipe IDs have the high order bit clear.)
s->survid = (s->nextid++) | 0x80000000u;
- if (nni_msg_header_append_u32(msg, s->survid) != 0) {
- // Should be ENOMEM.
- nni_msg_free(msg);
- return (NULL);
+ msg = nni_aio_get_msg(aio);
+ nni_msg_header_clear(msg);
+ if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
// If another message is there, this cancels it. We move the
@@ -379,20 +402,21 @@ surv_sock_sfilter(void *arg, nni_msg *msg)
s->expire = nni_clock() + s->survtime;
nni_timer_schedule(&s->timer, s->expire);
- // Clear the error condition.
- nni_sock_recverr(s->nsock, 0);
- // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
+ nni_mtx_unlock(&s->mtx);
- return (msg);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
}
static nni_msg *
-surv_sock_rfilter(void *arg, nni_msg *msg)
+surv_sock_filter(void *arg, nni_msg *msg)
{
surv_sock *s = arg;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
// Pass it unmolested
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
@@ -402,6 +426,7 @@ surv_sock_rfilter(void *arg, nni_msg *msg)
nni_msg_free(msg);
return (NULL);
}
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
@@ -433,9 +458,10 @@ static nni_proto_sock_ops surv_sock_ops = {
.sock_fini = surv_sock_fini,
.sock_open = surv_sock_open,
.sock_close = surv_sock_close,
+ .sock_send = surv_sock_send,
+ .sock_recv = surv_sock_recv,
+ .sock_filter = surv_sock_filter,
.sock_options = surv_sock_options,
- .sock_rfilter = surv_sock_rfilter,
- .sock_sfilter = surv_sock_sfilter,
};
static nni_proto surv_proto = {