-- Handles the bridge to javascript to do peer-to-peer connections log = require("log") rng = require("rng") util = require("util") net = {} initialized = false initialize = () -> am.eval_js(require("js_bridge")) initialized = true net.call = (method, args) -> if not initialized initialize! args = args or {} json_str = am.to_json(args) log.info("Json string sent to javascript:" .. tostring(json_str), {"net"}) result = am.eval_js("window.PEER." .. method .. "(" .. am.to_json(args) .. ")") result net.pull_peers = () -> if not initialized initialize! messages = am.eval_js("window.PEER.peer_message_queue") am.eval_js("window.PEER.peer_message_queue = []") messages net.pull_connections = () -> if not initialized initialize! messages = am.eval_js("window.PEER.connection_message_queue") am.eval_js("window.PEER.connection_message_queue = []") messages net.pull_creation = () -> if not initialized initialize! creations = am.eval_js("window.PEER.creation_queue") am.eval_js("window.PEER.creation_queue = []") creations -- Sequence of function(peer, message) | function(connection, message) -- functions of (peer,message) for peer "open","connection","call","close","disconnected","error" -- functions of (connection, message) for connection "data","open","close","error" callbacks = {} callback_info = {} -- Map of [string peerid] = Peer peers = {} --Connections are always create js side, this is just it's lua representation class Connection @connections = {} @methods = util.reverse({"data","open","close","error"}) new: (source, dest) => @source = source @dest = dest @get: (source, dest) => key = table.concat({source,dest},",") if @connections[key] return @connections[key] @@connections[key] = Connection(source,dest) @@connections[key] on: (event, callback) => if not @@methods[event] error("Tried to set an unknown event (" .. event .. ") on a connection") newid = #callbacks + 1 callbacks[newid] = callback callback_info[newid] = debug.getinfo(callback) -- Wait until the JS-side directional connection [source,dest] is ready while am.eval_js('window.PEER.connections[["' .. @source .. '","' .. @dest .. '"]] == null') coroutine.yield("Waiting for peer") -- Attach handler to this directional connection (source -> dest) net.call("conn_on", {source: @source, dest: @dest, e: event, message: newid}) send: (msgname, msg) => -- Send as array: [msgname, msg] log.info("Sending",{"net"}) res = net.call("send",{source: @source, dest: @dest, data: {msgname, msg}}) res class Peer @methods = util.reverse({"open","connection","call","close","disconnected","error"}) @max_attempts = 4 @create_timeout = 10 new: () => log.info("Creating peer...", {"net"}) net.call("create") creations = {} starttime = am.eval_js("Date.now()") attempts = 0 while #creations == 0 and attempts < @@max_attempts creations = net.pull_creation() if #creations > 1 error("Created more than 1 peer at a time, we don't know which one we are") messages = net.pull_connections() log.info("Creating peer " .. attempts .. "/" .. tostring(@@max_attempts), {"net"}) if #messages > 0 if messages[1] and messages[1].data and messages[1].data.message.type == "network" -- Try again net.call("create") attempts += 1 starttime = am.eval_js("Date.now()") else error(tostring(messages)) if am.eval_js("Date.now()") - starttime > (@@create_timeout * 1000) net.call("create") attempts += 1 starttime = am.eval_js("Date.now()") if attempts > @@max_attempts error("Failed to create host after 4 attempts") coroutine.yield! if attempts == @@max_attempts error("Failed to create peer, check https://status.peerjs.com") @id = creations[1] peers[@id] = @ log.info("Creating peer: " .. @id, {"net"}) generate_id: () => os.date("%Y%e") .. rng.numstring(4) replace_id: () => log.info("Regenerating id for peer: " .. @id, {"net"}) -- peers[@id] = nil TODO: uncomment, this breaks when running multiple peers from the same tab. net.call("delete_peer",{name: @id}) @id = @generate_id! peers[@id] = @ net.call("create", {name: @id}) on: (event, callback) => if not @@methods[event] error("Tried to set an unknown event (" .. event .. ") on a peer.") newid = #callbacks + 1 callbacks[newid] = callback net.call("on",{name: @id, message:newid, e: event}) connect: (id, options) => conn = net.call("connect", {source: @id, dest: id}) log.info("Got connection: " .. tostring(conn), {"net"}) Connection\get(conn[1],conn[2]) net.Peer = Peer -- A fake peer for testing fakepeers = {} fakeconnections = {} fakecallbacks = {} channel = require("channel") class FakePeer new: (id) => if id @id = id fakepeers[id] = @ on: (event, callback) => newid = #fakecallbacks + 1 fakecallbacks[newid] = callback connect: (id, options) => conn = channel.FaultyChannel({ avg_latency: 200 latency_std: 100 loss: 0.1 }) conn messages = {} formatcache = {} message_callbacks = {} net.register_message = (name, format) -> assert(type(format) == "table", "Format must be a table") format.required = format.required or {} format.optional = format.optional or {} if not (next(format.required) or next(format.optional)) log.warn("Message " .. name .. " registered with no fields.") for set in *({format.required, format.optional}) for field, type_ in pairs(set) if type(type_) == "string" key = string.format("%s\0%s\0%s",name,field,type_) if not formatcache[key] formatcache[key] = (any) -> assert(type(any) == type_, string.format("In message %q %q must be a %q, but was a %q", name, field, type_, type(any))) set[field] = formatcache[key] messages[name] = format message_callbacks[name] = {} log.info("Registered message type:" .. name, {"net"}) net.validate = (name, message) -> log.debug("Validating message:" .. tostring(message), {"net"}) assert(type(message) == "table", "Message must be a table") format = messages[name] assert(format, "Failed to find a format: " .. name) required = {} for field, validate in pairs(format.required) required[field] = validate for field, value in pairs(message) if format.required[field] required[field](value) required[field] = nil if format.optional[field] format.optional[field](value) missing = next(required) if missing error("Missing required field: " .. missing) true net.listen = (name, callback, id) -> id = id or {} message_callbacks[name] = message_callbacks[name] or {} message_callbacks[name][id] = callback id net.defen = (name, id) -> message_callbacks[name][id] = nil -- net.route = (conn, name, data) -> -- if message_callbacks[name] -- for id, callback in pairs(message_callbacks[name]) -- ret = message_callbacks[name](conn, net.rewrite_events = { connection: (message) -> -- message.data.data is [server, client]; build Hub-side Connection(server->client) conn = Connection\get(message.data.data[1], message.data.data[2]) assert(conn, "Failed to build conn?") assert(conn.source and conn.dest) message.data.data = conn } net.pump = () -> msg_ = net.pull_peers! if #msg_ > 0 log.info("Processing " .. tostring(#msg_) .. " peer messages", {"net"}) for message in *msg_ --log.info(tostring(message), {"net", message.data.peer}) if net.rewrite_events[message.data.e] log.info("Rewriting data due to " .. message.data.e .. " event", {"net", message.data.peer}) net.rewrite_events[message.data.e](message) log.info(tostring(message), {"net", message.data.peer}) if not message.data.peer and message.data.e == "open" log.info("Setting peerid for a peer that didn't have one " ..tostring(message), {"net"}) peer = peers[message.data.peer] assert(peer, "Failed to find peer:" .. message.data.peer .. " peers:" .. tostring(net.peers!)) callback = callbacks[message.message] assert(callback, "Failed to find callback " .. message.message .. " on peer " .. message.data.peer) callback(peer,message.data) msg_ = net.pull_connections! if #msg_ > 0 log.info("Processing " .. tostring(#msg_) .. " connection messages", {"net"}) for message in *msg_ --log.info(tostring(message), {"net", message.data.peer}) -- Extra debug for connection routing peer = message.data and message.data.peer or "nil" dest = message.data and message.data.dest or "nil" inner = message.data and message.data.data or nil log.debug("NET connection msg peer=" .. tostring(peer) .. " dest=" .. tostring(dest) .. " inner=" .. tostring(inner), {"net", "debug"}) -- For connection events, message.data is a wrapper from JS bridge. -- The actual payload from Connection:send is in message.data.data. payload = inner or message.data -- message.data.peer is the source, message.data.dest is the dest from JS bridge connection = Connection\get(message.data.peer, message.data.dest) callback = callbacks[message.message] if callback wcall = () -> callback(connection, payload) handler = (err) -> info = callback_info[message.message] string.format("Failed to call callback defined at %s:%d:\n%s", info.short_src, info.linedefined, debug.traceback(err)) assert(xpcall(wcall, handler)) else log.warn("Failed to find callback for message:" .. tostring(message),{"net"}) --assert(callback, "Failed to find callback " .. tostring(message.message) .. " for message" .. tostring(message)) net.peers = () -> peers net.node = am.group! initialize! net