From 27ece5603fc0cde89183ceb61f915fa64fef1061 Mon Sep 17 00:00:00 2001 From: Alexander Pickering Date: Wed, 29 Jul 2020 12:18:10 -0400 Subject: Added recv_any() recv_any() is a function that takes multiple sockets and waits for one or more of them to receive. See the unit test for examples. --- spec/aio_spec.lua | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++ spec/race_spec.lua | 5 ++ spec/startup_spec.lua | 74 ++++++++++++++++++++++----- 3 files changed, 203 insertions(+), 12 deletions(-) create mode 100644 spec/aio_spec.lua create mode 100644 spec/race_spec.lua (limited to 'spec') diff --git a/spec/aio_spec.lua b/spec/aio_spec.lua new file mode 100644 index 0000000..8c7c332 --- /dev/null +++ b/spec/aio_spec.lua @@ -0,0 +1,136 @@ +--[[ +test the asyncronous parts of nnng +]] +local nng = require("nng") +local lanes = require("lanes").configure() +describe("nng.aio",function() + it("should be able to create aio object",function() + print("one") + local aio = assert(nng.aio.alloc(function() end)) + end) + it("should accept a callback with arguments",function() + print("two") + local aio = assert(nng.aio.alloc(function(a,b,c) + print("hello") + end, "one", 2, "three")) + end) + --it("should call the callback after sleeping", function() + --print("three") + --local called = false + --local callback = function(lock) + --called = true + --end + --print("about to alloc") + --local aio = assert(nng.aio.alloc(callback)) + --print("done alloc") + --print("about to sleep") + --nng.aio.sleep(1,aio) + --print("whatever:",whatever) + ----nng.aio.sleep(1,aio) + --print("done sleep") + --os.execute("sleep 1") + --lock(-1) + --print("checking called...") + --print("called was",called) + --assert(called) + --end) + it("should call more than one socket getting a callback at once",function() + print("checking recv_any callback") + local s1 = assert(nng.bus0_open()) + assert(s1:listen("tcp://127.0.0.1:4000")) + + local s2 = assert(nng.bus0_open()) + assert(s2:listen("tcp://127.0.0.1:4001")) + + local s3 = assert(nng.bus0_open()) + assert(s3:dial("tcp://127.0.0.1:4000")) + assert(s3:dial("tcp://127.0.0.1:4001")) + + for i = 1, 100 do --100 times to try to trigger race conditions + --print("i:",i) + assert(s3:send("one")) + assert(s3:send("two")) + local s1_got_one, s1_got_two, s2_got_one, s2_got_two = false, false, false, false + while not (s1_got_one and s1_got_two and s2_got_one and s2_got_two) do + --local socket, message = nng.aio.recv_any(s1, s2) + --print("about to start recv_any") + --local tbl = nng.aio.recv_any(s1,s2) + for socket, message in pairs(nng.aio.recv_any(s1,s2)) do + --print("in one recv any:",socket, message) + if socket == s1 then + if message == "one" then + s1_got_one = true + elseif message == "two" then + s1_got_two = true + else + error("message socket 1:" .. message) + end + elseif socket == s2 then + if message == "one" then + s2_got_one = true + elseif message == "two" then + s2_got_two = true + else + error("message socket 2:" .. message) + end + else + error("socket:" .. tostring(socket)) + end + --print("done with recv_any",s1_got_one, s1_got_two, s2_got_one, s2_got_two) + end + end + assert(s1_got_one) + assert(s1_got_two) + assert(s2_got_one) + assert(s2_got_two) + end + print("Sucessful completion of recv_any test") + end) + it("Should accept multiple sockets of dispirate types #writing",function() + local servers = {} + local clients = {} + local messages = {} + local halfs = { + {"rep","req",true}, + {"bus","bus",nil}, + {"surveyor","respondent",false}, + {"pub","sub",false} + } + for i = 1, 10 do + local rng = math.random(#halfs) + + --Create server + servers[i] = assert(nng[halfs[rng][1] .. "0_open"]()) + local url = string.format("tcp://127.0.0.1:%d",4000 + i) + assert(servers[i]:listen(url)) + + --Create clients + local numclients = math.random(1,3) + clients[i] = {} + messages[i] = {} + for j = 1, numclients do + clients[i][j] = assert(nng[halfs[rng][2] .. "0_open"]()) + assert(clients[i][j]:dial(url)) + messages[i][j] = {} + if halfs[rng][3] == true then + --we send messages + local nummessages = math.random(5) + for k = 1, nummessages do + local message = string.format("ping_%d_%d_%d",i,j,k) + assert(clients[i][j]:send(message)) + table.insert(messages[i][j],message) + end + --elseif halfs[rng][3] == nil then + --we can send or receive messages + + else + --we receive and reply to messages + + end + end + end + for socket, message in pairs(nng.aio.recv_any(table.unpack(servers))) do + print("GOT MESSAGE:",message) + end + end) +end) diff --git a/spec/race_spec.lua b/spec/race_spec.lua new file mode 100644 index 0000000..732ae3e --- /dev/null +++ b/spec/race_spec.lua @@ -0,0 +1,5 @@ +--[[ +Test for race conditions +]] +describe("nng",function() +end) diff --git a/spec/startup_spec.lua b/spec/startup_spec.lua index dacf9e9..1b3f740 100644 --- a/spec/startup_spec.lua +++ b/spec/startup_spec.lua @@ -1,5 +1,5 @@ --[[ -test startup of nanomsg api +test startup of the nng api ]] describe("nng",function() @@ -45,27 +45,40 @@ describe("nng",function() end end) it("should be able to use a survey socket to gather information",function() + print("Starting survayor respondent test") math.randomseed(os.time()) local s = assert(nng.surveyor0_open()) + print("About to listen") assert(s:listen("ipc:///tmp/survey.ipc")) + print("S is listening") local b = {} for i = 1,100 do + print("Testing",i) local r = assert(nng.respondent0_open()) assert(r:dial("ipc:///tmp/survey.ipc")) + print("Dialed",i) b[i] = r end + print("About to send hello") assert(s:send("Hello")) + print("Done sending hello") for i = 1,100 do + print("About to recv from", i) local survey = assert(b[i]:recv()) + print("Done receving from ",i) assert(survey == "Hello") + print("About to send number back") assert(b[i]:send(string.format("%f",math.random()))) + print("Done sending number back") end local responses = {} while true do + print("Got ", #responses, "responses") local succ, msg = s:recv(nng.NNG_FLAG_NONBLOCK) if succ then table.insert(responses,tonumber(succ)) elseif msg == "Try again" then + print("Sleeping...") os.execute("sleep 1") elseif msg == "Incorrect state" then break @@ -79,18 +92,55 @@ describe("nng",function() --avg should be about 0.5 assert(avg > 0.4) assert(avg < 0.6) + print("Completed survayor respondent test") end) it("should be able to use publish and subscribe sockets to transfer information", function() - local s1 = assert(nng.pub0_open()) - local s2 = assert(nng.sub0_open()) - local s3 = assert(nng.sub0_open()) - assert(s1:listen("ipc:///tmp/pub.ipc")) - assert(s2:subscribe("")) - assert(s3:subscribe("")) - assert(s2:dial("ipc:///tmp/pub.ipc")) - assert(s3:dial("ipc:///tmp/pub.ipc")) - assert(s1:send("hello 1")) - assert(s2:recv() == "hello 1") - assert(s3:recv() == "hello 1") + print("starting pubsub test") + for i = 1,1000 do + local s1 = assert(nng.pub0_open()) + local s2 = assert(nng.sub0_open()) + local s3 = assert(nng.sub0_open()) + print("everything opened") + --local listener, err = s1:listen("tcp://127.0.0.1:1000") + local listener, err = s1:listen("ipc:///tmp/pub.ipc") + local num_addr_in_use = 0 + while err == "Address in use" do + print("Got addr in use") + num_addr_in_use = num_addr_in_use + 1 + if num_addr_in_use > 1000 then + error("After multiple attempts, failed to bind on round " .. i) + end + --listener, err = s1:listen("tcp://127.0.0.1:1000") + listener, err = s1:listen("ipc:///tmp/pub.ipc") + end + print("s1 is listeneing...") + assert(s2:subscribe("hello")) + assert(s3:subscribe("hello")) + --assert(s2:dial("tcp://127.0.0.1:1000")) + --assert(s3:dial("tcp://127.0.0.1:1000")) + assert(s2:dial("ipc:///tmp/pub.ipc")) + assert(s3:dial("ipc:///tmp/pub.ipc")) + print("Everything set up") + assert(s1:send("hello 1")) + print("sent") + assert.are_equal(s2:recv(),"hello 1") + print("received 1") + assert.are_equal(s3:recv(),"hello 1") + print("received 2") + listener:close() + s1:close() + end + print("Finishing pubsub test") + end) + describe("tcp transport",function() + it("has a keepalive option that prevents the tcp connection from closing",function() + print("starting tcp transport test") + local s1 = assert(nng.pair1_open()) + local s2 = assert(nng.pair1_open()) + --s1[nng.NNG_OPT_TCP_KEEPALIVE] = true + assert(s1:listen("tcp://127.0.0.1:1000")) + assert(s2:dial("tcp://127.0.0.1:1000")) + print("Finished tcp transport test") + end) end) end) -- cgit v1.2.3-70-g09d2