-- Hub-and-spoke networking client -- Connects to hub and provides router registration for message handling net = require "net" log = require "log" world = require "world" -- Register message types for hub->client communication net.register_message("to_client", { required: { target: "string" message_type: "string" } optional: { data: "table" } }) net.register_message("to_many_clients", { required: { targets: "table" message_type: "string" } optional: { data: "table" } }) net.register_message("broadcast", { required: { message_type: "string" } optional: { data: "table" } }) class Client new: (name) => @name = name or "anonymous" @peer = nil @hub_connection = nil @hub_id = nil @connected = false @routes = {} -- message_type -> handler function @on_connect_callbacks = {} @on_disconnect_callbacks = {} @initialized = false initialize: => if @initialized return @peer = net.Peer! log.info("Client peer created: #{@peer.id}", {"client", "net"}) @initialized = true connect_to_hub: (hub_id) => if not @initialized @initialize! @hub_id = hub_id @hub_connection = @peer\connect(hub_id) -- Set up connection handlers @hub_connection\on("open", -> @connected = true log.info("Connected to hub: #{hub_id}", {"client", "net"}) -- Send registration message using Connection:send(msgname, msg) @hub_connection\send("Join", {name: @name}) -- Surface client connect/join to the browser for integration tests. if am and am.eval_js and am.to_json js = string.format("window._clientConnectedToHub = true; window._clientJoinPayload = %s;", am.to_json({name: @name})) am.eval_js(js) -- Notify connection callbacks for callback in *@on_connect_callbacks callback! ) @hub_connection\on("data", (msgname, data) -> @handle_message(msgname, data) ) @hub_connection\on("close", -> @connected = false log.info("Disconnected from hub", {"client", "net"}) -- Notify disconnect callbacks for callback in *@on_disconnect_callbacks callback! ) while not @connected coroutine.yield! handle_message: (callback_id, message_data) => log.info("Client handle_message callback_id=" .. tostring(callback_id) .. " message_data=" .. tostring(message_data), {"net", "client", "debug"}) -- message_data is the array [message_type, data] sent by hub if type(message_data) ~= "table" or #message_data < 1 log.warn("Received invalid message format: " .. tostring(message_data), {"client", "net"}) return if type(message_data[1][1]) != "string" log.warn("Received invalid mesage type: " .. tostring(message_data[1][1]), {"client","net"}) msg_type = message_data[1][1] msg_data = message_data[1][2] or {} log.info("Message type: #{msg_type}", {"net", "client"}) if msg_type == "Join" log.error("Client saw Join message in handle_message; this should be hub-only", {"client", "net", "debug"}) if not msg_type or type(msg_type) ~= "string" log.warn("Received message without valid type:" .. tostring(msg_type), {"client", "net"}) return world.domain = "client" if @routes[msg_type] -- Route to registered handlers callbacks = @routes[msg_type] for _, callback in pairs(callbacks) callback(@hub_id, msg_data) else log.warn("No handler for message type: " .. tostring(msg_type), {"client", "net"}) msg_types = [key for key, _ in pairs(@routes)] if #msg_types > 0 log.warn("Registered message types: " .. table.concat(msg_types, ","), {"client", "net"}) -- Register a router for a specific message type -- callback is a (server-id:string, data:tbl) -> nil listen: (message_type, id, callback) => assert(type(callback) == "function", "Listened with something that is not a function") @routes[message_type] = @routes[message_type] or {} id = id or #@routes[message_type] + 1 @routes[message_type][id] = callback log.info("Router registered for #{message_type}", {"client", "net"}) id -- Unregister a router defen: (message_type, id) => if not @routes[message_type] or @routes[message_type][id] == nil log.warn("Removing listener that doesn't exist: #{message_type}", {"client", "net"}) return @routes[message_type][id] = nil log.info("Listener removed for #{message_type}", {"client", "net"}) -- Send message to hub send: (message_type, data) => if not @connected log.error("Cannot send - not connected to hub", {"client", "net"}) return false log.info("Client sending #{message_type}", {"net", "client"}) @hub_connection\send(message_type, data or {}) true on_connect: (callback) => table.insert(@on_connect_callbacks, callback) on_disconnect: (callback) => table.insert(@on_disconnect_callbacks, callback) -- Synchronus request/response for use in coroutines. sync: (request, request_data, response) => returned = nil lid = @listen(response, nil, (peer, data) -> returned = data ) @send(request, request_data) tries = 1 start = am.current_time! while not returned and tries < 4 log.info("Awaiting synchronus response to " .. request, {"net","client"}) coroutine.yield! if am.current_time! - start > 4 log.info("Async response timeout, requesting again...",{"net","client"}) @send(request, request_data) start = am.current_time! tries += 1 if tries == 4 error("Failed in sync request after 4 tries") @defen(response, lid) return returned is_connected: => @connected disconnect: => if @hub_connection @hub_connection\close! @connected = false pump: => net.pump! {:Client}