TIMESTAMP(3)??????????????
------------------ ???????? ------------------ ??????: "PCL"<[email protected]>; ????????: 2020??5??12??(??????) ????9:43 ??????: "user-zh"<[email protected]>; ????: flink10????kafka???? ?????????? ??????????????????????Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. ????????????????????????????????????????flink??blink??planner???????????? ?????????? <flink.version>1.10.0</flink.version> <scala.version>2.11</scala.version> ?????????? //???????????? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //????????tableEnvironment StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); Schema schema = new Schema() //.field("id", "VARCHAR").from("id") .field("id", "STRING") //.field("name", "VARCHAR") .field("amount", "DOUBLE") .field("proctime", Types.SQL_TIMESTAMP).proctime() //.field("rowtime", Types.SQL_TIMESTAMP) //.rowtime( // new Rowtime() // .timestampsFromField( // "eventtime") // .watermarksPeriodicBounded(2000)) ; // "0.8", "0.9", "0.10", "0.11", and "universal" tableEnv.connect(new Kafka().version("universal") .topic("source0511") .property("zookeeper.connect", "172.16.44.28:7758") .property("bootstrap.servers", "172.16.44.28:9096") .property("group.id", "source0511-group") .startFromEarliest() ) .withFormat(new Csv()) .withSchema(schema) .inAppendMode() .createTemporaryTable("sourceTable"); tableEnv.connect( new Kafka() .version("universal") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("sink0511") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("zookeeper.connect", "172.16.44.28:7758") .property("bootstrap.servers", "172.16.44.28:9096") .sinkPartitionerFixed()) .inAppendMode() .withFormat(new Json()) .withSchema( new Schema().field("totalamount", "DOUBLE") //.field("total", "INT") .field("time", Types.SQL_TIMESTAMP) ) .createTemporaryTable("sinkTable"); tableEnv.sqlUpdate("insert into sinkTable" + " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " + "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); //SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) // FROM user_actions // GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); env.execute("test");
