-- Network router, implements RAFT consensus with modificationes net = require("net") log = require("log") router = {} -- Election messages net.register_message("Prevote",{required:{peer:"string"}}) net.register_message("SurveyElection",{}) net.register_message("ResponseElection",{required:{candidate: "string"}}) net.register_message("CompleteElection",{required:{elected: "string"}}) -- Cluster state net.register_message("RequestClusterInfo",{}) net.register_message("RespondClusterInfo",{required:{ peerlist: "table" elected: "string" term:"number" prevotes: "table" }}) net.register_message("RequestWorldInfo",{}) net.register_message("RespondWorldInfo",{required:{ entities: "table" }}) --Heartbeat net.register_message("RequestHeartbeat",{required:{n1:"number",n2:"number"}}) net.register_message("RespondHeartbeat",{required:{n:"number",time:"number"}}) --Suggestion/commit, use ("peer","entity","time") as key to find messages later to commit. -- -> peers can only suggest 1 state change per entity, per time --Suggestions are broadcast from peers net.register_message("Suggest",{required:{ entity:"number" state:"table" peer:"string" time:"number" }}) --Commits are broadcast from the elected peer net.register_message("Commit",{required:{ nonce: "number" entity:"number" peer:"string" suggest_time:"number" commit_time:"number" }}) -- When a peer misses a committed message, re-request it net.register_message("RequestReplay",{required:{nonce: "number"}}) net.register_message("RespondReplay",{required:{ nonce: "number" entity: "number" peer: "string" state: "table" suggest_time: "number" commit_time: "number" }}) --Rejections are also broadcast from the elected peer net.register_message("Reject",{required:{ nonce: "number" entity:"number" peer:"string" suggest_time:"number" reject_time:"number" }}) --Suggest + commit from the elected peer, so we can throw away old state. net.register_message("Simplify",{required:{ entity:"number" state:"table" time:"number" }}) --Testing net.register_message("Raw",{optional:{s:"string"}}) router_singleton = nil class Queue new: () => @queue = {} push: (item) => table.insert(@queue, item) pop: () => table.remove(@queue, 1) class Router new: () => @peerlist = {} @routes = {} -- "uninitialized", "peer", "candidate", "elected" @state = "uninitialized" -- [peerid] = "votes for peerid" @nonce = 0 @prevotes = {} @term = 0 -- The uncommited queue @uncommited = Queue! router_singleton = @ initalize: (id) => @set_route("RequestClusterInfo",(conn, message) => print("Requested cluster info:", message) peerlist = {} for peerid, _ in pairs(@peerlist) table.insert(peerlist, peerid) conn\send("RespondClusterInfo",{ peerlist: peerlist elected: @elected term: @term prevotes: @prevotes }) true ) peer_setup = (peer) -> peer\on("error",(message) => if message.data.type == "unavailable-id" peer\replace_id! peer_setup(peer) return error("Peer setup error: " .. tostring(message)) ) peer\on("open",(message) => print("Peer",peer.id, "opened:",message) peer.open = true ) peer = net.Peer! @peer = peer peer_setup(peer) while not peer.open coroutine.yield("Waiting for open") net.pump! peerlist = @peerlist router = @ peer\on("connection",(message) => print("Peer",peer.id, "got connection", message) assert(message.data.dest == peer.id) peerlist[message.data.source] = message.data message.data\send("Raw",{s: "Hello after conn"}) message.data\on("data",(datamsg) => print("Peer ",peer.id," got data:",datamsg) router\route(message.data, datamsg.data[1], datamsg.data[2]) print("done routing") ) message.data\on("error",(msg) => error(msg) ) ) if id print("Doing id fork") connected = false conn = peer\connect(id) conn\on("open", (message) => print("Conn got open message") connected = true ) while not connected coroutine.yield("Waiting for client to connect") net.pump! conn\on("error", (message) => error(message) ) --Assume we vote for ourselves, and our peer votes for themselves @prevotes[peer.id] = peer.id @prevotes[id] = id @state = "peer" log.info("Peer passed, I'm a peer of " .. id, {"net"}) --while not connected -- coroutine.yield("Give it a second to connect") -- net.pump! got_hello = false conn\on("data", (message) => print("Peer got message:", message) got_hello = true ) while not got_hello coroutine.yield("Waiting for hello") net.pump! @peerlist[peer.id] = peer @peerlist[id] = conn clusterinfo = @sync(conn, "RequestClusterInfo", {}) print("Got cluster info:", clusterinfo) else log.info("No peer passed, I'm the elected peer: " .. peer.id, {"net"}) @state = "elected" @elected = peer.id @term += 1 -- Vote for ourselves @prevotes[peer.id] = peer.id -- Add ourselves to the peerlist @peerlist[peer.id] = peer while true coroutine.yield(peer.id) net.pump! sync: (conn, msgfmt, msg) => ret = nil conn\on("data", (message) => ret = message ) conn\send(msgfmt, msg) while not ret coroutine.yield("Waiting on " .. msgfmt) net.pump! return ret broadcast: (msgfmt, message) => for peerid, conn in pairs(@peerlist) if peerid ~= @peer.id conn\send(msgfmt, message) send_elected: (msgfmt, message) => elected_conn = @peerlist[@elected] elected_conn\send(msgfmt, message) set_route: (msgfmt, callback) => if @routes[msgfmt] log.warn("Overwriting callback for message " .. msgfmt, {"net"}) @routes[msgfmt] = {callback} listen: (msgfmt, callback, id) => id = id or {} @routes[msgfmt] = @routes[msgfmt] or {} @routes[msgfmt][id] = callback defen: (msgfmt, id) => assert(@routes[msgfmt]) assert(@routes[msgfmt][id]) @routes[msgfmt][id] = nil route: (conn, msgfmt, message) => print("Got route",conn,msgfmt,message) assert(type(msgfmt) == "string", "Message format must be a string") assert(type(message) == "table", "Message must be a table") if @routes[msgfmt] routed_any = false for _, r in pairs(@routes[msgfmt]) routed_any = true if r(@, conn, message) break if not routed_any log.warn("No routes found for message format:" .. msgfmt, {"net"}) else log.warn("No message callback registered for format " .. msgfmt .. " routes are: " .. tostring(@routes), {"net"}) node = am.group! node\update(() -> if router_singleton and router_singleton.state ~= "uninitalized" net.pump! coroutine.yield! ) {:Router, :node}