Hi,all
Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
现在有这样的场景:
消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
有大佬知道吗,有实际验证过吗
感谢
附上简单sql:
CREATE TABLE kafka (
a STRING,
b STRING,
c BIGINT,
process_time BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'x',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '300000'
);
CREATE TABLE filesystem (
`day` STRING,
`hour` STRING,
a STRING,
b STRING,
c BIGINT,
d BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) PARTITIONED BY (`day`, `hour`) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'path' = 'hdfs://xx',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file'
);
insert into filesystem
select
from_unixtime(process_time,'yyyy-MM-dd') as `day`,
from_unixtime(process_time,'HH') as `hour`,
a,
b,
c,
d,
e,
f,
g,
h,
i
from kafka;
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger