From: Maciej Krüger <[email protected]> This extends the conn:call function to take a function as it's last parameter, which will make the library use ubus_invoke_async.
This allows streaming the logs from ubus, among other things. An example has been provided Signed-off-by: Maciej Krüger <[email protected]> --- lua/stream_logs.lua | 29 ++++++++ lua/ubus.c | 168 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 lua/stream_logs.lua diff --git a/lua/stream_logs.lua b/lua/stream_logs.lua new file mode 100644 index 0000000..5490b02 --- /dev/null +++ b/lua/stream_logs.lua @@ -0,0 +1,29 @@ +-- Load modules +require "ubus" +require "uloop" + +uloop.init() + +-- Establish connection +local conn = ubus.connect() +if not conn then + error("Failed to connect to ubusd") +end + +local function handleLog (log) + for k, v in pairs(log) do + print(k , v) + end +end + +-- Stream logs +local ret = conn:call("log", "read", { stream = true, oneshot = false, lines = 0 }, function (log, control) + if control then + print('Control event', control.type) + else + handleLog(log) + end +end) + +uloop.run() + diff --git a/lua/ubus.c b/lua/ubus.c index 07b816d..51643df 100644 --- a/lua/ubus.c +++ b/lua/ubus.c @@ -19,6 +19,7 @@ #include <libubox/blobmsg_json.h> #include <lauxlib.h> #include <lua.h> +#include <libubox/ustream.h> #define MODNAME "ubus" #define METANAME MODNAME ".meta" @@ -42,6 +43,12 @@ struct ubus_lua_event { int r; }; +struct ubus_lua_request { + struct ubus_request r; + struct ustream_fd fd; + int fnc; +}; + struct ubus_lua_subscriber { struct ubus_subscriber s; int rnotify; @@ -660,6 +667,134 @@ ubus_lua_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) ubus_lua_parse_blob_array(L, blob_data(msg), blob_len(msg), true); } +static void +ubus_lua_async_complete_cb(struct ubus_request *req, int ret) +{ + struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r); + + lua_getglobal(state, "__ubus_cb_async"); + lua_rawgeti(state, -1, lureq->fnc); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + lua_pushnil(state); + + lua_newtable(state); + + lua_pushstring(state, "type"); + lua_pushstring(state, "connected"); + lua_settable(state, -3); + + lua_pushstring(state, "return"); + lua_pushnumber(state, ret); + lua_settable(state, -3); + + lua_call(state, 2, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_async_cb(struct ustream *s, struct blob_attr *msg) +{ + struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream); + + lua_getglobal(state, "__ubus_cb_async"); + lua_rawgeti(state, -1, lureq->fnc); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + if( msg ){ + ubus_lua_parse_blob_array(state, blob_data(msg), blob_len(msg), true); + } else { + lua_pushnil(state); + } + lua_call(state, 1, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_async_data_cb(struct ustream *s, int bytes) +{ + while (true) { + struct blob_attr *a; + int len, cur_len; + + a = (void*) ustream_get_read_buf(s, &len); + if (len < (int)sizeof(*a)) + break; + + cur_len = blob_len(a) + sizeof(*a); + if (len < cur_len) + break; + + ubus_lua_async_cb(s, a); + ustream_consume(s, cur_len); + } +} + +static void +ubus_lua_async_state_cb(struct ustream *s) +{ + struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream); + + lua_getglobal(state, "__ubus_cb_async"); + lua_rawgeti(state, -1, lureq->fnc); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + lua_pushnil(state); + + lua_newtable(state); + + lua_pushstring(state, "type"); + lua_pushstring(state, "closed"); + lua_settable(state, -3); + + lua_call(state, 2, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_async_fd_cb(struct ubus_request *req, int fd) +{ + struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r); + + lureq->fd.stream.notify_read = ubus_lua_async_data_cb; + lureq->fd.stream.notify_state = ubus_lua_async_state_cb; + ustream_fd_init(&lureq->fd, fd); +} + +static int +ubus_lua_register_async( struct ubus_lua_request ** retlureq, struct ubus_context *ctx, lua_State *L, + int fnc ) +{ + struct ubus_lua_request *lureq; + + lureq = calloc( 1, sizeof( struct ubus_lua_request ) ); + if( !lureq ){ + lua_pushstring( L, "Out of memory" ); + return lua_error(L); + } + + lua_getglobal(L, "__ubus_cb_async"); + lua_pushvalue(L, fnc); + lureq->fnc = luaL_ref(L, -2); + lua_pop(L, 1); + + // remove the fnc + lua_pop(L, 1); + + *retlureq = lureq; + + return 0; +} + static int ubus_lua_call(lua_State *L) { @@ -669,6 +804,20 @@ ubus_lua_call(lua_State *L) const char *path = luaL_checkstring(L, 2); const char *func = luaL_checkstring(L, 3); + bool isAsync = lua_isfunction(L, 5); + struct ubus_lua_request * req = NULL; + + if (isAsync) { + int ret = ubus_lua_register_async(&req, c->ctx, L, lua_gettop(L)); + if (ret) { + return ret; + } + if (!req) { + lua_pushstring(L, "Failed to register async callback"); + return lua_error( L ); + } + } + luaL_checktype(L, 4, LUA_TTABLE); blob_buf_init(&c->buf, 0); @@ -689,7 +838,14 @@ ubus_lua_call(lua_State *L) } top = lua_gettop(L); - rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000); + + if (isAsync) { + rv = ubus_invoke_async(c->ctx, id, func, c->buf.head, &req->r); + req->r.fd_cb = ubus_lua_async_fd_cb; + req->r.complete_cb = ubus_lua_async_complete_cb; + } else { + rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000); + } if (rv != UBUS_STATUS_OK) { @@ -699,6 +855,10 @@ ubus_lua_call(lua_State *L) return 2; } + if (isAsync) { + ubus_complete_request_async(c->ctx, &req->r); + } + return lua_gettop(L) - top; } @@ -731,7 +891,7 @@ ubus_lua_load_event(lua_State *L) event->e.cb = ubus_event_handler; - /* update the he callback lookup table */ + /* update the callback lookup table */ lua_getglobal(L, "__ubus_cb_event"); lua_pushvalue(L, -2); event->r = luaL_ref(L, -2); @@ -1021,5 +1181,9 @@ luaopen_ubus(lua_State *L) /* create the publisher table - notifications of new subs */ lua_createtable(L, 1, 0); lua_setglobal(L, "__ubus_cb_publisher"); + + /* create the async table - callbacks for invoke_async */ + lua_createtable(L, 1, 0); + lua_setglobal(L, "__ubus_cb_async"); return 0; } -- 2.38.1 _______________________________________________ openwrt-devel mailing list [email protected] https://lists.openwrt.org/mailman/listinfo/openwrt-devel
