Hi,
可以通过以下步骤还原车祸现场:
kafka topic: test_action
kafka message:
{"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
代码Problem2.java:
package com.flink;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
*
* 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
* 那么在eval方法接收到的就是Row[],
* 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
*
* 现在思路:就是在定义表的时候,把ARRYA看成STRING,
* 现在的问题,就是查询出来,都是空
*
* kafka topic: test_action
*
* kafka message:
* {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
*/
public class Problem2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode3", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable3 (\n" +
" action STRING\n" +
") WITH (\n" +
" 'connector.type' = 'kafka',\n" +
" 'connector.version' = '0.11',\n" +
" 'connector.topic' = 'test_action',\n" +
" 'connector.startup-mode' = 'earliest-offset',\n" +
" 'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
" 'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
" 'update-mode' = 'append',\n" +
" 'format.type' = 'json',\n" +
" 'format.derive-schema' = 'false',\n" +
" 'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);
Table table = bsEnv.sqlQuery("select * from actionTable3");
// Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// 输出都是空
bsEnv.execute("ARRAY tableFunction Problem");
}
}