From: Maciej Krüger <[email protected]>

This extends the conn:call function to take a function
as it's last parameter, which will make the library
use ubus_invoke_async.

This allows streaming the logs from ubus,
among other things.

An example has been provided

Signed-off-by: Maciej Krüger <[email protected]>
---
 lua/stream_logs.lua |  29 ++++++++
 lua/ubus.c          | 168 +++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 195 insertions(+), 2 deletions(-)
 create mode 100644 lua/stream_logs.lua

diff --git a/lua/stream_logs.lua b/lua/stream_logs.lua
new file mode 100644
index 0000000..5490b02
--- /dev/null
+++ b/lua/stream_logs.lua
@@ -0,0 +1,29 @@
+-- Load modules
+require "ubus"
+require "uloop"
+
+uloop.init()
+
+-- Establish connection
+local conn = ubus.connect()
+if not conn then
+        error("Failed to connect to ubusd")
+end
+
+local function handleLog (log)
+        for k, v in pairs(log) do
+                print(k , v)
+        end
+end
+
+-- Stream logs
+local ret = conn:call("log", "read", { stream = true, oneshot = false, lines = 
0 }, function (log, control)
+        if control then
+                print('Control event', control.type)
+        else
+                handleLog(log)
+        end
+end)
+
+uloop.run()
+
diff --git a/lua/ubus.c b/lua/ubus.c
index 07b816d..51643df 100644
--- a/lua/ubus.c
+++ b/lua/ubus.c
@@ -19,6 +19,7 @@
 #include <libubox/blobmsg_json.h>
 #include <lauxlib.h>
 #include <lua.h>
+#include <libubox/ustream.h>
 
 #define MODNAME                "ubus"
 #define METANAME       MODNAME ".meta"
@@ -42,6 +43,12 @@ struct ubus_lua_event {
        int r;
 };
 
+struct ubus_lua_request {
+       struct ubus_request r;
+       struct ustream_fd fd;
+       int fnc;
+};
+
 struct ubus_lua_subscriber {
        struct ubus_subscriber s;
        int rnotify;
@@ -660,6 +667,134 @@ ubus_lua_call_cb(struct ubus_request *req, int type, 
struct blob_attr *msg)
                ubus_lua_parse_blob_array(L, blob_data(msg), blob_len(msg), 
true);
 }
 
