From 27ece5603fc0cde89183ceb61f915fa64fef1061 Mon Sep 17 00:00:00 2001 From: Alexander Pickering Date: Wed, 29 Jul 2020 12:18:10 -0400 Subject: Added recv_any() recv_any() is a function that takes multiple sockets and waits for one or more of them to receive. See the unit test for examples. --- src/lua-nng.c | 252 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 240 insertions(+), 12 deletions(-) (limited to 'src/lua-nng.c') diff --git a/src/lua-nng.c b/src/lua-nng.c index 76d207b..9d12f30 100644 --- a/src/lua-nng.c +++ b/src/lua-nng.c @@ -23,6 +23,11 @@ #include #include +#include + +#include +#include "lua-nng-aio.h" + #define OPEN(name)\ int lnng_ ## name ## _open(lua_State *L){\ nng_socket *s = (nng_socket*)lua_newuserdata(L,sizeof(nng_socket));\ @@ -48,11 +53,29 @@ OPEN(rep0); OPEN(surveyor0); OPEN(respondent0); +//sleep(ms) +int lnng_msleep(lua_State *L){ + int ms = luaL_checkinteger(L,1); + nng_msleep(ms); + lua_pop(L,1); + return 0; +} + nng_socket* tosocket(lua_State *L, int offset){ luaL_checkudata(L,offset,"nng.socket"); return (nng_socket*)lua_touserdata(L,offset); } +nng_listener* tolistener(lua_State *L, int offset){ + luaL_checkudata(L,offset,"nng.listener"); + return (nng_listener*)lua_touserdata(L,offset); +} + +nng_dialer* todialer(lua_State *L, int offset){ + luaL_checkudata(L,offset,"nng.dialer"); + return (nng_dialer*)lua_touserdata(L,offset); +} + //socket:listen(url[, flags]) :: listener int lnng_listen(lua_State *L){ int argc = lua_gettop(L); @@ -143,6 +166,7 @@ int lnng_recv(lua_State *L){ //socket:close() int lnng_socket_close(lua_State *L){ + /*printf("Garbage collecting socket...");*/ nng_socket *sock = tosocket(L,1); int err = nng_close(*sock); lua_pop(L,1); @@ -185,9 +209,10 @@ int lnng_subscribe(lua_State *L){ //unsubscribe(socket,"topic") int lnng_unsubscribe(lua_State *L){ nng_socket *sock = tosocket(L,1); - const char *topic = lua_tostring(L,2); + size_t size; + const char *topic = luaL_checklstring(L,2,&size); lua_pop(L,2); - int err = nng_socket_set_string(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic); + int err = nng_socket_set(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic,size); if(err == 0){ lua_pushboolean(L,1); return 1; @@ -198,12 +223,158 @@ int lnng_unsubscribe(lua_State *L){ } } +//Option types +#define SOCKET_OPTION_SET(L, socket, flag, matches, ntype, gets, sets) \ + if(strcmp(flag, matches) == 0){\ + ntype value = (ntype)gets(L,3);\ + int err = sets(*socket, flag, value);\ + lua_pop(L,lua_gettop(L));\ + if(err != 0){\ + lua_pushboolean(L,0);\ + lua_pushfstring(L,nng_strerror(err));\ + return 2;\ + }else{\ + return 0;\ + }\ + } + +#define SOCKET_OPTION_GET(L, socket, flag, matches, ntype, gets, pushes) \ + if(strcmp(flag, matches) == 0){\ + ntype value;\ + int err = gets(*socket, flag, &value);\ + lua_pop(L,lua_gettop(L));\ + if(err != 0){\ + lua_pushboolean(L,0);\ + lua_pushfstring(L,nng_strerror(err));\ + return 2;\ + }else{\ + pushes(L,value);\ + return 1;\ + }\ + } + +//TODO +//set(listener,"flag",value) +int lnng_listener_set(lua_State *L){ + return 0; +} + +//set(socket, "flag", value) +int lnng_socket_set(lua_State *L){ + nng_socket *sock = tosocket(L,1); + const char *flag = luaL_checkstring(L,2); + //NNG_OPT_LOCADDR - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, luaL_checkinteger, nng_socket_set_ms); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, luaL_checkinteger, nng_socket_set_ms); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVBUF, int, luaL_checkinteger, nng_socket_set_int); + //NNG_OPT_RECVFD - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, luaL_checkinteger, nng_socket_set_uint64); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms); + //NNG_OPT_REMADDR - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDBUF, int, luaL_checkinteger, nng_socket_set_int); + //NNG_OPT_SENDFD - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, luaL_checkinteger, nng_socket_set_ms); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SOCKNAME, const char*, luaL_checkstring, nng_socket_set_string); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_MAXTTL, int, luaL_checkinteger, nng_socket_set_int); + //NNG_OPT_UR - read-only + //NNG_OPT_PROTO - read-only + //NNG_OPT_PEER - read-only + //NNG_OPT_PROTONAME - read-only + //NNG_OPT_PEERNAME - read-only + + //TCP options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, lua_toboolean, nng_socket_set_bool); + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, lua_toboolean, nng_socket_set_bool); + //NNG_OPT_TCP_BOUND_PORT - read-only? documentation doesn't say it, but it would be wierd if we could write to it. + + //TLS options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_AUTH_MODE, int, luaL_checkinteger, nng_socket_set_int); //write-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CA_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_CERT_KEY_FILE, const char*, luaL_checkstring, nng_socket_set_string); //write-only + //SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS TODO: NNG_OPT_TLS_CONFIG + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_TLS_SERVER_NAME, const char*, luaL_checkstring, nng_socket_set_string); + //NNG_OPT_TLS_VERIFIED - read-only + + //IPC options + //NNG_OPT_IPC_PEER_GID - read-only + //NNG_OPT_IPC_PEER_PID - read-only + //NNG_OPT_IPC_PEER_UID - read-only + //NNG_OPT_IPC_PEER_ZONEID - read-only + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_IPC_PERMISSIONS, int, luaL_checkinteger, nng_socket_set_int); + //TODO: NNG_OPT_IPC_SECURITY_DESCRIPTOR - windows-only, sets a pointer to a PSECURITY_DESCRIPTOR + + //PUB/SUB options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, lua_toboolean, nng_socket_set_bool); + + //REQ/REP options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms); + + //Survayor/respondent options + SOCKET_OPTION_SET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, luaL_checkinteger, nng_socket_set_ms); +} + +//get(socket,"flag",value) +int lnng_socket_get(lua_State *L){ + nng_socket *sock = tosocket(L,1); + const char *flag = luaL_checkstring(L,2); + //TODO NNG_OPT_LOCADDR //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RAW, bool, nng_socket_get_bool, lua_pushboolean); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMINT, nng_duration, nng_socket_get_ms, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECONNMAXT, nng_duration, nng_socket_get_ms, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVBUF, int, nng_socket_get_int, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVFD, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVMAXSZ, size_t, nng_socket_get_size, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_RECVTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger); + //TODO NNG_OPT_REMADDR + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDBUF, int, nng_socket_get_int, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDFD, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SENDTIMEO, nng_duration, nng_socket_get_ms, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SOCKNAME, char*, nng_socket_get_string, lua_pushstring); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_MAXTTL, int, nng_socket_get_int, lua_pushinteger); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_URL, char*, nng_socket_get_string, lua_pushstring); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTO, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEER, int, nng_socket_get_int, lua_pushinteger); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PROTONAME, char*, nng_socket_get_string, lua_pushstring); //read-only + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_PEERNAME, char*, nng_socket_get_string, lua_pushstring); //read-only + + //TCP options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_NODELAY, bool, nng_socket_get_bool, lua_pushboolean); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_KEEPALIVE, bool, nng_socket_get_bool, lua_pushboolean); + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TCP_BOUND_PORT, int, nng_socket_get_int, lua_pushinteger); + + //TLS options + //NNG_OPT_TLS_MODE - write-only option + //NNG_OPT_TLS_CA_FILE - write-only option + //NNG_OPT_TLS_CERT_KEY_FILE - write-only option + //TODO: NNG_OPT_TLS_CONFIG + //NNG_OPT_TLS_SERVER_NAME - write-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_TLS_VERIFIED, bool, nng_socket_get_bool, lua_pushboolean); //read-only option + + //IPC options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_GID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_PID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_UID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_IPC_PEER_ZONEID, uint64_t, nng_socket_get_uint64, lua_pushinteger); //read-only option on Solaris and illumos systems only + //NNG_OPT_IPC_PERMISSIONS - write-only option + //NNG_OPT_IPC_SECURITY_DESCRIPTOR - write-only option + + //PUB/SUB options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SUB_PREFNEW, bool, nng_socket_get_bool, lua_pushboolean); + + //REQ/REP options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_REQ_RESENDTIME, nng_duration, nng_socket_get_ms, lua_pushinteger); + + //Survayor/respondent options + SOCKET_OPTION_GET(L, sock, flag, NNG_OPT_SURVEYOR_SURVEYTIME, nng_duration, nng_socket_get_ms, lua_pushinteger); +} + static const struct luaL_Reg nng_dialer_m[] = { + {"close",lnng_dialer_close}, {NULL, NULL} }; static const struct luaL_Reg nng_listener_m[] = { - + {"close",lnng_listener_close}, {NULL, NULL} }; @@ -218,6 +389,7 @@ static const struct luaL_Reg nng_socket_m[] = { {"listen", lnng_listen}, {"send", lnng_send}, {"recv", lnng_recv}, + {"close", lnng_socket_close}, //pub/sub only {"subscribe",lnng_subscribe}, @@ -236,35 +408,91 @@ static const struct luaL_Reg nng_f[] = { {"rep0_open", lnng_rep0_open}, {"surveyor0_open",lnng_surveyor0_open}, {"respondent0_open",lnng_respondent0_open}, + {"sleep",lnng_msleep}, {NULL, NULL} }; #define flag(name) lua_pushnumber(L,name); lua_setfield(L,-2,#name); +#define option(name) lua_pushstring(L,name); lua_setfield(L,-2,#name); int luaopen_nng(lua_State *L){ - luaL_newmetatable(L,"nng.socket"); - luaL_newlib(L,nng_socket_m); - lua_setfield(L,-2,"__index"); - lua_pushcfunction(L,lnng_socket_close); - lua_setfield(L,-2,"__gc"); + luaL_newmetatable(L,"nng.socket");//{} + luaL_newlib(L,nng_socket_m);//{},{} + lua_newtable(L);//{},{},{} + lua_pushcfunction(L,lnng_socket_get);//{},{},{},get() + lua_setfield(L,-2,"__index");//{},{},{__index=get()} + lua_setmetatable(L,-2);//{},{} + lua_setfield(L,-2,"__index");//{__index = {}} + lua_pushcfunction(L,lnng_socket_close);//{__index = {}},close() + lua_setfield(L,-2,"__gc");//{__index = {}, __gc = close()} + lua_pushcfunction(L,lnng_socket_set);//{__index = {}, __gc = close()}, set() + lua_setfield(L,-2,"__newindex");//{__index = {}, __gc = close(), __newindex = set()} lua_pop(L,1); luaL_newmetatable(L,"nng.dialer"); luaL_newlib(L,nng_dialer_m); lua_setfield(L,-2,"__index"); - lua_pushcfunction(L,lnng_dialer_close); - lua_setfield(L,-2,"__gc"); + /*lua_pushcfunction(L,lnng_dialer_close);*/ + /*lua_setfield(L,-2,"__gc");*/ lua_pop(L,1); luaL_newmetatable(L,"nng.listener"); luaL_newlib(L,nng_listener_m); lua_setfield(L,-2,"__index"); - lua_pushcfunction(L,lnng_listener_close); - lua_setfield(L,-2,"__gc"); + /*lua_pushcfunction(L,lnng_listener_close);*/ + /*lua_setfield(L,-2,"__gc");*/ lua_pop(L,1); luaL_newlib(L,nng_f); + luaopen_nng_aio(L); + lua_setfield(L,-2,"aio"); + + //Flags flag(NNG_FLAG_NONBLOCK); flag(NNG_FLAG_ALLOC); + + //Options + option(NNG_OPT_SOCKNAME); + option(NNG_OPT_SOCKNAME); + option(NNG_OPT_RAW); + option(NNG_OPT_PROTO); + option(NNG_OPT_PROTONAME); + option(NNG_OPT_PEER); + option(NNG_OPT_PEERNAME); + option(NNG_OPT_RECVBUF); + option(NNG_OPT_SENDBUF); + option(NNG_OPT_RECVFD); + option(NNG_OPT_SENDFD); + option(NNG_OPT_RECVTIMEO); + option(NNG_OPT_SENDTIMEO); + option(NNG_OPT_LOCADDR); + option(NNG_OPT_REMADDR); + option(NNG_OPT_URL); + option(NNG_OPT_MAXTTL); + option(NNG_OPT_RECVMAXSZ); + option(NNG_OPT_RECONNMINT); + option(NNG_OPT_RECONNMAXT); + + //TCP options + option(NNG_OPT_TCP_NODELAY); + option(NNG_OPT_TCP_KEEPALIVE); + option(NNG_OPT_TCP_BOUND_PORT); + + //IPC options + option(NNG_OPT_IPC_PEER_GID); + option(NNG_OPT_IPC_PEER_PID); + option(NNG_OPT_IPC_PEER_UID); + option(NNG_OPT_IPC_PEER_ZONEID); + option(NNG_OPT_IPC_PERMISSIONS); + option(NNG_OPT_IPC_SECURITY_DESCRIPTOR); + + //Pub/sub options + /*option(NNG_OPT_SUB_SUBSCRIBE);//should use socket:subscribe() instead (so that nil is not counted as part of the subscription)*/ + /*option(NNG_OPT_SUB_UNSUBSCRIBE);//should use socket:unsubscribe() instead (same reason as above)*/ + option(NNG_OPT_SUB_PREFNEW); + + //Req/rep options + option(NNG_OPT_SURVEYOR_SURVEYTIME); + return 1; } -- cgit v1.2.3-70-g09d2