你好,这个问题已经解决了。
我现在通过官方例子:
SET table.sql-dialect=default;
create table flink_kafka(
sys_time bigint,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'test-sql',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
sys_time bigint
) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, 'yyyy-MM-dd') as dt,
DATE_FORMAT(rt, 'HH') as hr FROM flink_kafka;
发现数据一直无法写入hive。程序没有报错,
select * from flink_kafka;是有数据的。
但是hive_table一直没有数据,
我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据
--
Sent from: http://apache-flink.147419.n8.nabble.com/