感谢回复!
这个很神奇的是,执行sqlquery就没问题
/*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime,
INTERVAL '5' SECOND)" +
" from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) ");
tb1.printSchema();*/
放开注释后,打印的schema是
root
|-- EXPR$0: DOUBLE
|-- EXPR$1: TIMESTAMP(3)
在 2020-05-12 22:36:17,"忝忝向仧" <[email protected]> 写道:
>TIMESTAMP(3)时间格式不对吧
>
>
>------------------ 原始邮件 ------------------
>发件人: "PCL"<[email protected]>;
>发送时间: 2020年5月12日(星期二) 晚上9:43
>收件人: "user-zh"<[email protected]>;
>
>主题: flink10读取kafka报错
>
>
>
>各位大佬:
> 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute
>column, but TIMESTAMP(3) encountered.
>无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。
>版本如下:
><flink.version>1.10.0</flink.version>
><scala.version>2.11</scala.version>
>代码如下:
>//获取运行环境
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>//EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
> //创建一个tableEnvironment
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>
>Schema schema = new Schema()
>//.field("id", "VARCHAR").from("id")
>.field("id", "STRING")
>//.field("name", "VARCHAR")
>.field("amount", "DOUBLE")
>
> .field("proctime", Types.SQL_TIMESTAMP).proctime()
>//.field("rowtime", Types.SQL_TIMESTAMP)
>
> //.rowtime(
>
> // new Rowtime()
>
>
>//
> .timestampsFromField(
>
>
>//
> "eventtime")
>
>
>//
> .watermarksPeriodicBounded(2000))
>;
>
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>tableEnv.connect(new Kafka().version("universal")
>
> .topic("source0511")
>
> .property("zookeeper.connect", "172.16.44.28:7758")
>
> .property("bootstrap.servers", "172.16.44.28:9096")
>
> .property("group.id", "source0511-group")
>
> .startFromEarliest()
>
> )
>
> .withFormat(new Csv())
>
> .withSchema(schema)
>
> .inAppendMode()
>
> .createTemporaryTable("sourceTable");
>
>tableEnv.connect(
>new Kafka()
>
> .version("universal")
>// "0.8", "0.9", "0.10", "0.11", and "universal"
>.topic("sink0511")
>
> .property("acks", "all")
>
> .property("retries", "0")
>
> .property("batch.size", "16384")
>
> .property("linger.ms", "10")
>
> .property("zookeeper.connect", "172.16.44.28:7758")
>
> .property("bootstrap.servers", "172.16.44.28:9096")
>
> .sinkPartitionerFixed())
>
> .inAppendMode()
>
> .withFormat(new Json())
>
> .withSchema(
>new Schema().field("totalamount", "DOUBLE")
>//.field("total", "INT")
>.field("time", Types.SQL_TIMESTAMP)
>
> )
>
> .createTemporaryTable("sinkTable");
>
>tableEnv.sqlUpdate("insert into sinkTable"
>+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) "
>+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)");
>//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT
>user_name)
> // FROM user_actions
> // GROUP BY TUMBLE(user_action_time,
>INTERVAL '10' MINUTE);
>env.execute("test");