summaryrefslogtreecommitdiff
path: root/src/protocol/reqrep/rep.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/reqrep/rep.c')
-rw-r--r--src/protocol/reqrep/rep.c109
1 files changed, 65 insertions, 44 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index cd9411d9..510beab3 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -32,6 +32,7 @@ struct rep_sock {
nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
+ nni_mtx lk;
int raw;
int ttl;
nni_idhash *pipes;
@@ -62,6 +63,7 @@ rep_sock_fini(void *arg)
if (s->btrace != NULL) {
nni_free(s->btrace, s->btrace_len);
}
+ nni_mtx_fini(&s->lk);
NNI_FREE_STRUCT(s);
}
@@ -74,6 +76,7 @@ rep_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
rep_sock_fini(s);
@@ -89,7 +92,6 @@ rep_sock_init(void **sp, nni_sock *sock)
s->urq = nni_sock_recvq(sock);
*sp = s;
- nni_sock_senderr(sock, NNG_ESTATE);
return (0);
}
@@ -215,6 +217,7 @@ rep_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
+ // XXX: LOCKING?!?!
if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
rv = nni_msgq_tryput(p->sendq, msg);
}
@@ -347,10 +350,10 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
rep_sock *s = arg;
int rv;
+
+ nni_mtx_lock(&s->lk);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
- if (rv == 0) {
- nni_sock_senderr(s->sock, s->raw ? 0 : NNG_ESTATE);
- }
+ nni_mtx_unlock(&s->lk);
return (rv);
}
@@ -376,53 +379,18 @@ rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
}
static nni_msg *
-rep_sock_sfilter(void *arg, nni_msg *msg)
-{
- rep_sock *s = arg;
-
- if (s->raw) {
- return (msg);
- }
-
- // Cannot send again until a receive is done...
- nni_sock_senderr(s->sock, NNG_ESTATE);
-
- // If we have a stored backtrace, append it to the header...
- // if we don't have a backtrace, discard the message.
- if (s->btrace == NULL) {
- nni_msg_free(msg);
- return (NULL);
- }
-
- // drop anything else in the header...
- nni_msg_header_clear(msg);
-
- if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- nni_msg_free(msg);
- return (NULL);
- }
-
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- return (msg);
-}
-
-static nni_msg *
-rep_sock_rfilter(void *arg, nni_msg *msg)
+rep_sock_filter(void *arg, nni_msg *msg)
{
rep_sock *s = arg;
char * header;
size_t len;
+ nni_mtx_lock(&s->lk);
if (s->raw) {
+ nni_mtx_unlock(&s->lk);
return (msg);
}
- nni_sock_senderr(s->sock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (s->btrace != NULL) {
@@ -437,9 +405,61 @@ rep_sock_rfilter(void *arg, nni_msg *msg)
s->btrace_len = len;
memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
+ nni_mtx_unlock(&s->lk);
return (msg);
}
+static void
+rep_sock_send(void *arg, nni_aio *aio)
+{
+ rep_sock *s = arg;
+ int rv;
+ nni_msg * msg;
+
+ nni_mtx_lock(&s->lk);
+ if (s->raw) {
+ // Pass thru
+ nni_mtx_unlock(&s->lk);
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
+ }
+ if (s->btrace == NULL) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+
+ msg = nni_aio_get_msg(aio);
+
+ // drop anything else in the header... (it should already be
+ // empty, but there can be stale backtrace info there.)
+ nni_msg_header_clear(msg);
+
+ if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
+
+ nni_mtx_unlock(&s->lk);
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+rep_sock_recv(void *arg, nni_aio *aio)
+{
+ rep_sock *s = arg;
+
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops rep_pipe_ops = {
@@ -470,8 +490,9 @@ static nni_proto_sock_ops rep_sock_ops = {
.sock_open = rep_sock_open,
.sock_close = rep_sock_close,
.sock_options = rep_sock_options,
- .sock_rfilter = rep_sock_rfilter,
- .sock_sfilter = rep_sock_sfilter,
+ .sock_filter = rep_sock_filter,
+ .sock_send = rep_sock_send,
+ .sock_recv = rep_sock_recv,
};
static nni_proto nni_rep_proto = {