From a7ff1b815987b0c1b3ded5701845e0eb52f0592c Mon Sep 17 00:00:00 2001 From: Alexander Pickering Date: Tue, 14 Jul 2020 21:32:36 -0400 Subject: Added pubsub methods Add socket:subscribe() and socket:unsubscribe() for the pub/sub protocol. --- lua-nng-dev-1.rockspec | 2 +- spec/startup_spec.lua | 13 +++++++++++++ src/lua-nng.c | 44 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/lua-nng-dev-1.rockspec b/lua-nng-dev-1.rockspec index 0be82f6..3229e99 100644 --- a/lua-nng-dev-1.rockspec +++ b/lua-nng-dev-1.rockspec @@ -3,7 +3,7 @@ rockspec_format="3.0" version = "dev-1" source = { url = "git+https://cogarr.net/source/cgit.cgi/lua-nng", - tag = "v0.1" + --tag = "v0.1" } description = { summary = "A simple binding for Nanomessage Next Generation", diff --git a/spec/startup_spec.lua b/spec/startup_spec.lua index 45fa996..dacf9e9 100644 --- a/spec/startup_spec.lua +++ b/spec/startup_spec.lua @@ -80,4 +80,17 @@ describe("nng",function() assert(avg > 0.4) assert(avg < 0.6) end) + it("should be able to use publish and subscribe sockets to transfer information", function() + local s1 = assert(nng.pub0_open()) + local s2 = assert(nng.sub0_open()) + local s3 = assert(nng.sub0_open()) + assert(s1:listen("ipc:///tmp/pub.ipc")) + assert(s2:subscribe("")) + assert(s3:subscribe("")) + assert(s2:dial("ipc:///tmp/pub.ipc")) + assert(s3:dial("ipc:///tmp/pub.ipc")) + assert(s1:send("hello 1")) + assert(s2:recv() == "hello 1") + assert(s3:recv() == "hello 1") + end) end) diff --git a/src/lua-nng.c b/src/lua-nng.c index 71e57e8..76d207b 100644 --- a/src/lua-nng.c +++ b/src/lua-nng.c @@ -165,8 +165,40 @@ int lnng_listener_close(lua_State *L){ return 0; } -static const struct luaL_Reg nng_dialer_m[] = { +//subscribe(socket,"topic") +int lnng_subscribe(lua_State *L){ + nng_socket *sock = tosocket(L,1); + size_t size; + const char *topic = luaL_checklstring(L,2,&size); + lua_pop(L,2); + int err = nng_socket_set(*sock,NNG_OPT_SUB_SUBSCRIBE,topic,size); + if(err == 0){ + lua_pushboolean(L,1); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} +//unsubscribe(socket,"topic") +int lnng_unsubscribe(lua_State *L){ + nng_socket *sock = tosocket(L,1); + const char *topic = lua_tostring(L,2); + lua_pop(L,2); + int err = nng_socket_set_string(*sock,NNG_OPT_SUB_UNSUBSCRIBE,topic); + if(err == 0){ + lua_pushboolean(L,1); + return 1; + }else{ + lua_pushboolean(L,0); + lua_pushstring(L,nng_strerror(err)); + return 2; + } +} + +static const struct luaL_Reg nng_dialer_m[] = { {NULL, NULL} }; @@ -175,11 +207,21 @@ static const struct luaL_Reg nng_listener_m[] = { {NULL, NULL} }; +static const struct luaL_Reg nng_socket_sub_m[] = { + {"subscribe", lnng_subscribe}, + {"unsubscribe", lnng_unsubscribe}, + {NULL, NULL} +}; + static const struct luaL_Reg nng_socket_m[] = { {"dial", lnng_dial}, {"listen", lnng_listen}, {"send", lnng_send}, {"recv", lnng_recv}, + + //pub/sub only + {"subscribe",lnng_subscribe}, + {"unsubscribe",lnng_unsubscribe}, {NULL, NULL} }; -- cgit v1.2.3-70-g09d2