diff options
Diffstat (limited to 'src/supplemental/tls/tls_common.c')
| -rw-r--r-- | src/supplemental/tls/tls_common.c | 170 |
1 files changed, 150 insertions, 20 deletions
diff --git a/src/supplemental/tls/tls_common.c b/src/supplemental/tls/tls_common.c index 75b056b3..1ab687b8 100644 --- a/src/supplemental/tls/tls_common.c +++ b/src/supplemental/tls/tls_common.c @@ -15,6 +15,7 @@ #include "../../core/nng_impl.h" +#include "nng/nng.h" #include "tls_common.h" #include "tls_engine.h" @@ -42,6 +43,9 @@ static void tls_bio_recv_cb(void *arg); static void tls_do_send(nni_tls_conn *); static void tls_do_recv(nni_tls_conn *); static void tls_bio_send_start(nni_tls_conn *); +static void tls_bio_send_msg_start(nni_tls_conn *); +static void tls_bio_recv_start(nni_tls_conn *); +static void tls_bio_recv_msg_start(nni_tls_conn *); static void tls_bio_error(nni_tls_conn *, nng_err); #define nni_tls_conn_ops (nng_tls_engine_ops.conn_ops) @@ -156,17 +160,25 @@ nni_tls_peer_cert(nni_tls_conn *conn, nng_tls_cert **certp) } int -nni_tls_init(nni_tls_conn *conn, nng_tls_config *cfg) +nni_tls_init(nni_tls_conn *conn, nng_tls_config *cfg, bool msg_oriented) { nni_mtx_lock(&cfg->lock); cfg->busy = true; nni_mtx_unlock(&cfg->lock); - if (((conn->bio_send_buf = nni_zalloc(NNG_TLS_MAX_SEND_SIZE)) == - NULL) || - ((conn->bio_recv_buf = nni_zalloc(NNG_TLS_MAX_RECV_SIZE)) == - NULL)) { - return (NNG_ENOMEM); + conn->msg_oriented = msg_oriented; + if (msg_oriented) { + nni_lmq_init(&conn->bio_send_lmq, NNG_TLS_MAX_SEND_MSG_QUEUE); + nni_lmq_init(&conn->bio_recv_lmq, NNG_TLS_MAX_RECV_MSG_QUEUE); + // TODO: REMOVE + conn->bio_recv_buf = nni_zalloc(NNG_TLS_MAX_RECV_SIZE); + } else { + if (((conn->bio_send_buf = + nni_zalloc(NNG_TLS_MAX_SEND_SIZE)) == NULL) || + ((conn->bio_recv_buf = + nni_zalloc(NNG_TLS_MAX_RECV_SIZE)) == NULL)) { + return (NNG_ENOMEM); + } } conn->cfg = cfg; @@ -205,6 +217,9 @@ nni_tls_fini(nni_tls_conn *conn) if (conn->bio != NULL) { conn->bio_ops.bio_free(conn->bio); } + nni_lmq_fini(&conn->bio_recv_lmq); + nni_lmq_fini(&conn->bio_send_lmq); + nni_msg_free(conn->bio_recv_msg); nni_mtx_fini(&conn->bio_lock); nni_mtx_fini(&conn->lock); } @@ -411,7 +426,11 @@ tls_bio_send_cb(void *arg) nni_mtx_lock(&conn->bio_lock); conn->bio_send_active = false; - if ((rv = nni_aio_result(aio)) != 0) { + if ((rv = nni_aio_result(aio)) != NNG_OK) { + if (conn->msg_oriented) { + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + } tls_bio_error(conn, rv); nni_mtx_unlock(&conn->bio_lock); @@ -419,12 +438,19 @@ tls_bio_send_cb(void *arg) return; } - count = nni_aio_count(aio); - NNI_ASSERT(count <= conn->bio_send_len); - conn->bio_send_len -= count; - conn->bio_send_tail += count; - conn->bio_send_tail %= NNG_TLS_MAX_SEND_SIZE; - tls_bio_send_start(conn); + if (conn->msg_oriented) { + nng_msg *msg = nni_aio_get_msg(aio); + nni_msg_free(msg); + nni_aio_set_msg(aio, NULL); + tls_bio_send_msg_start(conn); + } else { + count = nni_aio_count(aio); + NNI_ASSERT(count <= conn->bio_send_len); + conn->bio_send_len -= count; + conn->bio_send_tail += count; + conn->bio_send_tail %= NNG_TLS_MAX_SEND_SIZE; + tls_bio_send_start(conn); + } nni_mtx_unlock(&conn->bio_lock); nni_tls_run(conn); @@ -446,9 +472,15 @@ tls_bio_recv_cb(void *arg) return; } - NNI_ASSERT(conn->bio_recv_len == 0); - NNI_ASSERT(conn->bio_recv_off == 0); - conn->bio_recv_len = nni_aio_count(aio); + if (conn->msg_oriented) { + nng_msg *msg = nni_aio_get_msg(aio); + nni_lmq_put(&conn->bio_recv_lmq, msg); + nni_aio_set_msg(aio, NULL); + } else { + NNI_ASSERT(conn->bio_recv_len == 0); + NNI_ASSERT(conn->bio_recv_off == 0); + conn->bio_recv_len = nni_aio_count(aio); + } nni_mtx_unlock(&conn->bio_lock); nni_tls_run(conn); @@ -481,6 +513,21 @@ tls_bio_recv_start(nni_tls_conn *conn) } static void +tls_bio_recv_msg_start(nni_tls_conn *conn) +{ + if (conn->bio_recv_pend || conn->bio_closed) { + // Already have a receive in flight. + return; + } + if (nni_lmq_full(&conn->bio_recv_lmq)) { + return; + } + + conn->bio_recv_pend = true; + conn->bio_ops.bio_recv(conn->bio, &conn->bio_recv); +} + +static void tls_bio_send_start(nni_tls_conn *conn) { nni_iov iov[2]; @@ -525,7 +572,46 @@ tls_bio_send_start(nni_tls_conn *conn) conn->bio_ops.bio_send(conn->bio, &conn->bio_send); } -int +static void +tls_bio_send_msg_start(nni_tls_conn *conn) +{ + nni_msg *msg; + + if (conn->bio_send_active || conn->bio_closed) { + return; + } + if (nni_lmq_get(&conn->bio_send_lmq, &msg) == NNG_OK) { + conn->bio_send_active = true; + nni_aio_set_msg(&conn->bio_send, msg); + conn->bio_ops.bio_send(conn->bio, &conn->bio_send); + } +} + +static nng_err +tls_engine_send_msg(nni_tls_conn *conn, const uint8_t *buf, size_t *szp) +{ + nng_msg *msg; + nng_err rv; + + // move the data into a message for the queue + nni_mtx_lock(&conn->bio_lock); + if (nni_lmq_full(&conn->bio_send_lmq)) { + nni_mtx_unlock(&conn->bio_lock); + return (NNG_EAGAIN); + } + if ((rv = nni_msg_alloc(&msg, *szp)) != NNG_OK) { + nni_mtx_unlock(&conn->bio_lock); + return (rv); + } + memcpy(nni_msg_body(msg), buf, *szp); + rv = nni_lmq_put(&conn->bio_send_lmq, msg); + NNI_ASSERT(rv == NNG_OK); // we checked it already above! + tls_bio_send_msg_start(conn); + nni_mtx_unlock(&conn->bio_lock); + return (rv); +} + +nng_err nng_tls_engine_send(void *arg, const uint8_t *buf, size_t *szp) { nni_tls_conn *conn = arg; @@ -535,6 +621,9 @@ nng_tls_engine_send(void *arg, const uint8_t *buf, size_t *szp) size_t space; size_t cnt; + if (conn->msg_oriented) { + return (tls_engine_send_msg(conn, buf, szp)); + } nni_mtx_lock(&conn->bio_lock); head = conn->bio_send_head; tail = conn->bio_send_tail; @@ -576,15 +665,55 @@ nng_tls_engine_send(void *arg, const uint8_t *buf, size_t *szp) tls_bio_send_start(conn); nni_mtx_unlock(&conn->bio_lock); - return (0); + return (NNG_OK); } -int +static nng_err +tls_engine_recv_msg(nni_tls_conn *conn, uint8_t *buf, size_t *szp) +{ + nni_msg *msg; + nng_err rv; + size_t len; + nni_mtx_lock(&conn->bio_lock); + if ((conn->bio_recv_msg == NULL) && + nni_lmq_empty(&conn->bio_recv_lmq)) { + tls_bio_recv_msg_start(conn); + nni_mtx_unlock(&conn->bio_lock); + return (NNG_EAGAIN); + } + + if (conn->bio_recv_msg == NULL) { + rv = nni_lmq_get(&conn->bio_recv_lmq, &conn->bio_recv_msg); + NNI_ASSERT(rv == NNG_OK); + } + msg = conn->bio_recv_msg; + + if ((len = nni_msg_len(msg)) < *szp) { + *szp = len; + } else { + len = *szp; + } + memcpy(buf, nni_msg_body(msg), len); + nni_msg_trim(msg, len); + if (nni_msg_len(msg) == 0) { + nni_msg_free(msg); + conn->bio_recv_msg = NULL; + } + tls_bio_recv_msg_start(conn); + nni_mtx_unlock(&conn->bio_lock); + return (NNG_OK); +} + +nng_err nng_tls_engine_recv(void *arg, uint8_t *buf, size_t *szp) { nni_tls_conn *conn = arg; size_t len = *szp; + if (conn->msg_oriented) { + return (tls_engine_recv_msg(conn, buf, szp)); + } + nni_mtx_lock(&conn->bio_lock); if (conn->bio_recv_len == 0) { tls_bio_recv_start(conn); @@ -604,7 +733,7 @@ nng_tls_engine_recv(void *arg, uint8_t *buf, size_t *szp) nni_mtx_unlock(&conn->bio_lock); *szp = len; - return (0); + return (NNG_OK); } int @@ -773,6 +902,7 @@ nng_tls_config_alloc(nng_tls_config **cfg_p, nng_tls_mode mode) cfg->size = size; cfg->ref = 1; cfg->busy = false; + cfg->mode = mode; nni_mtx_init(&cfg->lock); if ((rv = nni_tls_cfg_ops->init((void *) (cfg + 1), mode)) != 0) { |
