From 105ea66aba31825512fea8f281b67f9c2a1d79a5 Mon Sep 17 00:00:00 2001 From: Alexander Pickering Date: Tue, 15 Sep 2020 03:40:00 -0400 Subject: dos2unix --- README.md | 2 +- spec/aio_spec.lua | 272 +++++----- spec/race_spec.lua | 10 +- spec/startup_spec.lua | 294 +++++------ src/lua-nng-aio.c | 375 +++++++------- src/lua-nng-common.c | 44 +- src/lua-nng.c | 1347 +++++++++++++++++++++++++------------------------ 7 files changed, 1176 insertions(+), 1168 deletions(-) diff --git a/README.md b/README.md index 188af88..af9b5d4 100644 --- a/README.md +++ b/README.md @@ -23,5 +23,5 @@ luarocks install --server=http://rocks.cogarr.net lua-nng prints "hello" -For more examples, see sepc/start\_spec.lua +For more examples, see spec/start\_spec.lua diff --git a/spec/aio_spec.lua b/spec/aio_spec.lua index 75e4a0a..093ce59 100644 --- a/spec/aio_spec.lua +++ b/spec/aio_spec.lua @@ -1,136 +1,136 @@ ---[[ -test the asyncronous parts of nnng -]] -local nng = require("nng") -local lanes = require("lanes").configure() -describe("nng.aio",function() - it("should be able to create aio object",function() - print("one") - local aio = assert(nng.aio.alloc(function() end)) - end) - it("should accept a callback with arguments",function() - print("two") - local aio = assert(nng.aio.alloc(function(a,b,c) - print("hello") - end, "one", 2, "three")) - end) - --it("should call the callback after sleeping", function() - --print("three") - --local called = false - --local callback = function(lock) - --called = true - --end - --print("about to alloc") - --local aio = assert(nng.aio.alloc(callback)) - --print("done alloc") - --print("about to sleep") - --nng.aio.sleep(1,aio) - --print("whatever:",whatever) - ----nng.aio.sleep(1,aio) - --print("done sleep") - --os.execute("sleep 1") - --lock(-1) - --print("checking called...") - --print("called was",called) - --assert(called) - --end) - it("should call more than one socket getting a callback at once",function() - print("checking recv_any callback") - local s1 = assert(nng.bus0_open()) - assert(s1:listen("tcp://127.0.0.1:4000")) - - local s2 = assert(nng.bus0_open()) - assert(s2:listen("tcp://127.0.0.1:4001")) - - local s3 = assert(nng.bus0_open()) - assert(s3:dial("tcp://127.0.0.1:4000")) - assert(s3:dial("tcp://127.0.0.1:4001")) - - for i = 1, 100 do --100 times to try to trigger race conditions - --print("i:",i) - assert(s3:send("one")) - assert(s3:send("two")) - local s1_got_one, s1_got_two, s2_got_one, s2_got_two = false, false, false, false - while not (s1_got_one and s1_got_two and s2_got_one and s2_got_two) do - --local socket, message = nng.aio.recv_any(s1, s2) - --print("about to start recv_any") - --local tbl = nng.aio.recv_any(s1,s2) - for socket, message in pairs(nng.aio.recv_any(s1,s2)) do - --print("in one recv any:",socket, message) - if socket == s1 then - if message == "one" then - s1_got_one = true - elseif message == "two" then - s1_got_two = true - else - error("message socket 1:" .. message) - end - elseif socket == s2 then - if message == "one" then - s2_got_one = true - elseif message == "two" then - s2_got_two = true - else - error("message socket 2:" .. message) - end - else - error("socket:" .. tostring(socket)) - end - --print("done with recv_any",s1_got_one, s1_got_two, s2_got_one, s2_got_two) - end - end - assert(s1_got_one) - assert(s1_got_two) - assert(s2_got_one) - assert(s2_got_two) - end - print("Sucessful completion of recv_any test") - end) - --it("Should accept multiple sockets of dispirate types #writing",function() - --local servers = {} - --local clients = {} - --local messages = {} - --local halfs = { - --{"rep","req",true}, - --{"bus","bus",nil}, - --{"surveyor","respondent",false}, - --{"pub","sub",false} - --} - --for i = 1, 10 do - --local rng = math.random(#halfs) - - ----Create server - --servers[i] = assert(nng[halfs[rng][1] .. "0_open"]()) - --local url = string.format("tcp://127.0.0.1:%d",4000 + i) - --assert(servers[i]:listen(url)) - - ----Create clients - --local numclients = math.random(1,3) - --clients[i] = {} - --messages[i] = {} - --for j = 1, numclients do - --clients[i][j] = assert(nng[halfs[rng][2] .. "0_open"]()) - --assert(clients[i][j]:dial(url)) - --messages[i][j] = {} - --if halfs[rng][3] == true then - ----we send messages - --local nummessages = math.random(5) - --for k = 1, nummessages do - --local message = string.format("ping_%d_%d_%d",i,j,k) - --assert(clients[i][j]:send(message)) - --table.insert(messages[i][j],message) - --end - ----elseif halfs[rng][3] == nil then - ----we can send or receive messages - - --else - ----we receive and reply to messages - - --end - --end - --end - --for socket, message in pairs(nng.aio.recv_any(table.unpack(servers))) do - --print("GOT MESSAGE:",message) - --end - --end) -end) +--[[ +test the asyncronous parts of nnng +]] +local nng = require("nng") +local lanes = require("lanes").configure() +describe("nng.aio",function() + it("should be able to create aio object",function() + print("one") + local aio = assert(nng.aio.alloc(function() end)) + end) + it("should accept a callback with arguments",function() + print("two") + local aio = assert(nng.aio.alloc(function(a,b,c) + print("hello") + end, "one", 2, "three")) + end) + --it("should call the callback after sleeping", function() + --print("three") + --local called = false + --local callback = function(lock) + --called = true + --end + --print("about to alloc") + --local aio = assert(nng.aio.alloc(callback)) + --print("done alloc") + --print("about to sleep") + --nng.aio.sleep(1,aio) + --print("whatever:",whatever) + ----nng.aio.sleep(1,aio) + --print("done sleep") + --os.execute("sleep 1") + --lock(-1) + --print("checking called...") + --print("called was",called) + --assert(called) + --end) + it("should call more than one socket getting a callback at once",function() + print("checking recv_any callback") + local s1 = assert(nng.bus0_open()) + assert(s1:listen("tcp://127.0.0.1:4000")) + + local s2 = assert(nng.bus0_open()) + assert(s2:listen("tcp://127.0.0.1:4001")) + + local s3 = assert(nng.bus0_open()) + assert(s3:dial("tcp://127.0.0.1:4000")) + assert(s3:dial("tcp://127.0.0.1:4001")) + + for i = 1, 100 do --100 times to try to trigger race conditions + --print("i:",i) + assert(s3:send("one")) + assert(s3:send("two")) + local s1_got_one, s1_got_two, s2_got_one, s2_got_two = false, false, false, false + while not (s1_got_one and s1_got_two and s2_got_one and s2_got_two) do + --local socket, message = nng.aio.recv_any(s1, s2) + --print("about to start recv_any") + --local tbl = nng.aio.recv_any(s1,s2) + for socket, message in pairs(nng.aio.recv_any(s1,s2)) do + --print("in one recv any:",socket, message) + if socket == s1 then + if message == "one" then + s1_got_one = true + elseif message == "two" then + s1_got_two = true + else + error("message socket 1:" .. message) + end + elseif socket == s2 then + if message == "one" then + s2_got_one = true + elseif message == "two" then + s2_got_two = true + else + error("message socket 2:" .. message) + end + else + error("socket:" .. tostring(socket)) + end + --print("done with recv_any",s1_got_one, s1_got_two, s2_got_one, s2_got_two) + end + end + assert(s1_got_one) + assert(s1_got_two) + assert(s2_got_one) + assert(s2_got_two) + end + print("Sucessful completion of recv_any test") + end) + --it("Should accept multiple sockets of dispirate types #writing",function() + --local servers = {} + --local clients = {} + --local messages = {} + --local halfs = { + --{"rep","req",true}, + --{"bus","bus",nil}, + --{"surveyor","respondent",false}, + --{"pub","sub",false} + --} + --for i = 1, 10 do + --local rng = math.random(#halfs) + + ----Create server + --servers[i] = assert(nng[halfs[rng][1] .. "0_open"]()) + --local url = string.format("tcp://127.0.0.1:%d",4000 + i) + --assert(servers[i]:listen(url)) + + ----Create clients + --local numclients = math.random(1,3) + --clients[i] = {} + --messages[i] = {} + --for j = 1, numclients do + --clients[i][j] = assert(nng[halfs[rng][2] .. "0_open"]()) + --assert(clients[i][j]:dial(url)) + --messages[i][j] = {} + --if halfs[rng][3] == true then + ----we send messages + --local nummessages = math.random(5) + --for k = 1, nummessages do + --local message = string.format("ping_%d_%d_%d",i,j,k) + --assert(clients[i][j]:send(message)) + --table.insert(messages[i][j],message) + --end + ----elseif halfs[rng][3] == nil then + ----we can send or receive messages + + --else + ----we receive and reply to messages + + --end + --end + --end + --for socket, message in pairs(nng.aio.recv_any(table.unpack(servers))) do + --print("GOT MESSAGE:",message) + --end + --end) +end) diff --git a/spec/race_spec.lua b/spec/race_spec.lua index 732ae3e..b0a7f65 100644 --- a/spec/race_spec.lua +++ b/spec/race_spec.lua @@ -1,5 +1,5 @@ ---[[ -Test for race conditions -]] -describe("nng",function() -end) +--[[ +Test for race conditions +]] +describe("nng",function() +end) diff --git a/spec/startup_spec.lua b/spec/startup_spec.lua index e0faa82..94b0d39 100644 --- a/spec/startup_spec.lua +++ b/spec/startup_spec.lua @@ -1,147 +1,147 @@ ---[[ -test startup of the nng api -]] - -describe("nng",function() - local nng - it("should be included with require()",function() - nng = require("nng") - end) - it("should be able to create sockets",function() - local socket = assert(nng.pair1_open()) - end) - it("should be able to extablish a connection over inter-process communication",function() - local s1 = assert(nng.pair1_open()) - local s2 = assert(nng.pair1_open()) - assert(s1:listen("ipc:///tmp/pair.ipc")) - assert(s2:dial("ipc://tmp/pair.ipc")) - - assert(s2:send("hello")) - local rec = assert(s1:recv()) - assert(rec == "hello","Failed to receive hello, received:" .. rec) - end) - it("should be able to use a bus socket to distribute information",function() - local b = {} - for i = 1,10 do - local s = assert(nng.bus0_open()) - b[i] = s - end - for i = 1,10 do - local ipcaddr = string.format("ipc:///tmp/bus_%d.ipc",i) - assert(b[i]:listen(ipcaddr)) - end - for i = 1,10 do - for j = 1,10 do - if i ~= j then - local addr = string.format("ipc:///tmp/bus_%d.ipc",j) - assert(b[i]:dial(addr)) - end - end - end - assert(b[1]:send("Hello")) - for i = 2,10 do - local msg = assert(b[i]:recv()) - assert(msg == "Hello") - end - end) - it("should be able to use a survey socket to gather information",function() - math.randomseed(os.time()) - local s = assert(nng.surveyor0_open()) - assert(s:listen("ipc:///tmp/survey.ipc")) - local b = {} - for i = 1,100 do - local r = assert(nng.respondent0_open()) - assert(r:dial("ipc:///tmp/survey.ipc")) - b[i] = r - end - assert(s:send("Hello")) - for i = 1,100 do - local survey = assert(b[i]:recv()) - assert(survey == "Hello") - assert(b[i]:send(string.format("%f",math.random()))) - end - local responses = {} - while true do - local succ, msg = s:recv(nng.NNG_FLAG_NONBLOCK) - if succ then - table.insert(responses,tonumber(succ)) - elseif msg == "Try again" then - os.execute("sleep 1") - elseif msg == "Incorrect state" then - break - end - end - local avg = 0 - for _,v in pairs(responses) do - avg = avg + v - end - avg = avg / #responses - --avg should be about 0.5 - assert(avg > 0.4, "Average was:" .. avg) - assert(avg < 0.6, "Average was:" .. avg) - end) - it("should be able to use publish and subscribe sockets to transfer information", function() - --for i = 1,1000 do - for i = 1,10 do - local s1 = assert(nng.pub0_open()) - local s2 = assert(nng.sub0_open()) - local s3 = assert(nng.sub0_open()) - --local listener, err = s1:listen("tcp://127.0.0.1:1000") - local listener, err = s1:listen("ipc:///tmp/pub.ipc") - local dialers = {} - local num_addr_in_use = 0 - while err == "Address in use" do - num_addr_in_use = num_addr_in_use + 1 - if num_addr_in_use > 10 then - error("After multiple attempts, failed to bind on round " .. i) - end - listener, err = s1:listen("tcp://127.0.0.1:1000") - --listener, err = s1:listen("ipc:///tmp/pub.ipc") - end - assert(s2:dial("tcp://127.0.0.1:1000")) - assert(s3:dial("tcp://127.0.0.1:1000")) - --local d1 = assert(s2:dial("ipc:///tmp/pub.ipc")) - --local d2 = assert(s3:dial("ipc:///tmp/pub.ipc")) - assert(s2:subscribe("")) - assert(s3:subscribe("")) - table.insert(dialers, d1) - table.insert(dialers, d2) - assert(s1:send("hello 1")) - local r1 = assert(s2:recv()) - assert.are_equal(r1,"hello 1") - local r2 = assert(s2:recv()) - assert.are_equal(s2,"hello 1") - listener:close() - s1:close() - s2:close() - s3:close() - end - end) - describe("socket option",function() - describe("LOCADDR",function() - it("should return the local address for a socket",function() - local nng = require("nng") - local s1 = assert(nng.bus0_open()) - local s2 = assert(nng.bus0_open()) - local listener = assert(s1:listen("tcp://127.0.0.1:1001")) - local dialer = assert(s2:dial("tcp://127.0.0.1:1001")) - --local listener = s1:listen("ipc:///tmp/locaddr.ipc") - --local dialer = s2:dial("ipc:///tmp/locaddr.ipc") - assert(s1:send("test")) - assert(s2:recv() == "test") - local addr = dialer[nng.NNG_OPT_URL] - end) - end) - end) - describe("tcp transport",function() - it("has a keepalive option that prevents the tcp connection from closing #writing2",function() - local nng = require("nng") - local s1 = assert(nng.pair1_open()) - local s2 = assert(nng.pair1_open()) - s1[nng.NNG_OPT_TCP_KEEPALIVE] = true - --print(s1[nng.NNG_OPT_TCP_KEEPALIVE]) - assert(s1:listen("tcp://127.0.0.1:1000")) - assert(s2:dial("tcp://127.0.0.1:1000")) - end) - end) -end) +--[[ +test startup of the nng api +]] + +describe("nng",function() + local nng + it("should be included with require()",function() + nng = require("nng") + end) + it("should be able to create sockets",function() + local socket = assert(nng.pair1_open()) + end) + it("should be able to extablish a connection over inter-process communication",function() + local s1 = assert(nng.pair1_open()) + local s2 = assert(nng.pair1_open()) + assert(s1:listen("ipc:///tmp/pair.ipc")) + assert(s2:dial("ipc://tmp/pair.ipc")) + + assert(s2:send("hello")) + local rec = assert(s1:recv()) + assert(rec == "hello","Failed to receive hello, received:" .. rec) + end) + it("should be able to use a bus socket to distribute information",function() + local b = {} + for i = 1,10 do + local s = assert(nng.bus0_open()) + b[i] = s + end + for i = 1,10 do + local ipcaddr = string.format("ipc:///tmp/bus_%d.ipc",i) + assert(b[i]:listen(ipcaddr)) + end + for i = 1,10 do + for j = 1,10 do + if i ~= j then + local addr = string.format("ipc:///tmp/bus_%d.ipc",j) + assert(b[i]:dial(addr)) + end + end + end + assert(b[1]:send("Hello")) + for i = 2,10 do + local msg = assert(b[i]:recv()) + assert(msg == "Hello") + end + end) + it("should be able to use a survey socket to gather information",function() + math.randomseed(os.time()) + local s = assert(nng.surveyor0_open()) + assert(s:listen("ipc:///tmp/survey.ipc")) + local b = {} + for i = 1,100 do + local r = assert(nng.respondent0_open()) + assert(r:dial("ipc:///tmp/survey.ipc")) + b[i] = r + end + assert(s:send("Hello")) + for i = 1,100 do + local survey = assert(b[i]:recv()) + assert(survey == "Hello") + assert(b[i]:send(string.format("%f",math.random()))) + end + local responses = {} + while true do + local succ, msg = s:recv(nng.NNG_FLAG_NONBLOCK) + if succ then + table.insert(responses,tonumber(succ)) + elseif msg == "Try again" then + os.execute("sleep 1") + elseif msg == "Incorrect state" then + break + end + end + local avg = 0 + for _,v in pairs(responses) do + avg = avg + v + end + avg = avg / #responses + --avg should be about 0.5 + assert(avg > 0.4, "Average was:" .. avg) + assert(avg < 0.6, "Average was:" .. avg) + end) + it("should be able to use publish and subscribe sockets to transfer information", function() + --for i = 1,1000 do + for i = 1,10 do + local s1 = assert(nng.pub0_open()) + local s2 = assert(nng.sub0_open()) + local s3 = assert(nng.sub0_open()) + --local listener, err = s1:listen("tcp://127.0.0.1:1000") + local listener, err = s1:listen("ipc:///tmp/pub.ipc") + local dialers = {} + local num_addr_in_use = 0 + while err == "Address in use" do + num_addr_in_use = num_addr_in_use + 1 + if num_addr_in_use > 10 then + error("After multiple attempts, failed to bind on round " .. i) + end + listener, err = s1:listen("tcp://127.0.0.1:1000") + --listener, err = s1:listen("ipc:///tmp/pub.ipc") + end + assert(s2:dial("tcp://127.0.0.1:1000")) + assert(s3:dial("tcp://127.0.0.1:1000")) + --local d1 = assert(s2:dial("ipc:///tmp/pub.ipc")) + --local d2 = assert(s3:dial("ipc:///tmp/pub.ipc")) + assert(s2:subscribe("")) + assert(s3:subscribe("")) + table.insert(dialers, d1) + table.insert(dialers, d2) + assert(s1:send("hello 1")) + local r1 = assert(s2:recv()) + assert.are_equal(r1,"hello 1") + local r2 = assert(s2:recv()) + assert.are_equal(s2,"hello 1") + listener:close() + s1:close() + s2:close() + s3:close() + end + end) + describe("socket option",function() + describe("LOCADDR",function() + it("should return the local address for a socket",function() + local nng = require("nng") + local s1 = assert(nng.bus0_open()) + local s2 = assert(nng.bus0_open()) + local listener = assert(s1:listen("tcp://127.0.0.1:1001")) + local dialer = assert(s2:dial("tcp://127.0.0.1:1001")) + --local listener = s1:listen("ipc:///tmp/locaddr.ipc") + --local dialer = s2:dial("ipc:///tmp/locaddr.ipc") + assert(s1:send("test")) + assert(s2:recv() == "test") + local addr = dialer[nng.NNG_OPT_URL] + end) + end) + end) + describe("tcp transport",function() + it("has a keepalive option that prevents the tcp connection from closing #writing2",function() + local nng = require("nng") + local s1 = assert(nng.pair1_open()) + local s2 = assert(nng.pair1_open()) + s1[nng.NNG_OPT_TCP_KEEPALIVE] = true + --print(s1[nng.NNG_OPT_TCP_KEEPALIVE]) + assert(s1:listen("tcp://127.0.0.1:1000")) + assert(s2:dial("tcp://127.0.0.1:1000")) + end) + end) +end) diff --git a/src/lua-nng-aio.c b/src/lua-nng-aio.c index 707dbec..beedbff 100644 --- a/src/lua-nng-aio.c +++ b/src/lua-nng-aio.c @@ -1,187 +1,188 @@ -#include -#include -#include -#include - -#define NNG_STATIC_LIB - -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "lua-nng-common.h" -#include "lua-nng.h" - -struct callback_info { - lua_State *L; - nng_mtx *lmutex; //mutex for the lua state - nng_mtx *cmutex; - nng_cv *cv; - int socketref; //lua refrence to this socket - nng_aio *aio; - int completed; -}; - -void push_callback(void *v){ - /*printf("aio callback received!\n");*/ - struct callback_info *ci = (struct callback_info*)v; - lua_State *L = ci->L; - /*printf("About to lock lua state in callback\n");*/ - nng_mtx_lock(ci->lmutex);//lock the lua state - /*printf("Done locking lua state in callback\n");*/ - int err = nng_aio_result(ci->aio); - if(err != 0){ - /*printf("This callback was canceled or timed out: %d: %s\n", err, nng_strerror(err));*/ - nng_mtx_unlock(ci->lmutex); - return; - } - lua_rawgeti(L,LUA_REGISTRYINDEX,ci->socketref);//push socket - luaL_unref(L,LUA_REGISTRYINDEX,ci->socketref);//free the reference - nng_msg *msg = nng_aio_get_msg(ci->aio); - size_t len = nng_msg_len(msg); - void *body = nng_msg_body(msg); - lua_pushlstring(L,(const char*)body, len);//push the message - nng_msg_free(msg); - ci->completed = 1; - lua_settable(L,1); - /*printf("About to unlock lua state in callback\n");*/ - nng_mtx_unlock(ci->lmutex); - /*printf("About to lock condition mutex in callback\n");*/ - nng_mtx_lock(ci->cmutex); - /*printf("About wake condition\n");*/ - nng_cv_wake(ci->cv); - /*printf("Done wake condition\n");*/ - nng_mtx_unlock(ci->cmutex); - /*printf("Done with callback\n");*/ -} - -//recv_any(socket1, socket2, ...) :: {socket = message} -int lnng_aio_recv(lua_State *L){ - nng_mtx *luamtx, *callbackmtx; - int err = nng_mtx_alloc(&luamtx); - err |= nng_mtx_alloc(&callbackmtx); - /*err |= nng_mtx_alloc(&setupmtx);*/ - if(err != 0){ - /*printf("Something when wrong when allocating mutexes\n");*/ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } - int argv = lua_gettop(L); - /*printf("Receiving any on %d sockets\n",argv);*/ - struct callback_info **cis = (struct callback_info**)malloc(sizeof(struct callback_info*) * argv); - nng_mtx_lock(luamtx); - /*printf("Locked lua state\n");*/ - nng_cv *cv; - nng_cv_alloc(&cv, callbackmtx); - /*nng_mtx_lock(callbackmtx);//wait for one of the callbacks to happen*/ - /*printf("Callback 1 happened\n");*/ - for(int i = 0; i < argv; i++){ - /*printf("\tSetting up async %d\n", i);*/ - nng_socket *sock = tosocket(L,-1); - int sref = luaL_ref(L,LUA_REGISTRYINDEX); - /*printf("\tGot socket ref %d\n", sref);*/ - cis[i] = (struct callback_info*)malloc(sizeof(struct callback_info)); - struct callback_info *ci = cis[i]; - /*printf("\tLooking at ci %p\n",ci);*/ - ci->L = L; - ci->lmutex = luamtx; - ci->cmutex = callbackmtx; - ci->socketref = sref; - ci->completed = 0; - ci->cv = cv; - /*printf("\tAbout to alloc aio\n");*/ - nng_aio_alloc(&(ci->aio), push_callback, ci); - /*printf("\tAllocated aio\n");*/ - /*printf("\tEverything else set on callback info\n");*/ - nng_recv_aio(*sock, ci->aio); - /*printf("\tSet up async receive %d\n",i);*/ - } - lua_newtable(L);//table that will hold [socket] = message - /*printf("About to unlock lua state\n");*/ - nng_mtx_unlock(luamtx); - /*printf("Unlocked lua state\n");*/ - /*nng_mtx_lock(callbackmtx);//was unlocked by the callback, luamtx is locked at this point*/ - int complete = 0; - nng_mtx_lock(callbackmtx); - while(complete == 0){ - for(int i = 0; i < argv; i++){ - struct callback_info *ci = cis[i]; - if(ci->completed > 0){ - /*printf("At least 1 completed! breaking!\n");*/ - complete = 1; - goto found; - } - } - /*printf("About to wait\n");*/ - nng_cv_wait(cv); - /*printf("Done waiting, complete is: %d\n", complete);*/ - } - found: - /*printf("Callback 2 happened\n");*/ - nng_mtx_unlock(callbackmtx); - /*printf("Callback done\n");*/ - for(int i = 0; i < argv; i++){ - struct callback_info *ci = cis[i]; - /*printf("About to stop aio %d\n",ci->aio);*/ - /*nng_aio_cancel(ci->aio);*/ - nng_aio_stop(ci->aio); - nng_aio_free(ci->aio); - free(ci); - /*printf("Stopped aio %d\n",ci->aio);*/ - } - free(cis); - /*printf("Freeing things\n");*/ - nng_cv_free(cv); - nng_mtx_free(callbackmtx); - //nng_mtx_unlock(luamtx);//mutexes must not be locked when they are freed - nng_mtx_free(luamtx); - /*printf("Done freeing everything, returning...\n");*/ - return 1; -} - -static const struct luaL_Reg nng_aio_handler_m[] = { - {NULL, NULL} -}; - -static const struct luaL_Reg nng_aio_mutex_m[] = { - {NULL, NULL} -}; - - -static const struct luaL_Reg nng_http_f[] = { - {"recv_any",lnng_aio_recv}, - {NULL, NULL} -}; - -int luaopen_nng_aio(lua_State *L){ - luaL_newmetatable(L,"nng.aio.struct"); - luaL_newlib(L,nng_aio_handler_m); - lua_setfield(L,-2,"__index"); - lua_pop(L,1); - - luaL_newmetatable(L,"nng.aio.mutex"); - luaL_newlib(L,nng_aio_mutex_m); - lua_setfield(L,-2,"__index"); - lua_pop(L,1); - - luaL_newlib(L,nng_http_f); - return 1; -} +#include +#include +#include +#include + +#define NNG_STATIC_LIB + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "lua-nng-common.h" +#include "lua-nng.h" + +struct callback_info { + lua_State *L; + nng_mtx *lmutex; //mutex for the lua state + nng_mtx *cmutex; + nng_cv *cv; + int socketref; //lua refrence to this socket + nng_aio *aio; + int completed; +}; + +void push_callback(void *v){ + /*printf("aio callback received!\n");*/ + struct callback_info *ci = (struct callback_info*)v; + lua_State *L = ci->L; + /*printf("About to lock lua state in callback\n");*/ + nng_mtx_lock(ci->lmutex);//lock the lua state + /*printf("Done locking lua state in callback\n");*/ + int err = nng_aio_result(ci->aio); + if(err != 0){ + /*printf("This callback was canceled or timed out: %d: %s\n", err, nng_strerror(err));*/ + nng_mtx_unlock(ci->lmutex); + return; + } + lua_rawgeti(L,LUA_REGISTRYINDEX,ci->socketref);//push socket + luaL_unref(L,LUA_REGISTRYINDEX,ci->socketref);//free the reference + nng_msg *msg = nng_aio_get_msg(ci->aio); + size_t len = nng_msg_len(msg); + void *body = nng_msg_body(msg); + lua_pushlstring(L,(const char*)body, len);//push the message + nng_msg_free(msg); + ci->completed = 1; + lua_settable(L,1); + /*printf("About to unlock lua state in callback\n");*/ + nng_mtx_unlock(ci->lmutex); + /*printf("About to lock condition mutex in callback\n");*/ + nng_mtx_lock(ci->cmutex); + /*printf("About wake condition\n");*/ + nng_cv_wake(ci->cv); + /*printf("Done wake condition\n");*/ + nng_mtx_unlock(ci->cmutex); + /*printf("Done with callback\n");*/ +} + +//recv_any(socket1, socket2, ...) :: {socket = message} +int lnng_aio_recv(lua_State *L){ + nng_mtx *luamtx, *callbackmtx; + int err = nng_mtx_alloc(&luamtx); + err |= nng_mtx_alloc(&callbackmtx); + /*err |= nng_mtx_alloc(&setupmtx);*/ + if(err != 0){ + /*printf("Something when wrong when allocating mutexes\n");*/ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } + int argv = lua_gettop(L); + /*printf("Receiving any on %d sockets\n",argv);*/ + struct callback_info **cis = (struct callback_info**)malloc(sizeof(struct callback_info*) * argv); + nng_mtx_lock(luamtx); + /*printf("Locked lua state\n");*/ + nng_cv *cv; + nng_cv_alloc(&cv, callbackmtx); + /*nng_mtx_lock(callbackmtx);//wait for one of the callbacks to happen*/ + /*printf("Callback 1 happened\n");*/ + for(int i = 0; i < argv; i++){ + /*printf("\tSetting up async %d\n", i);*/ + nng_socket *sock = tosocket(L,-1); + int sref = luaL_ref(L,LUA_REGISTRYINDEX); + /*printf("\tGot socket ref %d\n", sref);*/ + cis[i] = (struct callback_info*)malloc(sizeof(struct callback_info)); + struct callback_info *ci = cis[i]; + /*printf("\tLooking at ci %p\n",ci);*/ + ci->L = L; + ci->lmutex = luamtx; + ci->cmutex = callbackmtx; + ci->socketref = sref; + ci->completed = 0; + ci->cv = cv; + /*printf("\tAbout to alloc aio\n");*/ + nng_aio_alloc(&(ci->aio), push_callback, ci); + /*printf("\tAllocated aio\n");*/ + /*printf("\tEverything else set on callback info\n");*/ + nng_recv_aio(*sock, ci->aio); + /*printf("\tSet up async receive %d\n",i);*/ + } + lua_newtable(L);//table that will hold [socket] = message + /*printf("About to unlock lua state\n");*/ + nng_mtx_unlock(luamtx); + /*printf("Unlocked lua state\n");*/ + /*nng_mtx_lock(callbackmtx);//was unlocked by the callback, luamtx is locked at this point*/ + int complete = 0; + nng_mtx_lock(callbackmtx); + while(complete == 0){ + for(int i = 0; i < argv; i++){ + struct callback_info *ci = cis[i]; + if(ci->completed > 0){ + /*printf("At least 1 completed! breaking!\n");*/ + complete = 1; + goto found; + } + } + /*printf("About to wait\n");*/ + nng_cv_wait(cv); + /*printf("Done waiting, complete is: %d\n", complete);*/ + } + found: + /*printf("Callback 2 happened\n");*/ + nng_mtx_unlock(callbackmtx); + /*printf("Callback done\n");*/ + for(int i = 0; i < argv; i++){ + struct callback_info *ci = cis[i]; + /*printf("About to stop aio %d\n",ci->aio);*/ + /*nng_aio_cancel(ci->aio);*/ + luaL_unref(L,LUA_REGISTRYINDEX,ci->socketref); + nng_aio_stop(ci->aio); + nng_aio_free(ci->aio); + free(ci); + /*printf("Stopped aio %d\n",ci->aio);*/ + } + free(cis); + /*printf("Freeing things\n");*/ + nng_cv_free(cv); + nng_mtx_free(callbackmtx); + //nng_mtx_unlock(luamtx);//mutexes must not be locked when they are freed + nng_mtx_free(luamtx); + /*printf("Done freeing everything, returning...\n");*/ + return 1; +} + +static const struct luaL_Reg nng_aio_handler_m[] = { + {NULL, NULL} +}; + +static const struct luaL_Reg nng_aio_mutex_m[] = { + {NULL, NULL} +}; + + +static const struct luaL_Reg nng_http_f[] = { + {"recv_any",lnng_aio_recv}, + {NULL, NULL} +}; + +int luaopen_nng_aio(lua_State *L){ + luaL_newmetatable(L,"nng.aio.struct"); + luaL_newlib(L,nng_aio_handler_m); + lua_setfield(L,-2,"__index"); + lua_pop(L,1); + + luaL_newmetatable(L,"nng.aio.mutex"); + luaL_newlib(L,nng_aio_mutex_m); + lua_setfield(L,-2,"__index"); + lua_pop(L,1); + + luaL_newlib(L,nng_http_f); + return 1; +} diff --git a/src/lua-nng-common.c b/src/lua-nng-common.c index a12ed79..a7485e5 100644 --- a/src/lua-nng-common.c +++ b/src/lua-nng-common.c @@ -1,22 +1,22 @@ -#include "lua-nng-common.h" -/*Just copy+paste lua's runtime traceback funtion*/ -int traceback (lua_State *L) { - if (!lua_isstring(L, 1)) /* 'message' not a string? */ - return 1; /* keep it intact */ - lua_getglobal(L,"debug"); - if (!lua_istable(L, -1)) { - lua_pop(L, 1); - return 1; - } - lua_getfield(L, -1, "traceback"); - if (!lua_isfunction(L, -1)) { - lua_pop(L, 2); - return 1; - } - lua_pushvalue(L, 1); /* pass error message */ - lua_pushinteger(L, 2); /* skip this function and traceback */ - lua_call(L, 2, 1); /* call debug.traceback */ - printf("%s\n",lua_tostring(L,-1)); - return 1; -} - +#include "lua-nng-common.h" +/*Just copy+paste lua's runtime traceback funtion*/ +int traceback (lua_State *L) { + if (!lua_isstring(L, 1)) /* 'message' not a string? */ + return 1; /* keep it intact */ + lua_getglobal(L,"debug"); + if (!lua_istable(L, -1)) { + lua_pop(L, 1); + return 1; + } + lua_getfield(L, -1, "traceback"); + if (!lua_isfunction(L, -1)) { + lua_pop(L, 2); + return 1; + } + lua_pushvalue(L, 1); /* pass error message */ + lua_pushinteger(L, 2); /* skip this function and traceback */ + lua_call(L, 2, 1); /* call debug.traceback */ + printf("%s\n",lua_tostring(L,-1)); + return 1; +} + diff --git a/src/lua-nng.c b/src/lua-nng.c index 68ea659..a57634e 100644 --- a/src/lua-nng.c +++ b/src/lua-nng.c @@ -1,670 +1,677 @@ -#include -#include -#include - -#define NNG_STATIC_LIB - -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include -#include "lua-nng-aio.h" - -#define OPEN(name)\ - int lnng_ ## name ## _open(lua_State *L){\ - nng_socket *s = (nng_socket*)lua_newuserdata(L,sizeof(nng_socket));\ - int err = nng_ ## name ## _open(s);\ - if(err == 0){\ - luaL_setmetatable(L,"nng.socket");\ - return 1;\ - }else{\ - lua_pushboolean(L,0);\ - lua_pushstring(L,nng_strerror(err));\ - return 2;\ - }\ - } - -OPEN(bus0); -OPEN(pair1); -OPEN(pub0); -OPEN(sub0); -OPEN(pull0); -OPEN(push0); -OPEN(req0); -OPEN(rep0); -OPEN(surveyor0); -OPEN(respondent0); - -//sleep(ms) -int lnng_msleep(lua_State *L){ - int ms = luaL_checkinteger(L,1); - nng_msleep(ms); - lua_pop(L,1); - return 0; -} - -nng_socket* tosocket(lua_State *L, int offset){ - luaL_checkudata(L,offset,"nng.socket"); - return (nng_socket*)lua_touserdata(L,offset); -} - -nng_listener* tolistener(lua_State *L, int offset){ - luaL_checkudata(L,offset,"nng.listener"); - return (nng_listener*)lua_touserdata(L,offset); -} - -nng_dialer* todialer(lua_State *L, int offset){ - luaL_checkudata(L,offset,"nng.dialer"); - return (nng_dialer*)lua_touserdata(L,offset); -} - -nng_sockaddr* tosockaddr(lua_State *L, int offset){ - luaL_checkudata(L,offset,"nng.sockaddr"); - return (nng_sockaddr*)lua_touserdata(L,offset); -} - - -//socket:listen(url[, flags]) :: listener -int lnng_listen(lua_State *L){ - int argc = lua_gettop(L); - int flags = 0; - nng_socket *sock = tosocket(L,1); - const char *url = luaL_checkstring(L,2); - if(argc >= 3){ - flags = luaL_checkinteger(L,3); - } - lua_pop(L,argc); - nng_listener *lp = (nng_listener*)lua_newuserdata(L,sizeof(nng_listener)); - int err = nng_listen(*sock, url, lp, flags); - if(err == 0){ - luaL_setmetatable(L,"nng.listener"); - return 1; - }else{ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } -} - -//socket:dial(url[, flags]) :: dialer -int lnng_dial(lua_State *L){ - int argc = lua_gettop(L); - int flags = 0; - nng_socket *sock = tosocket(L,1); - const char *url = luaL_checkstring(L,2); - if(argc >= 3){ - flags = luaL_checkinteger(L,3); - } - lua_pop(L,argc); - nng_dialer *dp = (nng_dialer*)lua_newuserdata(L,sizeof(nng_dialer)); - int err = nng_dial(*sock, url, dp, flags); - if(err == 0){ - luaL_setmetatable(L,"nng.dialer"); - return 1; - }else{ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } -} - -//socket:send("data"[, flags]) -int lnng_send(lua_State *L){ - int argc = lua_gettop(L); - int flags = 0; - nng_socket *sock = tosocket(L,1); - size_t datasize; - const char *data = luaL_checklstring(L,2,&datasize); - if(argc >= 3){ - flags = luaL_checkinteger(L,3); - } - lua_pop(L,argc); - int err = nng_send(*sock, (void*)data, datasize, flags); - if(err == 0){ - lua_pushboolean(L,1); - return 1; - }else{ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } -} - -//socket:recv([flags]) -int lnng_recv(lua_State *L){ - int argc = lua_gettop(L); - int flags = NNG_FLAG_ALLOC; //don't support zero copy - nng_socket *sock = tosocket(L,1); - if(argc >= 2){ - flags += luaL_checkinteger(L,2); - } - char *data = NULL; - size_t datasize; - int err = nng_recv(*sock, &data, &datasize, flags); - if(err == 0){ - lua_pushlstring(L,data,datasize); - nng_free(data,datasize); - return 1; - }else{ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } -} - -//socket:close() -int lnng_socket_close(lua_State *L){ - /*printf("Garbage collecting socket...");*/ - nng_socket *sock = tosocket(L,1); - int err = nng_close(*sock); - lua_pop(L,1); - return 0; -} - -//close(ud_dialer) -int lnng_dialer_close(lua_State *L){ - nng_dialer *dp = (nng_dialer*)lua_touserdata(L,1); - nng_dialer_close(*dp); - lua_pop(L,1); - return 0; -} - -//close(ud_listener) -int lnng_listener_close(lua_State *L){ - nng_listener *lp = (nng_listener*)lua_touserdata(L,1); - int err = nng_listener_close(*lp); - lua_pop(L,1); - return 0; -} - -//subscribe(socket,"topic") -int lnng_subscribe(lua_State *L){ - nng_socket *sock = tosocket(L,1); - size_t size; - const char *topic = luaL_checklstring(L,2,&size); - lua_pop(L,2); - int err = nng_socket_set(*sock,NNG_OPT_SUB_SUBSCRIBE,topic,size); - if(err == 0){ - lua_pushboolean(L,1); - return 1; - }else{ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } -} - -//unsubscribe(socket,"topic") -int lnng_unsubscribe(lua_State *L){ - nng_socket *sock = tosocket(L,1); - size_t size; - const char *topic = luaL_checklstring(L,2,&size); - lua_pop(L,2); - int err = nng_socket_set(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic,size); - if(err == 0){ - lua_pushboolean(L,1); - return 1; - }else{ - lua_pushboolean(L,0); - lua_pushstring(L,nng_strerror(err)); - return 2; - } -} - -//Option types -#define SOCKET_OPTION_SET(L, socket, flag, matches, ntype, gets, sets) \ - if(strcmp(flag, matches) == 0){\ - ntype value = (ntype)gets(L,3);\ - int err = sets(*socket, flag, value);\ - lua_pop(L,lua_gettop(L));\ - if(err != 0){\ - lua_pushboolean(L,0);\ - lua_pushfstring(L,nng_strerror(err));\ - return 2;\ - }else{\ - return 0;\ - }\ - } - -#define SOCKET_OPTION_GET(L, socket, flag, matches, ntype, gets, pushes) \ - if(strcmp(flag, matches) == 0){\ - ntype value;\ - int err = gets(*socket, flag, &value);\ - lua_pop(L,lua_gettop(L));\ - if(err != 0){\ - lua_pushboolean(L,0);\ - lua_pushfstring(L,nng_strerror(err));\ - return 2;\ - }else{\ - pushes(L,value);\ - return 1;\ - }\ - } - -//TODO -//set(listener,"flag",value) -int lnng_listener_set(lua_State *L){ - return 0; -} - -int lnng_sockaddr_get(lua_State *L){ - nng_sockaddr *sa = tosockaddr(L,1); - unsigned int f = sa->s_family; - const char *field = luaL_checkstring(L,2); - lua_pop(L,2); - printf("Getting %s from sockaddr\n",field); - if(strcmp(field,"type") == 0){ - lua_pushnumber(L,f); - return 1; - } - if(f == NNG_AF_UNSPEC){ - printf("Unspec\n"); - lua_pushnil(L); - return 1; - }else if(f == NNG_AF_INPROC){ - printf("Inproc\n"); - nng_sockaddr_inproc sai = sa->s_inproc; - if(strcmp(field,"name") == 0){ - lua_pushstring(L, sai.sa_name); - return 1; - } - }else if(f == NNG_AF_IPC){ - printf("IPC\n"); - nng_sockaddr_ipc sai = sa->s_ipc; - if(strcmp(field,"path") == 0){ - lua_pushstring(L, sai.sa_path); - return 1; - } - }else if(f == NNG_AF_INET){ - printf("Inet\n"); - nng_sockaddr_in sai = sa->s_in; - if(strcmp(field,"addr") == 0){ - lua_pushnumber(L, sai.sa_addr); - return 1; - }else if(strcmp(field,"port") == 0){ - lua_pushnumber(L, sai.sa_port); - return 1; - } - }else if(f == NNG_AF_INET6){ - nng_sockaddr_in6 sai = sa->s_in6; - if(strcmp(field,"addr") == 0){ - lua_pushlstring(L,sai.sa_addr,16); - return 1; - }else if(strcmp(field,"port") == 0){ - lua_pushnumber(L,sai.sa_port); - return 1; - } - }else if(f == NNG_AF_ZT){ - nng_sockaddr_zt sai = sa->s_zt; - if(strcmp(field,"nwid") == 0){ - lua_pushnumber(L,sai.sa_nwid); - return 1; - }else if(strcmp(field,"nodeid") == 0){ - lua_pushnumber(L,sai.sa_nodeid); - return 1; - }else if(strcmp(field,"port") == 0){ - lua_pushnumber(L,sai.sa_port); - return 1; - } - } -} - -//set(socket, "flag", value) -int lnng_socket_set(lua_State *L){ - nng_socket *sock = tosocket(L,1); - const char *flag = luaL_checkstring(L,2); - //NNG_OPT_LOCADDR - read-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, luaL_checkinteger, nng_socket_set_ms); - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, luaL_checkinteger, nng_socket_set_ms); - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVBUF, int, luaL_checkinteger, nng_socket_set_int); - //NNG_OPT_RECVFD - read-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, luaL_checkinteger, nng_socket_set_uint64); - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms); - //NNG_OPT_REMADDR - read-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDBUF, int, luaL_checkinteger, nng_socket_set_int); - //NNG_OPT_SENDFD - read-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms); - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SOCKNAME, const char*, luaL_checkstring, nng_socket_set_string); - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_MAXTTL, int, luaL_checkinteger, nng_socket_set_int); - //NNG_OPT_UR - read-only - //NNG_OPT_PROTO - read-only - //NNG_OPT_PEER - read-only - //NNG_OPT_PROTONAME - read-only - //NNG_OPT_PEERNAME - read-only - - //TCP options - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, lua_toboolean, nng_socket_set_bool); - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, lua_toboolean, nng_socket_set_bool); - //NNG_OPT_TCP_BOUND_PORT - read-only? documentation doesn't say it, but it would be wierd if we could write to it. - - //TLS options - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_AUTH_MODE, int, luaL_checkinteger, nng_socket_set_int); //write-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CA_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CERT_KEY_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only - //SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS TODO: NNG_OPT_TLS_CONFIG - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_SERVER_NAME, const char*, luaL_checkstring, nng_socket_set_string); - //NNG_OPT_TLS_VERIFIED - read-only - - //IPC options - //NNG_OPT_IPC_PEER_GID - read-only - //NNG_OPT_IPC_PEER_PID - read-only - //NNG_OPT_IPC_PEER_UID - read-only - //NNG_OPT_IPC_PEER_ZONEID - read-only - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_IPC_PERMISSIONS, int, luaL_checkinteger, nng_socket_set_int); - //TODO: NNG_OPT_IPC_SECURITY_DESCRIPTOR - windows-only, sets a pointer to a PSECURITY_DESCRIPTOR - - //PUB/SUB options - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, lua_toboolean, nng_socket_set_bool); - - //REQ/REP options - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms); - - //Survayor/respondent options - SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms); -} - -//get(socket,"flag",value) -int lnng_socket_get(lua_State *L){ - /*printf("Lua stack is %d\n",lua_gettop(L));*/ - nng_socket *sock = tosocket(L,1); - const char *flag = luaL_checkstring(L,2); - //NNG_OPT_LOCADDR - listeners, dialers, and connected pipes - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RAW, bool, nng_socket_get_bool, lua_pushboolean); //read-only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, nng_socket_get_ms, lua_pushinteger);//sockets and dialers, if both dialer and socket are set, dialer overrides socket - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, nng_socket_get_ms, lua_pushinteger); //sockets and dialers, if both dialer and socket are set, dialer overrides socket - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVBUF, int, nng_socket_get_int, lua_pushinteger);//socket only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVFD, int, nng_socket_get_int, lua_pushinteger); //socket only, read-only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, nng_socket_get_size, lua_pushinteger);//sockets, dialers and listeners - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger);//socket only - if(strcmp(flag, NNG_OPT_REMADDR) == 0){ - lua_pop(L,lua_gettop(L)); - nng_sockaddr *sa = (nng_sockaddr*)lua_newuserdata(L,sizeof(nng_sockaddr));//{udata} - int err = nng_socket_get_addr(*sock, NNG_OPT_REMADDR, sa);//{udata} - if(err == 0){ - luaL_setmetatable(L,"nng.sockaddr"); - return 1; - }else{ - return luaL_error(L,"Failed to lookup local address - %s",nng_strerror(err)); - } - } - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDBUF, int, nng_socket_get_int, lua_pushinteger); - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDFD, int, nng_socket_get_int, lua_pushinteger); //read-only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger); - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SOCKNAME, char*, nng_socket_get_string, lua_pushstring); - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_MAXTTL, int, nng_socket_get_int, lua_pushinteger); - //SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_URL, char*, nng_socket_get_string, lua_pushstring); //read-only for dialers, listeners, and pipes - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTO, int, nng_socket_get_int, lua_pushinteger); //read-only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEER, int, nng_socket_get_int, lua_pushinteger); //read-only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTONAME, char*, nng_socket_get_string, lua_pushstring); //read-only - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEERNAME, char*, nng_socket_get_string, lua_pushstring); //read-only - - //TCP options - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, nng_socket_get_bool, lua_pushboolean); - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, nng_socket_get_bool, lua_pushboolean); - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_BOUND_PORT, int, nng_socket_get_int, lua_pushinteger); - - //TLS options - //NNG_OPT_TLS_MODE - write-only option - //NNG_OPT_TLS_CA_FILE - write-only option - //NNG_OPT_TLS_CERT_KEY_FILE - write-only option - //TODO: NNG_OPT_TLS_CONFIG - //NNG_OPT_TLS_SERVER_NAME - write-only option - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TLS_VERIFIED, bool, nng_socket_get_bool, lua_pushboolean); //read-only option - - //IPC options - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_GID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_PID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_UID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_ZONEID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option on Solaris and illumos systems only - //NNG_OPT_IPC_PERMISSIONS - write-only option - //NNG_OPT_IPC_SECURITY_DESCRIPTOR - write-only option - - //PUB/SUB options - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, nng_socket_get_bool, lua_pushboolean); - - //REQ/REP options - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, nng_socket_get_ms, lua_pushinteger); - - //Survayor/respondent options - SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, nng_socket_get_ms, lua_pushinteger); - lua_pop(L,2); - - //If none of the above options matched, get the value from the metatable - int type = luaL_getmetatable(L,"nng.socket_m");////{socket_m} - /*luaL_newlib(L,nng_socket_m);*/ - lua_getfield(L,-1,"__index"); - lua_getfield(L,-1,flag);//{socket_m},{socket},any - int ref = luaL_ref(L,LUA_REGISTRYINDEX);//{socket_m},{socket} - lua_pop(L,2);// - lua_rawgeti(L,LUA_REGISTRYINDEX,ref);//any - return 1; -} - -/*#define DIALER_OPTION_SET(L, socket, flag, matches, ntype, gets, sets) \*/ - /*if(strcmp(flag, matches) == 0){\*/ - /*ntype value = (ntype)gets(L,3);\*/ - /*int err = sets(*socket, flag, value);\*/ - /*lua_pop(L,lua_gettop(L));\*/ - /*if(err != 0){\*/ - /*lua_pushboolean(L,0);\*/ - /*lua_pushfstring(L,nng_strerror(err));\*/ - /*return 2;\*/ - /*}else{\*/ - /*return 0;\*/ - /*}\*/ - /*}*/ - -/*#define DIALER_OPTION_GET(L, dialer, flag, matches, ntype, gets, pushes) \*/ - /*if(strcmp(flag, matches) == 0){\*/ - /*ntype value;\*/ - /*int err = gets(*socket, flag, &value);\*/ - /*lua_pop(L,lua_gettop(L));\*/ - /*if(err != 0){\*/ - /*lua_pushboolean(L,0);\*/ - /*lua_pushfstring(L,nng_strerror(err));\*/ - /*return 2;\*/ - /*}else{\*/ - /*pushes(L,value);\*/ - /*return 1;\*/ - /*}\*/ - /*}*/ - - -int lnng_dialer_get(lua_State *L){ - nng_dialer *dialer = todialer(L,1); - const char *flag = luaL_checkstring(L,2); - if(strcmp(flag, NNG_OPT_LOCADDR) == 0){ - printf("Looking for dialer's locaddr\n"); - lua_pop(L,lua_gettop(L)); - nng_sockaddr *sa = (nng_sockaddr*)lua_newuserdata(L,sizeof(nng_sockaddr));//{udata} - int err = nng_dialer_get_addr(*dialer , NNG_OPT_LOCADDR, sa);//{udata} - if(err == 0){ - luaL_setmetatable(L,"nng.sockaddr"); - return 1; - }else{ - return luaL_error(L,"Failed to lookup local address - %s",nng_strerror(err)); - } - } - SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_RECONNMINT, nng_duration, nng_dialer_get_ms, lua_pushinteger);//sockets and dialers, if both dialer and socket are set, dialer overrides socket - SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_RECONNMAXT, nng_duration, nng_dialer_get_ms, lua_pushinteger); //sockets and dialers, if both dialer and socket are set, dialer overrides socket - SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_RECVMAXSZ, size_t, nng_dialer_get_size, lua_pushinteger);//sockets, dialers and listeners - if(strcmp(flag, NNG_OPT_REMADDR) == 0){ - lua_pop(L,lua_gettop(L)); - nng_sockaddr *sa = (nng_sockaddr*)lua_newuserdata(L,sizeof(nng_sockaddr));//{udata} - int err = nng_dialer_get_addr(*dialer, NNG_OPT_REMADDR, sa);//{udata} - if(err == 0){ - luaL_setmetatable(L,"nng.sockaddr"); - return 1; - }else{ - return luaL_error(L,"Failed to lookup remote address - %s",nng_strerror(err)); - } - } - SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_URL, char*, nng_dialer_get_string, lua_pushstring); //read-only for dialers, listeners, and pipes - - //If none of the above options matched, get the value from the metatable - int type = luaL_getmetatable(L,"nng.dialer_m");////{socket_m} - /*luaL_newlib(L,nng_socket_m);*/ - lua_getfield(L,-1,"__index"); - lua_getfield(L,-1,flag);//{socket_m},{socket},any - int ref = luaL_ref(L,LUA_REGISTRYINDEX);//{socket_m},{socket} - lua_pop(L,2);// - lua_rawgeti(L,LUA_REGISTRYINDEX,ref);//any - return 1; -} - -int lnng_dialer_set(lua_State *L){ - -} - -static const struct luaL_Reg nng_dialer_m[] = { - {"close",lnng_dialer_close}, - {NULL, NULL} -}; - -static const struct luaL_Reg nng_listener_m[] = { - {"close",lnng_listener_close}, - {NULL, NULL} -}; - -static const struct luaL_Reg nng_socket_m[] = { - {"dial", lnng_dial}, - {"listen", lnng_listen}, - {"send", lnng_send}, - {"recv", lnng_recv}, - {"close", lnng_socket_close}, - - //pub/sub only - {"subscribe",lnng_subscribe}, - {"unsubscribe",lnng_unsubscribe}, - {NULL, NULL} -}; - -static const struct luaL_Reg nng_f[] = { - {"bus0_open", lnng_bus0_open}, - {"pair1_open", lnng_pair1_open}, - {"pub0_open", lnng_pub0_open}, - {"sub0_open", lnng_sub0_open}, - {"pull0_open", lnng_pull0_open}, - {"push0_open", lnng_push0_open}, - {"req0_open", lnng_req0_open}, - {"rep0_open", lnng_rep0_open}, - {"surveyor0_open",lnng_surveyor0_open}, - {"respondent0_open",lnng_respondent0_open}, - {"sleep",lnng_msleep}, - {NULL, NULL} -}; - - -#define flag(name) lua_pushnumber(L,name); lua_setfield(L,-2,#name); -#define option(name) lua_pushstring(L,name); lua_setfield(L,-2,#name); -int luaopen_nng(lua_State *L){ - luaL_newmetatable(L,"nng.socket_m");//{} - luaL_newlib(L,nng_socket_m);//{},{socket_m} - lua_setfield(L,-2,"__index");//{__index={socket_m}} - lua_pop(L,1);// - - luaL_newmetatable(L,"nng.socket");//{} - lua_pushcfunction(L,lnng_socket_get);//{},socket_get() - lua_setfield(L,-2,"__index");//{__index = socket_get()} - lua_pushcfunction(L,lnng_socket_close);//{__index = {socket_m}},close() - lua_setfield(L,-2,"__gc");//{__index = {socket_m}, __gc = close()} - lua_pushcfunction(L,lnng_socket_set);//{__index = {socket_m}, __gc = close()}, set() - lua_setfield(L,-2,"__newindex");//{__index = {socket_m}, __gc = close(), __newindex = set()} - lua_pop(L,1); - - luaL_newmetatable(L,"nng.dialer");//{} - lua_pushcfunction(L,lnng_dialer_get);//{},dialer_get() - lua_setfield(L,-2,"__index"); - lua_pushcfunction(L,lnng_dialer_set); - lua_setfield(L,-2,"__newindex"); - - luaL_newmetatable(L,"nng.dialer_m"); - luaL_newlib(L,nng_dialer_m); - lua_setfield(L,-2,"__index"); - /*lua_pushcfunction(L,lnng_dialer_close);*/ - /*lua_setfield(L,-2,"__gc");*/ - lua_pop(L,1); - - luaL_newmetatable(L,"nng.listener"); - luaL_newlib(L,nng_listener_m); - lua_setfield(L,-2,"__index"); - /*lua_pushcfunction(L,lnng_listener_close);*/ - /*lua_setfield(L,-2,"__gc");*/ - lua_pop(L,1); - - luaL_newmetatable(L,"nng.sockaddr");//{nng.sockaddr} - lua_pushcfunction(L,lnng_sockaddr_get);//{nng.sockaddr},sockaddr_get - lua_setfield(L,-2,"__index");//{nng_sockaddr} - lua_pop(L,1); - - luaL_newlib(L,nng_f); - luaopen_nng_aio(L); - lua_setfield(L,-2,"aio"); - - //Flags - flag(NNG_FLAG_NONBLOCK); - flag(NNG_FLAG_ALLOC); - - //Options - option(NNG_OPT_SOCKNAME); - option(NNG_OPT_SOCKNAME); - option(NNG_OPT_RAW); - option(NNG_OPT_PROTO); - option(NNG_OPT_PROTONAME); - option(NNG_OPT_PEER); - option(NNG_OPT_PEERNAME); - option(NNG_OPT_RECVBUF); - option(NNG_OPT_SENDBUF); - option(NNG_OPT_RECVFD); - option(NNG_OPT_SENDFD); - option(NNG_OPT_RECVTIMEO); - option(NNG_OPT_SENDTIMEO); - option(NNG_OPT_LOCADDR); - option(NNG_OPT_REMADDR); - option(NNG_OPT_URL); - option(NNG_OPT_MAXTTL); - option(NNG_OPT_RECVMAXSZ); - option(NNG_OPT_RECONNMINT); - option(NNG_OPT_RECONNMAXT); - - //TCP options - option(NNG_OPT_TCP_NODELAY); - option(NNG_OPT_TCP_KEEPALIVE); - option(NNG_OPT_TCP_BOUND_PORT); - - //IPC options - option(NNG_OPT_IPC_PEER_GID); - option(NNG_OPT_IPC_PEER_PID); - option(NNG_OPT_IPC_PEER_UID); - option(NNG_OPT_IPC_PEER_ZONEID); - option(NNG_OPT_IPC_PERMISSIONS); - option(NNG_OPT_IPC_SECURITY_DESCRIPTOR); - - //Pub/sub options - /*option(NNG_OPT_SUB_SUBSCRIBE);//should use socket:subscribe() instead (so that nil is not counted as part of the subscription)*/ - /*option(NNG_OPT_SUB_UNSUBSCRIBE);//should use socket:unsubscribe() instead (same reason as above)*/ - option(NNG_OPT_SUB_PREFNEW); - - //Req/rep options - option(NNG_OPT_SURVEYOR_SURVEYTIME); - - return 1; -} +#include +#include +#include + +#define NNG_STATIC_LIB + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include "lua-nng-aio.h" + +#define OPEN(name)\ + int lnng_ ## name ## _open(lua_State *L){\ + nng_socket *s = (nng_socket*)lua_newuserdata(L,sizeof(nng_socket));\ + int err = nng_ ## name ## _open(s);\ + if(err == 0){\ + luaL_setmetatable(L,"nng.socket");\ + return 1;\ + }else{\ + lua_pushboolean(L,0);\ + lua_pushstring(L,nng_strerror(err));\ + return 2;\ + }\ + } + +/*** +Opens a socket using the bus scaleability protocol +Sockets are not connected to anything at initalization, and some options may error. +See the [options](/options) section for information on options. See [nng_bus](https://nng.nanomsg.org/man/tip/nng_bus.7.html) for more information on the protocol. +@function nng.open_bus0() +@returns nng.socket A socket using the bus protocol +*/ +OPEN(bus0); +OPEN(pair1); +OPEN(pub0); +OPEN(sub0); +OPEN(pull0); +OPEN(push0); +OPEN(req0); +OPEN(rep0); +OPEN(surveyor0); +OPEN(respondent0); + +//sleep(ms) +int lnng_msleep(lua_State *L){ + int ms = luaL_checkinteger(L,1); + nng_msleep(ms); + lua_pop(L,1); + return 0; +} + +nng_socket* tosocket(lua_State *L, int offset){ + luaL_checkudata(L,offset,"nng.socket"); + return (nng_socket*)lua_touserdata(L,offset); +} + +nng_listener* tolistener(lua_State *L, int offset){ + luaL_checkudata(L,offset,"nng.listener"); + return (nng_listener*)lua_touserdata(L,offset); +} + +nng_dialer* todialer(lua_State *L, int offset){ + luaL_checkudata(L,offset,"nng.dialer"); + return (nng_dialer*)lua_touserdata(L,offset); +} + +nng_sockaddr* tosockaddr(lua_State *L, int offset){ + luaL_checkudata(L,offset,"nng.sockaddr"); + return (nng_sockaddr*)lua_touserdata(L,offset); +} + + +//socket:listen(url[, flags]) :: listener +int lnng_listen(lua_State *L){ + int argc = lua_gettop(L); + int flags = 0; + nng_socket *sock = tosocket(L,1); + const char *url = luaL_checkstring(L,2); + if(argc >= 3){ + flags = luaL_checkinteger(L,3); + } + lua_pop(L,argc); + nng_listener *lp = (nng_listener*)lua_newuserdata(L,sizeof(nng_listener)); + int err = nng_listen(*sock, url, lp, flags); + if(err == 0){ + luaL_setmetatable(L,"nng.listener"); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +//socket:dial(url[, flags]) :: dialer +int lnng_dial(lua_State *L){ + int argc = lua_gettop(L); + int flags = 0; + nng_socket *sock = tosocket(L,1); + const char *url = luaL_checkstring(L,2); + if(argc >= 3){ + flags = luaL_checkinteger(L,3); + } + lua_pop(L,argc); + nng_dialer *dp = (nng_dialer*)lua_newuserdata(L,sizeof(nng_dialer)); + int err = nng_dial(*sock, url, dp, flags); + if(err == 0){ + luaL_setmetatable(L,"nng.dialer"); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +//socket:send("data"[, flags]) +int lnng_send(lua_State *L){ + int argc = lua_gettop(L); + int flags = 0; + nng_socket *sock = tosocket(L,1); + size_t datasize; + const char *data = luaL_checklstring(L,2,&datasize); + if(argc >= 3){ + flags = luaL_checkinteger(L,3); + } + lua_pop(L,argc); + int err = nng_send(*sock, (void*)data, datasize, flags); + if(err == 0){ + lua_pushboolean(L,1); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +//socket:recv([flags]) +int lnng_recv(lua_State *L){ + int argc = lua_gettop(L); + int flags = NNG_FLAG_ALLOC; //don't support zero copy + nng_socket *sock = tosocket(L,1); + if(argc >= 2){ + flags += luaL_checkinteger(L,2); + } + char *data = NULL; + size_t datasize; + int err = nng_recv(*sock, &data, &datasize, flags); + if(err == 0){ + lua_pushlstring(L,data,datasize); + nng_free(data,datasize); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +//socket:close() +int lnng_socket_close(lua_State *L){ + /*printf("Garbage collecting socket...");*/ + nng_socket *sock = tosocket(L,1); + int err = nng_close(*sock); + lua_pop(L,1); + return 0; +} + +//close(ud_dialer) +int lnng_dialer_close(lua_State *L){ + nng_dialer *dp = (nng_dialer*)lua_touserdata(L,1); + nng_dialer_close(*dp); + lua_pop(L,1); + return 0; +} + +//close(ud_listener) +int lnng_listener_close(lua_State *L){ + nng_listener *lp = (nng_listener*)lua_touserdata(L,1); + int err = nng_listener_close(*lp); + lua_pop(L,1); + return 0; +} + +//subscribe(socket,"topic") +int lnng_subscribe(lua_State *L){ + nng_socket *sock = tosocket(L,1); + size_t size; + const char *topic = luaL_checklstring(L,2,&size); + lua_pop(L,2); + int err = nng_socket_set(*sock,NNG_OPT_SUB_SUBSCRIBE,topic,size); + if(err == 0){ + lua_pushboolean(L,1); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +//unsubscribe(socket,"topic") +int lnng_unsubscribe(lua_State *L){ + nng_socket *sock = tosocket(L,1); + size_t size; + const char *topic = luaL_checklstring(L,2,&size); + lua_pop(L,2); + int err = nng_socket_set(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic,size); + if(err == 0){ + lua_pushboolean(L,1); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +//Option types +#define SOCKET_OPTION_SET(L, socket, flag, matches, ntype, gets, sets) \ + if(strcmp(flag, matches) == 0){\ + ntype value = (ntype)gets(L,3);\ + int err = sets(*socket, flag, value);\ + lua_pop(L,lua_gettop(L));\ + if(err != 0){\ + lua_pushboolean(L,0);\ + lua_pushfstring(L,nng_strerror(err));\ + return 2;\ + }else{\ + return 0;\ + }\ + } + +#define SOCKET_OPTION_GET(L, socket, flag, matches, ntype, gets, pushes) \ + if(strcmp(flag, matches) == 0){\ + ntype value;\ + int err = gets(*socket, flag, &value);\ + lua_pop(L,lua_gettop(L));\ + if(err != 0){\ + lua_pushboolean(L,0);\ + lua_pushfstring(L,nng_strerror(err));\ + return 2;\ + }else{\ + pushes(L,value);\ + return 1;\ + }\ + } + +//TODO +//set(listener,"flag",value) +int lnng_listener_set(lua_State *L){ + return 0; +} + +int lnng_sockaddr_get(lua_State *L){ + nng_sockaddr *sa = tosockaddr(L,1); + unsigned int f = sa->s_family; + const char *field = luaL_checkstring(L,2); + lua_pop(L,2); + printf("Getting %s from sockaddr\n",field); + if(strcmp(field,"type") == 0){ + lua_pushnumber(L,f); + return 1; + } + if(f == NNG_AF_UNSPEC){ + printf("Unspec\n"); + lua_pushnil(L); + return 1; + }else if(f == NNG_AF_INPROC){ + printf("Inproc\n"); + nng_sockaddr_inproc sai = sa->s_inproc; + if(strcmp(field,"name") == 0){ + lua_pushstring(L, sai.sa_name); + return 1; + } + }else if(f == NNG_AF_IPC){ + printf("IPC\n"); + nng_sockaddr_ipc sai = sa->s_ipc; + if(strcmp(field,"path") == 0){ + lua_pushstring(L, sai.sa_path); + return 1; + } + }else if(f == NNG_AF_INET){ + printf("Inet\n"); + nng_sockaddr_in sai = sa->s_in; + if(strcmp(field,"addr") == 0){ + lua_pushnumber(L, sai.sa_addr); + return 1; + }else if(strcmp(field,"port") == 0){ + lua_pushnumber(L, sai.sa_port); + return 1; + } + }else if(f == NNG_AF_INET6){ + nng_sockaddr_in6 sai = sa->s_in6; + if(strcmp(field,"addr") == 0){ + lua_pushlstring(L,sai.sa_addr,16); + return 1; + }else if(strcmp(field,"port") == 0){ + lua_pushnumber(L,sai.sa_port); + return 1; + } + }else if(f == NNG_AF_ZT){ + nng_sockaddr_zt sai = sa->s_zt; + if(strcmp(field,"nwid") == 0){ + lua_pushnumber(L,sai.sa_nwid); + return 1; + }else if(strcmp(field,"nodeid") == 0){ + lua_pushnumber(L,sai.sa_nodeid); + return 1; + }else if(strcmp(field,"port") == 0){ + lua_pushnumber(L,sai.sa_port); + return 1; + } + } +} + +//set(socket, "flag", value) +int lnng_socket_set(lua_State *L){ + nng_socket *sock = tosocket(L,1); + const char *flag = luaL_checkstring(L,2); + //NNG_OPT_LOCADDR - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, luaL_checkinteger, nng_socket_set_ms); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, luaL_checkinteger, nng_socket_set_ms); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVBUF, int, luaL_checkinteger, nng_socket_set_int); + //NNG_OPT_RECVFD - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, luaL_checkinteger, nng_socket_set_uint64); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms); + //NNG_OPT_REMADDR - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDBUF, int, luaL_checkinteger, nng_socket_set_int); + //NNG_OPT_SENDFD - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SOCKNAME, const char*, luaL_checkstring, nng_socket_set_string); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_MAXTTL, int, luaL_checkinteger, nng_socket_set_int); + //NNG_OPT_UR - read-only + //NNG_OPT_PROTO - read-only + //NNG_OPT_PEER - read-only + //NNG_OPT_PROTONAME - read-only + //NNG_OPT_PEERNAME - read-only + + //TCP options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, lua_toboolean, nng_socket_set_bool); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, lua_toboolean, nng_socket_set_bool); + //NNG_OPT_TCP_BOUND_PORT - read-only? documentation doesn't say it, but it would be wierd if we could write to it. + + //TLS options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_AUTH_MODE, int, luaL_checkinteger, nng_socket_set_int); //write-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CA_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CERT_KEY_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only + //SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS TODO: NNG_OPT_TLS_CONFIG + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_SERVER_NAME, const char*, luaL_checkstring, nng_socket_set_string); + //NNG_OPT_TLS_VERIFIED - read-only + + //IPC options + //NNG_OPT_IPC_PEER_GID - read-only + //NNG_OPT_IPC_PEER_PID - read-only + //NNG_OPT_IPC_PEER_UID - read-only + //NNG_OPT_IPC_PEER_ZONEID - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_IPC_PERMISSIONS, int, luaL_checkinteger, nng_socket_set_int); + //TODO: NNG_OPT_IPC_SECURITY_DESCRIPTOR - windows-only, sets a pointer to a PSECURITY_DESCRIPTOR + + //PUB/SUB options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, lua_toboolean, nng_socket_set_bool); + + //REQ/REP options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms); + + //Survayor/respondent options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms); +} + +//get(socket,"flag",value) +int lnng_socket_get(lua_State *L){ + /*printf("Lua stack is %d\n",lua_gettop(L));*/ + nng_socket *sock = tosocket(L,1); + const char *flag = luaL_checkstring(L,2); + //NNG_OPT_LOCADDR - listeners, dialers, and connected pipes + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RAW, bool, nng_socket_get_bool, lua_pushboolean); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, nng_socket_get_ms, lua_pushinteger);//sockets and dialers, if both dialer and socket are set, dialer overrides socket + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, nng_socket_get_ms, lua_pushinteger); //sockets and dialers, if both dialer and socket are set, dialer overrides socket + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVBUF, int, nng_socket_get_int, lua_pushinteger);//socket only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVFD, int, nng_socket_get_int, lua_pushinteger); //socket only, read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, nng_socket_get_size, lua_pushinteger);//sockets, dialers and listeners + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger);//socket only + if(strcmp(flag, NNG_OPT_REMADDR) == 0){ + lua_pop(L,lua_gettop(L)); + nng_sockaddr *sa = (nng_sockaddr*)lua_newuserdata(L,sizeof(nng_sockaddr));//{udata} + int err = nng_socket_get_addr(*sock, NNG_OPT_REMADDR, sa);//{udata} + if(err == 0){ + luaL_setmetatable(L,"nng.sockaddr"); + return 1; + }else{ + return luaL_error(L,"Failed to lookup local address - %s",nng_strerror(err)); + } + } + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDBUF, int, nng_socket_get_int, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDFD, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SOCKNAME, char*, nng_socket_get_string, lua_pushstring); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_MAXTTL, int, nng_socket_get_int, lua_pushinteger); + //SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_URL, char*, nng_socket_get_string, lua_pushstring); //read-only for dialers, listeners, and pipes + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTO, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEER, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTONAME, char*, nng_socket_get_string, lua_pushstring); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEERNAME, char*, nng_socket_get_string, lua_pushstring); //read-only + + //TCP options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, nng_socket_get_bool, lua_pushboolean); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, nng_socket_get_bool, lua_pushboolean); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_BOUND_PORT, int, nng_socket_get_int, lua_pushinteger); + + //TLS options + //NNG_OPT_TLS_MODE - write-only option + //NNG_OPT_TLS_CA_FILE - write-only option + //NNG_OPT_TLS_CERT_KEY_FILE - write-only option + //TODO: NNG_OPT_TLS_CONFIG + //NNG_OPT_TLS_SERVER_NAME - write-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TLS_VERIFIED, bool, nng_socket_get_bool, lua_pushboolean); //read-only option + + //IPC options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_GID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_PID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_UID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_ZONEID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option on Solaris and illumos systems only + //NNG_OPT_IPC_PERMISSIONS - write-only option + //NNG_OPT_IPC_SECURITY_DESCRIPTOR - write-only option + + //PUB/SUB options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, nng_socket_get_bool, lua_pushboolean); + + //REQ/REP options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, nng_socket_get_ms, lua_pushinteger); + + //Survayor/respondent options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, nng_socket_get_ms, lua_pushinteger); + lua_pop(L,2); + + //If none of the above options matched, get the value from the metatable + int type = luaL_getmetatable(L,"nng.socket_m");////{socket_m} + /*luaL_newlib(L,nng_socket_m);*/ + lua_getfield(L,-1,"__index"); + lua_getfield(L,-1,flag);//{socket_m},{socket},any + int ref = luaL_ref(L,LUA_REGISTRYINDEX);//{socket_m},{socket} + lua_pop(L,2);// + lua_rawgeti(L,LUA_REGISTRYINDEX,ref);//any + return 1; +} + +/*#define DIALER_OPTION_SET(L, socket, flag, matches, ntype, gets, sets) \*/ + /*if(strcmp(flag, matches) == 0){\*/ + /*ntype value = (ntype)gets(L,3);\*/ + /*int err = sets(*socket, flag, value);\*/ + /*lua_pop(L,lua_gettop(L));\*/ + /*if(err != 0){\*/ + /*lua_pushboolean(L,0);\*/ + /*lua_pushfstring(L,nng_strerror(err));\*/ + /*return 2;\*/ + /*}else{\*/ + /*return 0;\*/ + /*}\*/ + /*}*/ + +/*#define DIALER_OPTION_GET(L, dialer, flag, matches, ntype, gets, pushes) \*/ + /*if(strcmp(flag, matches) == 0){\*/ + /*ntype value;\*/ + /*int err = gets(*socket, flag, &value);\*/ + /*lua_pop(L,lua_gettop(L));\*/ + /*if(err != 0){\*/ + /*lua_pushboolean(L,0);\*/ + /*lua_pushfstring(L,nng_strerror(err));\*/ + /*return 2;\*/ + /*}else{\*/ + /*pushes(L,value);\*/ + /*return 1;\*/ + /*}\*/ + /*}*/ + + +int lnng_dialer_get(lua_State *L){ + nng_dialer *dialer = todialer(L,1); + const char *flag = luaL_checkstring(L,2); + if(strcmp(flag, NNG_OPT_LOCADDR) == 0){ + printf("Looking for dialer's locaddr\n"); + lua_pop(L,lua_gettop(L)); + nng_sockaddr *sa = (nng_sockaddr*)lua_newuserdata(L,sizeof(nng_sockaddr));//{udata} + int err = nng_dialer_get_addr(*dialer , NNG_OPT_LOCADDR, sa);//{udata} + if(err == 0){ + luaL_setmetatable(L,"nng.sockaddr"); + return 1; + }else{ + return luaL_error(L,"Failed to lookup local address - %s",nng_strerror(err)); + } + } + SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_RECONNMINT, nng_duration, nng_dialer_get_ms, lua_pushinteger);//sockets and dialers, if both dialer and socket are set, dialer overrides socket + SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_RECONNMAXT, nng_duration, nng_dialer_get_ms, lua_pushinteger); //sockets and dialers, if both dialer and socket are set, dialer overrides socket + SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_RECVMAXSZ, size_t, nng_dialer_get_size, lua_pushinteger);//sockets, dialers and listeners + if(strcmp(flag, NNG_OPT_REMADDR) == 0){ + lua_pop(L,lua_gettop(L)); + nng_sockaddr *sa = (nng_sockaddr*)lua_newuserdata(L,sizeof(nng_sockaddr));//{udata} + int err = nng_dialer_get_addr(*dialer, NNG_OPT_REMADDR, sa);//{udata} + if(err == 0){ + luaL_setmetatable(L,"nng.sockaddr"); + return 1; + }else{ + return luaL_error(L,"Failed to lookup remote address - %s",nng_strerror(err)); + } + } + SOCKET_OPTION_GET(L, dialer, flag, NNG_OPT_URL, char*, nng_dialer_get_string, lua_pushstring); //read-only for dialers, listeners, and pipes + + //If none of the above options matched, get the value from the metatable + int type = luaL_getmetatable(L,"nng.dialer_m");////{socket_m} + /*luaL_newlib(L,nng_socket_m);*/ + lua_getfield(L,-1,"__index"); + lua_getfield(L,-1,flag);//{socket_m},{socket},any + int ref = luaL_ref(L,LUA_REGISTRYINDEX);//{socket_m},{socket} + lua_pop(L,2);// + lua_rawgeti(L,LUA_REGISTRYINDEX,ref);//any + return 1; +} + +int lnng_dialer_set(lua_State *L){ + +} + +static const struct luaL_Reg nng_dialer_m[] = { + {"close",lnng_dialer_close}, + {NULL, NULL} +}; + +static const struct luaL_Reg nng_listener_m[] = { + {"close",lnng_listener_close}, + {NULL, NULL} +}; + +static const struct luaL_Reg nng_socket_m[] = { + {"dial", lnng_dial}, + {"listen", lnng_listen}, + {"send", lnng_send}, + {"recv", lnng_recv}, + {"close", lnng_socket_close}, + + //pub/sub only + {"subscribe",lnng_subscribe}, + {"unsubscribe",lnng_unsubscribe}, + {NULL, NULL} +}; + +static const struct luaL_Reg nng_f[] = { + {"bus0_open", lnng_bus0_open}, + {"pair1_open", lnng_pair1_open}, + {"pub0_open", lnng_pub0_open}, + {"sub0_open", lnng_sub0_open}, + {"pull0_open", lnng_pull0_open}, + {"push0_open", lnng_push0_open}, + {"req0_open", lnng_req0_open}, + {"rep0_open", lnng_rep0_open}, + {"surveyor0_open",lnng_surveyor0_open}, + {"respondent0_open",lnng_respondent0_open}, + {"sleep",lnng_msleep}, + {NULL, NULL} +}; + + +#define flag(name) lua_pushnumber(L,name); lua_setfield(L,-2,#name); +#define option(name) lua_pushstring(L,name); lua_setfield(L,-2,#name); +int luaopen_nng(lua_State *L){ + luaL_newmetatable(L,"nng.socket_m");//{} + luaL_newlib(L,nng_socket_m);//{},{socket_m} + lua_setfield(L,-2,"__index");//{__index={socket_m}} + lua_pop(L,1);// + + luaL_newmetatable(L,"nng.socket");//{} + lua_pushcfunction(L,lnng_socket_get);//{},socket_get() + lua_setfield(L,-2,"__index");//{__index = socket_get()} + lua_pushcfunction(L,lnng_socket_close);//{__index = {socket_m}},close() + lua_setfield(L,-2,"__gc");//{__index = {socket_m}, __gc = close()} + lua_pushcfunction(L,lnng_socket_set);//{__index = {socket_m}, __gc = close()}, set() + lua_setfield(L,-2,"__newindex");//{__index = {socket_m}, __gc = close(), __newindex = set()} + lua_pop(L,1); + + luaL_newmetatable(L,"nng.dialer");//{} + lua_pushcfunction(L,lnng_dialer_get);//{},dialer_get() + lua_setfield(L,-2,"__index"); + lua_pushcfunction(L,lnng_dialer_set); + lua_setfield(L,-2,"__newindex"); + + luaL_newmetatable(L,"nng.dialer_m"); + luaL_newlib(L,nng_dialer_m); + lua_setfield(L,-2,"__index"); + /*lua_pushcfunction(L,lnng_dialer_close);*/ + /*lua_setfield(L,-2,"__gc");*/ + lua_pop(L,1); + + luaL_newmetatable(L,"nng.listener"); + luaL_newlib(L,nng_listener_m); + lua_setfield(L,-2,"__index"); + /*lua_pushcfunction(L,lnng_listener_close);*/ + /*lua_setfield(L,-2,"__gc");*/ + lua_pop(L,1); + + luaL_newmetatable(L,"nng.sockaddr");//{nng.sockaddr} + lua_pushcfunction(L,lnng_sockaddr_get);//{nng.sockaddr},sockaddr_get + lua_setfield(L,-2,"__index");//{nng_sockaddr} + lua_pop(L,1); + + luaL_newlib(L,nng_f); + luaopen_nng_aio(L); + lua_setfield(L,-2,"aio"); + + //Flags + flag(NNG_FLAG_NONBLOCK); + flag(NNG_FLAG_ALLOC); + + //Options + option(NNG_OPT_SOCKNAME); + option(NNG_OPT_SOCKNAME); + option(NNG_OPT_RAW); + option(NNG_OPT_PROTO); + option(NNG_OPT_PROTONAME); + option(NNG_OPT_PEER); + option(NNG_OPT_PEERNAME); + option(NNG_OPT_RECVBUF); + option(NNG_OPT_SENDBUF); + option(NNG_OPT_RECVFD); + option(NNG_OPT_SENDFD); + option(NNG_OPT_RECVTIMEO); + option(NNG_OPT_SENDTIMEO); + option(NNG_OPT_LOCADDR); + option(NNG_OPT_REMADDR); + option(NNG_OPT_URL); + option(NNG_OPT_MAXTTL); + option(NNG_OPT_RECVMAXSZ); + option(NNG_OPT_RECONNMINT); + option(NNG_OPT_RECONNMAXT); + + //TCP options + option(NNG_OPT_TCP_NODELAY); + option(NNG_OPT_TCP_KEEPALIVE); + option(NNG_OPT_TCP_BOUND_PORT); + + //IPC options + option(NNG_OPT_IPC_PEER_GID); + option(NNG_OPT_IPC_PEER_PID); + option(NNG_OPT_IPC_PEER_UID); + option(NNG_OPT_IPC_PEER_ZONEID); + option(NNG_OPT_IPC_PERMISSIONS); + option(NNG_OPT_IPC_SECURITY_DESCRIPTOR); + + //Pub/sub options + /*option(NNG_OPT_SUB_SUBSCRIBE);//should use socket:subscribe() instead (so that nil is not counted as part of the subscription)*/ + /*option(NNG_OPT_SUB_UNSUBSCRIBE);//should use socket:unsubscribe() instead (same reason as above)*/ + option(NNG_OPT_SUB_PREFNEW); + + //Req/rep options + option(NNG_OPT_SURVEYOR_SURVEYTIME); + + return 1; +} -- cgit v1.2.3-70-g09d2