aboutsummaryrefslogtreecommitdiff
path: root/src/channel.moon
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel.moon')
-rw-r--r--src/channel.moon78
1 files changed, 78 insertions, 0 deletions
diff --git a/src/channel.moon b/src/channel.moon
new file mode 100644
index 0000000..6f87d46
--- /dev/null
+++ b/src/channel.moon
@@ -0,0 +1,78 @@
+
+-- Implements a channels, an abstraction for sending data
+
+class Channel
+ new: (settings) =>
+ settings = settings or {}
+ --Every channel has at least a buffer, the messages to be read on recv()
+ @buffer = {}
+ for setting_name, setting_value in pairs settings
+ @[setting_name] = setting_value
+ poll: =>
+ error("Channel must implement poll")
+ send: (message) =>
+ error("Channel must implement message")
+ recv: =>
+ error("Channel must implement recv")
+
+class SimpleChannel extends Channel
+ @time = 0
+ new: (settings) =>
+ super(settings)
+ poll: =>
+ #@buffer > 0
+ send: (message) =>
+ table.insert(@buffer, message)
+ recv: =>
+ table.remove(@buffer, 1)
+
+class FaultyChannel extends Channel
+ -- Mock channel for testing
+ @time = 0
+ new: (settings) =>
+ @to_deliver = {}
+ @avg_latency = 0 -- in ms
+ @latency_std = 0
+ -- Latency can never be below 0, but can go up as much as it likes
+ @loss = 0.1 -- between 0 (never) and 1 (always)
+ super(settings)
+ @normal_at: (avg, std, n) ->
+ assert(avg and std and n, string.format("normal(avg, std, n) called with %q %q %q", tostring(avg), tostring(std), tostring(n)))
+ -- Normal curve probability at N
+ (1/ (math.sqrt(2*math.pi) * avg)) * math.exp(-(n - avg)^2 / (2 * (std^2)))
+ @normal: (avg, std) =>
+ -- Box-Muller transform
+ bm = math.sqrt(-2 * math.log(math.random())) * math.cos(2 * math.pi * math.random())
+ -- Box-Muller gives us std = e^-0.5 , avg = 0
+ ((bm / math.exp(-1/2)) * std) + avg
+ poll: =>
+ @pump!
+ #@buffer > 0
+ send: (message) =>
+ -- Do we deliver?
+ rng = math.random()
+ if @loss > rng
+ return
+ -- How long does it take?
+ -- Only uses the positive half of the normal distribution, double the standard deviation?
+ time = @@normal(@avg_latency, @latency_std * 2, math.random())
+ if time < 0 then
+ time = 0 -- We can't deliver messages in the past
+ table.insert(@to_deliver, {message,@@time + time})
+ recv: =>
+ @pump!
+ table.remove(@buffer, 1)
+ pump: =>
+ defrag = 1
+ deliver_len = #@to_deliver
+ for k,tbl in ipairs(@to_deliver)
+ {m, t} = tbl
+ @to_deliver[defrag] = tbl
+ if @@time > t
+ table.insert(@buffer, m)
+ else
+ defrag = defrag + 1
+ for i = defrag, deliver_len do
+ @to_deliver[i] = nil
+
+{:Channel, :SimpleChannel, :FaultyChannel}