Flink版本:1.10.2

使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。

本地测试的结果是一直重复输出数据。

请问一下DataStream 处理之后,怎么才能注册为 Table。

-------------------------------
代码如下:

// 异步redis处理
RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
aggProcessorArgs);

// 获取异步处理流
DataStream<Row> result = AsyncDataStream.orderedWait(
        dataStream,
        asyncFunction,
        60L,
        TimeUnit.SECONDS,
        100).returns(outRowTypeInfo);

// 注册为临时 table
tabEnv.createTemporaryView("test_table", result,
outRowFields.stream().collect(Collectors.joining(",")));

//
result.print("out_table>>");

Table test_table = tabEnv.sqlQuery("select * from test_table");
// 查询临时table
tabEnv.toAppendStream(test_table, Row.class).print("test_table");



-- 
**************************************
 tili
**************************************

回复