+static void
+ubus_lua_async_complete_cb(struct ubus_request *req, int ret)
+{
+       struct ubus_lua_request *lureq = container_of(req, struct 
ubus_lua_request, r);
+
+       lua_getglobal(state, "__ubus_cb_async");
+       lua_rawgeti(state, -1, lureq->fnc);
+       lua_remove(state, -2);
+
+       if (lua_isfunction(state, -1)) {
+               lua_pushnil(state);
+
+               lua_newtable(state);
+
+               lua_pushstring(state, "type");
+               lua_pushstring(state, "connected");
+               lua_settable(state, -3);
+
+               lua_pushstring(state, "return");
+               lua_pushnumber(state, ret);
+               lua_settable(state, -3);
+
+               lua_call(state, 2, 0);
+       } else {
+               lua_pop(state, 1);
+       }
+}
+
+static void
+ubus_lua_async_cb(struct ustream *s, struct blob_attr *msg)
+{
+       struct ubus_lua_request *lureq = container_of(s, struct 
ubus_lua_request, fd.stream);
+
+       lua_getglobal(state, "__ubus_cb_async");
+       lua_rawgeti(state, -1, lureq->fnc);
+       lua_remove(state, -2);
+
+       if (lua_isfunction(state, -1)) {
+               if( msg ){
+                       ubus_lua_parse_blob_array(state, blob_data(msg), 
blob_len(msg), true);
+               } else {
+                       lua_pushnil(state);
+               }
+               lua_call(state, 1, 0);
+       } else {
+               lua_pop(state, 1);
+       }
+}
+
+static void
+ubus_lua_async_data_cb(struct ustream *s, int bytes)
+{
+       while (true) {
+               struct blob_attr *a;
+               int len, cur_len;
+
+               a = (void*) ustream_get_read_buf(s, &len);
+               if (len < (int)sizeof(*a))
+                       break;
+
+               cur_len = blob_len(a) + sizeof(*a);
+               if (len < cur_len)
+                       break;
+
+               ubus_lua_async_cb(s, a);
+               ustream_consume(s, cur_len);
+       }
+}
+
+static void
+ubus_lua_async_state_cb(struct ustream *s)
+{
+       struct ubus_lua_request *lureq = container_of(s, struct 
ubus_lua_request, fd.stream);
+
+       lua_getglobal(state, "__ubus_cb_async");
+       lua_rawgeti(state, -1, lureq->fnc);
+       lua_remove(state, -2);
+
+       if (lua_isfunction(state, -1)) {
+               lua_pushnil(state);
+
+               lua_newtable(state);
+
+               lua_pushstring(state, "type");
+               lua_pushstring(state, "closed");
+               lua_settable(state, -3);
+
+               lua_call(state, 2, 0);
+       } else {
+               lua_pop(state, 1);
+       }
+}
+
+static void
+ubus_lua_async_fd_cb(struct ubus_request *req, int fd)
+{
+       struct ubus_lua_request *lureq = container_of(req, struct 
ubus_lua_request, r);
+
+       lureq->fd.stream.notify_read = ubus_lua_async_data_cb;
+       lureq->fd.stream.notify_state = ubus_lua_async_state_cb;
+       ustream_fd_init(&lureq->fd, fd);
+}
+
+static int
+ubus_lua_register_async( struct ubus_lua_request ** retlureq, struct 
ubus_context *ctx, lua_State *L,
+                       int fnc )
+{
+       struct ubus_lua_request *lureq;
+
+       lureq = calloc( 1, sizeof( struct ubus_lua_request ) );
+       if( !lureq ){
+               lua_pushstring( L, "Out of memory" );
+               return lua_error(L);
+       }
+
+       lua_getglobal(L, "__ubus_cb_async");
+       lua_pushvalue(L, fnc);
+       lureq->fnc = luaL_ref(L, -2);
+       lua_pop(L, 1);
+
+       // remove the fnc
+       lua_pop(L, 1);
+
+       *retlureq = lureq;
+
+       return 0;
+}
+
 static int
 ubus_lua_call(lua_State *L)
 {
@@ -669,6 +804,20 @@ ubus_lua_call(lua_State *L)
        const char *path = luaL_checkstring(L, 2);
        const char *func = luaL_checkstring(L, 3);
 
+       bool isAsync = lua_isfunction(L, 5);
+       struct ubus_lua_request * req = NULL;
+
+       if (isAsync) {
+               int ret = ubus_lua_register_async(&req, c->ctx, L, 
lua_gettop(L));
+               if (ret) {
+                       return ret;
+               }
+               if (!req) {
+                       lua_pushstring(L, "Failed to register async callback");
+                       return lua_error( L );
+               }
+       }
+
        luaL_checktype(L, 4, LUA_TTABLE);
        blob_buf_init(&c->buf, 0);
 
@@ -689,7 +838,14 @@ ubus_lua_call(lua_State *L)
        }
 
        top = lua_gettop(L);
-       rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, 
c->timeout * 1000);
+
+       if (isAsync) {
+               rv = ubus_invoke_async(c->ctx, id, func, c->buf.head, &req->r);
+               req->r.fd_cb = ubus_lua_async_fd_cb;
+               req->r.complete_cb = ubus_lua_async_complete_cb;
+       } else {
+               rv = ubus_invoke(c->ctx, id, func, c->buf.head, 
ubus_lua_call_cb, L, c->timeout * 1000);
+       }
 
        if (rv != UBUS_STATUS_OK)
        {
@@ -699,6 +855,10 @@ ubus_lua_call(lua_State *L)
                return 2;
        }
 
+       if (isAsync) {
+               ubus_complete_request_async(c->ctx, &req->r);
+       }
+
        return lua_gettop(L) - top;
 }
 
@@ -731,7 +891,7 @@ ubus_lua_load_event(lua_State *L)
 
        event->e.cb = ubus_event_handler;
 
-       /* update the he callback lookup table */
+       /* update the callback lookup table */
        lua_getglobal(L, "__ubus_cb_event");
        lua_pushvalue(L, -2);
        event->r = luaL_ref(L, -2);
@@ -1021,5 +1181,9 @@ luaopen_ubus(lua_State *L)
        /* create the publisher table - notifications of new subs */
        lua_createtable(L, 1, 0);
        lua_setglobal(L, "__ubus_cb_publisher");
+
+       /* create the async table - callbacks for invoke_async */
+       lua_createtable(L, 1, 0);
+       lua_setglobal(L, "__ubus_cb_async");
        return 0;
 }
-- 
2.38.1


_______________________________________________
openwrt-devel mailing list
[email protected]
https://lists.openwrt.org/mailman/listinfo/openwrt-devel

Reply via email to