aboutsummaryrefslogtreecommitdiff
path: root/src/supplemental/tls/tls_common.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/supplemental/tls/tls_common.c')
-rw-r--r--src/supplemental/tls/tls_common.c170
1 files changed, 150 insertions, 20 deletions
diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c
index 75b056b3..1ab687b8 100644
--- a/src/supplemental/tls/tls_common.c
+++ b/src/supplemental/tls/tls_common.c
@@ -15,6 +15,7 @@
#include "../../core/nng_impl.h"
+#include "nng/nng.h"
#include "tls_common.h"
#include "tls_engine.h"
@@ -42,6 +43,9 @@ static void tls_bio_recv_cb(void *arg);
static void tls_do_send(nni_tls_conn *);
static void tls_do_recv(nni_tls_conn *);
static void tls_bio_send_start(nni_tls_conn *);
+static void tls_bio_send_msg_start(nni_tls_conn *);
+static void tls_bio_recv_start(nni_tls_conn *);
+static void tls_bio_recv_msg_start(nni_tls_conn *);
static void tls_bio_error(nni_tls_conn *, nng_err);
#define nni_tls_conn_ops (nng_tls_engine_ops.conn_ops)
@@ -156,17 +160,25 @@ nni_tls_peer_cert(nni_tls_conn *conn, nng_tls_cert **certp)
}
int
-nni_tls_init(nni_tls_conn *conn, nng_tls_config *cfg)
+nni_tls_init(nni_tls_conn *conn, nng_tls_config *cfg, bool msg_oriented)
{
nni_mtx_lock(&cfg->lock);
cfg->busy = true;
nni_mtx_unlock(&cfg->lock);
- if (((conn->bio_send_buf = nni_zalloc(NNG_TLS_MAX_SEND_SIZE)) ==
- NULL) ||
- ((conn->bio_recv_buf = nni_zalloc(NNG_TLS_MAX_RECV_SIZE)) ==
- NULL)) {
- return (NNG_ENOMEM);
+ conn->msg_oriented = msg_oriented;
+ if (msg_oriented) {
+ nni_lmq_init(&conn->bio_send_lmq, NNG_TLS_MAX_SEND_MSG_QUEUE);
+ nni_lmq_init(&conn->bio_recv_lmq, NNG_TLS_MAX_RECV_MSG_QUEUE);
+ // TODO: REMOVE
+ conn->bio_recv_buf = nni_zalloc(NNG_TLS_MAX_RECV_SIZE);
+ } else {
+ if (((conn->bio_send_buf =
+ nni_zalloc(NNG_TLS_MAX_SEND_SIZE)) == NULL) ||
+ ((conn->bio_recv_buf =
+ nni_zalloc(NNG_TLS_MAX_RECV_SIZE)) == NULL)) {
+ return (NNG_ENOMEM);
+ }
}
conn->cfg = cfg;
@@ -205,6 +217,9 @@ nni_tls_fini(nni_tls_conn *conn)
if (conn->bio != NULL) {
conn->bio_ops.bio_free(conn->bio);
}
+ nni_lmq_fini(&conn->bio_recv_lmq);
+ nni_lmq_fini(&conn->bio_send_lmq);
+ nni_msg_free(conn->bio_recv_msg);
nni_mtx_fini(&conn->bio_lock);
nni_mtx_fini(&conn->lock);
}
@@ -411,7 +426,11 @@ tls_bio_send_cb(void *arg)
nni_mtx_lock(&conn->bio_lock);
conn->bio_send_active = false;
- if ((rv = nni_aio_result(aio)) != 0) {
+ if ((rv = nni_aio_result(aio)) != NNG_OK) {
+ if (conn->msg_oriented) {
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+ }
tls_bio_error(conn, rv);
nni_mtx_unlock(&conn->bio_lock);
@@ -419,12 +438,19 @@ tls_bio_send_cb(void *arg)
return;
}
- count = nni_aio_count(aio);
- NNI_ASSERT(count <= conn->bio_send_len);
- conn->bio_send_len -= count;
- conn->bio_send_tail += count;
- conn->bio_send_tail %= NNG_TLS_MAX_SEND_SIZE;
- tls_bio_send_start(conn);
+ if (conn->msg_oriented) {
+ nng_msg *msg = nni_aio_get_msg(aio);
+ nni_msg_free(msg);
+ nni_aio_set_msg(aio, NULL);
+ tls_bio_send_msg_start(conn);
+ } else {
+ count = nni_aio_count(aio);
+ NNI_ASSERT(count <= conn->bio_send_len);
+ conn->bio_send_len -= count;
+ conn->bio_send_tail += count;
+ conn->bio_send_tail %= NNG_TLS_MAX_SEND_SIZE;
+ tls_bio_send_start(conn);
+ }
nni_mtx_unlock(&conn->bio_lock);
nni_tls_run(conn);
@@ -446,9 +472,15 @@ tls_bio_recv_cb(void *arg)
return;
}
- NNI_ASSERT(conn->bio_recv_len == 0);
- NNI_ASSERT(conn->bio_recv_off == 0);
- conn->bio_recv_len = nni_aio_count(aio);
+ if (conn->msg_oriented) {
+ nng_msg *msg = nni_aio_get_msg(aio);
+ nni_lmq_put(&conn->bio_recv_lmq, msg);
+ nni_aio_set_msg(aio, NULL);
+ } else {
+ NNI_ASSERT(conn->bio_recv_len == 0);
+ NNI_ASSERT(conn->bio_recv_off == 0);
+ conn->bio_recv_len = nni_aio_count(aio);
+ }
nni_mtx_unlock(&conn->bio_lock);
nni_tls_run(conn);
@@ -481,6 +513,21 @@ tls_bio_recv_start(nni_tls_conn *conn)
}
static void
+tls_bio_recv_msg_start(nni_tls_conn *conn)
+{
+ if (conn->bio_recv_pend || conn->bio_closed) {
+ // Already have a receive in flight.
+ return;
+ }
+ if (nni_lmq_full(&conn->bio_recv_lmq)) {
+ return;
+ }
+
+ conn->bio_recv_pend = true;
+ conn->bio_ops.bio_recv(conn->bio, &conn->bio_recv);
+}
+
+static void
tls_bio_send_start(nni_tls_conn *conn)
{
nni_iov iov[2];
@@ -525,7 +572,46 @@ tls_bio_send_start(nni_tls_conn *conn)
conn->bio_ops.bio_send(conn->bio, &conn->bio_send);
}
-int
+static void
+tls_bio_send_msg_start(nni_tls_conn *conn)
+{
+ nni_msg *msg;
+
+ if (conn->bio_send_active || conn->bio_closed) {
+ return;
+ }
+ if (nni_lmq_get(&conn->bio_send_lmq, &msg) == NNG_OK) {
+ conn->bio_send_active = true;
+ nni_aio_set_msg(&conn->bio_send, msg);
+ conn->bio_ops.bio_send(conn->bio, &conn->bio_send);
+ }
+}
+
+static nng_err
+tls_engine_send_msg(nni_tls_conn *conn, const uint8_t *buf, size_t *szp)
+{
+ nng_msg *msg;
+ nng_err rv;
+
+ // move the data into a message for the queue
+ nni_mtx_lock(&conn->bio_lock);
+ if (nni_lmq_full(&conn->bio_send_lmq)) {
+ nni_mtx_unlock(&conn->bio_lock);
+ return (NNG_EAGAIN);
+ }
+ if ((rv = nni_msg_alloc(&msg, *szp)) != NNG_OK) {
+ nni_mtx_unlock(&conn->bio_lock);
+ return (rv);
+ }
+ memcpy(nni_msg_body(msg), buf, *szp);
+ rv = nni_lmq_put(&conn->bio_send_lmq, msg);
+ NNI_ASSERT(rv == NNG_OK); // we checked it already above!
+ tls_bio_send_msg_start(conn);
+ nni_mtx_unlock(&conn->bio_lock);
+ return (rv);
+}
+
+nng_err
nng_tls_engine_send(void *arg, const uint8_t *buf, size_t *szp)
{
nni_tls_conn *conn = arg;
@@ -535,6 +621,9 @@ nng_tls_engine_send(void *arg, const uint8_t *buf, size_t *szp)
size_t space;
size_t cnt;
+ if (conn->msg_oriented) {
+ return (tls_engine_send_msg(conn, buf, szp));
+ }
nni_mtx_lock(&conn->bio_lock);
head = conn->bio_send_head;
tail = conn->bio_send_tail;
@@ -576,15 +665,55 @@ nng_tls_engine_send(void *arg, const uint8_t *buf, size_t *szp)
tls_bio_send_start(conn);
nni_mtx_unlock(&conn->bio_lock);
- return (0);
+ return (NNG_OK);
}
-int
+static nng_err
+tls_engine_recv_msg(nni_tls_conn *conn, uint8_t *buf, size_t *szp)
+{
+ nni_msg *msg;
+ nng_err rv;
+ size_t len;
+ nni_mtx_lock(&conn->bio_lock);
+ if ((conn->bio_recv_msg == NULL) &&
+ nni_lmq_empty(&conn->bio_recv_lmq)) {
+ tls_bio_recv_msg_start(conn);
+ nni_mtx_unlock(&conn->bio_lock);
+ return (NNG_EAGAIN);
+ }
+
+ if (conn->bio_recv_msg == NULL) {
+ rv = nni_lmq_get(&conn->bio_recv_lmq, &conn->bio_recv_msg);
+ NNI_ASSERT(rv == NNG_OK);
+ }
+ msg = conn->bio_recv_msg;
+
+ if ((len = nni_msg_len(msg)) < *szp) {
+ *szp = len;
+ } else {
+ len = *szp;
+ }
+ memcpy(buf, nni_msg_body(msg), len);
+ nni_msg_trim(msg, len);
+ if (nni_msg_len(msg) == 0) {
+ nni_msg_free(msg);
+ conn->bio_recv_msg = NULL;
+ }
+ tls_bio_recv_msg_start(conn);
+ nni_mtx_unlock(&conn->bio_lock);
+ return (NNG_OK);
+}
+
+nng_err
nng_tls_engine_recv(void *arg, uint8_t *buf, size_t *szp)
{
nni_tls_conn *conn = arg;
size_t len = *szp;
+ if (conn->msg_oriented) {
+ return (tls_engine_recv_msg(conn, buf, szp));
+ }
+
nni_mtx_lock(&conn->bio_lock);
if (conn->bio_recv_len == 0) {
tls_bio_recv_start(conn);
@@ -604,7 +733,7 @@ nng_tls_engine_recv(void *arg, uint8_t *buf, size_t *szp)
nni_mtx_unlock(&conn->bio_lock);
*szp = len;
- return (0);
+ return (NNG_OK);
}
int
@@ -773,6 +902,7 @@ nng_tls_config_alloc(nng_tls_config **cfg_p, nng_tls_mode mode)
cfg->size = size;
cfg->ref = 1;
cfg->busy = false;
+ cfg->mode = mode;
nni_mtx_init(&cfg->lock);
if ((rv = nni_tls_cfg_ops->init((void *) (cfg + 1), mode)) != 0) {