你好,这个问题已经解决了。 我现在通过官方例子:
SET table.sql-dialect=default; create table flink_kafka( sys_time bigint, rt AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, 'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR rt AS rt - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'test-sql', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ); SET table.sql-dialect=hive; CREATE TABLE hive_table ( sys_time bigint ) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='0s', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, 'yyyy-MM-dd') as dt, DATE_FORMAT(rt, 'HH') as hr FROM flink_kafka; 发现数据一直无法写入hive。程序没有报错, select * from flink_kafka;是有数据的。 但是hive_table一直没有数据, 我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/