各位大佬:
有没有遇到过这个问题,Window aggregate can only be defined over a time attribute column,
but TIMESTAMP(3) encountered.
无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。
版本如下:
<flink.version>1.10.0</flink.version>
<scala.version>2.11</scala.version>
代码如下:
//获取运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//EnvironmentSettings settings =
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
//创建一个tableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Schema schema = new Schema()
//.field("id", "VARCHAR").from("id")
.field("id", "STRING")
//.field("name", "VARCHAR")
.field("amount", "DOUBLE")
.field("proctime", Types.SQL_TIMESTAMP).proctime()
//.field("rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
// new Rowtime()
// .timestampsFromField(
// "eventtime")
// .watermarksPeriodicBounded(2000))
;
// "0.8", "0.9", "0.10", "0.11", and "universal"
tableEnv.connect(new Kafka().version("universal")
.topic("source0511")
.property("zookeeper.connect",
"172.16.44.28:7758")
.property("bootstrap.servers",
"172.16.44.28:9096")
.property("group.id", "source0511-group")
.startFromEarliest()
)
.withFormat(new Csv())
.withSchema(schema)
.inAppendMode()
.createTemporaryTable("sourceTable");
tableEnv.connect(
new Kafka()
.version("universal")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("sink0511")
.property("acks", "all")
.property("retries", "0")
.property("batch.size", "16384")
.property("linger.ms", "10")
.property("zookeeper.connect", "172.16.44.28:7758")
.property("bootstrap.servers", "172.16.44.28:9096")
.sinkPartitionerFixed())
.inAppendMode()
.withFormat(new Json())
.withSchema(
new Schema().field("totalamount", "DOUBLE")
//.field("total", "INT")
.field("time", Types.SQL_TIMESTAMP)
)
.createTemporaryTable("sinkTable");
tableEnv.sqlUpdate("insert into sinkTable"
+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) "
+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)");
//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT
user_name)
// FROM user_actions
// GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
env.execute("test");