summaryrefslogtreecommitdiff
path: root/src/router.moon
diff options
context:
space:
mode:
authorAlexander M Pickering <alex@cogarr.net>2025-01-12 22:45:37 -0600
committerAlexander M Pickering <alex@cogarr.net>2025-01-12 22:45:37 -0600
commit90ee66a3a6aae10fd84f3f43844db55229933e37 (patch)
treef723f918871c3296636ef2538a1a29a23050e520 /src/router.moon
parentdecb72f936060a65bff18e9cbf33642ea3a71cd0 (diff)
downloadggj25-90ee66a3a6aae10fd84f3f43844db55229933e37.tar.gz
ggj25-90ee66a3a6aae10fd84f3f43844db55229933e37.tar.bz2
ggj25-90ee66a3a6aae10fd84f3f43844db55229933e37.zip
work
Diffstat (limited to 'src/router.moon')
-rw-r--r--src/router.moon245
1 files changed, 245 insertions, 0 deletions
diff --git a/src/router.moon b/src/router.moon
new file mode 100644
index 0000000..7ef9bfc
--- /dev/null
+++ b/src/router.moon
@@ -0,0 +1,245 @@
+-- Network router, implements RAFT consensus with modificationes
+net = require("net")
+log = require("log")
+
+router = {}
+
+-- Election messages
+net.register_message("Prevote",{required:{peer:"string"}})
+net.register_message("SurveyElection",{})
+net.register_message("ResponseElection",{required:{candidate: "string"}})
+net.register_message("CompleteElection",{required:{elected: "string"}})
+
+-- Cluster state
+net.register_message("RequestClusterInfo",{})
+net.register_message("RespondClusterInfo",{required:{
+ peerlist: "table"
+ elected: "string"
+ term:"number"
+ prevotes: "table"
+}})
+
+net.register_message("RequestWorldInfo",{})
+net.register_message("RespondWorldInfo",{required:{
+ entities: "table"
+}})
+
+--Heartbeat
+net.register_message("RequestHeartbeat",{required:{n1:"number",n2:"number"}})
+net.register_message("RespondHeartbeat",{required:{n:"number",time:"number"}})
+
+--Suggestion/commit, use ("peer","entity","time") as key to find messages later to commit.
+-- -> peers can only suggest 1 state change per entity, per time
+--Suggestions are broadcast from peers
+net.register_message("Suggest",{required:{
+ entity:"number"
+ state:"table"
+ peer:"string"
+ time:"number"
+}})
+--Commits are broadcast from the elected peer
+net.register_message("Commit",{required:{
+ nonce: "number"
+ entity:"number"
+ peer:"string"
+ suggest_time:"number"
+ commit_time:"number"
+}})
+-- When a peer misses a committed message, re-request it
+net.register_message("RequestReplay",{required:{nonce: "number"}})
+net.register_message("RespondReplay",{required:{
+ nonce: "number"
+ entity: "number"
+ peer: "string"
+ state: "table"
+ suggest_time: "number"
+ commit_time: "number"
+}})
+--Rejections are also broadcast from the elected peer
+net.register_message("Reject",{required:{
+ nonce: "number"
+ entity:"number"
+ peer:"string"
+ suggest_time:"number"
+ reject_time:"number"
+}})
+--Suggest + commit from the elected peer, so we can throw away old state.
+net.register_message("Simplify",{required:{
+ entity:"number"
+ state:"table"
+ time:"number"
+}})
+
+--Testing
+net.register_message("Raw",{optional:{s:"string"}})
+
+class Queue
+ new: () =>
+ @queue = {}
+ push: (item) =>
+ table.insert(@queue, item)
+ pop: () =>
+ table.remove(@queue, 1)
+
+class Router
+ new: () =>
+ @peerlist = {}
+ @routes = {}
+
+ -- "uninitalized", "peer", "candidate", "elected"
+ @state = "unitialized"
+
+ -- [peerid] = "votes for peerid"
+ @prevotes = {}
+ @term = 0
+
+ -- The uncommited queue
+ @uncommited = Queue!
+
+ initalize: (id) =>
+ @set_route("RequestClusterInfo",(conn, message) =>
+ print("Requested cluster info:", message)
+ peerlist = {}
+ for peerid, _ in pairs(@peerlist)
+ table.insert(peerlist, peerid)
+ conn\send("RespondClusterInfo",{
+ peerlist: peerlist
+ elected: @elected
+ term: @term
+ prevotes: @prevotes
+ })
+ true
+ )
+ peer_setup = (peer) ->
+ peer\on("error",(message) =>
+ if message.data.type == "unavailable-id"
+ peer\replace_id!
+ peer_setup(peer)
+ return
+ error("Peer setup error: " .. tostring(message))
+ )
+ peer\on("open",(message) =>
+ print("Peer",peer.id, "opened:",message)
+ peer.open = true
+ )
+ peer = net.Peer!
+ @peer = peer
+ peer_setup(peer)
+ while not peer.open
+ coroutine.yield("Waiting for open")
+ net.pump!
+ peerlist = @peerlist
+ router = @
+ peer\on("connection",(message) =>
+ print("Peer",peer.id, "got connection", message)
+ assert(message.data.dest == peer.id)
+ peerlist[message.data.source] = message.data
+ message.data\send("Raw",{s: "Hello after conn"})
+ message.data\on("data",(datamsg) =>
+ print("Peer ",peer.id," got data:",datamsg)
+ router\route(message.data, datamsg.data[1], datamsg.data[2])
+ print("done routing")
+ )
+ message.data\on("error",(msg) =>
+ error(msg)
+ )
+ )
+ if id
+ print("Doing id fork")
+ connected = false
+ conn = peer\connect(id)
+ conn\on("open", (message) =>
+ print("Conn got open message")
+ connected = true
+ )
+ while not connected
+ coroutine.yield("Waiting for client to connect")
+ net.pump!
+ conn\on("error", (message) =>
+ error(message)
+ )
+ --Assume we vote for ourselves, and our peer votes for themselves
+ @prevotes[peer.id] = peer.id
+ @prevotes[id] = id
+ @state = "peer"
+ log.info("Peer passed, I'm a peer of " .. id, {"net"})
+ --while not connected
+ -- coroutine.yield("Give it a second to connect")
+ -- net.pump!
+ got_hello = false
+ conn\on("data", (message) =>
+ print("Peer got message:", message)
+ got_hello = true
+ )
+ while not got_hello
+ coroutine.yield("Waiting for hello")
+ net.pump!
+ @peerlist[peer.id] = peer
+ @peerlist[id] = conn
+ clusterinfo = @sync(conn, "RequestClusterInfo", {})
+ print("Got cluster info:", clusterinfo)
+
+ else
+ log.info("No peer passed, I'm the elected peer: " .. peer.id, {"net"})
+ @state = "elected"
+ @elected = peer.id
+ @term += 1
+ -- Vote for ourselves
+ @prevotes[peer.id] = peer.id
+ -- Add ourselves to the peerlist
+ @peerlist[peer.id] = peer
+ while true
+ coroutine.yield(peer.id)
+ net.pump!
+
+ sync: (conn, msgfmt, msg) =>
+ ret = nil
+ conn\on("data", (message) =>
+ ret = message
+ )
+ conn\send(msgfmt, msg)
+ while not ret
+ coroutine.yield("Waiting on " .. msgfmt)
+ net.pump!
+ return ret
+
+ broadcast: (msgfmt, message) =>
+ for peerid, conn in pairs(@peerlist)
+ if peerid ~= @peer.id
+ conn\send(msgfmt, message)
+
+ send_elected: (msgfmt, message) =>
+ elected_conn = @peerlist[@elected]
+ elected_conn\send(msgfmt, message)
+
+ set_route: (msgfmt, callback) =>
+ if @routes[msgfmt]
+ log.warn("Overwriting callback for message " .. msgfmt, {"net"})
+ @routes[msgfmt] = {callback}
+
+ listen: (msgfmt, callback, id) =>
+ id = id or {}
+ @routes[msgfmt] = @routes[msgfmt] or {}
+ @routes[msgfmt][id] = callback
+
+ defen: (msgfmt, id) =>
+ assert(@routes[msgfmt])
+ assert(@routes[msgfmt][id])
+ @routes[msgfmt][id] = nil
+
+ route: (conn, msgfmt, message) =>
+ print("Got route",conn,msgfmt,message)
+ assert(type(msgfmt) == "string", "Message format must be a string")
+ assert(type(message) == "table", "Message must be a table")
+ if @routes[msgfmt]
+ routed_any = false
+ for _, r in pairs(@routes[msgfmt])
+ routed_any = true
+ if r(@, conn, message)
+ break
+ if not routed_any
+ log.warn("No routes found for message format:" .. msgfmt, {"net"})
+ else
+ log.warn("No message callback registered for format " .. msgfmt .. " routes are: " .. tostring(@routes), {"net"})
+
+{:Router}