你好,这个问题已经解决了。
我现在通过官方例子:

SET table.sql-dialect=default;

create table  flink_kafka(
sys_time bigint,
rt  AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'xx',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = '',
    'properties.group.id' = 'test-sql',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true'
);

SET table.sql-dialect=hive;

CREATE TABLE hive_table (
  sys_time bigint
) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);


INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, 'yyyy-MM-dd') as dt,
DATE_FORMAT(rt, 'HH') as hr  FROM flink_kafka;

发现数据一直无法写入hive。程序没有报错,
select * from flink_kafka;是有数据的。
但是hive_table一直没有数据,
我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复