Re: Flink SQL Count Distinct performance optimization

2020-01-07 文章 Kurt Young
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin  于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>> SELECT
>>
>>  aggId,
>>
>>  pageId,
>>
>>  statkey,
>>
>>  COUNT(DISTINCT deviceId) as cnt
>>
>>  FROM
>>
>>  (
>>
>>  SELECT
>>
>>  'ZL_005' as aggId,
>>
>>  'ZL_UV_PER_MINUTE' as pageId,
>>
>>  deviceId,
>>
>>  ts2Date(recvTime) as statkey
>>
>>  from
>>
>>  kafka_zl_etrack_event_stream
>>
>>  )
>>
>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>


Re: Flink SQL Count Distinct performance optimization

2020-01-07 文章 贺小令
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin  于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
> SELECT
>
>  aggId,
>
>  pageId,
>
>  statkey,
>
>  COUNT(DISTINCT deviceId) as cnt
>
>  FROM
>
>  (
>
>  SELECT
>
>  'ZL_005' as aggId,
>
>  'ZL_UV_PER_MINUTE' as pageId,
>
>  deviceId,
>
>  ts2Date(recvTime) as statkey
>
>  from
>
>  kafka_zl_etrack_event_stream
>
>  )
>
>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best


Flink SQL Count Distinct performance optimization

2020-01-07 文章 sunfulin
Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated. 


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 文章 USERNAME
感谢 唐老师 解答!

在 2020-01-07 19:46:06,"Yun Tang"  写道:
>Hi
>
>使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
>至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
>而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。
>
>
>[1] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>[2] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
>[3] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119
>
>祝好
>唐云
>
>
>From: USERNAME 
>Sent: Tuesday, January 7, 2020 17:54
>To: user-zh@flink.apache.org 
>Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别
>
>各位好!
>祝大家新年快乐!
>
>
>
>
>--版本
>FLINK 1.9.1 ON YARN
>
>
>--过程
>1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>--问题
>new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>这种计算场景有更好的计算方法吗?
>
>
>--部分代码
>final StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>new ProcessWindowFunction{
>public void process(Tuple tuple, Context context, Iterable 
>elements, Collector out) throws Exception {
>for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>
>iter.remove();
>}
>}
>
>}
>
>
>
>
>
>
>


Re:Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 文章 USERNAME
TTL 好像不支持 TimeCharacteristic.EventTime 方式



在 2020-01-08 14:17:11,"USERNAME"  写道:
>我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
>
>
>
>
>
>在 2020-01-07 19:51:57,"huoguo"  写道:
>>
>>
>>过期数据能通过TTL 设置过期吗?
>>
>>> 在 2020年1月7日,17:54,USERNAME  写道:
>>> 
>>> 各位好!
>>> 祝大家新年快乐!
>>> 
>>> 
>>> 
>>> 
>>> --版本
>>> FLINK 1.9.1 ON YARN
>>> 
>>> 
>>> --过程
>>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>>> --问题
>>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>>> 这种计算场景有更好的计算方法吗?
>>> 
>>> 
>>> --部分代码
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>> 
>>> new ProcessWindowFunction{
>>> public void process(Tuple tuple, Context context, Iterable 
>>> elements, Collector out) throws Exception {
>>> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>>> 
>>> iter.remove();
>>> }
>>> }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>


Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-07 文章 tison
请问你所说的 1.9 的样式是怎么样的呢?我记得最近有跟 visualizer 相关的讨论,但是没有这个特殊的 issue,你可以直接在 JIRA
上提 issue

Best,
tison.


 于2020年1月8日周三 下午12:56写道:

> 有大佬能解答下吗
>
> -邮件原件-
> 发件人: slle...@aliyun.com.INVALID 
> 发送时间: 2020年1月6日 11:15
> 收件人: user-zh@flink.apache.org
> 主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
>
> 链接地址:https://flink.apache.org/visualizer/index.html
>
>


回复: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-07 文章 sllence
有大佬能解答下吗

-邮件原件-
发件人: slle...@aliyun.com.INVALID  
发送时间: 2020年1月6日 11:15
收件人: user-zh@flink.apache.org
主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

链接地址:https://flink.apache.org/visualizer/index.html



回复:jobgraph 生成

2020-01-07 文章 张江
Very sorry for the wrong operation. I copied the wrong email address by the 
phone.


Thank you for your reply.


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月08日 11:08,tison 写道:
Hi Zhang,


I just notice that it is sent to user list. Please send to user-zh list(in cc) 
next time if you want to discuss in Chinese.


Best,
tison.




tison  于2020年1月8日周三 上午11:06写道:

如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以


JsonPlanGenerator.generatePlan(jobGraph)


拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。


Best,
tison.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos




张江  于2020年1月8日周三 上午11:01写道:

大家好,


通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink 
visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?


flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?




谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

Re: jobgraph 生成

2020-01-07 文章 tison
A public way to get JSON plan of a JobGraph is, with an existing Flink
Cluster, use REST API JarPlan[1].

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jars-jarid-plan


tison  于2020年1月8日周三 上午11:08写道:

