Re: 关于filesystem connector的一点疑问

2020-11-12 文章 Jingsong Li
尽早的可查,直接把delay设为0即可 (其它默认值)

On Thu, Nov 12, 2020 at 5:17 PM admin <17626017...@163.com> wrote:

> Hi,jingsong
> 所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
> 所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
> 比如sink.partition-commit.trigger = partition-time
> sink.partition-commit.delay = 10 min
>
> > 2020年11月12日 下午3:22,Jingsong Li  写道:
> >
> > Hi admin,
> >
> > 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> >
> > On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> >
> >> 补充一下不用partition time trigger的原因,partition
> >> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
> >>
> >>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >>>
> >>> Hi ,kandy
> >>> 我没有基于partition time 提交分区,我是基于默认的process
> >> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >>>
>  2020年11月12日 下午12:46,kandy.wang  写道:
> 
>  hi:
>  按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> >> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
>  设置的多久,如果超过之后,应当默认是会丢弃的吧。
> 
> 
>  https://cloud.tencent.com/developer/article/1707182
> 
>  这个连接可以看一下
> 
> 
> 
> 
> 
> 
> 
>  在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> > Hi,all
> > Flink 1.11的filesystem connector,partition
> trigger[1]都是使用的默认值,所以分区可以多次提交
> > 现在有这样的场景:
> > 消费kafka数据写入hdfs中,分区字段是 day + hour
> >> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> > 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> > 有大佬知道吗,有实际验证过吗
> > 感谢
> >
> > 附上简单sql:
> > CREATE TABLE kafka (
> > a STRING,
> > b STRING,
> > c BIGINT,
> > process_time BIGINT,
> > e STRING,
> > f STRING,
> > g STRING,
> > h INT,
> > i STRING
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'topic',
> > 'properties.bootstrap.servers' = 'x',
> > 'properties.group.id' = 'test-1',
> > 'scan.startup.mode' = 'latest-offset',
> > 'format' = 'json',
> > 'properties.flink.partition-discovery.interval-millis' = '30'
> > );
> >
> > CREATE TABLE filesystem (
> > `day` STRING,
> > `hour` STRING,
> > a STRING,
> > b STRING,
> > c BIGINT,
> > d BIGINT,
> > e STRING,
> > f STRING,
> > g STRING,
> > h INT,
> > i STRING
> > ) PARTITIONED BY (`day`, `hour`) WITH (
> > 'connector' = 'filesystem',
> > 'format' = 'parquet',
> > 'path' = 'hdfs://xx',
> > 'parquet.compression'='SNAPPY',
> > 'sink.partition-commit.policy.kind' = 'success-file'
> > );
> >
> > insert into filesystem
> > select
> > from_unixtime(process_time,'-MM-dd') as `day`,
> > from_unixtime(process_time,'HH') as `hour`,
> > a,
> > b,
> > c,
> > d,
> > e,
> > f,
> > g,
> > h,
> > i
> > from kafka;
> >
> >
> >
> > [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >>>
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: 关于filesystem connector的一点疑问

2020-11-12 文章 admin
Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
比如sink.partition-commit.trigger = partition-time
sink.partition-commit.delay = 10 min

> 2020年11月12日 下午3:22,Jingsong Li  写道:
> 
> Hi admin,
> 
> 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> 
> On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> 
>> 补充一下不用partition time trigger的原因,partition
>> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>> 
>>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
>>> 
>>> Hi ,kandy
>>> 我没有基于partition time 提交分区,我是基于默认的process
>> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
>>> 
 2020年11月12日 下午12:46,kandy.wang  写道:
 
 hi:
 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
>> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
 设置的多久,如果超过之后,应当默认是会丢弃的吧。
 
 
 https://cloud.tencent.com/developer/article/1707182
 
 这个连接可以看一下
 
 
 
 
 
 
 
 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> Hi,all
> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
> 现在有这样的场景:
> 消费kafka数据写入hdfs中,分区字段是 day + hour
>> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> 有大佬知道吗,有实际验证过吗
> 感谢
> 
> 附上简单sql:
> CREATE TABLE kafka (
> a STRING,
> b STRING,
> c BIGINT,
> process_time BIGINT,
> e STRING,
> f STRING,
> g STRING,
> h INT,
> i STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'topic',
> 'properties.bootstrap.servers' = 'x',
> 'properties.group.id' = 'test-1',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json',
> 'properties.flink.partition-discovery.interval-millis' = '30'
> );
> 
> CREATE TABLE filesystem (
> `day` STRING,
> `hour` STRING,
> a STRING,
> b STRING,
> c BIGINT,
> d BIGINT,
> e STRING,
> f STRING,
> g STRING,
> h INT,
> i STRING
> ) PARTITIONED BY (`day`, `hour`) WITH (
> 'connector' = 'filesystem',
> 'format' = 'parquet',
> 'path' = 'hdfs://xx',
> 'parquet.compression'='SNAPPY',
> 'sink.partition-commit.policy.kind' = 'success-file'
> );
> 
> insert into filesystem
> select
> from_unixtime(process_time,'-MM-dd') as `day`,
> from_unixtime(process_time,'HH') as `hour`,
> a,
> b,
> c,
> d,
> e,
> f,
> g,
> h,
> i
> from kafka;
> 
> 
> 
> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
>>> 
>> 
>> 
> 
> -- 
> Best, Jingsong Lee



Re: 关于filesystem connector的一点疑问

2020-11-11 文章 Jingsong Li
Hi admin,

不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)

