1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
-- Implements a channels, an abstraction for sending data
class Channel
new: (settings) =>
settings = settings or {}
--Every channel has at least a buffer, the messages to be read on recv()
@buffer = {}
for setting_name, setting_value in pairs settings
@[setting_name] = setting_value
poll: =>
error("Channel must implement poll")
send: (message) =>
error("Channel must implement message")
recv: =>
error("Channel must implement recv")
class SimpleChannel extends Channel
@time = 0
new: (settings) =>
super(settings)
poll: =>
#@buffer > 0
send: (message) =>
table.insert(@buffer, message)
recv: =>
table.remove(@buffer, 1)
class FaultyChannel extends Channel
-- Mock channel for testing
@time = 0
new: (settings) =>
@to_deliver = {}
@avg_latency = 0 -- in ms
@latency_std = 0
-- Latency can never be below 0, but can go up as much as it likes
@loss = 0.1 -- between 0 (never) and 1 (always)
super(settings)
@normal_at: (avg, std, n) ->
assert(avg and std and n, string.format("normal(avg, std, n) called with %q %q %q", tostring(avg), tostring(std), tostring(n)))
-- Normal curve probability at N
(1/ (math.sqrt(2*math.pi) * avg)) * math.exp(-(n - avg)^2 / (2 * (std^2)))
@normal: (avg, std) =>
-- Box-Muller transform
bm = math.sqrt(-2 * math.log(math.random())) * math.cos(2 * math.pi * math.random())
-- Box-Muller gives us std = e^-0.5 , avg = 0
((bm / math.exp(-1/2)) * std) + avg
poll: =>
@pump!
#@buffer > 0
send: (message) =>
-- Do we deliver?
rng = math.random()
if @loss > rng
return
-- How long does it take?
-- Only uses the positive half of the normal distribution, double the standard deviation?
time = @@normal(@avg_latency, @latency_std * 2, math.random())
if time < 0 then
time = 0 -- We can't deliver messages in the past
table.insert(@to_deliver, {message,@@time + time})
recv: =>
@pump!
table.remove(@buffer, 1)
pump: =>
defrag = 1
deliver_len = #@to_deliver
for k,tbl in ipairs(@to_deliver)
{m, t} = tbl
@to_deliver[defrag] = tbl
if @@time > t
table.insert(@buffer, m)
else
defrag = defrag + 1
for i = defrag, deliver_len do
@to_deliver[i] = nil
{:Channel, :SimpleChannel, :FaultyChannel}
|