1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
-- Handles the bridge to javascript to do peer-to-peer connections
log = require("log")
net = {}
net.initalize = () ->
am.eval_js(require("js_bridge"))
net.call = (method, args) ->
print("About to eval")
result = am.eval_js("window.PEER." .. method .. "(" .. am.to_json(args) .. ")")
print("Done evaling")
result
net.pull = () ->
messages = am.eval_js("window.PEER.message_queue")
am.eval_js("window.PEER.message_queue = []")
messages
callbacks = {}
peers = {}
connections = {}
class Connection
new: (source, dest) =>
@source = source
@dest = dest
on: (event, callback) =>
newid = #callbacks + 1
callbacks[newid] = callback
net.call("conn_on", {name: @source, id: @dest, e: event, message: newid})
send: (msgname, msg) =>
net.validate(msgname, msg)
net.call("send",{name: @source, id: @dest, data: msg})
class Peer
new: (id) =>
net.call("create",{name: id})
if id
@id = id
peers[id] = @
on: (event, callback) =>
newid = #callbacks + 1
callbacks[newid] = callback
net.call("on",{name: @id, message:newid, e: event})
connect: (id, options) =>
conn = net.call("connect", {name: @id, id: id})
log.info("Got connection: " .. tostring(conn), {"net"})
Connection(conn[1],conn[2])
net.Peer = Peer
-- A fake peer for testing
fakepeers = {}
fakeconnections = {}
fakecallbacks = {}
channel = require("channel")
class FakePeer
new: (id) =>
if id
@id = id
fakepeers[id] = @
on: (event, callback) =>
newid = #fakecallbacks + 1
fakecallbacks[newid] = callback
connect: (id, options) =>
conn = channel.FaultyChannel({
avg_latency: 200
latency_std: 100
loss: 0.1
})
conn
messages = {}
formatcache = {}
message_callbacks = {}
net.register_message = (name, format) ->
assert(type(format) == "table", "Format must be a table")
format.required = format.required or {}
format.optional = format.optional or {}
assert(next(format.required) or next(format.optional), "No fields found")
for set in *({format.required, format.optional})
for field, type_ in pairs(format.required)
if type(type_) == "string"
key = string.format("%s\0%s\0%s",name,field,type_)
if not formatcache[key]
formatcache[key] = (any) ->
assert(type(any) == type_, string.format("In message %q %q must be a %q, but was a %q", name, field, type_, type(any)))
set[field] = formatcache[key]
messages[name] = format
message_callbacks[name] = {}
net.validate = (name, message) ->
assert(type(message) == "table", "Message must be a table")
format = messages[name]
required = {}
for field, validate in pairs(format.required)
required[field] = validate
for field, value in pairs(message)
if format.required[field]
required[field](value)
required[field] = nil
if format.optional[field]
format.optional[field](value)
missing = next(required)
if missing
error("Missing required field: " .. missing)
true
net.listen = (name, callback, id) ->
id = id or {}
message_callbacks[name] = message_callbacks[name] or {}
message_callbacks[name][id] = callback
id
net.defen = (name, id) ->
message_callbacks[name][id] = nil
rewrite_events = {
connection: (message) ->
message.data.data = Connection(message.data.data[1], message.data.data[2])
}
net.pump = () ->
msg_ = net.pull!
log.info("Processing " .. tostring(#msg_) .. " messages", {"net"})
for message in *msg_
log.info(tostring(message), {"net", message.data.peer})
if rewrite_events[message.data.e]
log.info("Rewriting data due to " .. message.data.e .. " event", {"net", message.data.peer})
rewrite_events[message.data.e](message)
log.info(tostring(message), {"net", message.data.peer})
peer = peers[message.data.peer]
assert(peer, "Failed to find peer:" .. message.data.peer)
callback = callbacks[message.message]
assert(callback, "Failed to find callback " .. message.message .. " on peer " .. message.data.peer)
callback(peer,message.data)
net.peers = () ->
peers
net
|