> Hi Zhang,
>
> I just notice that it is sent to user list. Please send to user-zh list(in
> cc) next time if you want to discuss in Chinese.
>
> Best,
> tison.
>
>
> tison  于2020年1月8日周三 上午11:06写道:
>
>> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>>
>> JsonPlanGenerator.generatePlan(jobGraph)
>>
>> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>>
>> Best,
>> tison.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>>
>>
>> 张江  于2020年1月8日周三 上午11:01写道:
>>
>>> 大家好,
>>>
>>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>>
>>> flink里似乎没有直接的API可以调用,但是我在flink web
>>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>>
>>>
>>> 谢谢
>>>
>>> 张江
>>> 邮箱:zjkingdom2...@163.com
>>>
>>> 
>>>
>>> 签名由 网易邮箱大师  定制
>>>
>>


Re: jobgraph 生成

2020-01-07 文章 tison
Hi Zhang,

I just notice that it is sent to user list. Please send to user-zh list(in
cc) next time if you want to discuss in Chinese.

Best,
tison.


tison  于2020年1月8日周三 上午11:06写道:

> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>
> JsonPlanGenerator.generatePlan(jobGraph)
>
> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>
> Best,
> tison.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>
>
> 张江  于2020年1月8日周三 上午11:01写道:
>
>> 大家好,
>>
>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>
>> flink里似乎没有直接的API可以调用,但是我在flink web
>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>
>>
>> 谢谢
>>
>> 张江
>> 邮箱:zjkingdom2...@163.com
>>
>> 
>>
>> 签名由 网易邮箱大师  定制
>>
>


Re: 注册table时catalog无法变更

2020-01-07 文章 JingsongLee
Hi xiyueha,

你可以用TableEnv.sqlUpdate("create table ...")的DDL的方式,这会注册到当前catalog中。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2020年1月8日(星期三) 09:17
To:user-zh 
Cc:xiyueha 
Subject:Re: 注册table时catalog无法变更

临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: 注册table时catalog无法变更

2020-01-07 文章 Kurt Young
临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: flink sql confluent schema avro topic注册成表

2020-01-07 文章 Bowen Li
Hi 陈帅,

这是一个非常合理的需求。我们需要开发一个 Flink ConfluentSchemaRegistryCatalog
完成元数据的获取。社区希望的用户体验是用户只需要给出confluent schema registry的链接,Flink SQL可以通过
ConfluentSchemaRegistryCatalog自动获取读写所需的信息,不再需要用户手动写DDL和format。

社区内部已经开始讨论了,我们应该会在1.11中完成,请关注
https://issues.apache.org/jira/browse/FLINK-12256


On Wed, Dec 18, 2019 at 6:46 AM 陈帅  wrote:

> 谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢?
>
> 朱广彬  于2019年12月18日周三 上午10:30写道:
>
> > Hi 陈帅,
> >
> > 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro
> > schema的管理,所以,我们改动了flink-avro 的源码来支持。
> >
> > 主要涉及到这些地方:
> >
> >
> org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema}
> > 和org.apache.flink.table.descriptors.{Avro,AvroValidator}
> >
> > 使用时在构建Avro时指定以下三个参数即可(见标红部分):
> >
> > tableEnv.connect(
> > new Kafka()
> > .version("universal")
> > .topic(topic)
> > .properties(props)
> > ).withFormat(
> > new Avro()
> >   .useRegistry(true)
> >   .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS)
> >   .registrySubject(subject)
> >   .avroSchema(avroSchemaStr)
> > )
> >
> >
> > 陈帅  于2019年12月18日周三 上午8:26写道:
> > >
> > > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式
> 的topic注册成一张table?
> >
>


Re: 注册table时catalog无法变更

2020-01-07 文章 贺小令
hi,

streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
注册的表都是Temporary Table。

你可以通过:
catalog = new InMemoryExternalCatalog(catalogName);
streamTableEnvironment.registerCatalog(catalogName, catalog);
catalog.createTable()

或者
streamTableEnvironment.getCatalog().get().createTable()

的方式来注册表到指定的catalog


xiyu...@163.com  于2020年1月7日周二 下午3:20写道:

> hi,各位:
>
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> streamTableEnvironment.registerDataStream(tableName, dataStream,
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
>   streamTableEnvironment.registerCatalog(catalogName, new
> InMemoryExternalCatalog(catalogName));
> streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
>
>
> xiyu...@163.com
>


Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 文章 huoguo



过期数据能通过TTL 设置过期吗?

> 在 2020年1月7日,17:54,USERNAME  写道:
> 
> 各位好!
> 祝大家新年快乐!
> 
> 
> 
> 
> --版本
> FLINK 1.9.1 ON YARN
> 
> 
> --过程
> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
> --问题
> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
> 这种计算场景有更好的计算方法吗?
> 
> 
> --部分代码
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> 
> new ProcessWindowFunction{
> public void process(Tuple tuple, Context context, Iterable 
> elements, Collector out) throws Exception {
> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
> 
> iter.remove();
> }
> }
> 
> }
> 
> 
> 
> 
> 
> 
> 




Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 文章 Yun Tang
Hi

使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。


[1] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
[2] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
[3] 
https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119

祝好
唐云


From: USERNAME 
Sent: Tuesday, January 7, 2020 17:54
To: user-zh@flink.apache.org 
Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别

各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}









FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 文章 USERNAME
各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}