On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:

> 补充一下不用partition time trigger的原因,partition
> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>
> > 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> >
> > Hi ,kandy
> > 我没有基于partition time 提交分区,我是基于默认的process
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> >
> >> 2020年11月12日 下午12:46,kandy.wang  写道:
> >>
> >> hi:
> >> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
> >> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> >>
> >>
> >> https://cloud.tencent.com/developer/article/1707182
> >>
> >> 这个连接可以看一下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
> >>> Hi,all
> >>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
> >>> 现在有这样的场景:
> >>> 消费kafka数据写入hdfs中,分区字段是 day + hour
> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
> >>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
> >>> 有大佬知道吗,有实际验证过吗
> >>> 感谢
> >>>
> >>> 附上简单sql:
> >>> CREATE TABLE kafka (
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  process_time BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) WITH (
> >>>  'connector' = 'kafka',
> >>>  'topic' = 'topic',
> >>>  'properties.bootstrap.servers' = 'x',
> >>>  'properties.group.id' = 'test-1',
> >>>  'scan.startup.mode' = 'latest-offset',
> >>>  'format' = 'json',
> >>>  'properties.flink.partition-discovery.interval-millis' = '30'
> >>> );
> >>>
> >>> CREATE TABLE filesystem (
> >>>  `day` STRING,
> >>>  `hour` STRING,
> >>>  a STRING,
> >>>  b STRING,
> >>>  c BIGINT,
> >>>  d BIGINT,
> >>>  e STRING,
> >>>  f STRING,
> >>>  g STRING,
> >>>  h INT,
> >>>  i STRING
> >>> ) PARTITIONED BY (`day`, `hour`) WITH (
> >>>  'connector' = 'filesystem',
> >>>  'format' = 'parquet',
> >>>  'path' = 'hdfs://xx',
> >>>  'parquet.compression'='SNAPPY',
> >>>  'sink.partition-commit.policy.kind' = 'success-file'
> >>> );
> >>>
> >>> insert into filesystem
> >>> select
> >>>  from_unixtime(process_time,'-MM-dd') as `day`,
> >>>  from_unixtime(process_time,'HH') as `hour`,
> >>>  a,
> >>>  b,
> >>>  c,
> >>>  d,
> >>>  e,
> >>>  f,
> >>>  g,
> >>>  h,
> >>>  i
> >>> from kafka;
> >>>
> >>>
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> >
>
>

-- 
Best, Jingsong Lee


Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
补充一下不用partition time trigger的原因,partition 
time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang  写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '30'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 



Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
sink.partition-commit.trigger 

 process-timeString  Trigger type for partition commit: 'process-time': 
based on the time of the machine, it neither requires partition time extraction 
nor watermark generation. Commit partition once the 'current system time' 
passes 'partition creation system time' plus 'delay'. 'partition-time': based 
on the time that extracted from partition values, it requires watermark 
generation. Commit partition once the 'watermark' passes 'time extracted from 
partition values' plus 'delay'.
sink.partition-commit.delay 

 0 s DurationThe partition will not commit until the delay 
