大家好
在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下!
简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception
in thread "main" org.apache.flink.table.api.ValidationException: A group
window expects a time attribute for grouping in a stream environment.
下边是我的详细流程的相关片段
----------------------------------------------------------------------------
1. 我们使用的jar包是flink-xx_2.12:1.10.0 / kafka版本为0.11
2. kafka的数据格式为{"acct":"acct1234", "evtime":1593396391819}
3. 使用descriptor的方式连接kafka,代码为:
StreamExecutionEnvironment fsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);
fsTableEnv.connect(new Kafka()
.version("universal")
.topic("jes_topic_evtime")
.property("zookeeper.connect",
"172.xx.xx.xxx:2181")
.property("bootstrap.servers",
"172.xx.xx.xxx:9092")
.property("group.id", "grp1")
.startFromEarliest()
).withFormat(new Json()
.failOnMissingField(false).deriveSchema())
.withSchema(new
Schema().field("acct", "STRING").field("evtime",
"LONG").field("logictime","TIMESTAMP(3)").rowTime(new
Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000)))
.inAppendMode().createTemporaryTable("testTableName");
Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime
FROM testTableName")
.window(Tumble.over("5.seconds").on("logictime").as("w1"))
.groupBy("w1, acct")
.select("w1.rowtime, acctno");
测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null