diff options
| author | Alexander M Pickering <alex@cogarr.net> | 2025-01-12 22:45:37 -0600 |
|---|---|---|
| committer | Alexander M Pickering <alex@cogarr.net> | 2025-01-12 22:45:37 -0600 |
| commit | 90ee66a3a6aae10fd84f3f43844db55229933e37 (patch) | |
| tree | f723f918871c3296636ef2538a1a29a23050e520 /src/router.moon | |
| parent | decb72f936060a65bff18e9cbf33642ea3a71cd0 (diff) | |
| download | ggj25-90ee66a3a6aae10fd84f3f43844db55229933e37.tar.gz ggj25-90ee66a3a6aae10fd84f3f43844db55229933e37.tar.bz2 ggj25-90ee66a3a6aae10fd84f3f43844db55229933e37.zip | |
work
Diffstat (limited to 'src/router.moon')
| -rw-r--r-- | src/router.moon | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/src/router.moon b/src/router.moon new file mode 100644 index 0000000..7ef9bfc --- /dev/null +++ b/src/router.moon @@ -0,0 +1,245 @@ +-- Network router, implements RAFT consensus with modificationes +net = require("net") +log = require("log") + +router = {} + +-- Election messages +net.register_message("Prevote",{required:{peer:"string"}}) +net.register_message("SurveyElection",{}) +net.register_message("ResponseElection",{required:{candidate: "string"}}) +net.register_message("CompleteElection",{required:{elected: "string"}}) + +-- Cluster state +net.register_message("RequestClusterInfo",{}) +net.register_message("RespondClusterInfo",{required:{ + peerlist: "table" + elected: "string" + term:"number" + prevotes: "table" +}}) + +net.register_message("RequestWorldInfo",{}) +net.register_message("RespondWorldInfo",{required:{ + entities: "table" +}}) + +--Heartbeat +net.register_message("RequestHeartbeat",{required:{n1:"number",n2:"number"}}) +net.register_message("RespondHeartbeat",{required:{n:"number",time:"number"}}) + +--Suggestion/commit, use ("peer","entity","time") as key to find messages later to commit. +-- -> peers can only suggest 1 state change per entity, per time +--Suggestions are broadcast from peers +net.register_message("Suggest",{required:{ + entity:"number" + state:"table" + peer:"string" + time:"number" +}}) +--Commits are broadcast from the elected peer +net.register_message("Commit",{required:{ + nonce: "number" + entity:"number" + peer:"string" + suggest_time:"number" + commit_time:"number" +}}) +-- When a peer misses a committed message, re-request it +net.register_message("RequestReplay",{required:{nonce: "number"}}) +net.register_message("RespondReplay",{required:{ + nonce: "number" + entity: "number" + peer: "string" + state: "table" + suggest_time: "number" + commit_time: "number" +}}) +--Rejections are also broadcast from the elected peer +net.register_message("Reject",{required:{ + nonce: "number" + entity:"number" + peer:"string" + suggest_time:"number" + reject_time:"number" +}}) +--Suggest + commit from the elected peer, so we can throw away old state. +net.register_message("Simplify",{required:{ + entity:"number" + state:"table" + time:"number" +}}) + +--Testing +net.register_message("Raw",{optional:{s:"string"}}) + +class Queue + new: () => + @queue = {} + push: (item) => + table.insert(@queue, item) + pop: () => + table.remove(@queue, 1) + +class Router + new: () => + @peerlist = {} + @routes = {} + + -- "uninitalized", "peer", "candidate", "elected" + @state = "unitialized" + + -- [peerid] = "votes for peerid" + @prevotes = {} + @term = 0 + + -- The uncommited queue + @uncommited = Queue! + + initalize: (id) => + @set_route("RequestClusterInfo",(conn, message) => + print("Requested cluster info:", message) + peerlist = {} + for peerid, _ in pairs(@peerlist) + table.insert(peerlist, peerid) + conn\send("RespondClusterInfo",{ + peerlist: peerlist + elected: @elected + term: @term + prevotes: @prevotes + }) + true + ) + peer_setup = (peer) -> + peer\on("error",(message) => + if message.data.type == "unavailable-id" + peer\replace_id! + peer_setup(peer) + return + error("Peer setup error: " .. tostring(message)) + ) + peer\on("open",(message) => + print("Peer",peer.id, "opened:",message) + peer.open = true + ) + peer = net.Peer! + @peer = peer + peer_setup(peer) + while not peer.open + coroutine.yield("Waiting for open") + net.pump! + peerlist = @peerlist + router = @ + peer\on("connection",(message) => + print("Peer",peer.id, "got connection", message) + assert(message.data.dest == peer.id) + peerlist[message.data.source] = message.data + message.data\send("Raw",{s: "Hello after conn"}) + message.data\on("data",(datamsg) => + print("Peer ",peer.id," got data:",datamsg) + router\route(message.data, datamsg.data[1], datamsg.data[2]) + print("done routing") + ) + message.data\on("error",(msg) => + error(msg) + ) + ) + if id + print("Doing id fork") + connected = false + conn = peer\connect(id) + conn\on("open", (message) => + print("Conn got open message") + connected = true + ) + while not connected + coroutine.yield("Waiting for client to connect") + net.pump! + conn\on("error", (message) => + error(message) + ) + --Assume we vote for ourselves, and our peer votes for themselves + @prevotes[peer.id] = peer.id + @prevotes[id] = id + @state = "peer" + log.info("Peer passed, I'm a peer of " .. id, {"net"}) + --while not connected + -- coroutine.yield("Give it a second to connect") + -- net.pump! + got_hello = false + conn\on("data", (message) => + print("Peer got message:", message) + got_hello = true + ) + while not got_hello + coroutine.yield("Waiting for hello") + net.pump! + @peerlist[peer.id] = peer + @peerlist[id] = conn + clusterinfo = @sync(conn, "RequestClusterInfo", {}) + print("Got cluster info:", clusterinfo) + + else + log.info("No peer passed, I'm the elected peer: " .. peer.id, {"net"}) + @state = "elected" + @elected = peer.id + @term += 1 + -- Vote for ourselves + @prevotes[peer.id] = peer.id + -- Add ourselves to the peerlist + @peerlist[peer.id] = peer + while true + coroutine.yield(peer.id) + net.pump! + + sync: (conn, msgfmt, msg) => + ret = nil + conn\on("data", (message) => + ret = message + ) + conn\send(msgfmt, msg) + while not ret + coroutine.yield("Waiting on " .. msgfmt) + net.pump! + return ret + + broadcast: (msgfmt, message) => + for peerid, conn in pairs(@peerlist) + if peerid ~= @peer.id + conn\send(msgfmt, message) + + send_elected: (msgfmt, message) => + elected_conn = @peerlist[@elected] + elected_conn\send(msgfmt, message) + + set_route: (msgfmt, callback) => + if @routes[msgfmt] + log.warn("Overwriting callback for message " .. msgfmt, {"net"}) + @routes[msgfmt] = {callback} + + listen: (msgfmt, callback, id) => + id = id or {} + @routes[msgfmt] = @routes[msgfmt] or {} + @routes[msgfmt][id] = callback + + defen: (msgfmt, id) => + assert(@routes[msgfmt]) + assert(@routes[msgfmt][id]) + @routes[msgfmt][id] = nil + + route: (conn, msgfmt, message) => + print("Got route",conn,msgfmt,message) + assert(type(msgfmt) == "string", "Message format must be a string") + assert(type(message) == "table", "Message must be a table") + if @routes[msgfmt] + routed_any = false + for _, r in pairs(@routes[msgfmt]) + routed_any = true + if r(@, conn, message) + break + if not routed_any + log.warn("No routes found for message format:" .. msgfmt, {"net"}) + else + log.warn("No message callback registered for format " .. msgfmt .. " routes are: " .. tostring(@routes), {"net"}) + +{:Router} |
