Hi, 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855
Best, Jark On Mon, 6 Jul 2020 at 10:19, Jim Chen <chenshuai19950...@gmail.com> wrote: > 大家好: > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > 那么在eval方法接收到的就是Row[], > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > 通过下面的步骤和代码可还原车祸场景: > kafka topic: test_action > kafka message: > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > > 代码1:Problem.java > package com.flink; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > /** > * > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > * 那么在eval方法接收到的就是Row[], > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > * > * kafka topic: test_action > * > * kafka message: > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > */ > public class Problem { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > envSettings); > bsEnv.registerFunction("explode2", new ExplodeFunction()); > > String ddlSource = "CREATE TABLE actionTable (\n" + > " action ARRAY<\n" + > " ROW<" + > " actionID STRING,\n" + > " actionName STRING\n" + > " >\n" + > " >\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka',\n" + > " 'connector.version' = '0.11',\n" + > " 'connector.topic' = 'test_action',\n" + > " 'connector.startup-mode' = 'earliest-offset',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'update-mode' = 'append',\n" + > " 'format.type' = 'json'\n" + > ")"; > bsEnv.sqlUpdate(ddlSource); > > // Table table = bsEnv.sqlQuery("select `action` from actionTable"); > Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL > TABLE(explode2(`action`)) as T(`word`)"); > table.printSchema(); > bsEnv.toAppendStream(table, Row.class) > .print("==tb=="); > > > bsEnv.execute("ARRAY tableFunction Problem"); > } > } > > 代码2:ExplodeFunction.java > package com.flink; > > import org.apache.flink.table.functions.TableFunction; > import org.apache.flink.types.Row; > > import java.util.ArrayList; > import java.util.Arrays; > > public class ExplodeFunction extends TableFunction<Row> { > > public void eval(Row[] values) { > System.out.println(values.length); > if (values.length > 0) { > for (Row row : values) { > if (row != null) {// 这里debug出来的row总是空 > ArrayList<Object> list = new ArrayList<>(); > for (int i = 0; i < row.getArity(); i++) { > Object field = row.getField(i); > list.add(field); > } > > collector.collect(Row.of(Arrays.toString(list.toArray()))); > } > } > } > } > } > > 最后贴个debug的图 > [image: image.png] >