time. If it is a daily partition, should be '1 d', if it is a hourly partition, 
should be '1 h'.
这两个参数都没有设置,都是默认值

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang  写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '30'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 



Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
Hi ,kandy
我没有基于partition time 提交分区,我是基于默认的process 
time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区

> 2020年11月12日 下午12:46,kandy.wang  写道:
> 
> hi:
> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
> delay 时机触发分区提交,得看你的sink.partition-commit.delay
> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> 
> 
> https://cloud.tencent.com/developer/article/1707182
> 
> 这个连接可以看一下 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>> Hi,all
>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>> 现在有这样的场景:
>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>> 有大佬知道吗,有实际验证过吗
>> 感谢
>> 
>> 附上简单sql:
>> CREATE TABLE kafka (
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   process_time BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.bootstrap.servers' = 'x',
>>   'properties.group.id' = 'test-1',
>>   'scan.startup.mode' = 'latest-offset',
>>   'format' = 'json',
>>   'properties.flink.partition-discovery.interval-millis' = '30'
>> );
>> 
>> CREATE TABLE filesystem (
>>   `day` STRING,
>>   `hour` STRING,
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   d BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>   'connector' = 'filesystem',
>>   'format' = 'parquet',
>>   'path' = 'hdfs://xx',
>>   'parquet.compression'='SNAPPY',
>>   'sink.partition-commit.policy.kind' = 'success-file'
>> );
>> 
>> insert into filesystem
>> select
>>   from_unixtime(process_time,'-MM-dd') as `day`,
>>   from_unixtime(process_time,'HH') as `hour`,
>>   a,
>>   b,
>>   c,
>>   d,
>>   e,
>>   f,
>>   g,
>>   h,
>>   i
>> from kafka;
>> 
>> 
>> 
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger



Re:关于filesystem connector的一点疑问

2020-11-11 文章 kandy.wang
hi:
按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
delay 时机触发分区提交,得看你的sink.partition-commit.delay
设置的多久,如果超过之后,应当默认是会丢弃的吧。


https://cloud.tencent.com/developer/article/1707182

这个连接可以看一下 







在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>Hi,all
>Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>现在有这样的场景:
>消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>有大佬知道吗,有实际验证过吗
>感谢
>
>附上简单sql:
>CREATE TABLE kafka (
>a STRING,
>b STRING,
>c BIGINT,
>process_time BIGINT,
>e STRING,
>f STRING,
>g STRING,
>h INT,
>i STRING
>) WITH (
>'connector' = 'kafka',
>'topic' = 'topic',
>'properties.bootstrap.servers' = 'x',
>'properties.group.id' = 'test-1',
>'scan.startup.mode' = 'latest-offset',
>'format' = 'json',
>'properties.flink.partition-discovery.interval-millis' = '30'
>);
>
>CREATE TABLE filesystem (
>`day` STRING,
>`hour` STRING,
>a STRING,
>b STRING,
>c BIGINT,
>d BIGINT,
>e STRING,
>f STRING,
>g STRING,
>h INT,
>i STRING
>) PARTITIONED BY (`day`, `hour`) WITH (
>'connector' = 'filesystem',
>'format' = 'parquet',
>'path' = 'hdfs://xx',
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file'
>);
>
>insert into filesystem
>select
>from_unixtime(process_time,'-MM-dd') as `day`,
>from_unixtime(process_time,'HH') as `hour`,
>a,
>b,
>c,
>d,
>e,
>f,
>g,
>h,
>i
>from kafka;
>
>
>
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger


关于filesystem connector的一点疑问

2020-11-11 文章 admin
Hi,all
Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
现在有这样的场景:
消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
有大佬知道吗,有实际验证过吗
感谢

附上简单sql:
CREATE TABLE kafka (
a STRING,
b STRING,
c BIGINT,
process_time BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'x',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '30'
);

CREATE TABLE filesystem (
`day` STRING,
`hour` STRING,
a STRING,
b STRING,
c BIGINT,
d BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) PARTITIONED BY (`day`, `hour`) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'path' = 'hdfs://xx',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file'
);

insert into filesystem
select
from_unixtime(process_time,'-MM-dd') as `day`,
from_unixtime(process_time,'HH') as `hour`,
a,
b,
c,
d,
e,
f,
g,
h,
i
from kafka;



[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger