diff options
| -rw-r--r-- | spec/aio_spec.lua | 136 | ||||
| -rw-r--r-- | spec/race_spec.lua | 5 | ||||
| -rw-r--r-- | spec/startup_spec.lua | 74 | ||||
| -rw-r--r-- | src/lua-nng-aio.c | 286 | ||||
| -rw-r--r-- | src/lua-nng-aio.h | 1 | ||||
| -rw-r--r-- | src/lua-nng-common.c | 22 | ||||
| -rw-r--r-- | src/lua-nng-common.h | 6 | ||||
| -rw-r--r-- | src/lua-nng-http.c | 52 | ||||
| -rw-r--r-- | src/lua-nng.c | 252 | ||||
| -rw-r--r-- | src/lua-nng.h | 26 |
10 files changed, 836 insertions, 24 deletions
diff --git a/spec/aio_spec.lua b/spec/aio_spec.lua new file mode 100644 index 0000000..8c7c332 --- /dev/null +++ b/spec/aio_spec.lua @@ -0,0 +1,136 @@ +--[[
+test the asyncronous parts of nnng
+]]
+local nng = require("nng")
+local lanes = require("lanes").configure()
+describe("nng.aio",function()
+ it("should be able to create aio object",function()
+ print("one")
+ local aio = assert(nng.aio.alloc(function() end))
+ end)
+ it("should accept a callback with arguments",function()
+ print("two")
+ local aio = assert(nng.aio.alloc(function(a,b,c)
+ print("hello")
+ end, "one", 2, "three"))
+ end)
+ --it("should call the callback after sleeping", function()
+ --print("three")
+ --local called = false
+ --local callback = function(lock)
+ --called = true
+ --end
+ --print("about to alloc")
+ --local aio = assert(nng.aio.alloc(callback))
+ --print("done alloc")
+ --print("about to sleep")
+ --nng.aio.sleep(1,aio)
+ --print("whatever:",whatever)
+ ----nng.aio.sleep(1,aio)
+ --print("done sleep")
+ --os.execute("sleep 1")
+ --lock(-1)
+ --print("checking called...")
+ --print("called was",called)
+ --assert(called)
+ --end)
+ it("should call more than one socket getting a callback at once",function()
+ print("checking recv_any callback")
+ local s1 = assert(nng.bus0_open())
+ assert(s1:listen("tcp://127.0.0.1:4000"))
+
+ local s2 = assert(nng.bus0_open())
+ assert(s2:listen("tcp://127.0.0.1:4001"))
+
+ local s3 = assert(nng.bus0_open())
+ assert(s3:dial("tcp://127.0.0.1:4000"))
+ assert(s3:dial("tcp://127.0.0.1:4001"))
+
+ for i = 1, 100 do --100 times to try to trigger race conditions
+ --print("i:",i)
+ assert(s3:send("one"))
+ assert(s3:send("two"))
+ local s1_got_one, s1_got_two, s2_got_one, s2_got_two = false, false, false, false
+ while not (s1_got_one and s1_got_two and s2_got_one and s2_got_two) do
+ --local socket, message = nng.aio.recv_any(s1, s2)
+ --print("about to start recv_any")
+ --local tbl = nng.aio.recv_any(s1,s2)
+ for socket, message in pairs(nng.aio.recv_any(s1,s2)) do
+ --print("in one recv any:",socket, message)
+ if socket == s1 then
+ if message == "one" then
+ s1_got_one = true
+ elseif message == "two" then
+ s1_got_two = true
+ else
+ error("message socket 1:" .. message)
+ end
+ elseif socket == s2 then
+ if message == "one" then
+ s2_got_one = true
+ elseif message == "two" then
+ s2_got_two = true
+ else
+ error("message socket 2:" .. message)
+ end
+ else
+ error("socket:" .. tostring(socket))
+ end
+ --print("done with recv_any",s1_got_one, s1_got_two, s2_got_one, s2_got_two)
+ end
+ end
+ assert(s1_got_one)
+ assert(s1_got_two)
+ assert(s2_got_one)
+ assert(s2_got_two)
+ end
+ print("Sucessful completion of recv_any test")
+ end)
+ it("Should accept multiple sockets of dispirate types #writing",function()
+ local servers = {}
+ local clients = {}
+ local messages = {}
+ local halfs = {
+ {"rep","req",true},
+ {"bus","bus",nil},
+ {"surveyor","respondent",false},
+ {"pub","sub",false}
+ }
+ for i = 1, 10 do
+ local rng = math.random(#halfs)
+
+ --Create server
+ servers[i] = assert(nng[halfs[rng][1] .. "0_open"]())
+ local url = string.format("tcp://127.0.0.1:%d",4000 + i)
+ assert(servers[i]:listen(url))
+
+ --Create clients
+ local numclients = math.random(1,3)
+ clients[i] = {}
+ messages[i] = {}
+ for j = 1, numclients do
+ clients[i][j] = assert(nng[halfs[rng][2] .. "0_open"]())
+ assert(clients[i][j]:dial(url))
+ messages[i][j] = {}
+ if halfs[rng][3] == true then
+ --we send messages
+ local nummessages = math.random(5)
+ for k = 1, nummessages do
+ local message = string.format("ping_%d_%d_%d",i,j,k)
+ assert(clients[i][j]:send(message))
+ table.insert(messages[i][j],message)
+ end
+ --elseif halfs[rng][3] == nil then
+ --we can send or receive messages
+
+ else
+ --we receive and reply to messages
+
+ end
+ end
+ end
+ for socket, message in pairs(nng.aio.recv_any(table.unpack(servers))) do
+ print("GOT MESSAGE:",message)
+ end
+ end)
+end)
diff --git a/spec/race_spec.lua b/spec/race_spec.lua new file mode 100644 index 0000000..732ae3e --- /dev/null +++ b/spec/race_spec.lua @@ -0,0 +1,5 @@ +--[[
+Test for race conditions
+]]
+describe("nng",function()
+end)
diff --git a/spec/startup_spec.lua b/spec/startup_spec.lua index dacf9e9..1b3f740 100644 --- a/spec/startup_spec.lua +++ b/spec/startup_spec.lua @@ -1,5 +1,5 @@ --[[
-test startup of nanomsg api
+test startup of the nng api
]]
describe("nng",function()
@@ -45,27 +45,40 @@ describe("nng",function() end
end)
it("should be able to use a survey socket to gather information",function()
+ print("Starting survayor respondent test")
math.randomseed(os.time())
local s = assert(nng.surveyor0_open())
+ print("About to listen")
assert(s:listen("ipc:///tmp/survey.ipc"))
+ print("S is listening")
local b = {}
for i = 1,100 do
+ print("Testing",i)
local r = assert(nng.respondent0_open())
assert(r:dial("ipc:///tmp/survey.ipc"))
+ print("Dialed",i)
b[i] = r
end
+ print("About to send hello")
assert(s:send("Hello"))
+ print("Done sending hello")
for i = 1,100 do
+ print("About to recv from", i)
local survey = assert(b[i]:recv())
+ print("Done receving from ",i)
assert(survey == "Hello")
+ print("About to send number back")
assert(b[i]:send(string.format("%f",math.random())))
+ print("Done sending number back")
end
local responses = {}
while true do
+ print("Got ", #responses, "responses")
local succ, msg = s:recv(nng.NNG_FLAG_NONBLOCK)
if succ then
table.insert(responses,tonumber(succ))
elseif msg == "Try again" then
+ print("Sleeping...")
os.execute("sleep 1")
elseif msg == "Incorrect state" then
break
@@ -79,18 +92,55 @@ describe("nng",function() --avg should be about 0.5
assert(avg > 0.4)
assert(avg < 0.6)
+ print("Completed survayor respondent test")
end)
it("should be able to use publish and subscribe sockets to transfer information", function()
- local s1 = assert(nng.pub0_open())
- local s2 = assert(nng.sub0_open())
- local s3 = assert(nng.sub0_open())
- assert(s1:listen("ipc:///tmp/pub.ipc"))
- assert(s2:subscribe(""))
- assert(s3:subscribe(""))
- assert(s2:dial("ipc:///tmp/pub.ipc"))
- assert(s3:dial("ipc:///tmp/pub.ipc"))
- assert(s1:send("hello 1"))
- assert(s2:recv() == "hello 1")
- assert(s3:recv() == "hello 1")
+ print("starting pubsub test")
+ for i = 1,1000 do
+ local s1 = assert(nng.pub0_open())
+ local s2 = assert(nng.sub0_open())
+ local s3 = assert(nng.sub0_open())
+ print("everything opened")
+ --local listener, err = s1:listen("tcp://127.0.0.1:1000")
+ local listener, err = s1:listen("ipc:///tmp/pub.ipc")
+ local num_addr_in_use = 0
+ while err == "Address in use" do
+ print("Got addr in use")
+ num_addr_in_use = num_addr_in_use + 1
+ if num_addr_in_use > 1000 then
+ error("After multiple attempts, failed to bind on round " .. i)
+ end
+ --listener, err = s1:listen("tcp://127.0.0.1:1000")
+ listener, err = s1:listen("ipc:///tmp/pub.ipc")
+ end
+ print("s1 is listeneing...")
+ assert(s2:subscribe("hello"))
+ assert(s3:subscribe("hello"))
+ --assert(s2:dial("tcp://127.0.0.1:1000"))
+ --assert(s3:dial("tcp://127.0.0.1:1000"))
+ assert(s2:dial("ipc:///tmp/pub.ipc"))
+ assert(s3:dial("ipc:///tmp/pub.ipc"))
+ print("Everything set up")
+ assert(s1:send("hello 1"))
+ print("sent")
+ assert.are_equal(s2:recv(),"hello 1")
+ print("received 1")
+ assert.are_equal(s3:recv(),"hello 1")
+ print("received 2")
+ listener:close()
+ s1:close()
+ end
+ print("Finishing pubsub test")
+ end)
+ describe("tcp transport",function()
+ it("has a keepalive option that prevents the tcp connection from closing",function()
+ print("starting tcp transport test")
+ local s1 = assert(nng.pair1_open())
+ local s2 = assert(nng.pair1_open())
+ --s1[nng.NNG_OPT_TCP_KEEPALIVE] = true
+ assert(s1:listen("tcp://127.0.0.1:1000"))
+ assert(s2:dial("tcp://127.0.0.1:1000"))
+ print("Finished tcp transport test")
+ end)
end)
end)
diff --git a/src/lua-nng-aio.c b/src/lua-nng-aio.c new file mode 100644 index 0000000..fa3db33 --- /dev/null +++ b/src/lua-nng-aio.c @@ -0,0 +1,286 @@ +#include <stdlib.h>
+#include <lua.h>
+#include <lauxlib.h>
+#include <lualib.h>
+
+#define NNG_STATIC_LIB
+
+#include <nng/nng.h>
+
+#include <nng/transport/inproc/inproc.h>
+#include <nng/transport/ipc/ipc.h>
+#include <nng/transport/tcp/tcp.h>
+#include <nng/transport/tls/tls.h>
+#include <nng/transport/zerotier/zerotier.h>
+
+#include <nng/protocol/pair1/pair.h>
+#include <nng/protocol/bus0/bus.h>
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+#include <nng/protocol/pipeline0/pull.h>
+#include <nng/protocol/pipeline0/push.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/survey0/respond.h>
+#include <nng/protocol/survey0/survey.h>
+
+#include <nng/supplemental/util/platform.h>
+
+#include "lua-nng-common.h"
+#include "lua-nng.h"
+
+struct lnng_aio{
+ lua_State *L;
+ int func;
+ int args;
+ nng_aio *aio;
+};
+
+void lcallback(void *lfaa){
+ printf("aio callback running\n");
+ struct lnng_aio *l = (struct lnng_aio*)lfaa;
+ lua_State *L = l->L;
+ printf("At the beginning we have: %d\n",lua_gettop(L));
+ for(int i = 1; i <= lua_gettop(L); i++){
+ printf("-%d - %s\n",i,lua_typename(L,lua_type(L,-i)));
+ lua_getglobal(L,"print");
+ lua_pushvalue(L,i);
+ lua_call(L,1,0);
+ }
+ lua_getglobal(L,"table");//{table}
+ lua_pushcfunction(L,traceback);//{table},traceback()
+ lua_rawgeti(L,LUA_REGISTRYINDEX, l->func);//{table},traceback(),callb()
+ printf("foo\n");
+ lua_getfield(L,-3,"unpack");//{table},traceback(),callb(),table.unpack()
+ printf("unpacked\n");
+ printf("unpack, -1 - %s\n",lua_typename(L,lua_type(L,-1)));
+ printf("Args was: %d\n",l->args);
+ lua_rawgeti(L,LUA_REGISTRYINDEX, l->args);//{table},traceback(),callb(),table.unpack(),{args}
+ printf("got args\n");
+ printf("-2 - %s\n",lua_typename(L,lua_type(L,-2)));
+ printf("-1 - %s\n",lua_typename(L,lua_type(L,-1)));
+ lua_call(L, 1, LUA_MULTRET);//{table},traceback(), callb(), args...
+ printf("bar\n");
+ printf("Top: %d\n",lua_gettop(L));
+ for(int i = 1; i <= lua_gettop(L); i++){
+ printf("-%d - %s\n",i,lua_typename(L,lua_type(L,-i)));
+ }
+ int numargs = lua_gettop(L) - 3;
+ printf("Nargs:%d\n",numargs);
+ lua_pcall(L, numargs, 0, 2);//{table},traceback()
+ printf("Finished with everything, poping last 2 and returning\n");
+ lua_pop(L,2);//
+}
+
+//nng.aio_alloc(callback(), args...) :: nng.aio
+int lnng_aio_alloc(lua_State *L){
+ int argc = lua_gettop(L);//callback(), args...
+ lua_createtable(L,argc - 1, 0);//callback(), args..., {}
+ for(int i = 1; i < argc; i++){
+ printf("Adding element %d to arg table (%s)\n",i,lua_typename(L,lua_type(L,i + 1)));
+ lua_pushnumber(L, i);
+ lua_getglobal(L,"print");
+ lua_pushvalue(L, i + 1);
+ lua_call(L,1,0);
+ lua_pushvalue(L, i + 1);
+ lua_settable(L, -3);
+ }
+ int argtbl = luaL_ref(L, LUA_REGISTRYINDEX);//callback(), args...
+ lua_pop(L, argc - 1);//callback()
+ int func = luaL_ref(L, LUA_REGISTRYINDEX);//
+
+ struct lnng_aio *lfaa = (struct lnng_aio*)lua_newuserdata(L,sizeof(struct lnng_aio));//userdata
+ lfaa->L = L;
+ lfaa->args = argtbl;
+ lfaa->func = func;
+ nng_aio *aio;
+ int err = nng_aio_alloc(&aio, lcallback, (void*)lfaa);
+ lfaa->aio = aio;
+ if(err == 0){
+ printf("After aio_alloc we have:%d",lua_gettop(L));
+ luaL_setmetatable(L,"nng.aio.struct");
+ return 1;
+ }else{
+ lua_pop(L,1);
+ lua_pushboolean(L,0);
+ lua_pushstring(L,nng_strerror(err));
+ return 2;
+ }
+}
+struct nng_aio* toaio(lua_State *L, int index){
+ struct lnng_aio* l = (struct lnng_aio*)luaL_checkudata(L,index,"nng.aio.struct");
+ return l->aio;
+}
+
+//sleep(duration, aio)
+int lnng_aio_sleep(lua_State *L){
+ printf("sleep called\n");
+ int duration = luaL_checkinteger(L,1);
+ printf("Duration was %d\n");
+ nng_aio *aio = toaio(L,2);
+ printf("Got aio: %p\n",aio);
+ nng_sleep_aio(duration, aio);
+ printf("did sleep\n");
+ return 0;
+}
+
+struct callback_info {
+ lua_State *L;
+ nng_mtx *lmutex; //mutex for the lua state
+ nng_mtx *cmutex;
+ nng_cv *cv;
+ int socketref; //lua refrence to this socket
+ nng_aio *aio;
+ int completed;
+};
+
+void push_callback(void *v){
+ printf("aio callback received!\n");
+ struct callback_info *ci = (struct callback_info*)v;
+ lua_State *L = ci->L;
+ printf("About to lock lua state in callback\n");
+ nng_mtx_lock(ci->lmutex);//lock the lua state
+ printf("Done locking lua state in callback\n");
+ int err = nng_aio_result(ci->aio);
+ if(err != 0){
+ printf("This callback was canceled or timed out: %d: %s\n", err, nng_strerror(err));
+ nng_mtx_unlock(ci->lmutex);
+ return;
+ }
+ lua_rawgeti(L,LUA_REGISTRYINDEX,ci->socketref);//push socket
+ luaL_unref(L,LUA_REGISTRYINDEX,ci->socketref);//free the reference
+ nng_msg *msg = nng_aio_get_msg(ci->aio);
+ size_t len = nng_msg_len(msg);
+ void *body = nng_msg_body(msg);
+ lua_pushlstring(L,(const char*)body, len);//push the message
+ nng_msg_free(msg);
+ ci->completed = 1;
+ lua_settable(L,1);
+ printf("About to unlock lua state in callback\n");
+ nng_mtx_unlock(ci->lmutex);
+ printf("About to lock condition mutex in callback\n");
+ nng_mtx_lock(ci->cmutex);
+ printf("About wake condition\n");
+ nng_cv_wake(ci->cv);
+ printf("Done wake condition\n");
+ nng_mtx_unlock(ci->cmutex);
+ printf("Done with callback\n");
+}
+
+//TODO: there is a wierd bug here:
+//If multiple sockets receive a message at the same time, one or more of those
+//messages can be thrown out becuase of the canceling of the async recieves.
+//recv_any(socket1, socket2, ...) :: socket | false, message | errmsg,
+int lnng_aio_recv(lua_State *L){
+ nng_mtx *luamtx, *callbackmtx, *setupmtx;
+ int err = nng_mtx_alloc(&luamtx);
+ err |= nng_mtx_alloc(&callbackmtx);
+ err |= nng_mtx_alloc(&setupmtx);
+ if(err != 0){
+ printf("Something when wrong when allocating mutexes\n");
+ lua_pushboolean(L,0);
+ lua_pushstring(L,nng_strerror(err));
+ return 2;
+ }
+ int argv = lua_gettop(L);
+ printf("Receiving any on %d sockets\n",argv);
+ struct callback_info **cis = (struct callback_info**)malloc(sizeof(struct callback_info*) * argv);
+ nng_mtx_lock(luamtx);
+ printf("Locked lua state\n");
+ nng_cv *cv;
+ nng_cv_alloc(&cv, callbackmtx);
+ /*nng_mtx_lock(callbackmtx);//wait for one of the callbacks to happen*/
+ printf("Callback 1 happened\n");
+ for(int i = 0; i < argv; i++){
+ printf("\tSetting up async %d\n", i);
+ nng_socket *sock = tosocket(L,-1);
+ int sref = luaL_ref(L,LUA_REGISTRYINDEX);
+ printf("\tGot socket ref %d\n", sref);
+ cis[i] = (struct callback_info*)malloc(sizeof(struct callback_info));
+ struct callback_info *ci = cis[i];
+ printf("\tLooking at ci %p\n",ci);
+ ci->L = L;
+ ci->lmutex = luamtx;
+ ci->cmutex = callbackmtx;
+ ci->socketref = sref;
+ ci->completed = 0;
+ ci->cv = cv;
+ printf("\tAbout to alloc aio\n");
+ nng_aio_alloc(&(ci->aio), push_callback, ci);
+ printf("\tAllocated aio\n");
+ printf("\tEverything else set on callback info\n");
+ nng_recv_aio(*sock, ci->aio);
+ printf("\tSet up async receive %d\n",i);
+ }
+ lua_newtable(L);//table that will hold [socket] = message
+ printf("About to unlock lua state\n");
+ nng_mtx_unlock(luamtx);
+ printf("Unlocked lua state\n");
+ /*nng_mtx_lock(callbackmtx);//was unlocked by the callback, luamtx is locked at this point*/
+ int complete = 0;
+ nng_mtx_lock(callbackmtx);
+ while(complete == 0){
+ for(int i = 0; i < argv; i++){
+ struct callback_info *ci = cis[i];
+ if(ci->completed > 0){
+ printf("At least 1 completed! breaking!\n");
+ complete = 1;
+ goto found;
+ }
+ }
+ printf("About to wait\n");
+ nng_cv_wait(cv);
+ printf("Done waiting, complete is: %d\n", complete);
+ }
+ found:
+ printf("Callback 2 happened\n");
+ nng_mtx_unlock(callbackmtx);
+ printf("Callback done\n");
+ for(int i = 0; i < argv; i++){
+ struct callback_info *ci = cis[i];
+ printf("About to stop aio %d\n",ci->aio);
+ /*nng_aio_cancel(ci->aio);*/
+ nng_aio_stop(ci->aio);
+ nng_aio_free(ci->aio);
+ free(ci);
+ printf("Stopped aio %d\n",ci->aio);
+ }
+ free(cis);
+ printf("Freeing things\n");
+ nng_cv_free(cv);
+ nng_mtx_free(callbackmtx);
+ nng_mtx_unlock(luamtx);//mutexes must not be locked when they are freed
+ nng_mtx_free(luamtx);
+ /*printf("Done freeing everything, returning...\n");*/
+ return 1;
+}
+
+static const struct luaL_Reg nng_aio_handler_m[] = {
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg nng_aio_mutex_m[] = {
+ {NULL, NULL}
+};
+
+
+static const struct luaL_Reg nng_http_f[] = {
+ {"alloc",lnng_aio_alloc},
+ {"recv_any",lnng_aio_recv},
+ {NULL, NULL}
+};
+
+int luaopen_nng_aio(lua_State *L){
+ luaL_newmetatable(L,"nng.aio.struct");
+ luaL_newlib(L,nng_aio_handler_m);
+ lua_setfield(L,-2,"__index");
+ lua_pop(L,1);
+
+ luaL_newmetatable(L,"nng.aio.mutex");
+ luaL_newlib(L,nng_aio_mutex_m);
+ lua_setfield(L,-2,"__index");
+ lua_pop(L,1);
+
+ luaL_newlib(L,nng_http_f);
+ return 1;
+}
diff --git a/src/lua-nng-aio.h b/src/lua-nng-aio.h new file mode 100644 index 0000000..da5e367 --- /dev/null +++ b/src/lua-nng-aio.h @@ -0,0 +1 @@ +int luaopen_nng_aio(lua_State *L);
diff --git a/src/lua-nng-common.c b/src/lua-nng-common.c new file mode 100644 index 0000000..a12ed79 --- /dev/null +++ b/src/lua-nng-common.c @@ -0,0 +1,22 @@ +#include "lua-nng-common.h"
+/*Just copy+paste lua's runtime traceback funtion*/
+int traceback (lua_State *L) {
+ if (!lua_isstring(L, 1)) /* 'message' not a string? */
+ return 1; /* keep it intact */
+ lua_getglobal(L,"debug");
+ if (!lua_istable(L, -1)) {
+ lua_pop(L, 1);
+ return 1;
+ }
+ lua_getfield(L, -1, "traceback");
+ if (!lua_isfunction(L, -1)) {
+ lua_pop(L, 2);
+ return 1;
+ }
+ lua_pushvalue(L, 1); /* pass error message */
+ lua_pushinteger(L, 2); /* skip this function and traceback */
+ lua_call(L, 2, 1); /* call debug.traceback */
+ printf("%s\n",lua_tostring(L,-1));
+ return 1;
+}
+
diff --git a/src/lua-nng-common.h b/src/lua-nng-common.h new file mode 100644 index 0000000..a56dd8a --- /dev/null +++ b/src/lua-nng-common.h @@ -0,0 +1,6 @@ +#include <lua.h>
+#include <lualib.h>
+#include <lauxlib.h>
+
+// lua runtime's traceback function
+int traceback (lua_State *L);
diff --git a/src/lua-nng-http.c b/src/lua-nng-http.c new file mode 100644 index 0000000..61f7401 --- /dev/null +++ b/src/lua-nng-http.c @@ -0,0 +1,52 @@ +#include <lua.h>
+#include <lauxlib.h>
+#include <lualib.h>
+
+#define NNG_STATIC_LIB
+
+#include <nng/nng.h>
+
+#include <nng/transport/inproc/inproc.h>
+#include <nng/transport/ipc/ipc.h>
+#include <nng/transport/tcp/tcp.h>
+#include <nng/transport/tls/tls.h>
+#include <nng/transport/zerotier/zerotier.h>
+
+#include <nng/protocol/pair1/pair.h>
+#include <nng/protocol/bus0/bus.h>
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+#include <nng/protocol/pipeline0/pull.h>
+#include <nng/protocol/pipeline0/push.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/survey0/respond.h>
+#include <nng/protocol/survey0/survey.h>
+
+void handle_callback(nng_aio *aio){
+
+}
+
+//handler_alloc(string path,function callback) :: http_handler
+int lnng_http_handler_alloc(lua_State *L){
+
+}
+
+static const struct luaL_Reg nng_http_handler_m[] = {
+ {NULL, NULL}
+};
+
+static const struct luaL_Reg nng_http_f[] = {
+ {"handler_alloc",lnng_http_handler_alloc},
+ {NULL, NULL}
+};
+
+int luaopen_nng_http(lua_State *L){
+ luaL_newmetatable(L,"nng.http.handler");
+ luaL_newlib(L,nng_http_handler_m);
+ lua_setfield(L,-2,"__index");
+ lua_pop(L,1);
+
+ luaL_newlib(L,nng_http_f);
+ return 1;
+}
diff --git a/src/lua-nng.c b/src/lua-nng.c index 76d207b..9d12f30 100644 --- a/src/lua-nng.c +++ b/src/lua-nng.c @@ -23,6 +23,11 @@ #include <nng/protocol/survey0/respond.h>
#include <nng/protocol/survey0/survey.h>
+#include <nng/supplemental/util/platform.h>
+
+#include <string.h>
+#include "lua-nng-aio.h"
+
#define OPEN(name)\
int lnng_ ## name ## _open(lua_State *L){\
nng_socket *s = (nng_socket*)lua_newuserdata(L,sizeof(nng_socket));\
@@ -48,11 +53,29 @@ OPEN(rep0); OPEN(surveyor0);
OPEN(respondent0);
+//sleep(ms)
+int lnng_msleep(lua_State *L){
+ int ms = luaL_checkinteger(L,1);
+ nng_msleep(ms);
+ lua_pop(L,1);
+ return 0;
+}
+
nng_socket* tosocket(lua_State *L, int offset){
luaL_checkudata(L,offset,"nng.socket");
return (nng_socket*)lua_touserdata(L,offset);
}
+nng_listener* tolistener(lua_State *L, int offset){
+ luaL_checkudata(L,offset,"nng.listener");
+ return (nng_listener*)lua_touserdata(L,offset);
+}
+
+nng_dialer* todialer(lua_State *L, int offset){
+ luaL_checkudata(L,offset,"nng.dialer");
+ return (nng_dialer*)lua_touserdata(L,offset);
+}
+
//socket:listen(url[, flags]) :: listener
int lnng_listen(lua_State *L){
int argc = lua_gettop(L);
@@ -143,6 +166,7 @@ int lnng_recv(lua_State *L){ //socket:close()
int lnng_socket_close(lua_State *L){
+ /*printf("Garbage collecting socket...");*/
nng_socket *sock = tosocket(L,1);
int err = nng_close(*sock);
lua_pop(L,1);
@@ -185,9 +209,10 @@ int lnng_subscribe(lua_State *L){ //unsubscribe(socket,"topic")
int lnng_unsubscribe(lua_State *L){
nng_socket *sock = tosocket(L,1);
- const char *topic = lua_tostring(L,2);
+ size_t size;
+ const char *topic = luaL_checklstring(L,2,&size);
lua_pop(L,2);
- int err = nng_socket_set_string(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic);
+ int err = nng_socket_set(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic,size);
if(err == 0){
lua_pushboolean(L,1);
return 1;
@@ -198,12 +223,158 @@ int lnng_unsubscribe(lua_State *L){ }
}
+//Option types
+#define SOCKET_OPTION_SET(L, socket, flag, matches, ntype, gets, sets) \
+ if(strcmp(flag, matches) == 0){\
+ ntype value = (ntype)gets(L,3);\
+ int err = sets(*socket, flag, value);\
+ lua_pop(L,lua_gettop(L));\
+ if(err != 0){\
+ lua_pushboolean(L,0);\
+ lua_pushfstring(L,nng_strerror(err));\
+ return 2;\
+ }else{\
+ return 0;\
+ }\
+ }
+
+#define SOCKET_OPTION_GET(L, socket, flag, matches, ntype, gets, pushes) \
+ if(strcmp(flag, matches) == 0){\
+ ntype value;\
+ int err = gets(*socket, flag, &value);\
+ lua_pop(L,lua_gettop(L));\
+ if(err != 0){\
+ lua_pushboolean(L,0);\
+ lua_pushfstring(L,nng_strerror(err));\
+ return 2;\
+ }else{\
+ pushes(L,value);\
+ return 1;\
+ }\
+ }
+
+//TODO
+//set(listener,"flag",value)
+int lnng_listener_set(lua_State *L){
+ return 0;
+}
+
+//set(socket, "flag", value)
+int lnng_socket_set(lua_State *L){
+ nng_socket *sock = tosocket(L,1);
+ const char *flag = luaL_checkstring(L,2);
+ //NNG_OPT_LOCADDR - read-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, luaL_checkinteger, nng_socket_set_ms);
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, luaL_checkinteger, nng_socket_set_ms);
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVBUF, int, luaL_checkinteger, nng_socket_set_int);
+ //NNG_OPT_RECVFD - read-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, luaL_checkinteger, nng_socket_set_uint64);
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms);
+ //NNG_OPT_REMADDR - read-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDBUF, int, luaL_checkinteger, nng_socket_set_int);
+ //NNG_OPT_SENDFD - read-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms);
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SOCKNAME, const char*, luaL_checkstring, nng_socket_set_string);
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_MAXTTL, int, luaL_checkinteger, nng_socket_set_int);
+ //NNG_OPT_UR - read-only
+ //NNG_OPT_PROTO - read-only
+ //NNG_OPT_PEER - read-only
+ //NNG_OPT_PROTONAME - read-only
+ //NNG_OPT_PEERNAME - read-only
+
+ //TCP options
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, lua_toboolean, nng_socket_set_bool);
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, lua_toboolean, nng_socket_set_bool);
+ //NNG_OPT_TCP_BOUND_PORT - read-only? documentation doesn't say it, but it would be wierd if we could write to it.
+
+ //TLS options
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_AUTH_MODE, int, luaL_checkinteger, nng_socket_set_int); //write-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CA_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CERT_KEY_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only
+ //SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS TODO: NNG_OPT_TLS_CONFIG
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_SERVER_NAME, const char*, luaL_checkstring, nng_socket_set_string);
+ //NNG_OPT_TLS_VERIFIED - read-only
+
+ //IPC options
+ //NNG_OPT_IPC_PEER_GID - read-only
+ //NNG_OPT_IPC_PEER_PID - read-only
+ //NNG_OPT_IPC_PEER_UID - read-only
+ //NNG_OPT_IPC_PEER_ZONEID - read-only
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_IPC_PERMISSIONS, int, luaL_checkinteger, nng_socket_set_int);
+ //TODO: NNG_OPT_IPC_SECURITY_DESCRIPTOR - windows-only, sets a pointer to a PSECURITY_DESCRIPTOR
+
+ //PUB/SUB options
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, lua_toboolean, nng_socket_set_bool);
+
+ //REQ/REP options
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms);
+
+ //Survayor/respondent options
+ SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms);
+}
+
+//get(socket,"flag",value)
+int lnng_socket_get(lua_State *L){
+ nng_socket *sock = tosocket(L,1);
+ const char *flag = luaL_checkstring(L,2);
+ //TODO NNG_OPT_LOCADDR //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RAW, bool, nng_socket_get_bool, lua_pushboolean); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, nng_socket_get_ms, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, nng_socket_get_ms, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVBUF, int, nng_socket_get_int, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVFD, int, nng_socket_get_int, lua_pushinteger); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, nng_socket_get_size, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger);
+ //TODO NNG_OPT_REMADDR
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDBUF, int, nng_socket_get_int, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDFD, int, nng_socket_get_int, lua_pushinteger); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SOCKNAME, char*, nng_socket_get_string, lua_pushstring);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_MAXTTL, int, nng_socket_get_int, lua_pushinteger);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_URL, char*, nng_socket_get_string, lua_pushstring); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTO, int, nng_socket_get_int, lua_pushinteger); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEER, int, nng_socket_get_int, lua_pushinteger); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTONAME, char*, nng_socket_get_string, lua_pushstring); //read-only
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEERNAME, char*, nng_socket_get_string, lua_pushstring); //read-only
+
+ //TCP options
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, nng_socket_get_bool, lua_pushboolean);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, nng_socket_get_bool, lua_pushboolean);
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_BOUND_PORT, int, nng_socket_get_int, lua_pushinteger);
+
+ //TLS options
+ //NNG_OPT_TLS_MODE - write-only option
+ //NNG_OPT_TLS_CA_FILE - write-only option
+ //NNG_OPT_TLS_CERT_KEY_FILE - write-only option
+ //TODO: NNG_OPT_TLS_CONFIG
+ //NNG_OPT_TLS_SERVER_NAME - write-only option
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TLS_VERIFIED, bool, nng_socket_get_bool, lua_pushboolean); //read-only option
+
+ //IPC options
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_GID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_PID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_UID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_ZONEID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option on Solaris and illumos systems only
+ //NNG_OPT_IPC_PERMISSIONS - write-only option
+ //NNG_OPT_IPC_SECURITY_DESCRIPTOR - write-only option
+
+ //PUB/SUB options
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, nng_socket_get_bool, lua_pushboolean);
+
+ //REQ/REP options
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, nng_socket_get_ms, lua_pushinteger);
+
+ //Survayor/respondent options
+ SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, nng_socket_get_ms, lua_pushinteger);
+}
+
static const struct luaL_Reg nng_dialer_m[] = {
+ {"close",lnng_dialer_close},
{NULL, NULL}
};
static const struct luaL_Reg nng_listener_m[] = {
-
+ {"close",lnng_listener_close},
{NULL, NULL}
};
@@ -218,6 +389,7 @@ static const struct luaL_Reg nng_socket_m[] = { {"listen", lnng_listen},
{"send", lnng_send},
{"recv", lnng_recv},
+ {"close", lnng_socket_close},
//pub/sub only
{"subscribe",lnng_subscribe},
@@ -236,35 +408,91 @@ static const struct luaL_Reg nng_f[] = { {"rep0_open", lnng_rep0_open},
{"surveyor0_open",lnng_surveyor0_open},
{"respondent0_open",lnng_respondent0_open},
+ {"sleep",lnng_msleep},
{NULL, NULL}
};
#define flag(name) lua_pushnumber(L,name); lua_setfield(L,-2,#name);
+#define option(name) lua_pushstring(L,name); lua_setfield(L,-2,#name);
int luaopen_nng(lua_State *L){
- luaL_newmetatable(L,"nng.socket");
- luaL_newlib(L,nng_socket_m);
- lua_setfield(L,-2,"__index");
- lua_pushcfunction(L,lnng_socket_close);
- lua_setfield(L,-2,"__gc");
+ luaL_newmetatable(L,"nng.socket");//{}
+ luaL_newlib(L,nng_socket_m);//{},{}
+ lua_newtable(L);//{},{},{}
+ lua_pushcfunction(L,lnng_socket_get);//{},{},{},get()
+ lua_setfield(L,-2,"__index");//{},{},{__index=get()}
+ lua_setmetatable(L,-2);//{},{}
+ lua_setfield(L,-2,"__index");//{__index = {}}
+ lua_pushcfunction(L,lnng_socket_close);//{__index = {}},close()
+ lua_setfield(L,-2,"__gc");//{__index = {}, __gc = close()}
+ lua_pushcfunction(L,lnng_socket_set);//{__index = {}, __gc = close()}, set()
+ lua_setfield(L,-2,"__newindex");//{__index = {}, __gc = close(), __newindex = set()}
lua_pop(L,1);
luaL_newmetatable(L,"nng.dialer");
luaL_newlib(L,nng_dialer_m);
lua_setfield(L,-2,"__index");
- lua_pushcfunction(L,lnng_dialer_close);
- lua_setfield(L,-2,"__gc");
+ /*lua_pushcfunction(L,lnng_dialer_close);*/
+ /*lua_setfield(L,-2,"__gc");*/
lua_pop(L,1);
luaL_newmetatable(L,"nng.listener");
luaL_newlib(L,nng_listener_m);
lua_setfield(L,-2,"__index");
- lua_pushcfunction(L,lnng_listener_close);
- lua_setfield(L,-2,"__gc");
+ /*lua_pushcfunction(L,lnng_listener_close);*/
+ /*lua_setfield(L,-2,"__gc");*/
lua_pop(L,1);
luaL_newlib(L,nng_f);
+ luaopen_nng_aio(L);
+ lua_setfield(L,-2,"aio");
+
+ //Flags
flag(NNG_FLAG_NONBLOCK);
flag(NNG_FLAG_ALLOC);
+
+ //Options
+ option(NNG_OPT_SOCKNAME);
+ option(NNG_OPT_SOCKNAME);
+ option(NNG_OPT_RAW);
+ option(NNG_OPT_PROTO);
+ option(NNG_OPT_PROTONAME);
+ option(NNG_OPT_PEER);
+ option(NNG_OPT_PEERNAME);
+ option(NNG_OPT_RECVBUF);
+ option(NNG_OPT_SENDBUF);
+ option(NNG_OPT_RECVFD);
+ option(NNG_OPT_SENDFD);
+ option(NNG_OPT_RECVTIMEO);
+ option(NNG_OPT_SENDTIMEO);
+ option(NNG_OPT_LOCADDR);
+ option(NNG_OPT_REMADDR);
+ option(NNG_OPT_URL);
+ option(NNG_OPT_MAXTTL);
+ option(NNG_OPT_RECVMAXSZ);
+ option(NNG_OPT_RECONNMINT);
+ option(NNG_OPT_RECONNMAXT);
+
+ //TCP options
+ option(NNG_OPT_TCP_NODELAY);
+ option(NNG_OPT_TCP_KEEPALIVE);
+ option(NNG_OPT_TCP_BOUND_PORT);
+
+ //IPC options
+ option(NNG_OPT_IPC_PEER_GID);
+ option(NNG_OPT_IPC_PEER_PID);
+ option(NNG_OPT_IPC_PEER_UID);
+ option(NNG_OPT_IPC_PEER_ZONEID);
+ option(NNG_OPT_IPC_PERMISSIONS);
+ option(NNG_OPT_IPC_SECURITY_DESCRIPTOR);
+
+ //Pub/sub options
+ /*option(NNG_OPT_SUB_SUBSCRIBE);//should use socket:subscribe() instead (so that nil is not counted as part of the subscription)*/
+ /*option(NNG_OPT_SUB_UNSUBSCRIBE);//should use socket:unsubscribe() instead (same reason as above)*/
+ option(NNG_OPT_SUB_PREFNEW);
+
+ //Req/rep options
+ option(NNG_OPT_SURVEYOR_SURVEYTIME);
+
return 1;
}
diff --git a/src/lua-nng.h b/src/lua-nng.h new file mode 100644 index 0000000..80b716d --- /dev/null +++ b/src/lua-nng.h @@ -0,0 +1,26 @@ +#define NNG_STATIC_LIB
+
+#include <nng/nng.h>
+
+#include <nng/transport/inproc/inproc.h>
+#include <nng/transport/ipc/ipc.h>
+#include <nng/transport/tcp/tcp.h>
+#include <nng/transport/tls/tls.h>
+#include <nng/transport/zerotier/zerotier.h>
+
+#include <nng/protocol/pair1/pair.h>
+#include <nng/protocol/bus0/bus.h>
+#include <nng/protocol/pubsub0/pub.h>
+#include <nng/protocol/pubsub0/sub.h>
+#include <nng/protocol/pipeline0/pull.h>
+#include <nng/protocol/pipeline0/push.h>
+#include <nng/protocol/reqrep0/req.h>
+#include <nng/protocol/reqrep0/rep.h>
+#include <nng/protocol/survey0/respond.h>
+#include <nng/protocol/survey0/survey.h>
+
+#include <nng/supplemental/util/platform.h>
+
+nng_socket* tosocket(lua_State *L, int offset);
+nng_listener* tolistener(lua_State *L, int offset);
+nng_dialer* todialer(lua_State *L, int offset);
|
