[ANNOUNCE] Apache Flink 1.17.2 released

2023-11-28 Thread Yun Tang
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.17.2, which is the second bugfix release for the Apache Flink 1.17 
series.

Apache Flink® Is a framework and distributed processing engine for stateful 
computations over unbounded and bounded data streams. Flink has been designed 
to run in all common cluster environments, perform computations at in-memory 
speed and at any scale.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/2023/11/29/apache-flink-1.17.2-release-announcement/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353260

We would like to thank all contributors of the Apache Flink community who made 
this release possible!


Feel free to reach out to the release managers (or respond to this thread) with 
feedback on the release process. Our goal is to constantly improve the release 
process. Feedback on what could be improved or things that didn't go so well 
are appreciated.


Regards,
Release Manager


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Yun Tang
Congratulations!
Unlike other data-lakes, Paimon might be the first one to act as a stream-first 
(not batch-first) data-lake.

Best
Yun Tang

From: Xianxun Ye 
Sent: Tuesday, March 28, 2023 10:52
To: d...@flink.apache.org 
Cc: Yu Li ; user ; user-zh 
; d...@flink.apache.org 
Subject: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache 
Paimon(incubating)

Congratulations!

Best regards,

Xianxun

On 03/27/2023 22:51,Samrat 
Deb<mailto:decordea...@gmail.com> wrote:
congratulations

Bests,
Samrat

On Mon, Mar 27, 2023 at 7:19 PM Yanfei Lei  wrote:

Congratulations!

Best Regards,
Yanfei

ramkrishna vasudevan  于2023年3月27日周一 21:46写道:

Congratulations !!!

On Mon, Mar 27, 2023 at 2:54 PM Yu Li  wrote:

Dear Flinkers,


As you may have noticed, we are pleased to announce that Flink Table
Store has joined the Apache Incubator as a separate project called Apache
Paimon(incubating) [1] [2] [3]. The new project still aims at building a
streaming data lake platform for high-speed data ingestion, change data
tracking and efficient real-time analytics, with the vision of supporting a
larger ecosystem and establishing a vibrant and neutral open source
community.


We would like to thank everyone for their great support and efforts for
the Flink Table Store project, and warmly welcome everyone to join the
development and activities of the new project. Apache Flink will continue
to be one of the first-class citizens supported by Paimon, and we believe
that the Flink and Paimon communities will maintain close cooperation.


亲爱的Flinkers,


正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2]
[3]。新项目的名字是 Apache
Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。


在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache
Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。


Best Regards,

Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)


致礼,

李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)


[1] https://paimon.apache.org/

[2] https://github.com/apache/incubator-paimon

[3]
https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal



Re: [ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

2023-01-30 Thread Yun Tang
Thanks Yuanfei for driving the frocksdb release!

Best
Yun Tang

From: Yuan Mei 
Sent: Tuesday, January 31, 2023 15:09
To: Jing Ge 
Cc: Yanfei Lei ; d...@flink.apache.org 
; user ; user...@flink.apache.org 

Subject: Re: [ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

Thanks Yanfei for driving the release!

Best
Yuan

On Mon, Jan 30, 2023 at 8:46 PM Jing Ge via user 
mailto:user@flink.apache.org>> wrote:
Hi Yanfei,

Thanks for your effort. Looking forward to checking it.

Best regards,
Jing

On Mon, Jan 30, 2023 at 1:42 PM Yanfei Lei 
mailto:fredia...@gmail.com>> wrote:
It is very happy to announce the release of FRocksDB 6.20.3-ververica-2.0.

Compiled files for Linux x86, Linux arm, Linux ppc64le, MacOS x86,
MacOS arm, and Windows are included in FRocksDB 6.20.3-ververica-2.0
jar, and the FRocksDB in Flink 1.17 would be updated to
6.20.3-ververica-2.0.

Release highlights:
- [FLINK-30457] Add periodic_compaction_seconds option to RocksJava[1].
- [FLINK-30321] Upgrade ZLIB of FRocksDB to 1.2.13[2].
- Avoid expensive ToString() call when not in debug[3].
- [FLINK-24932] Support build FRocksDB Java on Apple silicon[4].

Maven artifacts for FRocksDB can be found at:
https://mvnrepository.com/artifact/com.ververica/frocksdbjni

We would like to thank all efforts from the Apache Flink community
that made this release possible!

[1] https://issues.apache.org/jira/browse/FLINK-30457
[2] https://issues.apache.org/jira/browse/FLINK-30321
[3] https://github.com/ververica/frocksdb/pull/55
[4] https://issues.apache.org/jira/browse/FLINK-24932

Best regards,
Yanfei
Ververica(Alibaba)


Re: Flink CEP Incremental Checkpoint Issue

2022-10-22 Thread Yun Tang
Hi Puneet,

The incremental checkpoint size of RocksDB state-backend is not exactly the 
delta state change, it is the size of newly uploaded SST files (which are not 
uploaded before). The newly uploaded SST files are generated by compaction or 
data flush.
In other words, I don't think we should care about the checkpoint size too 
much. Instead, we should care more about the output results.

Best
Yun Tang

From: Martijn Visser 
Sent: Wednesday, October 19, 2022 22:03
To: Puneet Duggal 
Cc: user 
Subject: Re: Flink CEP Incremental Checkpoint Issue

Hi,

Given that Flink 1.12 is no longer supported by the community, can you validate 
this with the latest Flink version? (Currently 1.15).
Next to that, the contents of your checkpoints is not only the results of your 
CEP, but given that you're using Exactly Once also there's internal information 
needed for providing those exactly once guarantees.

Best regards,

Martijn

On Mon, Oct 17, 2022 at 10:09 PM Puneet Duggal 
mailto:puneetduggal1...@gmail.com>> wrote:
Apologies for the mistake of calculation

120*6*2KB = 1440KB = 1.4MB

> On 18-Oct-2022, at 1:35 AM, Puneet Duggal 
> mailto:puneetduggal1...@gmail.com>> wrote:
>
> Hi,
>
> I am working on a use case which uses Flink CEP for pattern detection.
>
> Flink Version - 1.12.1
> Deployment Mode - Session Mode (Highly Available)
> State Backend - RocksDB
> Checkpoint Interval - 2 mins
> Checkpoint Mode - Exactly Once
>
> CEP pattern looks something like - A not_followed_by B within (40mins)
> After Match Skip Strategy - Skip Past Last Event
>
> In order to test out incremental checkpointing and its size, I deployed a job 
> on a cluster (let's say cluster A, hence job name J(aa)) and that same job on 
> cluster B 1 week later (Job Name J(ab)). Basically at any given point in 
> time, both jobs( J(aa) and J(ab)) process exactly the same records. After 1 
> week of deployment of J(ab), I found out that in spite of working on the same 
> records and window time of 40mins (after which unmatched patterns should 
> expire), the incremental checkpoint size of J(aa) is around 40-50MB whereas 
> that of J(ab) is 25-30MB. My assumption of the incremental checkpoint is that 
> it only contains delta state change after the last checkpoint which is same 
> for both jobs. Attached screenshots for J(ab) and J(aa) respectively.
>
> J(ab)
>
> 
>
> J(aa)
>
> 
>
> Checkpoint Configuration
>
> 
>
>
> One more doubt on the same lines is that these jobs consume on an average 6 
> events per second with one event of the size around 2KB. Assuming a 
> checkpoint interval of 2 mins and each event getting stored in CEP state, 
> total delta size of the state should be 2*60*6*1.32 = 316KB which is nowhere 
> near to size shown in the incremental checkpoint for both the jobs. Even 
> including meta info for these records, not sure what am i missing which is 
> causing incremental checkpoints to be so huge.
>
>
> Regards,
> Puneet
>



Re: ExecutionMode in ExecutionConfig

2022-09-16 Thread Yun Tang
Hi Hailu,

If you take a look at the history of ExecutionMode [1], apart from the 
refactoring commit, this class is introduced before the year 2016, in which 
DataSet API has not been deprecated.

>From my point of view, you should set runtime mode [2] instead of execution 
>mode currently if using Flink as a computation engine.


[1] 
https://github.com/apache/flink/commits/master/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
[2] 
https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java#L98

Best
Yun Tang


From: zhanghao.c...@outlook.com 
Sent: Thursday, September 15, 2022 0:03
To: Hailu, Andreas ; user@flink.apache.org 

Subject: Re: ExecutionMode in ExecutionConfig

It's added in Flink 1.14: 
https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.14/#expose-a-consistent-globaldataexchangemode.
 Not sure if there's a way to change this in 1.13

Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 23:38
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


I can give this a try. Do you know which Flink version does this feature become 
available in?



ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 11:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.



The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com> 
mailto:zhanghao.c...@outlook.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: RE: ExecutionMode in ExecutionConfig



Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com> 
mailto:zhanghao.c...@outlook.com>>
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/<https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_datastream_execution-5Fmode_&d=DwMF-g&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=q-f1lFtNrjN2BnGqtchdhZkFNvCDUE8ZuhD4M0wJsdHcpLqEqTybqUaMAlo6lz91&s=bM_ucnQfxGo5Ky9Fq6S1yXbTqz476hGaKtkZINW4kGU&e=>
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink<https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_datastream_execution-5Fmode_&d=DwMF-g&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=q-f1lFtNrjN2BnGqtchdhZkFNvCDUE8ZuhD4M0wJsdHcpLqEqTybqUaMAlo6lz91&s=bM_ucnQfxGo5Ky9Fq6S1yXbTqz476hGaKtkZINW4kGU&e=>

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Exec

Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-09-12 Thread Yun Tang
An interesting topic, I noticed that the datahub community has launched the 
feature request discussion of Flink Integration [1].

@Martijn Visser<mailto:martijnvis...@apache.org> Did the Flink community had 
created tickets to track this topic?
>From my current understanding, Flink lacks rich information on 
>FlinkJobListener just as Feng mentioned, which has been supported well by 
>Spark, to send data lineage to external systems.



[1] https://feature-requests.datahubproject.io/p/flink-integration


Best
Yun Tang

From: wangqinghuan <1095193...@qq.com>
Sent: Monday, January 17, 2022 18:27
To: user@flink.apache.org 
Subject: Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration


we are using Datahub to address table-level lineage and column-level lineage 
for Flink SQL.

在 2022/1/13 23:27, Martijn Visser 写道:
Hi everyone,

I'm currently checking out different metadata platforms, such as Amundsen [1] 
and Datahub [2]. In short, these types of tools try to address problems related 
to topics such as data discovery, data lineage and an overall data catalogue.

I'm reaching out to the Dev and User mailing lists to get some feedback. It 
would really help if you could spend a couple of minutes to let me know if you 
already use either one of the two mentioned metadata platforms or another one, 
or are you evaluating such tools? If so, is that for the purpose as a 
catalogue, for lineage or anything else? Any type of feedback on these types of 
tools is appreciated.

Best regards,

Martijn

[1] https://github.com/amundsen-io/amundsen/
[2] https://github.com/linkedin/datahub



Re: NegativeArraySizeException trying to take a savepoint

2022-06-14 Thread Yun Tang
Hi Mike,

I think the root cause is that the size of java bytes array still exceed VM 
limit.
The exception message is not friendly and not covered by sanity check [1] as it 
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the 
byte array directly without check.

If you want to walk around the problem, please consider to reduce the size of 
listState#add to avoid too large value.



[1] https://github.com/facebook/rocksdb/pull/3850
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/rocksjni/iterator.cc#L239-L245

Best
Yun Tang


From: Martijn Visser 
Sent: Monday, June 13, 2022 21:47
To: Mike Barborak 
Cc: user@flink.apache.org 
Subject: Re: NegativeArraySizeException trying to take a savepoint

Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since Flink 
bumped to a newer version of RocksDB in that version. Is that a possibility for 
you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak 
mailto:mi...@ec.ai>>:

When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?



Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.



We are using Flink version 1.13.5.



Thanks,

Mike



[1] https://issues.apache.org/jira/browse/FLINK-9268

[2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html



Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)

... 4 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504

at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)

at java.base/java.util.concurrent.FutureTask.get(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)

at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)

... 3 more

Caused by: java.lang.NegativeArraySizeException: -785722504

at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)

at org.rocksdb.RocksIterator.value0(RocksIterator.java)

at org.rocksdb.RocksIterator.value(RocksIterator.java:50)

at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)

at 
org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)

at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)

at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)

at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)

at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)

at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)


Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-05-18 Thread Yun Tang
Hi Alexis,

Sorry for the late response. I come from the reply in FLINK-27504[1].
The MAINFEST file in RocksDB records history of version changes.

In other words, once a new SST file created or an old file deleted via 
compaction, it will create a new version in RocksDB, which will update the 
MAINFEST file.
The default value for max MAINFEST file size is 1GB [2], since you create the 
checkpoint every 30 seconds, files might be flushed on that time, and that's 
why the MAINFEST file grows.

You can limit the max file size via DBOptions#setMaxManifestFileSize [3].


[1] 
https://issues.apache.org/jira/browse/FLINK-27504?focusedCommentId=17537788&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17537788
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/include/rocksdb/options.h#L636
[3] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/src/main/java/org/rocksdb/DBOptions.java#L520

Best
Yun Tang

From: Alexis Sarda-Espinosa 
Sent: Tuesday, May 3, 2022 8:47
To: Peter Brucia 
Cc: user@flink.apache.org 
Subject: RE: RocksDB's state size discrepancy with what's seen with state 
processor API


Ok



Regards,

Alexis.



From: Peter Brucia 
Sent: Freitag, 22. April 2022 15:31
To: Alexis Sarda-Espinosa 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API



No

Sent from my iPhone




Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-11 Thread Yun Tang
Hi all,

I think forum might be a good choice for search and maintain. However, unlike 
slack workspace, it seems no existing popular product could be leveraged easily.

Thus, I am +1 to create an Apache Flink slack channel. If the ASF slack cannot 
be joined easily for most of users, I prefer to set up our own slack workspace.

Best
Yun Tang

From: Jingsong Li 
Sent: Thursday, May 12, 2022 10:49
To: Xintong Song 
Cc: dev ; user 
Subject: Re: [Discuss] Creating an Apache Flink slack workspace

Hi all,

Regarding using ASF slack. I share the problems I saw in the Apache Druid
community. [1]

> As you may have heard, it’s become increasingly difficult for new users
without an @apache.org email address to join the ASF #druid Slack channel.
ASF Infra disabled the option to publicly provide a link to the workspace
to anyone who wanted it, after encountering issues with spammers.

> Per Infra’s guidance (https://infra.apache.org/slack.html), new community
members should only be invited as single-channel guests. Unfortunately,
single-channel guests are unable to extend invitations to new members,
including their colleagues who are using Druid. Only someone with full
member privileges is able to extend an invitation to new members. This lack
of consistency doesn’t make the community feel inclusive.

> There is a workaround in place (
https://github.com/apache/druid-website-src/pull/278) – users can send an
email to druid-u...@googlegroups.com to request an invite to the Slack
channel from an existing member – but this still poses a barrier to entry,
and isn’t a viable permanent solution. It also creates potential privacy
issues as not everyone is at liberty to announce they’re using Druid nor
wishes to display their email address in a public forum.

[1] https://lists.apache.org/thread/f36tvfwfo2ssf1x3jb4q0v2pftdyo5z5

Best,
Jingsong

On Thu, May 12, 2022 at 10:22 AM Xintong Song  wrote:

> To make some progress, maybe we decide on chat vs forum vs none and then
>> go into a deeper discussion on the implementation or is there anything
>> about Slack that would be complete blocker for the implementation?
>>
>
> Sure, then I'd be +1 for chat. From my side, the initiative is more about
> making communication more efficient, rather than making information easier
> to find.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 11, 2022 at 5:39 PM Konstantin Knauf 
> wrote:
>
>> I don't think we can maintain two additional channels. Some people have
>> already concerns about covering one additional channel.
>>
>> I think, a forum provides a better user experience than a mailing list.
>> Information is structured better, you can edit messages, sign up and search
>> is easier.
>>
>> To make some progress, maybe we decide on chat vs forum vs none and then
>> go into a deeper discussion on the implementation or is there anything
>> about Slack that would be complete blocker for the implementation?
>>
>>
>>
>> Am Mi., 11. Mai 2022 um 07:35 Uhr schrieb Xintong Song <
>> tonysong...@gmail.com>:
>>
>>> I agree with Robert on reworking the "Community" and "Getting Help"
>>> pages to emphasize how we position the mailing lists and Slack, and on
>>> revisiting in 6-12 months.
>>>
>>> Concerning dedicated Apache Flink Slack vs. ASF Slack, I'm with
>>> Konstantin. I'd expect it to be easier for having more channels and keeping
>>> them organized, managing permissions for different roles, adding bots, etc.
>>>
>>> IMO, having Slack is about improving the communication efficiency when
>>> you are already in a discussion, and we expect such improvement would
>>> motivate users to interact more with each other. From that perspective,
>>> forums are not much better than mailing lists.
>>>
>>> I'm also open to forums as well, but not as an alternative to Slack. I
>>> definitely see how forums help in keeping information organized and easy to
>>> find. However, I'm a bit concerned about the maintenance overhead. I'm not
>>> very familiar with Discourse or Reddit. My impression is that they are not
>>> as easy to set up and maintain as Slack.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1] https://asktug.com/
>>>
>>> On Tue, May 10, 2022 at 4:50 PM Konstantin Knauf 
>>> wrote:
>>>
>>>> Thanks for starting this discussion again. I am pretty much with Timo
>>>> here. Slack or Discourse as an alternative for the user community, and
>>>> mailing list for the contributing, design discussi

Re: Failed to restore from ck, because of KryoException

2022-05-10 Thread Yun Tang
Hi Liting,

Did you ever change your defined class, e.g. xxx.SparkIdentifiers, 
xxx.ServiceEvent,  xxx.Event and so on?

Or did you change your flink program? Kryo serializer needs to register class 
for serialization, which is not friendly for version upgrade.

Moreover, except from the compatibility problem, we also suggest you to use 
customized serializers for your customized class for better performance.

Best
Yun Tang

From: Liting Liu (litiliu) 
Sent: Friday, May 6, 2022 10:20
To: user@flink.apache.org 
Subject: Failed to restore from ck, because of KryoException


Hi, We are using flink 1.14.3. But when the job try to restart from checkPoint, 
the following exception accour. What's wrong?

And how can i avoid it?


Caused by: TimerException{com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 99, Size: 9

Serialization trace:

webexSiteName (com.cisco.wx2.diagnostic_events.SparkIdentifiers)

identifiers (com.cisco.wx2.diagnostic_events.ServiceEvent)

event (com.cisco.wx2.diagnostic_events.Event)}

... 14 more

Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 99, Size: 9

Serialization trace:

webexSiteName (com.cisco.wx2.diagnostic_events.SparkIdentifiers)

identifiers (com.cisco.wx2.diagnostic_events.ServiceEvent)

event (com.cisco.wx2.diagnostic_events.Event)

at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:394)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$100(RocksDBMapState.java:65)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getValue(RocksDBMapState.java:502)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$2.next(RocksDBMapState.java:217)

at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)

at scala.collection.IterableLike$class.head(IterableLike.scala:107)

at scala.collection.AbstractIterable.head(Iterable.scala:54)

at 
com.cisco.wx2.flink.functions.UnifiedClientJoinAnalysisWindowFunction$$anonfun$buildAnalysisPipeline$1.apply(UnifiedClientJoinAnalysisWindowFunction.scala:170)

at 
com.cisco.wx2.flink.functions.UnifiedClientJoinAnalysisWindowFunction$$anonfun$buildAnalysisPipeline$1.apply(UnifiedClientJoinAnalysisWindowFunction.scala:170)

at scala.Option.foreach(Option.scala:257)



Re: RocksDB efficiency and keyby

2022-04-21 Thread Yun Tang
Hi Trystan,

You can use async-profiler[1] to detect the CPU stack within RocksDB to see 
what happened, maybe you can try to enable partitioned index & filters[2] if 
the call stack is occupied by loading index or filter block.

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-memory-partitioned-index-filters

Best
Yun Tang



From: Yaroslav Tkachenko 
Sent: Thursday, April 21, 2022 0:44
To: Trystan 
Cc: user 
Subject: Re: RocksDB efficiency and keyby

Yep, I'd give it another try. EBS could be too slow in some use-cases.

On Wed, Apr 20, 2022 at 9:39 AM Trystan 
mailto:entro...@gmail.com>> wrote:
Thanks for the info! We're running EBS gp2 volumes... awhile back we tested 
local SSDs with a different job and didn't notice any gains, but that was 
likely due to an under-optimized job where the bottleneck was elsewhere

On Wed, Apr 20, 2022, 11:08 AM Yaroslav Tkachenko 
mailto:yaros...@goldsky.io>> wrote:
Hey Trystan,

Based on my personal experience, good disk IO for RocksDB matters a lot. Are 
you using the fastest SSD storage you can get for RocskDB folders?

For example, when running on GCP, we noticed 10x throughput improvement by 
switching RocksDB storage to 
https://cloud.google.com/compute/docs/disks/local-ssd

On Wed, Apr 20, 2022 at 8:50 AM Trystan 
mailto:entro...@gmail.com>> wrote:
Hello,

We have a job where its main purpose is to track whether or not we've 
previously seen a particular event - that's it. If it's new, we save it to an 
external database. If we've seen it, we block the write. There's a 3-day TTL to 
manage the state size. The downstream db can tolerate new data slipping through 
and reject the write - we mainly use the state to reduce writes.

We're starting to see some performance issues, even after adding 50% capacity 
to the job. After some number of days/weeks, it eventually goes into a constant 
backpressure situation. I'm wondering if there's something we can do to improve 
efficiency.

1. According to the flamegraph, 60-70% of the time is spent in RocksDB.get
2. The state is just a ValueState. I assume this is the smallest/most 
efficient state. The keyby is extremely high cardinality - are we better off 
with a lower cardinality and a MapState .contains() check?
3. Current configs: taskmanager.memory.process.size: 4g, 
taskmanager.memory.managed.fraction: 0.8 (increased from 0.6, didn't see much 
change)
4. Estimated num keys tops out somewhere around 9-10B. Estimated live data size 
somewhere around 250 GB. Attempting to switch to heap state immediately ran 
into OOM (parallelism: 120, 8gb memory each).

And perhaps the answer is just "scale out" :) but if there are any signals to 
know when we've reached the limit of current scale, it'd be great to know what 
signals to look for!

Thanks!
Trystan


Re: RocksDB state not cleaned up

2022-04-08 Thread Yun Tang
Hi Alexis,

RocksDB itself supports manual compaction API [1], and current Flink does not 
support to call these APIs to support periodic compactions.

If Flink supports such period compaction, from my understanding, this is 
somehow like major compaction in HBase. I am not sure whether this is really 
useful for Flink as this could push data to the last level, which leads to 
increase the read amplification.

[1] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/RocksDB.html

Best
Yun Tang

From: Alexis Sarda-Espinosa 
Sent: Friday, April 8, 2022 18:54
To: tao xiao ; David Morávek 
Cc: Yun Tang ; user 
Subject: RE: RocksDB state not cleaned up


May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And 
if yes, if it helped with this case.



Regards,

Alexis.



From: tao xiao 
Sent: Samstag, 18. September 2021 05:01
To: David Morávek 
Cc: Yun Tang ; user 
Subject: Re: RocksDB state not cleaned up



Thanks for the feedback! However TTL already proves that the state cannot be 
cleaned up on time due to too many levels built up in RocksDB.



Hi @Yun Tang<mailto:myas...@live.com> do you have any suggestions to tune 
RocksDB to accelerate the compaction progress?



On Fri, Sep 17, 2021 at 8:01 PM David Morávek 
mailto:d...@apache.org>> wrote:

Cleaning up with timers should solve this. Both approaches have some advantages 
and disadvantages though.



Timers:

- No "side effects".

- Can be set in event time. Deletes are regular tombstones that will get 
compacted later on.



TTL:

- Performance. This costs literally nothing compared to an extra state for 
timer + writing a tombstone marker.

- Has "side-effects", because it works in processing time. This is just 
something to keep in mind eg. when bootstraping the state from historical data. 
(large event time / processing time skew)



With 1.14 release, we've bumped the RocksDB version so it may be possible to 
use a "periodic compaction" [1], but nobody has tried that so far. In the 
meantime I think there is non real workaround because we don't expose a way to 
trigger manual compaction.



I'm off to vacation until 27th and I won't be responsive during that time. I'd 
like to pull Yun into the conversation as he's super familiar with the RocksDB 
state backend.



[1] 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction



Best,

D.



On Fri, Sep 17, 2021 at 5:17 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi David,



Confirmed with RocksDB log Stephan's observation is the root cause that 
compaction doesn't clean up the high level sst files fast enough.  Do you think 
manual clean up by registering a timer is the way to go or any RocksDB 
parameter can be tuned to mitigate this issue?



On Wed, Sep 15, 2021 at 12:10 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi David,



If I read Stephan's comment correctly TTL doesn't work well for cases where we 
have too many levels, like fast growing state,  as compaction doesn't clean up 
high level SST files in time, Is this correct? If yes should we register a 
timer with TTL time and manual clean up the state (state.clear() ) when the 
timer fires?



I will turn on RocksDB logging as well as compaction logging [1] to verify this



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction





On Tue, Sep 14, 2021 at 5:38 PM David Morávek 
mailto:d...@apache.org>> wrote:

Hi Tao,



my intuition is that the compaction of SST files is not triggering. By default, 
it's only triggered by the size ratios of different levels [1] and the TTL 
mechanism has no effect on it.



Some reasoning from Stephan:



It's very likely to have large files in higher levels that haven't been 
compacted in a long time and thus just stay around.



This might be especially possible if you insert a lot in the beginning (build 
up many levels) and then have a moderate rate of modifications, so the changes 
and expiration keep happening purely in the merges / compactions of the first 
levels. Then the later levels may stay unchanged for quite some time.



You should be able to see compaction details by setting RocksDB logging to INFO 
[2]. Can you please check these and validate whether this really is the case?



[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction

[2] 
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting



Best,

D.



On Mon, Sep 13, 2021 at 3:18 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi team



We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL 
update type is OnCreateAndWrite. We set the value state when the value state 
doesn't exist and we never updat

Re: flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread Yun Tang
Hi Sigalit,

First of all, did you ensure different source operator consumes different 
consumer id for the kafka source? Did each flink job share the same data or 
consumed the data independently?

Moreover, was your job behaves back pressured? It might need to break the 
chained operator to see whether the sink back-pressured the source to impact 
the throughput of source.

Last but not least, did your source already have 100% CPU usage, which means 
your source operator has already reached to its highest throughput.

Best
Yun Tang

From: Sigalit Eliazov 
Sent: Thursday, April 7, 2022 19:12
To: user 
Subject: flink pipeline handles very small amount of messages in a second (only 
500)

hi all

I would appreciate some help to understand the pipeline behaviour...

We deployed a standalone flink cluster. The pipelines are deployed via the jm 
rest api.
We have 5 task managers with 1 slot each.

In total i am deploying 5 pipelines which mainly read from kafka, a simple 
object conversion and either write back to kafka or GCP pub/sub or save in the 
DB.

These jobs run "forever" and basically each task manager runs a specific job 
(this is how flink handled it).

 We have a test that sends to kafka 10k messages per second. but according to 
the metrics exposed by flink i see that the relevant job handles only 500 
messages per second.
I would expect all the 10K to be handled. I guess the setup is not correct.

The messages are in avro format
Currently we are not using checkpoints at all.
Any suggestions are welcome.

Thanks alot
Sigalit





Re: RocksDB metrics for effective memory consumption

2022-03-16 Thread Yun Tang
Hi Donatien,

The managed memory in Flink actually locates off-heap and stays as native 
memory, e.g memory consumed by RocksDB, python. In other words, JVM cannot know 
how much the memory that third-party software used.
Thus, Flink will just set the upper limit for the managed memory and let 
third-party software to consume. That's why you can see the managed memory is 
always allocated full at the beginning.

And if you want to know the memory used by RocksDB, you should use jeprof + 
jemalloc to detect the malloced memory from RocksDB side, or refer to RocksDB 
reported block cache usage [1]. Please note that all RocksDB instances within 
same slot would share the same block cache, they will report same usage.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-metrics-block-cache-usage

Best
Yun Tang

From: Donatien Schmitz 
Sent: Tuesday, March 15, 2022 19:45
To: user@flink.apache.org 
Subject: RocksDB metrics for effective memory consumption

Hi,

I am working on the analysis of the memory consumption of RocksDB state backend 
for simple DAGs. I would like to check fine-grained memory utilization of 
RocksDB with the native metrics (reported on Prometheus+Grafana). RocksDB uses 
Managed memory allocated to each TaskManager but this value peaks at the 
beginning of the job. Is the managed memory always allocated at full even if it 
would not be necessary?

For my experiments I am using a simple DAG consisting of Source (FS) -> Map -> 
DiscardSink. The Map does not process anything but stores the latest value of 
the KeyedStream keys (with predicted amount of keys in the dataset and constant 
value size (1024 bytes)).

I anyone has some more insights on the memory utilization of RocksDB at Flink's 
level, I would appreciate.

Best,

Donatien Schmitz
PhD Student


Re: Incremental checkpointing & RocksDB Serialization

2022-03-06 Thread Yun Tang
Hi Vidya,


  1.  You could tune your job to avoid backpressure. Maybe you can upgrade your 
flink engine to at least flink-1.13 to know how to monitor the back pressure 
status [1]
  2.  You can refer to [2] to know how to custom your serializer.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

Best,
Yun Tang

From: Vidya Sagar Mula 
Sent: Sunday, March 6, 2022 4:16
To: Yun Tang 
Cc: user 
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more details. 
Can you please clarify my inline questions?

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

 - Yes, I expanded the checkpoint details and noticed e2e duration is 
much higher than async duration. Attaching the screenshot here(Checkpoint #59) 
Can you give elaborate more on "checkpoint barrier stay in the channel longer." 
What are the suggested ways to mitigate this issue? I am wondering how can this 
be avoided as it is happening only at the end of the window.


> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

 - What are your recommendations on other serializers? I tried to change 
it to Avro by enabling the flag "forceAvro" to TRUE in the Execution Config. 
But, it RocksDB is still going picking KryoSerializer. This is because the 
Transformation is KeyType is assigned as GenericType. I am not sure what 
changes need to made to my class/pojo to take the Avro Serialzer.
Can you please suggest the way to change to other better serializers?



On Fri, Mar 4, 2022 at 2:06 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang

From: Vidya Sagar Mula mailto:mulasa...@gmail.com>>
Sent: Friday, March 4, 2022 17:00
To: user mailto:user@flink.apache.org>>
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I 
am trying the incremental checkpointing on this set up. I have a pipeline with 
a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-
I am observing the long duration while taking the snapshot at the end of each 
window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
---
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% 
at the time of issue. With further analysis with thread dumps at the time CPU 
peak, it is showing RocksDB serialization related call trace. All the thread 
samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo 
Serializer. I did try to change the serializer type, but that is not the focal 
point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to 
increase the CPU cycles to 2 and 4. But, it did not give me any better results. 
I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking 
a lot of CPU at the time of serialize/deserialize in the RocksDB?



Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
---
- Why is

Re: Incremental checkpointing & RocksDB Serialization

2022-03-04 Thread Yun Tang
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang

From: Vidya Sagar Mula 
Sent: Friday, March 4, 2022 17:00
To: user 
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I 
am trying the incremental checkpointing on this set up. I have a pipeline with 
a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-
I am observing the long duration while taking the snapshot at the end of each 
window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
---
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% 
at the time of issue. With further analysis with thread dumps at the time CPU 
peak, it is showing RocksDB serialization related call trace. All the thread 
samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo 
Serializer. I did try to change the serializer type, but that is not the focal 
point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to 
increase the CPU cycles to 2 and 4. But, it did not give me any better results. 
I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking 
a lot of CPU at the time of serialize/deserialize in the RocksDB?



Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
---
- Why is the incremental checkpointing taking more time for the snapshot at the 
end of the window duration?
- Why is RocksDB serialization causing the CPU peak?
- Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
Avro)

Thank you,











Re: Pods are OOMKilled with RocksDB backend after a few checkpoints

2022-02-28 Thread Yun Tang
Hi Alex,

Since current default checkpoint policy in RocksDB state backend is still full 
snapshot, which is actually creating savepoint format.
Current savepoint would scan the whole RocksDB with iterators to write data 
out, and some intermediate data would be kept in memory.

I think you could use incremental checkpoint for RocksDB state backend, which 
is also what we want to make as default checkpoint policy in the future within 
Flink.
For the overhead memory, you can configure 
taskmanager.memory.jvm-overhead.min<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-min>
 and 
taskmanager.memory.jvm-overhead.max<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-jvm-overhead-max>
 [1] to limit the overhead memory. The task off-heap memory would not take 
effect for RocksDB.

If you want to watch the memory usage, since the managed memory is enabled by 
default, all instances within one slot would use memory from same block cache 
[2], you can try state.backend.rocksdb.metrics.block-cache-usage [3].
Please keep in mind that all RocksDB instances within one slot would report 
same block cache usage.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#detailed-memory-model
[2] 
https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#cost-memory-used-in-memtable-to-block-cache
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-metrics-block-cache-usage


Best
Yun Tang



From: Alexandre Montecucco 
Sent: Friday, February 25, 2022 20:14
To: user 
Subject: Pods are OOMKilled with RocksDB backend after a few checkpoints

Hi all,

I am trying to reduce the memory usage of a Flink app.
There is about 25+Gb of state when persisted to checkpoint/savepoint. And a 
fair amount of short lived objects as incoming traffic is fairly high.
So far, I have 8TM with 20GB each using Flink 1.12. I would like to reduce the 
amount of memory I give, as the state will continue growing. I start my 
application from an existing savepoint.

Given that CPU is not really an issue, I  switched to RocksDB backend, so that 
state is serialized and supposedly much more compact in memory.
I am setting taskmanager.memory.process.size=2m and 
taskmanager.memory.managed.size=6000m (and tried other values ranging from 
3000m to 1m).

The issue I observed is that the task manager pod memory is increasing during 
each checkpoint and the 4th checkpoint fails because most of the pods are 
OOMKilled. There is no java exception in the logs, so I really suspect it is 
simply RocksDB using more memory than allocated.
I explicitly set state.backend.rocksdb.memory.managed=true to be sure.
I tried intervals of 2 minutes and 5 minutes for the checkpoint, and it always 
seems to fail during the 4th checkpoint.

I tried incremental checkpoints and after 30 checkpoints no sign of failure so 
far.

I tried with a few GB of overhead memory but that only delays the issue a bit 
longer.
>From the heap usage graph, in all cases, it looks as expected. The heap goes 
>back to a few hundred MB after GC, as the only long lived state is off-heap. 
>Xmx heap is about 12GB but peak usage is at most 6Gb.


Am I misconfiguring anything that could explain the OOMKilled pods?

Also, what is the best single metric to monitor rocksdb memory usage?  (I tried 
estimate-live-data-size and size-all-mem-tables but I am not fully sure yet 
about their exact meaning).

Best,
Alex


By communicating with Grab Inc and/or its subsidiaries, associate companies and 
jointly controlled entities (“Grab Group”), you are deemed to have consented to 
the processing of your personal data as set out in the Privacy Notice which can 
be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended 
recipient(s). If you are not the intended recipient(s), please do not 
disseminate, distribute or copy this email Please notify Grab Group immediately 
if you have received this by mistake and delete this email from your system. 
Email transmission cannot be guaranteed to be secure or error-free as any 
information therein could be intercepted, corrupted, lost, destroyed, delayed 
or incomplete, or contain viruses. Grab Group do not accept liability for any 
errors or omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and attachments 
therein shall remain vested in Grab Group, unless otherwise provided by law.


Re: How to prevent check pointing of timers ?

2022-02-07 Thread Yun Tang
Hi Alex,

I think the better solution is to know what the problem you have ever met when 
restoring the timers?

Flink does not support to remove state (including timer state) currently.

Best
Yun Tang

From: Alex Drobinsky 
Sent: Monday, February 7, 2022 21:09
To: Caizhi Weng 
Cc: User-Flink 
Subject: Re: How to prevent check pointing of timers ?

By timer I mean regular timer from KeyedState which utilized via function 
onTimer, for example:



public class StateWithTimer {
public long timerValue = 0;
public volatile boolean shouldResetTimer = true;

public boolean resetIfMust(long timeoutInMilliseconds, TimerService 
timerService) {
if (shouldResetTimer) {
setupTimer(timeoutInMilliseconds, timerService);
shouldResetTimer = false;
return true;
}
return false;
}

public void setupTimer(long timeoutInMilliseconds, TimerService 
timerService) {
// Cancel previous timer
timerService.deleteProcessingTimeTimer(timerValue);
// Register new timer
// Should it be configurable ?
timerValue = (timerService.currentProcessingTime() + 
timeoutInMilliseconds)*1000/1000;
timerService.registerProcessingTimeTimer(timerValue);
}

}

State which utilizes timers extends StateWithTimer above, the function 
resetIfMust is current workaround - it resets timers first time after restart 
from checkpoint or start.


@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
   MultiStorePacketState so = state.value();
   if 
(so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, 
ctx.timerService())) {
  return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}




пн, 7 февр. 2022 г. в 05:06, Caizhi Weng 
mailto:tsreape...@gmail.com>>:
Hi!

Could you elaborate more on your code or share it if possible? Which timer are 
you talking about? Are you using the data stream API or SQL API? Do you mean 
the timer registered per record for a window aggregation? Does mini batch 
aggregation [1] solve your problem?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation

Alex Drobinsky mailto:alex.drobin...@gmail.com>> 
于2022年2月3日周四 20:41写道:
Dear flink user,

In our project, restoring the timer's state creates numerous issues, so I would 
like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered timers during the open 
function ?

Best regards,
Alexander


Re: Inaccurate checkpoint trigger time

2022-01-30 Thread Yun Tang
Hi Paul,

I think Robert's idea might be right.

>From the log you pasted, the checkpoint interval is 2m30s. Chk-5 triggered at 
>16:42:23 and completed at 16:42:42.
In the normal case, chk-6 would be triggered near 16:44:53. However, the actual 
chk-6 triggered at 16:46:02, which is obviously not normal case.

I think your analysis is not correct due to the log below:
2022-01-27 16:46:02,693 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 6 (type=CHECKPOINT) @ 1643273162422 for job 
3a57fdaa16502c411a46471bba595d7c.

If you translate the unix time 1643273162422 to Beijing time, you can see this 
is actually 2022-01-27 16:46:02.
If the Zookeeper is really slow to respond, the unix time should be much 
earlier than the logged time [1].

Flink has been improved that checkpointing would be backpressured on slow 
cleanup [2].



[1] 
https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L540
[2] https://issues.apache.org/jira/browse/FLINK-17073

Best
Yun Tang




From: Robert Metzger 
Sent: Friday, January 28, 2022 21:53
To: Paul Lam 
Cc: Yun Tang ; user 
Subject: Re: Inaccurate checkpoint trigger time

Hi Paul,

where are you storing your checkpoints, and what's their size?

IIRC, Flink won't trigger a new checkpoint before the old ones haven't been 
cleaned up, and if your checkpoints are large and stored on S3, it can take a 
while to clean them up (especially with the Hadoop S3 plugin, using presto s3 
is faster).




On Thu, Jan 27, 2022 at 10:56 AM Paul Lam 
mailto:paullin3...@gmail.com>> wrote:
Hi Yun,

Sorry for the late reply. I finally found some time to investigate this problem 
further. I upgraded the job to 1.14.0, but it’s still the same.

I’ve checked the debug logs, and I found that Zookeeper notifies watched event 
of checkpoint id changes very late [1]. Each time a checkpoint finished, it 
would take minutes before the Zookeeper client notices the checkpoint ID is 
changed.

I suspect the checkpoint coordinator is blocking on incrementing checkpoint ID 
on Zookeeper [2]. But with no luck, there’s no many relevant logs can help me 
prove that.

What do you think of this? Thanks a lot!

[1] https://gist.github.com/link3280/5072a054a43b40ba28891837a8fdf995
[2] 
https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L743

Best,
Paul Lam

2021年11月23日 16:49,Paul Lam 
mailto:paullin3...@gmail.com>> 写道:

Hi Yun,

Thanks a lot for your pointers! I’ll try it out as you suggested and then get 
back to you.

Best,
Paul Lam

2021年11月23日 16:32,Yun Tang mailto:myas...@live.com>> 写道:

Hi Paul,

This is really weird, from what I know, flink-1.11.0 has a problem of handling 
min-pause time [1] and this should be resolved in flink-1.12.1.

Could you open the debug log level for org.apache.flink.runtime.checkpoint and 
use jmap or byteman to get the field value of 
CheckpointCoordinator#lastCheckpointCompletionRelativeTime, 
CheckpointRequestDecider#minPauseBetweenCheckpoints and 
SystemClock#relativeTimeMillis in method 
CheckpointRequestDecider#nextTriggerDelayMillis [2] to see any unexpected 
behavior.


[1] https://issues.apache.org/jira/browse/FLINK-18856
[2] 
https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L182


Best
Yun Tang


From: Paul Lam mailto:paullin3...@gmail.com>>
Sent: Tuesday, November 23, 2021 14:35
To: user mailto:user@flink.apache.org>>
Subject: Inaccurate checkpoint trigger time

Hi,

Recently I’ve noticed a job has nondeterministic checkpoint trigger time.

The jobs is using Flink 1.12.1 with FsStateBackend and is of 650 parallelism. 
It was configured to trigger checkpoint every 150 seconds with 0 pause time and 
no concurrent checkpoints. However there’re obvious errors in the checkpoint 
trigger times, as the actual interval may vary from 30 seconds to 6 minutes.

The jobmanager logs are good, and no error logs is found. Some of the output 
are as follow:

2021-11-23 13:51:46,438 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1446 for job f432b8d90859db54f7a79ff29a563ee4 (47142264825 bytes in 
22166 ms).
2021-11-23 13:57:21,021 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1447 (type=CHECKPOINT) @ 1637647040653 for job 
f432b8d90859db54f7a79ff29a563ee4.
2021-11-23 13:57:43,761 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1447 for job f432b8d90859db54f7a79ff29a563ee4 (46563195101 bytes in 
21813 ms).
2021-11-23 13:59:09,387 INFO 
org.apache.fli

Re: Reading performance - Kafka VS FileSystem

2022-01-26 Thread Yun Tang
Hi Jasmin,

>From my knowledge, it seems no big company would adopt pure file system source 
>as the main data source of Flink. We would in general choose a message queue, 
>e.g Kafka, as the data source.

Best
Yun Tang

From: Jasmin Redžepović 
Sent: Wednesday, January 26, 2022 23:13
To: user@flink.apache.org 
Subject: Re: Reading performance - Kafka VS FileSystem

Also, what would you recommend? I have both options available:

  *   Kafka - protobuf messages
  *   S3 - here are messages copied from kafka for persistence with Kafka 
Connect service

On 26.01.2022., at 14:43, Jasmin Redžepović 
mailto:jasmin.redzepo...@superbet.com>> wrote:

Hello Flink committers :)

Just one short question:
How is performance of reading from Kafka source compared to reading from 
FileSystem source? I would be very grateful if you could provide a short 
explanation.

I saw in documentation that both provide exactly-once semantics for streaming, 
but this sentence about FileSystem got me thinking about performance: “For any 
repeated enumeration, the SplitEnumerator filters out previously detected files 
and only sends new ones to the SourceReader.”  - is this filtering slowing down 
reading if there are more and more files?

p.s. I’m new to the Flink

Thanks for your help and Best regards,
Jasmin



This email is confidential and intended solely for the use of the individual or 
entity to whom it is addressed. If you received this e-mail by mistake, please 
notify the sender immediately by e-mail and delete this e-mail from your 
system. Please be informed that if you are not the intended recipient, you 
should not disseminate, distribute, disclose, copy or use this e-mail in any 
way, the act of dissemination, distribution, disclosure, copying or taking any 
action in reliance on the contents of this information being strictly 
prohibited. This e-mail is sent by a Superbet Group company. Any views 
expressed by the sender of this email are not necessarily those of Superbet 
Group. Please note that computer viruses can be transmitted by email. You are 
advised to check this email and any attachments for the presence of viruses. 
Superbet Group cannot accept any responsibility for any viruses transmitted by 
this email and/or any attachments.


Re: Failure Restart Strategy leads to error

2022-01-26 Thread Yun Tang
Hi Siddhesh,

The root cause is that the configuration of group.id is missing for the Flink 
program. The configuration of restart strategy has no relationship with this.

I think you should pay your attention to kafka related configurations.


Best
Yun Tang

From: Siddhesh Kalgaonkar 
Sent: Wednesday, January 26, 2022 3:17
To: user 
Subject: Failure Restart Strategy leads to error

I have Flink Kafka Consumer in place which works fine until I add the below 
lines:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))

It gives me the below error stack trace:

DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) - Close 
ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642.
org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink Kafka 
Example(b425ae91bfb0e81980b878b3e4392137).
at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
DEBUG [flink-akka.actor.default-dispatcher-12] 
(DefaultJobLeaderIdService.java:148) - Remove job 
b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring.
 INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047) - 
Disconnect job manager 
a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3 for job 
b425ae91bfb0e81980b878b3e4392137 from the resource manager.
DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:80) 
- Initiating tracking of resources for job b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:60) 
- Stopping tracking of resources for job b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-14] (AkkaRpcActor.java:131) - The 
RpcEndpoint jobmanager_3 terminated successfully.
 INFO [flink-akka.actor.default-dispatcher-8] 
(DefaultJobLeaderService.java:136) - Stop job leader service.
 INFO [flink-akka.actor.default-dispatcher-8] 
(TaskExecutorLocalStateStoresManager.java:231) - Shutting down 
TaskExecutorLocalStateStoresManager.
DEBUG [flink-akka.actor.default-dispatcher-8] (IOManagerAsync.java:121) - 
Shutting down I/O manager.
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.sca

Re: Is Scala the best language for Flink?

2022-01-24 Thread Yun Tang
Hi Sebastian,

If you are a Flink runtime developer, Flink already make the runtime code scala 
free [1] for maintenance concerns. If you are just a Flink user, I think both 
languages are fine.

[1] https://issues.apache.org/jira/browse/FLINK-14105
[FLINK-14105] Make flink-runtime scala-free - ASF 
JIRA<https://issues.apache.org/jira/browse/FLINK-14105>
As the consensus among our community(please link dedicated thread if there is) 
we keep in mind that flink-runtime will be eventually scala-free. It is because 
of ...
issues.apache.org

Best
Yun Tang

From: seb 
Sent: Monday, January 24, 2022 20:14
To: user@flink.apache.org 
Subject: Is Scala the best language for Flink?

Hi there,

I am getting started with Apache Flink. I am curious whether there is a clear 
winner between developing in either Scala or Java.

It sounds like Flink is typically slower to support new versions of Scala and 
that Java development might have fewer quirks.

What do you think? I have experience coding in Scala, but I am more than happy 
to learn Java.

Thanks in advance for sharing your thoughts!

Best,
Sebastian


Re: Question about MapState size

2022-01-23 Thread Yun Tang
Hi Abdul,

What does "only count pertaining to the specific key of partition" mean? The 
counting size is for the map related to a specific selected key or the all the 
maps in the whole map state?

You can leverage RocksDB's native metrics to monitor the rocksDB usage, such as 
total-sst-files-size[1] to know the total sst files on disks of each rocksDB.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-total-sst-files-size


Best
Yun Tang

From: Abdul Rahman 
Sent: Saturday, January 22, 2022 14:51
To: user@flink.apache.org 
Subject: Question about MapState size

Hello,

I have a streaming application that has an operator based on the
KeyedCoProcessFunction. The operator has a MapState object.  I store
some data in this operator with a fixed ttl. I would like to monitor
the size/count of this state over time since its related to some
operational metrics we want to track. Seems like a simple thing to do;
but I havent come up with a way to do so

Given that iterating over the complete map is an expensive operation,
I only plan to do so periodically.  The first issue is that , the
stream is keyed, so any time i do a count of the mapstate, i dont get
the complete size of the state object, but only count pertaining to
the specific key of partition. Is there a way around this ?

Secondly, is there a way to monitor rocksdb usage over time. I can
find managed memory metrics. but this does not include disk space
rocksdb used. is there a way to get this from standard flink metrics;
either task manager or job manager ?


Re: Apache Flink - Can AllWindowedStream be parallel ?

2022-01-23 Thread Yun Tang
Hi Singh,

All the output operator transformed by AllWindowedStream would be 
SingleOutputStreamOperator, which cannot be parallel.


[1] 
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala

Best
Yun Tang

From: M Singh 
Sent: Sunday, January 23, 2022 4:24
To: User-Flink 
Subject: Apache Flink - Can AllWindowedStream be parallel ?

Hi Folks:

The documentation for AllWindowedStream 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#datastream-rarr-allwindowedstream)
 has a note:


This is in many cases a non-parallel transformation. All records will be 
gathered in one task for the windowAll operator.


Does this mean that in some cases it might be parallel ?  If so, is there an 
example of such a scenario ?

Thanks


Re: [DISCUSS] Deprecate MapR FS

2022-01-09 Thread Yun Tang
+1 for dropping the MapR Fs.

Best
Yun Tang

From: Till Rohrmann 
Sent: Wednesday, January 5, 2022 18:33
To: Martijn Visser 
Cc: David Morávek ; dev ; Seth Wiesman 
; User 
Subject: Re: [DISCUSS] Deprecate MapR FS

+1 for dropping the MapR FS.

Cheers,
Till

On Wed, Jan 5, 2022 at 10:11 AM Martijn Visser 
mailto:mart...@ververica.com>> wrote:
Hi everyone,

Thanks for your input. I've checked the MapR implementation and it has no 
annotation at all. Given the circumstances that we thought that MapR was 
already dropped, I would propose to immediately remove MapR in Flink 1.15 
instead of first marking it as deprecated and removing it in Flink 1.16.

Please let me know what you think.

Best regards,

Martijn

On Thu, 9 Dec 2021 at 17:27, David Morávek 
mailto:d...@apache.org>> wrote:
+1, agreed with Seth's reasoning. There has been no real activity in MapR FS 
module for years [1], so the eventual users should be good with using the jars 
from the older Flink versions for quite some time

[1] 
https://github.com/apache/flink/commits/master/flink-filesystems/flink-mapr-fs

Best,
D.

On Thu, Dec 9, 2021 at 4:28 PM Konstantin Knauf 
mailto:kna...@apache.org>> wrote:
+1 (what Seth said)

On Thu, Dec 9, 2021 at 4:15 PM Seth Wiesman 
mailto:sjwies...@gmail.com>> wrote:

> +1
>
> I actually thought we had already dropped this FS. If anyone is still
> relying on it in production, the file system abstraction in Flink has been
> incredibly stable over the years. They should be able to use the 1.14 MapR
> FS with later versions of Flink.
>
> Seth
>
> On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
> mailto:mart...@ververica.com>>
> wrote:
>
>> Hi all,
>>
>> Flink supports multiple file systems [1] which includes MapR FS. MapR as
>> a company doesn't exist anymore since 2019, the technology and intellectual
>> property has been sold to Hewlett Packard.
>>
>> I don't think that there's anyone who's using MapR anymore and therefore
>> I think it would be good to deprecate this for Flink 1.15 and then remove
>> it in Flink 1.16. Removing this from Flink will slightly shrink the
>> codebase and CI runtime.
>>
>> I'm also cross posting this to the User mailing list, in case there's
>> still anyone who's using MapR.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>>
>

--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Operator state in New Source API

2021-12-22 Thread Yun Tang
Hi Krzysztof,

Non-keyed operator state only supports list-like state [1] as there exist no 
primary key in operator state. That is to say you cannot use map state in 
source operator.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-operator-state


Best,
Yun Tang

From: Krzysztof Chmielewski 
Sent: Thursday, December 23, 2021 6:32
To: user 
Subject: Operator state in New Source API

Hi,
Is it possible to use managed operator state like MapState in an implementation 
of new unified source interface [1]. I'm especially interested with using 
Managed State in SplitEnumerator implementation.

I have a use case that is a variation of File Source where I will have a great 
number of files that I need to process, for example a million. I know that 
FileSource maintains a collection of already processed paths in 
ContinuousFileSplitEnumerator object.

In my case I cannot afford to have all million Strings sitting on my heap. I'm 
hoping to use an operator state for this and build splits in batches, 
periodically adding new files to the alreadyProcessedPaths collection.

Regards,
Krzysztof Chmielewski


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yun Tang
Great news!
Thanks for all the guys who contributed in this project.

Best
Yun Tang

On 2021/11/30 16:30:52 Till Rohrmann wrote:
> Great news, Yingjie. Thanks a lot for sharing this information with the
> community and kudos to all the contributors of the external shuffle service
> :-)
> 
> Cheers,
> Till
> 
> On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> 
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1] for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and further
> > embrace cloud native. For more features about the project, please refer to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
> 


Re: Will Flink loss some old Keyed State when changing the parallelism

2021-11-26 Thread Yun Tang
Hi Yang,

Flink keeps the max key groups the same no matter how parallelism changes, and 
use this to avoid state data lost [1]

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html


Best
Yun Tang

On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
> Hi,
> 
> to rescale, you should take a savepoint, stop the job, then restart from
> the savepoint with your new desired parallelism. This way, no data will be
> lost.
> 
> Best,
> Nico
> 
> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
> 
> > Will Flink loss some old Keyed State when changing the parallelism, like 2
> > -> 5, or 5->3?
> >
> >
> >
> >
> >
> >
> >
> 


Re: savepoint.readKeyedState hangs on org.rocksdb.RocksDB.disposeInternal

2021-10-31 Thread Yun Tang
Hi Mike,

Which version of Flink did you use? Could you try Flink-1.14 which enables 
logging of RocksDB [1][2] to see what reported in RocksDB log. From my 
experience, this is caused by waiting for resource (maybe column family) to 
close when closing the DB, and you should not meet this problem each time.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-log-dir
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-log-level

Best
Yun Tang


From: Mike Barborak 
Sent: Monday, November 1, 2021 3:36
To: user@flink.apache.org 
Subject: savepoint.readKeyedState hangs on org.rocksdb.RocksDB.disposeInternal


Hi,



I am using the state processing API to examine a savepoint. My code works fine 
when I use a HashMapStateBackend but for larger savepoints, I don’t have enough 
memory so need to use a EmbeddedRocksDBStateBackend. Even then, I am able to 
process some smaller states but this one:



operatorID,parallelism,maxParallelism,coordinatorState (bytes),sub task 
states,total size (bytes)

6030185956219c0e7d5d37d16df14a69,1,128,(none),1,16201253369



…hangs with a thread stuck here:



org.rocksdb.RocksDB.disposeInternal(long) RocksDB.java (native)

org.rocksdb.RocksObject.disposeInternal() RocksObject.java:37

org.rocksdb.AbstractImmutableNativeReference.close() 
AbstractImmutableNativeReference.java:57

org.apache.flink.util.IOUtils.closeQuietly(AutoCloseable) IOUtils.java:275

org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose() 
RocksDBKeyedStateBackend.java:456

org.apache.flink.state.api.input.operator.StateReaderOperator.close() 
StateReaderOperator.java:120

org.apache.flink.state.api.input.KeyedStateInputFormat.close() 
KeyedStateInputFormat.java:206

org.apache.flink.runtime.operators.DataSourceTask.invoke() 
DataSourceTask.java:219

org.apache.flink.runtime.taskmanager.Task.doRun() Task.java:779

org.apache.flink.runtime.taskmanager.Task.run() Task.java:566

java.lang.Thread.run() Thread.java:829



I found this issue:



https://issues.apache.org/jira/browse/FLINK-20044



which seems to imply that an error has occurred but I don’t see any sign of an 
error in my logs. My code basically looks like this:



DataSource source = savepoint.readKeyedState("re-process-1", new 
ReFunctionStateReader());

…

source. writeAsFormattedText(…);

env.execute();



And below is how the logs end. Any suggestions as to what I might do to resolve 
this issue?



Thanks,

Mike



9281 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink 
(TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) 
(9373e99f41ebc27a485fdd3bb3496a1a) switched from DEPLOYING to INITIALIZING.

9281 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink 
(TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) 
(9373e99f41ebc27a485fdd3bb3496a1a) switched from INITIALIZING to RUNNING.

9291 WARN  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at 
readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) exceeded the 80 
characters length limit and was truncated.

9291 WARN  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at 
runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 
characters length limit and was truncated.

9299 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager 
uses directory 
/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv8gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280
 for spill files.

9308 INFO  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at 
runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
(9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED.

9308 INFO  [CHAIN DataSource (at runStateReport

Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-22 Thread Yun Tang
Thanks for Chesnay & Martijn and everyone who made this release happen.

Best
Yun Tang

From: JING ZHANG 
Sent: Friday, October 22, 2021 10:17
To: dev 
Cc: Martijn Visser ; Jingsong Li 
; Chesnay Schepler ; user 

Subject: Re: [ANNOUNCE] Apache Flink 1.13.3 released

Thank Chesnay, Martijn and every contributor for making this happen!


Thomas Weise mailto:t...@apache.org>> 于2021年10月22日周五 上午12:15写道:
Thanks for making the release happen!

On Thu, Oct 21, 2021 at 5:54 AM Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
>
> Thanks to Chesnay & Martijn and everyone who made this release happen.
>
>
> > 在 2021年10月21日,20:08,Martijn Visser 
> > mailto:mart...@ververica.com>> 写道:
> >
> > Thank you Chesnay, Leonard and all contributors!
> >
> > On Thu, 21 Oct 2021 at 13:40, Jingsong Li 
> > mailto:jingsongl...@gmail.com> 
> > <mailto:jingsongl...@gmail.com<mailto:jingsongl...@gmail.com>>> wrote:
> > Thanks, Chesnay & Martijn
> >
> > 1.13.3 really solves many problems.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf 
> > mailto:kna...@apache.org> 
> > <mailto:kna...@apache.org<mailto:kna...@apache.org>>> wrote:
> > >
> > > Thank you, Chesnay & Martijn, for managing this release!
> > >
> > > On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler 
> > > mailto:ches...@apache.org> 
> > > <mailto:ches...@apache.org<mailto:ches...@apache.org>>>
> > > wrote:
> > >
> > > > The Apache Flink community is very happy to announce the release of
> > > > Apache Flink 1.13.3, which is the third bugfix release for the Apache
> > > > Flink 1.13 series.
> > > >
> > > > Apache Flink® is an open-source stream processing framework for
> > > > distributed, high-performing, always-available, and accurate data
> > > > streaming applications.
> > > >
> > > > The release is available for download at:
> > > > https://flink.apache.org/downloads.html 
> > > > <https://flink.apache.org/downloads.html>
> > > >
> > > > Please check out the release blog post for an overview of the
> > > > improvements for this bugfix release:
> > > > https://flink.apache.org/news/2021/10/19/release-1.13.3.html 
> > > > <https://flink.apache.org/news/2021/10/19/release-1.13.3.html>
> > > >
> > > > The full release notes are available in Jira:
> > > >
> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> > > >  
> > > > <https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329>
> > > >
> > > > We would like to thank all contributors of the Apache Flink community
> > > > who made this release possible!
> > > >
> > > > Regards,
> > > > Chesnay
> > > >
> > > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable <https://twitter.com/snntrable>
> > >
> > > https://github.com/knaufk <https://github.com/knaufk>
> >
> >
> >
> > --
> > Best, Jingsong Lee
>


Re: Reset of transient variables in state to default values.

2021-10-20 Thread Yun Tang
Hi,

For RocksDB state backend, it will pick the registered kryo serializer for 
normal read/write use and checkpint/restore. Moreover, since key-values are 
serialized to store in RocksDB, it actually deep copy them to avoid later 
unexpected modification.

For FileSystem/HashMap state backend, it will pick the registered kryo 
serializer only for checkpoint/restore. Since java-based state backend would 
not deep copy key-values for performance reasons, it might be changed 
unexpectedly if user misused, which might make the field reset to default value.

Best,
Yun Tang

From: Arvid Heise 
Sent: Monday, October 18, 2021 20:30
To: Alex Drobinsky 
Cc: JING ZHANG ; Yun Tang ; User-Flink 

Subject: Re: Reset of transient variables in state to default values.

That's what I would try out, but I'm not sure if the statebackend would pick 
that up. @Yun Tang<mailto:myas...@live.com> do you know more?

On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky 
mailto:alex.drobin...@gmail.com>> wrote:
Hi Arvid,

It sounds like a good direction, do I need to register my state class with 
KryoSerializer , similar to this ?

env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
 ProtobufSerializer.class);


пн, 18 окт. 2021 г. в 10:32, Arvid Heise 
mailto:ar...@apache.org>>:
Hi Alex,

could you also log the identifity hashcode (or something similar) of the 
related instance? I suspect that it's not the field that is set to null but 
that you get a clone where the field is null. In that case, you need to add a 
specific KryoSerializer to initialize it (or just go with a lazy access pattern 
all the way).

On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky 
mailto:alex.drobin...@gmail.com>> wrote:
Hi Jing,

Job doesn't restart from the checkpoint, it's a brand new clean job , no 
exceptions happened during execution, no restarts :)
The state is a Keyed State so a new key means a new State - in this situation a 
currentFile is equal to null - as expected and handled without issues.
Before I even thought to inquire about my questions, the first thing I did - I 
added log messages with the value of currentFile in any place it could be 
changed.
So I checked that before I release my control to Flink, currentFile has the 
correct value and after I receive value from state in the next iteration it's 
set to null.
The checkpoints by themselves could be irrelevant to the problem, the only 
indication of connection is my assumption based on observation that the 
interval between first event and first occurrence of nullification is exactly 
the same as the checkpoint interval.

Yun Tang - you are correct, it's a KryoSerializer, if I remove the "transient" 
qualifier from currentFile, it crashes inside of KryoSerializer because 
RandomAccessFile isn't serializable.
Which also points to the fact that at least once serialization was actually 
executed. I will try an alternative approach - I will add my own writeObject 
implementation, it should work :)

Best regards,
Alex






вт, 12 окт. 2021 г. в 15:07, JING ZHANG 
mailto:beyond1...@gmail.com>>:
Hi Alex,
Since you use `FileSystemStateBackend`, I think currentFile became nullified 
once in a while is not caused by period checkpoint.

Because if job is running without failover or restore from checkpoint, 
read/write value state on `FileSystemStateBackend` does not cause serialization 
and deserialization at all. I have already simplify your coding and verify this 
point. If you are interested, you can find this simplified code in the 
attachment of the email.

There are some possible reasons come to my mind, I hope this helps.
1. Does job restore from checkpoint/savepoint? This may caused by failover or 
user trigger stop-with-savepoint.
2. If job does not restore from checkpoint or savepoint.
 2.1 After read the MultiStorePacketState from ValueState, is there 
somewhere in your program to update the currentFile field to null again? 
Because the state stored in heap,  it may be changed if program updates its 
value somewhere.
 2.2 When the currentFile became null, is there any possible that current 
key never appear before? that is it's the first time that the current key 
appears, so get state would return default value(a new MultiStorePacketState 
instance with null currentFile)

Best,
JING ZHANG

Yun Tang mailto:myas...@live.com>> 于2021年10月12日周二 下午4:41写道:
Hi Alex,

Since you use customized MultiStorePacketState class as the value state type, 
it should use kryo serializer [1] to serialize your class via accessing RocksDB 
state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo 
would serialize your transient field.
If you're not familiar with Flink's serialization stack, I think you could 
check behaviors below:

  1.  Without any checkpoint restore, use FileSystemStateBackend to see wh

Re: Reset of transient variables in state to default values.

2021-10-12 Thread Yun Tang
Hi Alex,

Since you use customized MultiStorePacketState class as the value state type, 
it should use kryo serializer [1] to serialize your class via accessing RocksDB 
state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo 
would serialize your transient field.
If you're not familiar with Flink's serialization stack, I think you could 
check behaviors below:

  1.  Without any checkpoint restore, use FileSystemStateBackend to see whether 
the transient field could be read as expected, the answer should be yes.
  2.  After restoring from checkpoint, check whether could read the transient 
field back if using FileSystemStateBackend.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class

Best
Yun Tang



From: Alex Drobinsky 
Sent: Monday, October 11, 2021 22:37
To: JING ZHANG 
Cc: User-Flink 
Subject: Re: Reset of transient variables in state to default values.

It would be difficult to provide even a semblance of the complete product , 
however I could try to provide enough details to reproduce the problem.
Standard source would do:

DataStream stream = env.addSource(
new FlinkKafkaConsumer<>(topic, new 
AbstractDeserializationSchema() {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
}, properties)).name(topic);

The operator body something like:


public class MultiStorePacketFunction extends KeyedProcessFunction implements Serializable {
   private transient ValueState state;

   @Override
   public void processElement(SplitterToMultiStore packet, Context ctx, 
Collector out) throws Exception {
  if (packet.hasPackets()) {
 storedPackets.inc(packet.getPackets().getPacketsCount());
  }

  MultiStorePacketState so = state.value();
  if (process(packet, out, so, ctx)) {
 state.update(null);
 state.clear();
  } else {
 state.update(so);
  }
   }

public String generateNextFilename(String sessionKey, int partNumber) {
  String path = DirectoryService.getInstance().bookDirectory();
  return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
   }

   private void storeContent(Collector collector, 
MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
  assert (packets != null);
  assert (packets.hasPackets());

  if ( state.currentFile == null) {
 openFile(collector, state, packets);
  }

  Utils.continueWriteToPcap(state.currentFile, 
packets.getPackets().getPacketsList());
  state.fileOffset = state.currentFile.length();

  tryToCloseFile(collector, state);
   }

   static public String extractExportedFileName(String fileName) {
  String path[] = fileName.split("/");
  return path[path.length - 2] + "/" + path[path.length - 1];
   }

   private void openFile(Collector collector, 
MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
  state.fileIsOpened = true;
  state.fileName = generateNextFilename(state.sessionKey, state.partNumber);
  state.exportedFileName = extractExportedFileName(state.fileName);

// -> Here RandomAccessFile created
  state.currentFile = Utils.startWriteToPcap(state.fileName, 
packets.getPackets().getPacketsList());
  state.fileOffset = state.currentFile.length();
  state.partNumber++;
   }

   private void tryToCloseFile(Collector collector, 
MultiStorePacketState state) throws IOException {
  if (state.currentFile.length() < 
StorePacketConfigurationParameters.partSizeLimit) {
 return;
  }
  closeFile(collector, state);
   }

   private void closeFile(Collector collector, 
MultiStorePacketState state) throws IOException {
  state.currentFile.close();
  state.currentFile = null;
  state.fileIsOpened = false;
  ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
  outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
  outputBuilder.setSessionType(SessionType.Multi);
  outputBuilder.setSessionKey(state.sessionKey);
  var classifierOutput = outputBuilder.build();
  state.sessionMetadata.add(classifierOutput);
  collector.collect(classifierOutput);
   }

public boolean process(SplitterToMultiStore packet, 
Collector collector, MultiStorePacketState so, Context 
context) throws Exception {

  // First message
  if (packet.hasClassificationResult()) {
 sendClassificationResult(packet, collector, so);
 return false;
  }

  // Last message
  if (packet.hasSessionClosure()) {
 if (so.isCoverageIncorrect) {
return true;
 }
 handleSessionClosure(packet, collector, so, context);
 return true;

Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Yun Tang
Hi Lei,

RocksDB state-backend's checkpoint is composited by RocksDB's own files 
(unmodified compressed SST format files) and incremental checkpoints means 
Flink does not upload files which were uploaded before. As you can see, 
incremental checkpoints highly depend on the RocksDB's own mechanism to remove 
useless files, which is triggered by internal compaction. You should not care 
too much on the checkpointed data size as your job consuming more and more 
records, moreover the increasing size is actually quite small (from 1.32GB to 
1.34GB).

Best
Yun Tang




From: Lei Wang 
Sent: Monday, October 11, 2021 16:16
To: user 
Subject: Checkpoint size increasing even i enable increasemental checkpoint


[image.png]

The  checkpointed data size became bigger and bigger and the node cpu is very 
high when the job is doing checkpointing.
 But I have enabled incremental checkpointing:  env.setStateBackend(new 
RocksDBStateBackend(checkpointDir, true));

I am using flink-1.11.2 and aliyun oss as checkpoint storage.


Any insight on this?

Thanks,

Lei



Re: Cleaning old incremental checkpoint files

2021-09-17 Thread Yun Tang
Hi Robin,

You could use Checkpoints#loadCheckpointMetadata[1] to analysis the checkpoint 
meta data.

For the problem of make checkpoint self-contained, you might be interested in 
the ticket [2]


[1] 
https://github.com/apache/flink/blob/8debdd06be0e917610c50a77893f7ade45cee98f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99
[2] https://issues.apache.org/jira/browse/FLINK-24149

Best
Yun Tang

From: Robin Cassan 
Sent: Tuesday, September 7, 2021 20:17
To: Yun Tang 
Cc: Robert Metzger ; user 
Subject: Re: Cleaning old incremental checkpoint files

Hey Yun, thanks for the answer!

How would you analyze the checkpoint metadata? Would you build a program with 
the State Processor API library, or is there a better way to do it?
I believe the option you mention would indeed facilitate cleaning, it would 
still be manual (because we can't set a periodic deletion) but at least we can 
safely remove old folders with this option

Thanks,
Robin

Le ven. 3 sept. 2021 à 18:21, Yun Tang 
mailto:myas...@live.com>> a écrit :
Hi Robin,

It's not easy to clean incremental checkpoints as different job instances have 
different checkpoint sub-directory (due to different job id). You could 
analysis your checkpoint metadata to see what files are still useful in older 
checkpoint directory.

BTW, I also think of a possible solution to provide the ability to re-upload 
all files under some specific configured option so that we could let new job 
get decoupled with older checkpoints. Do you think that could resolve your case?

Best
Yun Tang

From: Robin Cassan 
mailto:robin.cas...@contentsquare.com>>
Sent: Wednesday, September 1, 2021 17:38
To: Robert Metzger mailto:rmetz...@apache.org>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Cleaning old incremental checkpoint files

Thanks Robert for your answer, this seems to be what we observed when we tried 
to delete the first time: Flink complained about missing files.
I'm wondering then how are people cleaning their storage for incremental 
checkpoints? Is there any guarantee when using TTLs that after the TTL has 
expired, no more file older than the TTL will be needed in the shared folder?

Le mar. 3 août 2021 à 13:29, Robert Metzger 
mailto:rmetz...@apache.org>> a écrit :
Hi Robin,

Let's say you have two checkpoints #1 and #2, where #1 has been created by an 
old version or your job, and #2 has been created by the new version.
When can you delete #1?
In #1, there's a directory "/shared" that contains data that is also used by 
#2, because of the incremental nature of the checkpoints.

You can not delete the data in the /shared directory, as this data is 
potentially still in use.

I know this is only a partial answer to your question. I'll try to find out 
more details and extend my answer later.


On Thu, Jul 29, 2021 at 2:31 PM Robin Cassan 
mailto:robin.cas...@contentsquare.com>> wrote:
Hi all!

We've happily been running a Flink job in production for a year now, with the 
RocksDB state backend and incremental retained checkpointing on S3. We often 
release new versions of our jobs, which means we cancel the running one and 
submit another while restoring the previous jobId's last retained checkpoint.

This works fine, but we also need to clean old files from S3 which are starting 
to pile up. We are wondering two things:
- once the newer job has restored the older job's checkpoint, is it safe to 
delete it? Or will the newer job's checkpoints reference files from the older 
job, in which case deleting the old checkpoints might cause errors during the 
next restore?
- also, since all our state has a 7 days TTL, is it safe to set a 7 or 8 days 
retention policy on S3 which would automatically clean old files, or could we 
still need to retain files older than 7 days even with the TTL?

Don't hesitate to ask me if anything is not clear enough!

Thanks,
Robin


Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Yun Tang
Hi David,

I think Seth had shared some useful information.

If you want to know what happened within RocksDB when you're reading, you can 
leverage async-profiler [1] to catch the RocksDB stacks and I guess that index 
block might be evicted too frequently during your read. And we could use new 
read option which disable fillCache [2] to speedup bulk scan in the future to 
help improve the performance.


Best
Yun Tang

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)

From: Seth Wiesman 
Sent: Friday, September 10, 2021 0:58
To: David Causse ; user 
Cc: Piotr Nowojski 
Subject: Re: State processor API very slow reading a keyed state with RocksDB

Hi David,

I was also able to reproduce the behavior, but was able to get significant 
performance improvements by reducing the number of slots on each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime 
execution of DataSet over DataStream. In particular, Flink's DataStream 
operators are aware of the resource requirements of the state backend and 
include RocksDB in its internal memory configurations. In the state processor 
api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances within 
the same JVM, you are actually running a single native process with multiple 
logical instances. I _think_ we are seeing contention amongst the logical 
RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to continue 
investigating. If my suspicion for the slowness is correct, we will need to 
migrate to the new Source API and improve this as part of DataStream 
integration. This migration is something we'd like to do regardless, but I 
don't have a timeline to share.

Aside: Why is writing still relatively fast?

Even with these factors accounted for, I do still expect writing to be faster 
than reading. This is due to both how RocksDB internal data structures work, 
along with some peculiarities of how to state processor API has to perform 
reads.

1. RocksDB internally uses a data structure called a log structured merge tree 
(or LSM). This means writes are always implemented as appends, so there is no 
seek required. Additionally, writes go into an in-memory data structure called 
a MemTable that is flushed to disk asynchronously.  Because there may be 
multiple entries for a given key, RocksDB needs to search for the most recent 
value and potentially read from disk. This may be alleviated by enabling bloom 
filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state 
(ValueState, ListState, etc) as its own column family (table). A key only 
exists in a table if it has a non-null value. This means not all keys exist in 
all column families for a given operator. The state-proc-api wants to make it 
appear as if each operator is composed of a single table with multiple columns. 
To do this, we perform a full table scan on one column family and then do point 
lookups of that key on the others. However, we still need to find the keys that 
may only exist in other tables. The trick we perform is to delete keys from 
rocksDB after each read, so we can do full table scans on all column families 
but never see any duplicates. This means the reader is performing multiple 
reads and writes on every call to `readKey` and is more expensive than it may 
appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried 
profiling/flame graphs and I was not able to make much sense out of those 
results. There are no IO/Memory bottlenecks that I could notice, it looks 
indeed like the Job is stuck inside RocksDB itself. This might be an issue with 
for example memory configuration. Streaming jobs and State Processor API are 
running in very different environments as the latter one is using DataSet API 
under the hood, so maybe that can explain this? However I'm no expert in 
neither DataSet API nor the RocksDB, so it's hard for me to make progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse 
mailto:dcau...@wikimedia.org>> napisał(a):
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower 
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a long 
value. Generating the bootstrap state from a CSV file with 100M entries takes a 
couple minutes over 12 slots spread over 3 TM (4Gb allowed). But another job 
that does the opposite (converts this state into a CSV file) takes several 
hours. I would have expected these two job runtimes to be in the same ballpark.

I w

Re: Cleaning old incremental checkpoint files

2021-09-03 Thread Yun Tang
Hi Robin,

It's not easy to clean incremental checkpoints as different job instances have 
different checkpoint sub-directory (due to different job id). You could 
analysis your checkpoint metadata to see what files are still useful in older 
checkpoint directory.

BTW, I also think of a possible solution to provide the ability to re-upload 
all files under some specific configured option so that we could let new job 
get decoupled with older checkpoints. Do you think that could resolve your case?

Best
Yun Tang

From: Robin Cassan 
Sent: Wednesday, September 1, 2021 17:38
To: Robert Metzger 
Cc: user 
Subject: Re: Cleaning old incremental checkpoint files

Thanks Robert for your answer, this seems to be what we observed when we tried 
to delete the first time: Flink complained about missing files.
I'm wondering then how are people cleaning their storage for incremental 
checkpoints? Is there any guarantee when using TTLs that after the TTL has 
expired, no more file older than the TTL will be needed in the shared folder?

Le mar. 3 août 2021 à 13:29, Robert Metzger 
mailto:rmetz...@apache.org>> a écrit :
Hi Robin,

Let's say you have two checkpoints #1 and #2, where #1 has been created by an 
old version or your job, and #2 has been created by the new version.
When can you delete #1?
In #1, there's a directory "/shared" that contains data that is also used by 
#2, because of the incremental nature of the checkpoints.

You can not delete the data in the /shared directory, as this data is 
potentially still in use.

I know this is only a partial answer to your question. I'll try to find out 
more details and extend my answer later.


On Thu, Jul 29, 2021 at 2:31 PM Robin Cassan 
mailto:robin.cas...@contentsquare.com>> wrote:
Hi all!

We've happily been running a Flink job in production for a year now, with the 
RocksDB state backend and incremental retained checkpointing on S3. We often 
release new versions of our jobs, which means we cancel the running one and 
submit another while restoring the previous jobId's last retained checkpoint.

This works fine, but we also need to clean old files from S3 which are starting 
to pile up. We are wondering two things:
- once the newer job has restored the older job's checkpoint, is it safe to 
delete it? Or will the newer job's checkpoints reference files from the older 
job, in which case deleting the old checkpoints might cause errors during the 
next restore?
- also, since all our state has a 7 days TTL, is it safe to set a 7 or 8 days 
retention policy on S3 which would automatically clean old files, or could we 
still need to retain files older than 7 days even with the TTL?

Don't hesitate to ask me if anything is not clear enough!

Thanks,
Robin


Re: JobManager Resident memory Always Increasing

2021-08-15 Thread Yun Tang
Hi Pranjul,

First of all, you adopted on-heap state backend: HashMapStateBackend, which 
would not use native off-heap memory. Moreover, JobManager would not initialize 
any keyed state backend instance. And if not enable high availability, 
JobManagerCheckpointStorage would also not use direct memory to write 
checkpoint stream out. Did you use some third-party native library?

You could use native memory tracking [1] to see whether JVM used too much 
overhead native memory. And use memory allocator analysis tool such as JeMalloc 
[2] to see whether existed unexpected native memory usage.


[1] https://shipilev.net/jvm/anatomy-quarks/12-native-memory-tracking/
[2] https://gist.github.com/thomasdarimont/79b3cef01e5786210309

Best
Yun Tang

From: Pranjul Ahuja 
Sent: Monday, August 16, 2021 13:10
To: user@flink.apache.org 
Subject: JobManager Resident memory Always Increasing

Hi,

We are running the JobManager container with 1024GB out of which 512MB is 
allocated to the heap. We observe that the JobManager container's resident 
memory is always increasing and it never comes down. Heap usage remains to be 
constant and not rising abruptly. Can anyone help here where else this memory 
is going?

Statebackend Used - HashMapStateBackend
Checkpoint Storage - JobManagerCheckpointStorage




Re: Inspecting SST state of rocksdb

2021-08-09 Thread Yun Tang
Hi Kai,

Since the stored key and value in RocksDB are serialized bytes, it's not easy 
to read directly. You could consider to use State Processor API [1]. However, 
this needs you to know what the state name of the SQL operator. You could 
analysis the checkpoing  _metadata [2] to know state names.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/#state-processor-api
[2] 
https://github.com/apache/flink/blob/0d2b945729df8f0a0149d02ca24633ae52a1ef21/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

Best
Yun Tang

From: Kai Fu 
Sent: Monday, August 9, 2021 13:01
To: user 
Subject: Inspecting SST state of rocksdb

Hi team,

I'm trying to inspect SST files of flink's state with sst related tools like 
sst_dump, ldb in 
wiki<https://github.com/facebook/rocksdb/wiki/Administration-and-Data-Access-Tool>.
 But it seems I'm getting meaningless results as shown below. The tools I'm 
using are from RocksDB's trunk and built from source. Am I doing it the right 
way, or is there any alternative to inspect the state? We're aware of Flink's 
queryable state, while it seems not well supported for SQL generated operators.

$ ./sst_dump --file=../db/30.sst --command=scan --read_num=50
options.env is 0xba33e0
Process ../db/30.sst
Sst file format: block-based
from [] to []
'� =>
'� =>
'� =>
'� =>
'� =>
'� =>
'� =>
'� =>
'� =>
'� =>
'� =>

--
Best wishes,
- Kai


[ANNOUNCE] Apache Flink 1.13.2 released

2021-08-05 Thread Yun Tang

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.13.2, which is the second bugfix release for the Apache Flink 1.13 
series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/news/2021/08/06/release-1.13.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Yun Tang


Re: Best Practice of Using HashSet State

2021-08-05 Thread Yun Tang
Hi Jerome,

The type of value, list and map means that the structure of value to the 
primary key. I am not sure what the set structure you mean here, if you want to 
let the value as a set, and you can just leverage map state. As you might know, 
java actually use HashMap to implement the HashSet.

Best
Yun Tang

From: Jerome Li 
Sent: Friday, August 6, 2021 7:57
To: user@flink.apache.org 
Subject: Best Practice of Using HashSet State


Hi,



I am new to Flink and state backend. I find Flink does provide ValueState, 
ListState, and MapState. But it does not provide State object for HashSet. What 
is the best practice of storing HashSet State in Flink? Should we use 
ValueState and set the value to be HashSet? Or should we use ListState and 
implement a wrapper class for serializing and desterilizing the HashSet to List?



Any help would be appreciated!



Best,

Jerome


Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Yun Tang
Hi Sandeep,

If you set the flink-statebackend-rocksdb as provided scope, it should not 
include the org.rocksdb classes, have you ever checked your application jar 
package directly just as what I described?


Best
Yun Tang

From: Sandeep khanzode 
Sent: Friday, August 6, 2021 2:04
To: Stephan Ewen 
Cc: user ; Yun Tang 
Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading

Hello Stephan, Yun,

Thanks for your insights.

All I have added is this:


org.apache.flink
flink-statebackend-rocksdb_2.12
${flink.version}
provided


No other library explicitly added. I am assuming, as mentioned, is that the 
flink-dist.jar already contains the relevant classes and the App or parent 
class loader loads the Rocks DB classes. All other Flink dependencies are 
packaged as Maven - Provided.

Moving to parent-first gives the Spring Framework serialisation issues … I will 
take a look at that …

I thought it would be simpler to simply specify Bloom Filters as an option …

Maybe, I will have to remove Spring dependency …


Thanks,
Sandip



On 05-Aug-2021, at 5:55 PM, Stephan Ewen 
mailto:se...@apache.org>> wrote:

@Yun Tang

Our FRocksDB has the same java package names (org.rocksdb.). Adding 
'org.rocksdb' to parent-first patterns ensures it will be loaded only once, and 
not accidentally multiple times (as Child-first classloading does).

The RocksDB code here is a bit like Flink internal components, which we always 
force parent-first (everything that starts with "org.apache.fink.").

To use RocksDB from the application jar, I think you would need to remove the 
RocksDB state backend from the classpath (lib folder), or you get exactly the 
error reported above.

I cannot think of a downside to add RocksDB to the parent first patterns.

On Thu, Aug 5, 2021 at 10:04 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Stephan,

Since we use our own FRocksDB instead of the original RocksDB as dependency, I 
am not sure whether this problem has relationship with this. From my knowledge, 
more customers would include Flink classes within the application jar package, 
and it might cause problems if the client has different flink version with 
servers.


Best,
Yun Tang

From: Stephan Ewen mailto:se...@apache.org>>
Sent: Wednesday, August 4, 2021 19:10
To: Yun Tang mailto:myas...@live.com>>
Cc: Sandeep khanzode mailto:sand...@shiftright.ai>>; 
user mailto:user@flink.apache.org>>
Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading

@Yun Tang Does it make sense to add RocksDB to the "always parent-first 
options" to avoid these kind of errors when users package apps incorrectly?
My feeling is that these packaging errors occur very frequently.


On Wed, Aug 4, 2021 at 10:41 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Sandeep,

Did you include the RocksDB classes in the application jar package? You can 
unpark your jar package to check whether them existed.
If so, since RocksDB classes are already included in the flink-dist package, 
you don't need to include them in your jar package (maybe you explicitly added 
the dependency of org.rocksdb:rocksdbjni in your pom).

Best
Yun Tang

From: Sandeep khanzode mailto:sand...@shiftright.ai>>
Sent: Wednesday, August 4, 2021 11:54
To: user mailto:user@flink.apache.org>>
Subject: Bloom Filter - RocksDB - LinkageError Classloading

Hello,

I tried to add the bloom filter functionality as mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html


 rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {

public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
return currentOptions.setMaxOpenFiles(1024);
}

public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
BloomFilter bloomFilter = new BloomFilter();
handlesToClose.add(bloomFilter);

return currentOptions
.setTableFormatConfig(
new 
BlockBasedTableConfig().setFilter(bloomFilter));
}
 });

This is in the main class where we setup in the StreamExecutionEnvironment …

I get ClassLoading errors due to that ...

Caused by: java.lang.LinkageError: loader constraint violation: loader 
org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
module of loader 'app')


What is documented is to change the order to parent-first in the 
flink-co

Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Yun Tang
Hi Stephan,

Since we use our own FRocksDB instead of the original RocksDB as dependency, I 
am not sure whether this problem has relationship with this. From my knowledge, 
more customers would include Flink classes within the application jar package, 
and it might cause problems if the client has different flink version with 
servers.


Best,
Yun Tang

From: Stephan Ewen 
Sent: Wednesday, August 4, 2021 19:10
To: Yun Tang 
Cc: Sandeep khanzode ; user 
Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading

@Yun Tang Does it make sense to add RocksDB to the "always parent-first 
options" to avoid these kind of errors when users package apps incorrectly?
My feeling is that these packaging errors occur very frequently.


On Wed, Aug 4, 2021 at 10:41 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Sandeep,

Did you include the RocksDB classes in the application jar package? You can 
unpark your jar package to check whether them existed.
If so, since RocksDB classes are already included in the flink-dist package, 
you don't need to include them in your jar package (maybe you explicitly added 
the dependency of org.rocksdb:rocksdbjni in your pom).

Best
Yun Tang

From: Sandeep khanzode mailto:sand...@shiftright.ai>>
Sent: Wednesday, August 4, 2021 11:54
To: user mailto:user@flink.apache.org>>
Subject: Bloom Filter - RocksDB - LinkageError Classloading

Hello,

I tried to add the bloom filter functionality as mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html


 rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {

public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
return currentOptions.setMaxOpenFiles(1024);
}

public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
BloomFilter bloomFilter = new BloomFilter();
handlesToClose.add(bloomFilter);

return currentOptions
.setTableFormatConfig(
new 
BlockBasedTableConfig().setFilter(bloomFilter));
}
 });

This is in the main class where we setup in the StreamExecutionEnvironment …

I get ClassLoading errors due to that ...

Caused by: java.lang.LinkageError: loader constraint violation: loader 
org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
module of loader 'app')


What is documented is to change the order to parent-first in the 
flink-conf.yaml … but then I get different issues for the basic/core Spring 
Framework classes not being serializable …

Any help will be appreciated.

Thanks,
Sandip


Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-05 Thread Yun Tang
Hi Piotr,


  1.  Can we push for better benchmark coverage in the RocksDB project in the 
future?
  2.  Sure, I think we could contribute what we did in flink-benchmarks to 
improve their JMH benchmark [1]. And I will ask them how often will they run 
the benchmark.

  1.  Can we try to catch this kind of problems with RocksDB earlier? For 
example with more frequent RocksDB upgrades, or building test flink builds with 
the most recent RocksDB version to run our benchmarks and validate newer 
RocksDB versions?
  2.  I think this advice could make sense. Apart from releasing our own 
FRocksDB every time, which would take several days to build and distribute 
across maven repos. I prefer to use pre-built official RocksDB jar package and 
we need to maintain a Flink branch which does not contain the FRocksDB TTL 
feature so that the official RocksDB jar package could run directly.

[1] https://github.com/facebook/rocksdb/tree/master/java/jmh

Best,
Yun Tang

From: Piotr Nowojski 
Sent: Thursday, August 5, 2021 2:01
To: Yuval Itzchakov 
Cc: Yun Tang ; Nico Kruber ; 
user@flink.apache.org ; dev 
Subject: Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

Thanks for the detailed explanation Yun Tang and clearly all of the effort you 
have put into it. Based on what was described here I would also vote for going 
forward with the upgrade.

It's a pity that this regression wasn't caught in the RocksDB community. I 
would have two questions/ideas:
1. Can we push for better benchmark coverage in the RocksDB project in the 
future?
2. Can we try to catch this kind of problems with RocksDB earlier? For example 
with more frequent RocksDB upgrades, or building test flink builds with the 
most recent RocksDB version to run our benchmarks and validate newer RocksDB 
versions?

Best,
Piotrek

śr., 4 sie 2021 o 19:59 Yuval Itzchakov 
mailto:yuva...@gmail.com>> napisał(a):
Hi Yun,
Thank you for the elaborate explanation and even more so for the super hard 
work that you're doing digging into RocksDB and chasing after hundreds of 
commits in order to fix them so we can all benefit.

I can say for myself that optimizing towards memory is more important ATM for 
us, and I'm totally +1 for this.

On Wed, Aug 4, 2021 at 8:50 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Yuval,

Upgrading RocksDB version is a long story since Flink-1.10.
When we first plan to introduce write buffer manager to help control the memory 
usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from current 
RocksDB-5.17. However, we found performance regression in our micro benchmark 
on state operations [1] if bumped to RocksDB-5.18. We did not figure the root 
cause at that time and decide to cherry pick the commits of write buffer 
manager to our own FRocksDB [2]. And we finally released our own 
frocksdbjni-5.17.2-artisans-2.0 at that time.

As time goes no, more and more bugs or missed features have been reported in 
the old RocksDB version. Such as:

  1.  Cannot support ARM platform [3]
  2.  Dose not have stable deleteRange API, which is useful for Flink scale out 
[4]
  3.  Cannot support strict block cache [5]
  4.  Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
  5.  Uncontrolled log size make us disabled the RocksDB internal LOG [7]
  6.  RocksDB's optimizeForPointLookup option might cause data lost [8]
  7.  Current dummy entry used for memory control in RocksDB-5.17 is too large, 
leading performance problem [9]
  8.  Cannot support alpine-based images.
  9.  ...

Some of the bugs are walked around, and some are still open.

And we decide to make some changes from Flink-1.12. First of all, we reported 
the performance regression problem compared with RocksDB-5.18 and RocksDB-5.17 
to RocksDB community [10]. However, as RocksDB-5.x versions are a bit older for 
the community, and RocksJava usage might not be the core part for facebook 
guys, we did not get useful replies. Thus, we decide to figure out the root 
cause of performance regression by ourself.
Fortunately, we find the cause via binary search the commits among RocksDB-5.18 
and RocksDB-5.17, and updated in the original thread [10]. To be short, the 
performance regression is due to different implementation of `__thread` and 
`thread_local` in gcc and would have more impact on dynamic loading [11], which 
is also what current RocksJava jar package does. With my patch [12], the 
performance regression would disappear if comparing RocksDB-5.18 with 
RocksDB-5.17.

Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to 
RocksDB-6.x. However, another performance regression appeared even with my 
patch [12]. With previous knowledge, we know that we must verify the built .so 
files with our java-based benchmark instead of using RocksDB built-in db-bench. 
I started to search the 1340+ commits from RocksDB-5.18 to RocksDB-6.11 to find 
the performance problem. H

Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-04 Thread Yun Tang
Hi Yuval,

Upgrading RocksDB version is a long story since Flink-1.10.
When we first plan to introduce write buffer manager to help control the memory 
usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from current 
RocksDB-5.17. However, we found performance regression in our micro benchmark 
on state operations [1] if bumped to RocksDB-5.18. We did not figure the root 
cause at that time and decide to cherry pick the commits of write buffer 
manager to our own FRocksDB [2]. And we finally released our own 
frocksdbjni-5.17.2-artisans-2.0 at that time.

As time goes no, more and more bugs or missed features have been reported in 
the old RocksDB version. Such as:

  1.  Cannot support ARM platform [3]
  2.  Dose not have stable deleteRange API, which is useful for Flink scale out 
[4]
  3.  Cannot support strict block cache [5]
  4.  Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
  5.  Uncontrolled log size make us disabled the RocksDB internal LOG [7]
  6.  RocksDB's optimizeForPointLookup option might cause data lost [8]
  7.  Current dummy entry used for memory control in RocksDB-5.17 is too large, 
leading performance problem [9]
  8.  Cannot support alpine-based images.
  9.  ...

Some of the bugs are walked around, and some are still open.

And we decide to make some changes from Flink-1.12. First of all, we reported 
the performance regression problem compared with RocksDB-5.18 and RocksDB-5.17 
to RocksDB community [10]. However, as RocksDB-5.x versions are a bit older for 
the community, and RocksJava usage might not be the core part for facebook 
guys, we did not get useful replies. Thus, we decide to figure out the root 
cause of performance regression by ourself.
Fortunately, we find the cause via binary search the commits among RocksDB-5.18 
and RocksDB-5.17, and updated in the original thread [10]. To be short, the 
performance regression is due to different implementation of `__thread` and 
`thread_local` in gcc and would have more impact on dynamic loading [11], which 
is also what current RocksJava jar package does. With my patch [12], the 
performance regression would disappear if comparing RocksDB-5.18 with 
RocksDB-5.17.

Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to 
RocksDB-6.x. However, another performance regression appeared even with my 
patch [12]. With previous knowledge, we know that we must verify the built .so 
files with our java-based benchmark instead of using RocksDB built-in db-bench. 
I started to search the 1340+ commits from RocksDB-5.18 to RocksDB-6.11 to find 
the performance problem. However, I did not figure out the root cause after 
spending several weeks this time. The performance behaves up and down in those 
commits and I cannot get the commit which lead the performance regression. Take 
this commit of integrating block cache tracer in block-based table reader [13] 
for example, I noticed that this commit would cause a bit performance 
regression and that might be the useless usage accounting in operations, 
however, the problematic code was changed in later commits. Thus, after several 
weeks digging, I have to give up for the endless searching in the thousand 
commits temporarily. As RocksDB community seems not make the project management 
system public, unlike Apache's open JIRA systems, we do not know what benchmark 
they actually run before releasing each version to guarantee the performance.

With my patch [10] on latest RocksDB-6.20.3, we could get the results on 
nexmark in the original thread sent by Stephan, and we can see the performance 
behaves closely in many real-world cases. And we also hope new features, such 
as direct buffer supporting [14] in RocksJava could help improve RocksDB's 
performance in the future.

Hope this could explain what we already did.


[1] https://github.com/apache/flink-benchmarks
[2] https://github.com/ververica/frocksdb/tree/FRocksDB-5.17.2
[3] https://issues.apache.org/jira/browse/FLINK-13598
[4] https://issues.apache.org/jira/browse/FLINK-21321
[5] https://github.com/facebook/rocksdb/issues/6247
[6] https://issues.apache.org/jira/browse/FLINK-21726
[7] https://issues.apache.org/jira/browse/FLINK-15068
[8] https://issues.apache.org/jira/browse/FLINK-17800
[9] https://github.com/facebook/rocksdb/pull/5175
[10] https://github.com/facebook/rocksdb/issues/5774
[11] http://david-grs.github.io/tls_performance_overhead_cost_linux/
[12] https://github.com/ververica/frocksdb/pull/19
[13] https://github.com/facebook/rocksdb/pull/5441/
[14] https://github.com/facebook/rocksdb/pull/2283


Best,
Yun Tang


On Wed, Aug 4, 2021 at 2:36 PM Yuval Itzchakov 
mailto:yuva...@gmail.com>> wrote:
We are heavy users of RocksDB and have had several issues with memory managed 
in Kubernetes, most of them actually went away when we upgraded from Flink 1.9 
to 1.13.

Do we know why there's such a huge performance regression? Can we improve this 
somehow w

Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-04 Thread Yun Tang
Hi Sandeep,

Did you include the RocksDB classes in the application jar package? You can 
unpark your jar package to check whether them existed.
If so, since RocksDB classes are already included in the flink-dist package, 
you don't need to include them in your jar package (maybe you explicitly added 
the dependency of org.rocksdb:rocksdbjni in your pom).

Best
Yun Tang

From: Sandeep khanzode 
Sent: Wednesday, August 4, 2021 11:54
To: user 
Subject: Bloom Filter - RocksDB - LinkageError Classloading

Hello,

I tried to add the bloom filter functionality as mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html


 rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {

public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
return currentOptions.setMaxOpenFiles(1024);
}

public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
BloomFilter bloomFilter = new BloomFilter();
handlesToClose.add(bloomFilter);

return currentOptions
.setTableFormatConfig(
new 
BlockBasedTableConfig().setFilter(bloomFilter));
}
 });

This is in the main class where we setup in the StreamExecutionEnvironment …

I get ClassLoading errors due to that ...

Caused by: java.lang.LinkageError: loader constraint violation: loader 
org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
module of loader 'app')


What is documented is to change the order to parent-first in the 
flink-conf.yaml … but then I get different issues for the basic/core Spring 
Framework classes not being serializable …

Any help will be appreciated.

Thanks,
Sandip


Re: Flink k8 HA mode + checkpoint management

2021-08-03 Thread Yun Tang
Hi Harsh,

The job id would be fixed as  if using HA mode 
with native k8s, which means the checkpoint path should stay the same no matter 
how many times you submit.
However, if HA mode is enabled, the new job would first recover from the HA 
checkpoint store to recover the last checkpoint. In other words, your new job 
should recover from last checkpoint-1. From your exceptions, we can see the job 
did not recover successfully and start the job from scratch. That's why you 
could meet the exception that checkpoint-meta file has been existed.

There would be two reasons for this:

  1.  The HA checkpoint store did not recover successfully, you could check 
whether the checkpoint 1 is completed in the previous run.
  2.  The last checkpoint-1 finished to store on the remote checkpoint path but 
fail to add to the checkpoint store. However, the checkpoint coordinator would 
clean up the checkpoint meta if failed to add to checkpoint store [1] unless 
your job crashed or meet the PossibleInconsistentStateException [2].

I think you should check the jobmanager log of your last run to know the root 
cause.

[1] 
https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1233
[2] 
https://github.com/apache/flink/blob/46bf6d68ee97684949ba3ad38dc18ff7c800092a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1226


Best
Yun Tang

From: Manong Karl 
Sent: Wednesday, August 4, 2021 9:17
To: Harsh Shah 
Cc: user@flink.apache.org 
Subject: Re: Flink k8 HA mode + checkpoint management

Can You please share your configs? I'm using native kubernetes without HA and 
there's no issues. I'm curious how this happens.  AFAIK jobid is generated 
randomly.


Harsh Shah mailto:harsh.a.s...@shopify.com>> 
于2021年8月4日周三 上午2:44写道:
Hello,

I am trying to use Flink HA mode inside 
kubernetes<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/>
 in 
standalone<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#application-mode>
 mode. The Job ID is always constant, "". In 
situation where we restart the job (Not from a check-point or savepoint), we 
see errors like
"""

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
'/flink-checkpoints//chk-1/_metadata' 
already exists

"""
where checkpoints have not been created since the restart of Job .

My question:
* Is the recommended way to set a new unique "checkpoint path" every time we 
update Job and restart necessary k8 resources (say not restarted from 
checkpoint-savepoint)? Or GC checkpoints during deletion and reload from 
savepoint if required? Looking for a standard recommendation.
* Is there a way I can override the JobID to be unique and indicate it is a 
complete restart in HA mode?


Thanks,
Harsh


Re: as-variable configuration for state ac

2021-07-27 Thread Yun Tang
Hi Mason,

I think this request is reasonable and you could create a JIRA ticket so that 
we could resolve it later.


Best,
Yun Tang

From: Mason Chen 
Sent: Tuesday, July 27, 2021 15:15
To: Yun Tang 
Cc: Mason Chen ; user@flink.apache.org 

Subject: Re: as-variable configuration for state ac

Yup, your understand is correct—that was the analogy I was trying to make!

On Jul 26, 2021, at 7:57 PM, Yun Tang 
mailto:myas...@live.com>> wrote:

Hi Mason,

In rocksDB, one state is corresponding to a column family and we could 
aggregate all RocksDB native metrics per column family. If my understanding is 
right, are you hoping that all state latency metrics for a particular state 
could be aggregated per state level?


Best
Yun Tang

From: Mason Chen mailto:mas.chen6...@gmail.com>>
Sent: Tuesday, July 27, 2021 4:24
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: as-variable configuration for state ac

We have been using the state backend latency tracking metrics from Flink 1.13. 
To make metrics aggregation easier, could there be a config to expose something 
like `state.backend.rocksdb.metrics.column-family-as-variable` that rocksdb 
provides to do aggregation across column families.

In this case, it would be the various components of state.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable



Re: as-variable configuration for state ac

2021-07-26 Thread Yun Tang
Hi Mason,

In rocksDB, one state is corresponding to a column family and we could 
aggregate all RocksDB native metrics per column family. If my understanding is 
right, are you hoping that all state latency metrics for a particular state 
could be aggregated per state level?


Best
Yun Tang

From: Mason Chen 
Sent: Tuesday, July 27, 2021 4:24
To: user@flink.apache.org 
Subject: as-variable configuration for state ac

We have been using the state backend latency tracking metrics from Flink 1.13. 
To make metrics aggregation easier, could there be a config to expose something 
like `state.backend.rocksdb.metrics.column-family-as-variable` that rocksdb 
provides to do aggregation across column families.

In this case, it would be the various components of state.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable


Re: Exception in snapshotState suppresses subsequent checkpoints

2021-07-01 Thread Yun Tang
Hi Tao,

I run your program with Flink-1.12.1 and found the problem you described really 
existed. And things would go normal if switching to Flink-1.12.2 version.

After dig into the root cause, I found this is caused by a fixed bug [1]: If a 
legacy source task fails outside of the legacy thread, the legacy thread blocks 
proper cancellation (completion future never completed). As you throw the NPE 
within the source operator, it will never exit and cannot handle subsequent 
checkpoint requirements then. That's why you see all subsequent checkpoints 
cannot finish.


[1] 
https://github.com/apache/flink/commit/b332ce40d88be84d9cf896f446c7c6e26dbc8b6a#diff-54e9ce3b15d6badcc9376ab144df066eb46c4e516d6ee31ef8eb38e2d4359042

Best
Yun Tang

From: Matthias Pohl 
Sent: Thursday, July 1, 2021 16:41
To: tao xiao 
Cc: Yun Tang ; user ; Roman 
Khachatryan 
Subject: Re: Exception in snapshotState suppresses subsequent checkpoints

Hi Tao,
it looks like it should work considering that you have a sleep of 1 second 
before each emission. I'm going to add Roman to this thread. Maybe, he has sees 
something obvious which I'm missing.
Could you run the job with the log level set to debug and provide the logs once 
more? Additionally, having both the TaskManager's and the JobManager's logs 
available would help in understanding what's going on.

Best,
Matthias

On Wed, Jun 30, 2021 at 6:14 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi team,

Does anyone have a clue?

On Mon, Jun 28, 2021 at 3:27 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
My job is very simple as you can see from the code I pasted. I simply print out 
the number to stdout. If you look at the log the number continued to print out 
after checkpoint 1 which indicated no back pressure was happening.  It is very 
easy to reproduce this if you run the code I provided in IDE


LOG

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator 
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: 
Checkpoint was declined. 
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out 
(1/1)#0. Failure reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-27 Thread Yun Tang
Hi Tao,

I'm afraid that your Flink job continues to be in high backpressued and all 
subsequent checkpoints did not ever run 'FromElementsFunctionT#snapshotState' 
which means your code to throw exception never be executed. You could check 
those expired checkpoints to see whether your tasks containing 
'FromElementsFunctionT' has ever been completed.

Best
Yun Tang

From: tao xiao 
Sent: Saturday, June 26, 2021 16:40
To: user 
Subject: Re: Exception in snapshotState suppresses subsequent checkpoints

Btw here is the checkpoint related log

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator 
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: 
Checkpoint was declined. 
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out 
(1/1)#0. Failure reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) 
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
Caused by: org.apache.flink.util.SerializedThrowable: npe
at 
com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
 ~[classes/:?]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
 ~[flink-streaming-

Re: Metric for JVM Overhaed

2021-06-25 Thread Yun Tang
Hi Pranjul,

Currently, Flink only have the metrics shown in taskmanager UI to tell the 
capacity of JMV overhead. However, Flink cannot detect how much overhead memory 
has been occupied as those memory footprints might be asked by the third-party 
library via OS malloc directly instead of via JVM.

Some tools provided by memory allocator such jemalloc or tcmalloc, could help 
find how much the memory usage via OS malloc. Even though, there still exists 
some memory used by mmap or on local stack, which is not so easy to detect.

Best
Yun Tang

From: Guowei Ma 
Sent: Friday, June 25, 2021 15:22
To: Pranjul Ahuja 
Cc: user 
Subject: Re: Metric for JVM Overhaed

Hi Pranjul
There are already some system metrics that track the jvm 
status(CPU/Memory/Threads/GC). You could find them in the [1]

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#system-metrics
Best,
Guowei


On Fri, Jun 25, 2021 at 2:33 PM Pranjul Ahuja 
mailto:ahuja0...@gmail.com>> wrote:
Hi,

Is there any metric to track the task manager JVM overhead? Or is it the case 
that it is already included in the metric Status.JVM.Memory.NonHeap?

Thanks,
Pranjul


Re: High Flink checkpoint Size

2021-06-23 Thread Yun Tang
Hi Vijay,

To be honest, an 18MB checkpoint size in total might not be something serious. 
If you really want to dig what inside, you could use 
Checkpoints#loadCheckpointMetadata [1] to load the _metadata to see anything 
unexpected.

And you could refer to FlinkKafkaConsumerBase#unionOffsetStates [2] and 
FlinkKinesisConsumer#sequenceNumsToRestore to compare different operator state 
stored in kafka and kinesis connector.

[1] 
https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99
[2] 
https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L201
[3] 
https://github.com/apache/flink/blob/10146366bec7feca85acedb23184b99517059bc6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L158-L159

Best,
Yun Tang

From: Vijayendra Yadav 
Sent: Wednesday, June 23, 2021 11:02
To: user 
Subject: High Flink checkpoint Size

Hi Team,

I have two flink Streaming Jobs
1) Flink streaming from KAFKA and writing to s3
2) Fling Streaming from KINESIS (KDS) and writing to s3

Both Jobs have similar checkpoint duration.

Job #1 (KAFKA) checkpoint size is only 85KB
Job #2 (KINESIS) checkpoint size is 18MB

There are no checkpoint failures. But I want to understand why Kinesis 
streaming has such a huge checkpoint size, is there a way to handle it 
differently? and reduce the size.

Thanks,
Vijay


Re: PoJo to Avro Serialization throw KryoException: java.lang.UnsupportedOperationException

2021-06-22 Thread Yun Tang
Hi Rommel,

I wonder why avro type would use kryo as its serializer to serialize, could you 
check what kind of type information could get via TypeInformation.of(class) [1]


[1] 
https://github.com/apache/flink/blob/cc3f85eb4cd3e5031a84321e62d01b3009a00577/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java#L208


Best
Yun Tang

From: Rommel Holmes 
Sent: Wednesday, June 23, 2021 13:43
To: user 
Subject: PoJo to Avro Serialization throw KryoException: 
java.lang.UnsupportedOperationException

My Unit test was running OK under Flink 1.11.2 with parquet-avro 1.10.0, once I 
upgrade to 1.12.0 with parquet-avro 1.12.0, my unit test will throw

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
 ~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
...

aused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) 
~[?:1.8.0_282]
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
 ~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) 
~[kryo-2.24.0.jar:?]
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
~[kryo-2.24.0.jar:?]
... 27 more
My Unit test code snippet is something like below:

private ImmutableList testData = ImmutableList.of(
PoJo.build("123", "0.0.0.0", null),
PoJo.build("123", "0.0.0.1", 2L)
);

DataStream input = env
.addSource(new TestSource(testData), 
PojoTypeInfo.of(PoJo.class))
.assignTimestampsAndWatermarks(watermarkStrategy);

DataStream output = input
.map(TestClass::convertPoJoToGenericRecord)
.returns(new GenericRecordAvroTypeInfo(PoJo.getAvroSchema()));

output.addSink();

The function is something like

GenericRecord convertPoJoToGenericRecord(PoJo pojo) throws Exception {
Schema schema = PoJo.getAvroSchema();
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
for (Schema.Field field : schema.getFields()) {
builder.set(field.name<http://field.name>(), 
TestClass.getObjectField(field, pojo));
}
GenericRecord record = builder.build();
return record;
}

Can anyone help on this?

Thank you


Re: How to set state.backend.rocksdb.latency-track-enabled

2021-06-18 Thread Yun Tang
Hi Chen-Che,

The PR-16177 [1] is the documentation for state access latency tracking, 
thought it has not been merged, you could still refer it for more details.

[1] https://github.com/apache/flink/pull/16177


Best
Yun Tang


From: Chen-Che Huang 
Sent: Friday, June 18, 2021 16:21
To: user@flink.apache.org 
Subject: Re: How to set state.backend.rocksdb.latency-track-enabled

Hi Yangze,

Got it. I'll evaluate to enable this feature and see whether I can gain some 
insights. Many thanks for your reply.

On 2021/06/18 07:52:53, Yangze Guo  wrote:
> Hi, Chen-Che,
>
> IIUC, the "state.backend.rocksdb.latency-track-enabled" is just a
> reject alternative and has been incorrectly written to the release
> note. You can refer to the [1] instead.
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backends-latency-tracking-options
>
> Best,
> Yangze Guo
>
> On Fri, Jun 18, 2021 at 3:39 PM Chen-Che Huang  wrote:
> >
> > Hi,
> >
> > The 1.13 release note 
> > (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) mentions 
> > that we can set state.backend.rocksdb.latency-track-enabled to obtain some 
> > rockdb metrics with a marginal impact. However, I couldn't see 
> > state.backend.rocksdb.latency-track-enabled in 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/.
> >  Based on this PR (https://github.com/apache/flink/pull/15091), it seems 
> > that state.backend.rocksdb.latency-track-enabled is related to 
> > state.backend.latency-track.keyed-state-enable? Or which option I should 
> > set for metrics with a marginal impact. Thanks.
> >
> > Best wishes,
> > Chen-Che Huang
>


Re: RocksDB CPU resource usage

2021-06-17 Thread Yun Tang
Hi Padarn,

>From my experiences, de-/serialization might not consume 3x CPU usage, and the 
>background compaction could also increase the CPU usage. You could use 
>async-profiler [1] to figure out what really consumed your CPU usage as it 
>could also detect the native RocksDB thread stack.


[1] https://github.com/jvm-profiling-tools/async-profiler

Best
Yun Tang


From: Robert Metzger 
Sent: Thursday, June 17, 2021 14:11
To: Padarn Wilson 
Cc: JING ZHANG ; user 
Subject: Re: RocksDB CPU resource usage

If you are able to execute your job locally as well (with enough data), you can 
also run it with a profiler and see the CPU cycles spent on serialization (you 
can also use RocksDB locally)

On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson 
mailto:pad...@gmail.com>> wrote:
Thanks Robert. I think it would be easy enough to test this hypothesis by 
making the same comparison with some simpler state inside the aggregation 
window.

On Wed, 16 Jun 2021, 7:58 pm Robert Metzger, 
mailto:rmetz...@apache.org>> wrote:
Depending on the datatypes you are using, seeing 3x more CPU usage seems 
realistic.
Serialization can be quite expensive. See also: 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html 
Maybe it makes sense to optimize there a bit.

On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG 
mailto:beyond1...@gmail.com>> wrote:
Hi Padarn,
After switch stateBackend from filesystem to rocksdb, all reads/writes from/to 
backend have to go through de-/serialization to retrieve/store the state 
objects, this may cause more cpu cost.
But I'm not sure it is the main reason leads to 3x CPU cost in your job.
To find out the reason, we need more profile on CPU cost, such as Flame Graphs. 
BTW, starting with Flink 1.13, Flame Graphs are natively supported in Flink[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/

Best,
JING ZHANG

Padarn Wilson mailto:pad...@gmail.com>> 于2021年6月15日周二 
下午5:05写道:
Hi all,

We have a job that we just enabled rocksdb on (instead of file backend), and 
see that the CPU usage is almost 3x greater on (we had to increase taskmanagers 
3x to get it to run.

I don't really understand this, is there something we can look at to understand 
why CPU use is so high? Our state mostly consists of aggregation windows.

Cheers,
Padarn


Re: Discard checkpoint files through a single recursive call

2021-06-15 Thread Yun Tang
Hi Jiang,

Please take a look at FLINK-17860 and FLINK-13856 for previous discussion of 
this problem.

[1] https://issues.apache.org/jira/browse/FLINK-17860
[2] https://issues.apache.org/jira/browse/FLINK-13856

Best
Yun Tang


From: Guowei Ma 
Sent: Wednesday, June 16, 2021 8:40
To: Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Discard checkpoint files through a single recursive call

hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you 
want to change it? For example, which interface or class do you want to add a 
method to?
Although I am not a state expert, as far as I know, due to incremental 
checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the 
discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello Flink!

We are building an infrastructure where we implement our own 
CompletedCheckpointStore. The read and write to the external storage location 
of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job 
writes out a very high number of checkpoint files per checkpoint. (In our case 
we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in 
CompletedCheckpoint discards the state files one by one, we are seeing a very 
high number of remote calls. Sometimes the deletion fails to catch up with the 
checkpoint progress.

Given the interface we are given to configure the external storage location for 
checkpoints is always a `target directory`. Would it be reasonable to expose an 
implementation of discard() that directly calls disposeStorageLocation with 
recursive set to true, without iterating over each individual files first? Is 
there any blockers for that?

Thank you!


links
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70


Re: Question about State TTL and Interval Join

2021-06-06 Thread Yun Tang
Hi Chris,

Interval Join should clean state which is not joined during interval and you 
don't need to set state TTL. (Actually, the states used in interval join are 
not exposed out and you cannot set TTL for those state as TTL is only public 
for user self-described states.)

The checkpoint size continues to increase does not mean your actual state also 
increases. RocksDB actually write a deleter when remove element and those 
useless data would be cleared physically after compaction. You could judge 
whether state really grows up by using non-incremental checkpoints to see how 
much state size will be.

Moreover, the OOM should not be related to RocksDB as it used off-heap native 
memory, and you might need some work to dig what occupied the JVM memory during 
checkpoints.

Best
Yun Tang

From: McBride, Chris 
Sent: Saturday, June 5, 2021 3:17
To: user@flink.apache.org 
Subject: Question about State TTL and Interval Join


We currently have a flink 1.8 application deployed on Kinesis Data Analytics 
using the RocksDB State backend. Our application is joining across 3 different 
kinesis streams using an interval join. We noticed that our checkpoint sizes 
continue to increase over time, we eventually have OOM failures writing 
checkpoints and need to restart the application without restoring from a 
savepoint.



Does this kind of application require a state TTL on the join operator? I 
assumed since it was an interval join, events that fell outside of the lower 
timebound would automatically be expired from the state. Is that a correct 
assumption?



Thanks,

Chris




Re: Dynamic configuration of Flink checkpoint interval

2021-05-30 Thread Yun Tang
Hi Kai,

I think unaligned checkpoint + alignment timeout [1] might also help you in 
this case. You could leverage unaligned checkpoint to help reduce the 
checkpoint duration.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-alignment-timeout

Best
Yun Tang


From: Senhong Liu 
Sent: Monday, May 31, 2021 10:33
To: JING ZHANG 
Cc: Kai Fu ; user 
Subject: Re: Dynamic configuration of Flink checkpoint interval

Hi all,

In fact, a pretty similar JIRA has been created, which is 
https://issues.apache.org/jira/browse/FLINK-18578 and I am working on it. In 
the near future, I will publish a FLIP and start a discussion about that. We 
look forward to your participation.

Best,
Senhong Liu

JING ZHANG mailto:beyond1...@gmail.com>> 于2021年5月31日周一 
上午10:21写道:
Hi Kai,

Happy to hear that.
Would you please paste the JIRA link in the email after you create it. Maybe it 
could help other users who encounter the same problem. Thanks very much.

Best regards,
JING ZHANG

Kai Fu mailto:zzfu...@gmail.com>> 于2021年5月30日周日 下午11:19写道:
Hi Jing,

Yup, what you're describing is what I want. I also tried the approach you 
suggested and it works. I'm going to take that approach for the moment and 
create a Jira issue for this feature.

On Sun, May 30, 2021 at 8:57 PM JING ZHANG 
mailto:beyond1...@gmail.com>> wrote:
Hi Kai,

Do you try to find a way to hot update checkpoint interval or disable/enable 
checkpoint without stop and restart job?
Unfortunately, it is not supported yet, AFAIK.
You're very welcome to create an issue and describe your needs here (Flink’s 
Jira<http://issues.apache.org/jira/browse/FLINK>) .
At present, you may would like to use the following temporary solution:
  1. set a bigger value as checkpoint interval, start your job
  2. do a savepoint after cold start is completed
  3. set a normal value as checkpoint interval, restart the job from savepoint

Best regards,
JING ZHANG

Kai Fu mailto:zzfu...@gmail.com>> 于2021年5月30日周日 下午7:13写道:
Hi team,

We want to know if Flink has some dynamic configuration of the checkpoint 
interval. Our use case has a cold start phase where the entire dataset is 
replayed from the beginning until the most recent ones.

In the cold start phase, the resources are fully utilized and the backpressure 
is high for all upstream operators, causing the checkpoint timeout constantly. 
The real production traffic is far less than that and the current provisioned 
resource is capable of handling it.

We're thinking if Flink can support the dynamic checkpoint config to bypass the 
checkpoint operation or make it less frequent on the cold start phase to speed 
up the process, while making the checkpoint normal again once the cold start is 
completed.

--
Best wishes,
- Kai


--
Best wishes,
- Kai


Re: Error restarting job from Savepoint

2021-05-30 Thread Yun Tang
Hi Ganti,

If you could ensure that newer class could keep backwards compatibility as 
previous class, you can try to set serialVesionUID explicitly of current class 
to -7317586767482317266.

If you want to avoid such issue later, you must set the serialVesionUID 
explicitly first if not using customized serializer for those classes. Another 
better solution is to ensure the class backwards compatibility with customized 
serializer or leverage apache avro.

You could refer to [1] for more details.

[1] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

Best
Yun Tang

From: Yashwant Ganti 
Sent: Thursday, May 27, 2021 1:14
To: user@flink.apache.org 
Subject: Error restarting job from Savepoint

Hello,

We are facing an error restarting a job from a savepoint. We believe it is 
because one of the common classes used across all of our jobs was changed but 
there was no serialVersionUID assigned to the class. There error we are facing 
is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101)
at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 11 more
Caused by: java.io<http://java.io>.InvalidClassException: 
com..**.***; local class incompatible: stream classdesc 
serialVersionUID = -7317586767482317266, local class serialVersionUID = 
-8797204481428423223
at java.base/java.io<http://java.io>.ObjectStreamClass.initNonProxy(Unknown 
Source)
at 
java.base/java.io<http://java.io>.ObjectInputStream.readNonProxyDesc(Unknown 
Source)
at 
java.base/java.io<http://java.io>.ObjectInputStream.readClassDesc(Unknown 
Source)
at 
java.base/java.io<http://java.io>.ObjectInputStream.readOrdinaryObject(Unknown 
Source)
at java.base/java.io<http://java.io>.ObjectInputStream.readObject0(Unknown 
Source)
at 
java.base/java.io<http://java.io>.ObjectInputStream.defaultReadFields(Unknown 
Source)
at 
java.base/java.io<http://java.io>.ObjectInputStream.readSerialData(Unknown 
Source)
at 
java.base/java.io<http://java.io>.ObjectInputStream.readOrdinaryObject(Unknown 
Source)
at java.base/java.io<http://java.io>.ObjectInputStream.readObject0(Unknown 
Source)
at java.base/java.io<http://java.io>.ObjectInputStream.readObject(Unknown 
Source)
at java.base/java.io<http://java.io>.ObjectInputStream.readObject(Unknown 
Source)
at 
org.ap

Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Yun Tang
Thanks for Dawid and Guowei's great work, and thanks for everyone involved for 
this release.

Best
Yun Tang

From: Xintong Song 
Sent: Thursday, May 6, 2021 12:08
To: user ; dev 
Subject: Re: [ANNOUNCE] Apache Flink 1.13.0 released

Thanks Dawid & Guowei as the release managers, and everyone who has
contributed to this release.


Thank you~

Xintong Song



On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:

> Thanks Dawid & Guowei for the great work, thanks everyone involved.
>
> Best,
> Leonard
>
> 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>
> Thanks for managing the release. +1. I like the focus on improving
> operations with this version.
>
> --
> *Von: *"Matthias Pohl" 
> *An: *"Etienne Chauchot" 
> *CC: *"dev" , "Dawid Wysakowicz" <
> dwysakow...@apache.org>, "user" ,
> annou...@apache.org
> *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Yes, thanks for managing the release, Dawid & Guowei! +1
>
> On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> wrote:
>
>> Congrats to everyone involved !
>>
>> Best
>>
>> Etienne
>> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Guowei & Dawid
>>
>>
>
>


Re: savepoint command in code

2021-05-05 Thread Yun Tang
Hi,

You could trigger savepoint via rest API [1] or refer to SavepointITCase[2] to 
see how to trigger savepoint in test code.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints
[2] 
https://github.com/apache/flink/blob/c688bf3c83e72155ccf5d04fe397b7c0a1274fd1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L438

Best
Yun Tang

From: Abdullah bin Omar 
Sent: Tuesday, May 4, 2021 11:50
To: user@flink.apache.org 
Subject: savepoint command in code

Hello,

I am trying to use the savepoint command (./bin/flink savepoint jobid) in the 
code instead of doing it manually in the terminal. The jobid can get using 
getjobid(). The problem is to define the path ./bin/flink  —  it can not be 
shown as a directory (probably because of a unix executable file).

Is there a way to define the path (./bin/flink) in the code? or, is there any 
function to get the savepoint from code instead of manual command?

Thank you




Re: Checkpoint error - "The job has failed"

2021-04-28 Thread Yun Tang
Hi Dan,

You could refer to the "Fix Versions" in FLINK-16753 [1] and know that this bug 
is resolved after 1.11.3 not 1.11.1.

[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Dan Hill 
Sent: Tuesday, April 27, 2021 7:50
To: Yun Tang 
Cc: Robert Metzger ; user 
Subject: Re: Checkpoint error - "The job has failed"

Hey Yun and Robert,

I'm using Flink v1.11.1.

Robert, I'll send you a separate email with the logs.

On Mon, Apr 26, 2021 at 12:46 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger mailto:rmetz...@apache.org>>
Sent: Monday, April 26, 2021 14:46
To: Dan Hill mailto:quietgol...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Yun Tang
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger 
Sent: Monday, April 26, 2021 14:46
To: Dan Hill 
Cc: user 
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Yun Tang
HI Kevin,

Currently, you can view logs to find when to start and finish to restore [1] to 
know how much time spent on task side. Flink-1.13 also try to expose stage of 
task initializations [2] and maybe it could help you.


state.backend.rocksdb.metrics.total-sst-files-size should be correct to 
describe the sst file size. We can have several reasons why the savepoint size 
larger than sst-files size:

  1.  SST files are compressed with snappy format by default while savepoint 
not.
  2.  SST files could save spaces due to same prefix key bytes.
  3.  Some contents are still in memory write buffer and not yet flushed.

However, the difference is really huge, have you ever logined machines having 
keyed state to see how much space occupried? And what's the incremental 
checkpoint size of your job, have you ever enabeld TTL for state?


[1] https://issues.apache.org/jira/browse/FLINK-19013
[2] https://issues.apache.org/jira/browse/FLINK-17012

Best
Yun Tang



From: Guowei Ma 
Sent: Thursday, April 1, 2021 11:57
To: Kevin Lam 
Cc: user ; Yun Tang 
Subject: Re: Measuring the Size of State, Savepoint Size vs. Restore time

Hi, Kevin

If you use the RocksDB and want to know the data on the disk I think that is 
the right metric. But the SST files might include some expired data. Some data 
in memory is not included in the SST files yet. In general I think it could 
reflect the state size of your application.

I think that there is no metric for the time that spends on restoring from a 
savepoint.

As for why there is a huge difference between the size of sst and the size of 
savepoint, I think @Yun can give some detailed insights.

Best,
Guowei


On Thu, Apr 1, 2021 at 1:38 AM Kevin Lam 
mailto:kevin@shopify.com>> wrote:
Hi all,

We're interested in doing some analysis on how the size of our savepoints and 
state affects the time it takes to restore from a savepoint. We're running 
Flink 1.12 and using RocksDB as a state backend, on Kubernetes.

What is the best way to measure the size of a Flink Application's state? Is 
state.backend.rocksdb.metrics.total-sst-files-size<https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-total-sst-files-size>
 the right thing to look at?

We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for all 
our operators, after restoring from a savepoint, and we noticed that the sum of 
all the sst files sizes is much much smaller than the total size of our 
savepoint (7GB vs 10TB).  Where does that discrepancy come from?

Do you have any general advice on correlating savepoint size with restore times?

Thanks in advance!


Re: State size increasing exponentially in Flink v1.9

2021-03-26 Thread Yun Tang
Hi,

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

CopyOnWriteStateMap should only exist in heap based state-backend.

Best
Yun Tang


From: Chesnay Schepler 
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius ; user@flink.apache.org 

Subject: Re: State size increasing exponentially in Flink v1.9

Could you show us how you interact with the map state (ideally the full code of 
your function that accesses the state)?

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!



I am using flink v1.9 with RocksDBStateBackend, but over time the state size is 
increasing exponentially.



I am using MapState in my project & seeing memory spike, after looking at heap 
dump I see duplicates in it.



I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)



Can anyone give me pointers to find more details on it.



Heap Dump pointed to 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811



Thanks,

Julius



Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-18 Thread Yun Tang
Hi Alexey,

Flink would only write once for checkpointed files. Could you try to write 
checkpointed files as block blob format and see whether the problem still 
existed?

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 13:54
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
How underlying storage explains fact that without re-scale I can restore from 
savepoint? Does Flink write file once or many times, if many times, then 
potentially could be problem with 50,000 blocks per blob limit, I'm I right? 
Should I try block blob with compaction like described in [1] or without 
compaction?

Thanks,
Alexey

From: Yun Tang 
Sent: Wednesday, March 17, 2021 9:31 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I am not familiar with azure blob storage and I cannot load the "_metadata" 
with your given file locally.

Currently, I highly suspect this strange rescaling behavior is related with 
your underlying storage, could you try to use block blob instead of page blob 
[1] to see whether this behavior still existed?

[1] 
https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Block_Blob_with_Compaction_Support_and_Configuration


Best
Yun Tang


From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 12:00
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
Azure web UI shows size of all files created by Flink as 128Mib * X (128, 256, 
640), see screenshot attached. In my understanding this is because Flink 
creates them as Page Blobs. In same storage other application creates files as 
block blobs and they have sizes not rounded on 128Mib


Thanks,
Alexey

____
From: Yun Tang 
Sent: Wednesday, March 17, 2021 8:38 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I tried to load your _metadata as checkpoint via 
Checkpoints#loadCheckpointMetadata [1] but found this file is actually not a 
savepoint meta, have you ever uploaded the correct files?
Moreover, I noticed that both size of 77e77928-cb26-4543-bd41-e785fcac49f0 and 
_metadata are 128MB which is much larger than its correct capacity, is this 
expected on azure blob storage or you just uploaded the wrong files?

[1] 
https://github.com/apache/flink/blob/956c0716fdbf20bf53305fe4d023fa2bea412595/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 0:45
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,

I've copied 77e77928-cb26-4543-bd41-e785fcac49f0 and _metadata to Google drive:
https://drive.google.com/drive/folders/1J3nwvQupLBT5ZaN_qEmc2y_-MgFz0cLb?usp=sharing

Compression was never enabled (docs says that RocksDB's incremental checkpoints 
always use snappy compression, not sure does it have effect on savepoint or not)

Thanks,
Alexey
________
From: Yun Tang 
Sent: Wednesday, March 17, 2021 12:33 AM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Thanks for your quick response. I have checked two different logs and still 
cannot understand why this could happen.

Take 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0"
 for example, the key group range offset has been intersected correctly during 
rescale for task "Intake voice calls (6/7)". The only place I could doubt is 
that azure blob storage did work as expected during seek offset [1].

Have you ever enabled snappy compression [2] [3] for savepoints?
Could you also share the file 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0
 " so that I could seek locally to see whether work as expected.
Moreover, could you also share savepoint meta data 
""wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/_metadata"
 ?


[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L211

Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-17 Thread Yun Tang
Hi Alexey,

I am not familiar with azure blob storage and I cannot load the "_metadata" 
with your given file locally.

Currently, I highly suspect this strange rescaling behavior is related with 
your underlying storage, could you try to use block blob instead of page blob 
[1] to see whether this behavior still existed?

[1] 
https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Block_Blob_with_Compaction_Support_and_Configuration


Best
Yun Tang


From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 12:00
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
Azure web UI shows size of all files created by Flink as 128Mib * X (128, 256, 
640), see screenshot attached. In my understanding this is because Flink 
creates them as Page Blobs. In same storage other application creates files as 
block blobs and they have sizes not rounded on 128Mib


Thanks,
Alexey

____
From: Yun Tang 
Sent: Wednesday, March 17, 2021 8:38 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I tried to load your _metadata as checkpoint via 
Checkpoints#loadCheckpointMetadata [1] but found this file is actually not a 
savepoint meta, have you ever uploaded the correct files?
Moreover, I noticed that both size of 77e77928-cb26-4543-bd41-e785fcac49f0 and 
_metadata are 128MB which is much larger than its correct capacity, is this 
expected on azure blob storage or you just uploaded the wrong files?

[1] 
https://github.com/apache/flink/blob/956c0716fdbf20bf53305fe4d023fa2bea412595/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 0:45
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,

I've copied 77e77928-cb26-4543-bd41-e785fcac49f0 and _metadata to Google drive:
https://drive.google.com/drive/folders/1J3nwvQupLBT5ZaN_qEmc2y_-MgFz0cLb?usp=sharing

Compression was never enabled (docs says that RocksDB's incremental checkpoints 
always use snappy compression, not sure does it have effect on savepoint or not)

Thanks,
Alexey
________
From: Yun Tang 
Sent: Wednesday, March 17, 2021 12:33 AM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Thanks for your quick response. I have checked two different logs and still 
cannot understand why this could happen.

Take 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0"
 for example, the key group range offset has been intersected correctly during 
rescale for task "Intake voice calls (6/7)". The only place I could doubt is 
that azure blob storage did work as expected during seek offset [1].

Have you ever enabled snappy compression [2] [3] for savepoints?
Could you also share the file 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0
 " so that I could seek locally to see whether work as expected.
Moreover, could you also share savepoint meta data 
""wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/_metadata"
 ?


[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L211
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
[3] 
https://ci.apache.org/projechttps://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-snapshot-compressions/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 14:25
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Attached.


From: Yun Tang 
Sent: Tuesday, March 16, 2021 11:13 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Thanks for your reply, could you also share logs during normal restoring just 
as I wrote in previous thread so that I could compare.

Best
Yun Tang
__

Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-17 Thread Yun Tang
Hi Alexey,

I tried to load your _metadata as checkpoint via 
Checkpoints#loadCheckpointMetadata [1] but found this file is actually not a 
savepoint meta, have you ever uploaded the correct files?
Moreover, I noticed that both size of 77e77928-cb26-4543-bd41-e785fcac49f0 and 
_metadata are 128MB which is much larger than its correct capacity, is this 
expected on azure blob storage or you just uploaded the wrong files?

[1] 
https://github.com/apache/flink/blob/956c0716fdbf20bf53305fe4d023fa2bea412595/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 0:45
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,

I've copied 77e77928-cb26-4543-bd41-e785fcac49f0 and _metadata to Google drive:
https://drive.google.com/drive/folders/1J3nwvQupLBT5ZaN_qEmc2y_-MgFz0cLb?usp=sharing

Compression was never enabled (docs says that RocksDB's incremental checkpoints 
always use snappy compression, not sure does it have effect on savepoint or not)

Thanks,
Alexey
____
From: Yun Tang 
Sent: Wednesday, March 17, 2021 12:33 AM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Thanks for your quick response. I have checked two different logs and still 
cannot understand why this could happen.

Take 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0"
 for example, the key group range offset has been intersected correctly during 
rescale for task "Intake voice calls (6/7)". The only place I could doubt is 
that azure blob storage did work as expected during seek offset [1].

Have you ever enabled snappy compression [2] [3] for savepoints?
Could you also share the file 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0
 " so that I could seek locally to see whether work as expected.
Moreover, could you also share savepoint meta data 
""wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/_metadata"
 ?


[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L211
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
[3] 
https://ci.apache.org/projechttps://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-snapshot-compressions/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 14:25
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Attached.


From: Yun Tang 
Sent: Tuesday, March 16, 2021 11:13 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Thanks for your reply, could you also share logs during normal restoring just 
as I wrote in previous thread so that I could compare.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 13:55
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
I'm attaching shorter version of log, looks like full version didn't come 
through

Thanks,
Alexey

From: Yun Tang 
Sent: Tuesday, March 16, 2021 8:05 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I believe your exception messages are printed from Flink-1.12.2 not 
Flink-1.12.1 due to the line number of method calling.

Could you share exception message of Flink-1.12.1 when rescaling? Moreover, I 
hope you could share more logs during restoring and rescaling. I want to see 
details of key group handle [1]

[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L153

Best

From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 15:10
To: Yun Tang ; Tzu-Li (Gordon) Tai 

Re: Saved State in FSstate Backend

2021-03-17 Thread Yun Tang
Hi

You could refer to [1] to know more details about checkpoint directory 
structure. If you are using FsStateBackend, all checkpointed data should be 
found under 'exclusive' folder, and nothing would exist if keyed state handle 
smaller than memory threshold [2] (checkpointed data would be sent to JM 
directly).


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-fs-memory-threshold

Best
Yun Tang

From: Abdullah bin Omar 
Sent: Wednesday, March 17, 2021 17:41
To: user@flink.apache.org 
Subject: Saved State in FSstate Backend

Hi,

I used the FSstate backend to save the state. I just got a folder named similar 
to JobID (attached image). Inside the folder, there are two more folders named 
by shared and task owned. However, there is nothing in those folders.

How can I see the saved state? or, where is the state saved?

Thank you!


Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-17 Thread Yun Tang
Hi Alexey,

Thanks for your quick response. I have checked two different logs and still 
cannot understand why this could happen.

Take 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0"
 for example, the key group range offset has been intersected correctly during 
rescale for task "Intake voice calls (6/7)". The only place I could doubt is 
that azure blob storage did work as expected during seek offset [1].

Have you ever enabled snappy compression [2] [3] for savepoints?
Could you also share the file 
"wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/77e77928-cb26-4543-bd41-e785fcac49f0
 " so that I could seek locally to see whether work as expected.
Moreover, could you also share savepoint meta data 
""wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-67de6690143a/_metadata"
 ?


[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L211
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
[3] 
https://ci.apache.org/projechttps://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-snapshot-compressions/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 14:25
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Attached.

____
From: Yun Tang 
Sent: Tuesday, March 16, 2021 11:13 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Thanks for your reply, could you also share logs during normal restoring just 
as I wrote in previous thread so that I could compare.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 13:55
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
I'm attaching shorter version of log, looks like full version didn't come 
through

Thanks,
Alexey

From: Yun Tang 
Sent: Tuesday, March 16, 2021 8:05 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I believe your exception messages are printed from Flink-1.12.2 not 
Flink-1.12.1 due to the line number of method calling.

Could you share exception message of Flink-1.12.1 when rescaling? Moreover, I 
hope you could share more logs during restoring and rescaling. I want to see 
details of key group handle [1]

[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L153

Best

From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 15:10
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Also restore from same savepoint without change in parallelism works fine.


From: Alexey Trenikhun 
Sent: Monday, March 15, 2021 9:51 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

No, I believe original exception was from 1.12.1 to 1.12.1

Thanks,
Alexey


From: Yun Tang 
Sent: Monday, March 15, 2021 8:07:07 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-16 Thread Yun Tang
Hi Alexey,

Thanks for your reply, could you also share logs during normal restoring just 
as I wrote in previous thread so that I could compare.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Wednesday, March 17, 2021 13:55
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
I'm attaching shorter version of log, looks like full version didn't come 
through

Thanks,
Alexey
____
From: Yun Tang 
Sent: Tuesday, March 16, 2021 8:05 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I believe your exception messages are printed from Flink-1.12.2 not 
Flink-1.12.1 due to the line number of method calling.

Could you share exception message of Flink-1.12.1 when rescaling? Moreover, I 
hope you could share more logs during restoring and rescaling. I want to see 
details of key group handle [1]

[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L153

Best

From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 15:10
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Also restore from same savepoint without change in parallelism works fine.


From: Alexey Trenikhun 
Sent: Monday, March 15, 2021 9:51 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

No, I believe original exception was from 1.12.1 to 1.12.1

Thanks,
Alexey

____
From: Yun Tang 
Sent: Monday, March 15, 2021 8:07:07 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-16 Thread Yun Tang
Hi Alexey,

I believe your exception messages are printed from Flink-1.12.2 not 
Flink-1.12.1 due to the line number of method calling.

Could you share exception message of Flink-1.12.1 when rescaling? Moreover, I 
hope you could share more logs during restoring and rescaling. I want to see 
details of key group handle [1]

[1] 
https://github.com/apache/flink/blob/dc404e2538fdfbc98b9c565951f30f922bf7cedd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L153

Best

From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 15:10
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Also restore from same savepoint without change in parallelism works fine.


From: Alexey Trenikhun 
Sent: Monday, March 15, 2021 9:51 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

No, I believe original exception was from 1.12.1 to 1.12.1

Thanks,
Alexey


From: Yun Tang 
Sent: Monday, March 15, 2021 8:07:07 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Prefix Seek RocksDB

2021-03-16 Thread Yun Tang
Hi Rex,

Prefix seek iterator has not ever been used in Flink when seeking. I hope you 
could first read more details about this from RocksDB wiki as prefix extractor 
could impact the performance.

Best
Yun Tang

From: Rex Fenley 
Sent: Wednesday, March 17, 2021 2:02
To: Yun Tang 
Cc: user ; Brad Davis 
Subject: Re: Prefix Seek RocksDB

Thanks for the input, I'll look more into that.

Does your answer then imply that Joins and Aggs do not inherently always use 
prefix seeks? I'd imagine that the join key on join and groupby key on aggs 
would always be used as prefix keys. Is this not the case?

Also, is there good information on what the correct prefix extractor is for 
Flink? This feature is something I only just discovered so I was hoping to gain 
clarity.

Thanks

On Mon, Mar 15, 2021 at 8:33 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Rex,

You could configure prefix seek via RocksDB's column family options [1]. Be 
careful to use correct prefix extractor.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#passing-options-factory-to-rocksdb


Best



From: Rex Fenley mailto:r...@remind101.com>>
Sent: Tuesday, March 16, 2021 8:29
To: user mailto:user@flink.apache.org>>
Cc: Brad Davis mailto:brad.da...@remind101.com>>
Subject: Prefix Seek RocksDB

Hello!

I'm wondering if Flink RocksDB state backend is pre-configured to have Prefix 
Seeks enabled, such as for Joins and Aggs on the TableAPI [1]? If not, what's 
the easiest way to configure this? I'd imagine this would be beneficial.

Thanks!

[1] https://github.com/facebook/rocksdb/wiki/Prefix-Seek

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


Re: Prefix Seek RocksDB

2021-03-15 Thread Yun Tang
Hi Rex,

You could configure prefix seek via RocksDB's column family options [1]. Be 
careful to use correct prefix extractor.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#passing-options-factory-to-rocksdb


Best



From: Rex Fenley 
Sent: Tuesday, March 16, 2021 8:29
To: user 
Cc: Brad Davis 
Subject: Prefix Seek RocksDB

Hello!

I'm wondering if Flink RocksDB state backend is pre-configured to have Prefix 
Seeks enabled, such as for Joins and Aggs on the TableAPI [1]? If not, what's 
the easiest way to configure this? I'd imagine this would be beneficial.

Thanks!

[1] https://github.com/facebook/rocksdb/wiki/Prefix-Seek

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW 
US  |  LIKE US


Re: Question about session_aggregate.merging-window-set.rocksdb_estimate-num-keys

2021-03-15 Thread Yun Tang
Hi,

Could you describe what you observed in details? Which states you compare with 
the session window state "merging-window-set", the "newKeysInState" or 
"existingKeysInState"?

BTW, since we use list state as main state for window operator and we use 
RocksDB's merge operation for window state add operations, this would cause the 
estimating of number keys inaccurate [1]:
  // Estimation will be inaccurate when:
  // (1) there exist merge keys
  // (2) keys are directly overwritten
  // (3) deletion on non-existing keys
  // (4) low number of samples

[1] 
https://github.com/ververica/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/version_set.cc#L919-L924



Best
Yun Tang

From: Vishal Santoshi 
Sent: Monday, March 15, 2021 5:48
To: user 
Subject: Re: Question about 
session_aggregate.merging-window-set.rocksdb_estimate-num-keys

All I can think is, that any update on a state key, which I do in my 
ProcessFunction, creates an update ( essentially an append on rocksdb ) which 
does render the previous value for the key, a  tombstone , but that need not 
reflect on the count  ( as double or triple counts ) atomically, thus the 
called as an "estimate" , but was not anticipating this much difference ...

On Sun, Mar 14, 2021 at 5:32 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
The reason I ask is that I have a "Process Window Function" on that Session  
Window  and I keep key scoped Global State.  I maintain a TTL on that state ( 
that is outside the Window state )  that is roughly the current WM + lateness.

I would imagine that keys for that custom state are roughly equal to the number 
of keys in the "merging-window-set" . It seems twice that number but does 
follow the slope. I am trying to figure out why this deviation.

public void process(KEY key,
ProcessWindowFunction, KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
Iterable> elements, 
Collector> out)
throws Exception {
// scoped to the key
if (state.value() == null) {
this.newKeysInState.inc();
state.update(new IntervalList());
}else{
this.existingKeysInState.inc();
}

On Sun, Mar 14, 2021 at 3:32 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
Hey folks,

  Was looking at this very specific metric 
"session_aggregate.merging-window-set.rocksdb_estimate-num-keys".  Does this 
metric also represent session windows ( it is a session window ) that have 
lateness on them ? In essence if the session window was closed but has a 
lateness of a few hours would those keys still be counted against this metric.

I think they should as it is an estimate keys for the Column Family for the 
operator and if the window has not been GCed then the key for those Windows 
should be in RocksDB but wanted to be sure.

Regards.




Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-15 Thread Yun Tang
Hi,

Can you scale the job at the same version from 1.12.1 to 1.12.1?

Best
Yun Tang


From: Alexey Trenikhun 
Sent: Tuesday, March 16, 2021 4:46
To: Tzu-Li (Gordon) Tai ; user@flink.apache.org 

Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Savepoint was taken with 1.12.1, I've tried to scale up using same version and 
1.12.2


From: Tzu-Li (Gordon) Tai 
Sent: Monday, March 15, 2021 12:06 AM
To: user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: failure checkpoint counts

2021-03-09 Thread Yun Tang
Hi Abdullah,

The "Connection refused" exception should have no direct relationship with 
checkpoint, I think you could check whether the socket source has worked well 
in your job.

Best
Yun Tang

From: Abdullah bin Omar 
Sent: Tuesday, March 9, 2021 0:13
To: user@flink.apache.org 
Subject: failure checkpoint counts

Hi,

I faced this exception at the time of checkpoint counts. Could you please 
inform me what the problem is here?

the exception:


org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=100)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:130)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:81)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:221)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:696)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:433)

at jdk.internal.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)

at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:564)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at 
akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)

at 
akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at 
akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.net<http://java.net>.ConnectException: Connection refused

at java.base/sun.nio.ch<http://sun.nio.ch>.Net.connect0(Native Method)

at java.base/sun.nio.ch<http://sun.nio.ch>.Net.connect(Net.java:574)

at java.base/sun.nio.ch<http://sun.nio.ch>.Net.connect(Net.java:563)

at 
java.base/sun.nio.ch<http://sun.nio.ch>.NioSocketImpl.connect(NioSocketImpl.java:588)

at 
java.base/java.net<http://java.net>.SocksSocketImpl.connect(SocksSocketImpl.java:333)

at java.base/java.net<http://java.net>.Socket.connect(Socket.java:648)

at 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:104)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)



Thank you!





Re: Is Ververica Connector Redis open source?

2021-03-09 Thread Yun Tang
Hi Yik,

As far as I know, the source code of ververica connector is not public, and you 
could refer to [1] for open-source implementation.


[1] https://github.com/apache/bahir-flink/tree/master/flink-connector-redis

Best
Yun Tang




From: Yik San Chan 
Sent: Tuesday, March 9, 2021 12:01
To: user 
Subject: Is Ververica Connector Redis open source?

Hi community,

I found this package 
https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-redis/1.11-vvr-2.1.3
 in Maven Repository. However, I cannot find it anywhere in GitHub. Does anyone 
know this is open source or not?

Thank you!

Best,
Yik San Chan


Re: Performance issues when RocksDB block cache is full

2021-02-17 Thread Yun Tang
Hi Yaroslav,

Unfortunately, RocksDB does not have such TTL block cache, and if you really 
only have very few active keys, current LRU implementation should work well as 
only useful latest entries are inserted into cache.
What kind of behavior when cache reached the maximum? Have you ever noticed 
anything different on RocksDB metrics?
Perhaps you might meet problem of flushing write buffer too early [1] and 
partitioned index [2] might help.

[1] https://issues.apache.org/jira/browse/FLINK-19238
[2] https://issues.apache.org/jira/browse/FLINK-20496

Best
Yun Tang



From: Dawid Wysakowicz
Sent: Monday, February 15, 2021 17:55
To: Yaroslav Tkachenko; user@flink.apache.org
Cc: Yun Tang
Subject: Re: Performance issues when RocksDB block cache is full


Hey Yaroslav,

Unfortunately I don't have enough knowledge to give you an educated reply. The 
first part certainly does make sense to me, but I am not sure how to mitigate 
the issue. I am ccing Yun Tang who worked more on the RocksDB state backend (It 
might take him a while to answer though, as he is on vacation right now).

Best,

Dawid

On 14/02/2021 06:57, Yaroslav Tkachenko wrote:
Hello,

I observe throughput degradation when my pipeline reaches the maximum of the 
allocated block cache.

The pipeline is consuming from a few Kafka topics at a high rate (100k+ rec/s). 
Almost every processed message results in a (keyed) state read with an optional 
write. I've enabled native RocksDB metrics and noticed that everything stays 
stable until the block cache usage reaches maximum. If I understand correctly, 
this makes sense: this cache is used for all reads and cache misses could mean 
reading data on disk, which is much slower (I haven't switched to SSDs yet). 
Does it make sense?

One thing I know about the messages I consume: I expect very few keys to be 
active simultaneously, most of them can be treated as cold. So I'd love RocksDB 
block cache to have a TTL option (say, 30 minutes), which, I imagine, could 
solve this issue by guaranteeing to only keep active keys in memory. I don't 
feel like LRU is doing a very good job here... I couldn't find any option like 
that, but I'm wondering if someone could recommend something similar.

Thank you!

--
Yaroslav Tkachenko
sap1ens.com<https://sap1ens.com>


Re: question on ValueState

2021-02-07 Thread Yun Tang
Hi,

MemoryStateBackend and FsStateBackend both hold keyed state in 
HeapKeyedStateBackend [1], and the main structure to store data is StateTable 
[2] which holds POJO format objects. That is to say, the object would not be 
serialized when calling update().
On the other hand, RocksDB statebackend would store value with serialized bytes.


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java

Best
Yun Tang


From: Colletta, Edward 
Sent: Sunday, February 7, 2021 19:53
To: user@flink.apache.org 
Subject: question on ValueState


Using FsStateBackend.



I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.



However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.



Can someone clarify what really happens when value() is called?





   public void processElement(M in, Context ctx, Collector out) throws 
Exception {

MyWindow myWindow;

myWindow = windowState.value();

if (myWindow == null) {


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

myWindow = new MyWindow(0L, slide, windowSize);

windowState.update(myWindow);

myWindow.eq.add(0L);

}


myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);

}



@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

MyWindow myWindow = windowState.value();

myWindow.slide(0L);

out.collect(myWindow.globalAccum);

}






Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-03 Thread Yun Tang
Hi Randal,

Please consider to use jemalloc instead of glibc as default memory allocator 
[1] to avoid memory fragmentation. As far as I know, at least two groups of 
users, who run Flink on YARN and k8s respectively, have reported similar 
problem that memory continues growing up once restart [2]. The problem both 
went away once they adopt to use JeMalloc.

[1] https://issues.apache.org/jira/browse/FLINK-19125
[2] https://issues.apache.org/jira/browse/FLINK-18712

Best
Yun Tang

From: Lasse Nedergaard 
Sent: Wednesday, February 3, 2021 14:07
To: Xintong Song 
Cc: user 
Subject: Re: Memory usage increases on every job restart resulting in eventual 
OOMKill

Hi

We had something similar and our problem was class loader leaks. We used a 
summary log component to reduce logging but still turned out that it used a 
static object that wasn’t released when we got an OOM or restart. Flink was 
reusing task managers so only workaround was to stop the job wait until they 
was removed and start again until we fixed the underlying problem.

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 3. feb. 2021 kl. 02.54 skrev Xintong Song :


How is the memory measured?
I meant which flink or k8s metric is collected? I'm asking because depending on 
which metric is used, the *container memory usage* can be defined differently. 
E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the taskmanagers? 
You should find something like the following at the beginning of taskmanger 
logs.

INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:  1.688gb (1811939328 bytes)
INFO  [] - Total Flink Memory:  1.250gb (1342177280 bytes)
INFO  [] -   Total JVM Heap Memory: 512.000mb (536870902 bytes)
INFO  [] - Framework:   128.000mb (134217728 bytes)
INFO  [] - Task:384.000mb (402653174 bytes)
INFO  [] -   Total Off-heap Memory: 768.000mb (805306378 bytes)
INFO  [] - Managed: 512.000mb (536870920 bytes)
INFO  [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -   Framework: 128.000mb (134217728 bytes)
INFO  [] -   Task:  0 bytes
INFO  [] -   Network:   128.000mb (134217730 bytes)
INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:192.000mb (201326592 bytes)


Thank you~

Xintong Song


On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt 
mailto:randal.p...@foresite.com>> wrote:
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2869/Screenshot_2021-02-02_at_11.png>
so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: rocksdb block cache usage

2021-01-27 Thread Yun Tang
Hi,

If you have enabled managed memory, and since all rocksDB instances share the 
same block cache within one slot, all 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 in the same slot would report the same value.


Best
Yun Tang

From: Chesnay Schepler 
Sent: Wednesday, January 27, 2021 20:59
To: 曾祥才 ; User-Flink 
Subject: Re: rocksdb block cache usage

I don't quite understand the question; all 3 metrics you listed are the same 
one?

On 1/27/2021 9:23 AM, 曾祥才 wrote:
hi, all
   I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
 and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 metric exposed.
 I'm confused  that the total memory used for block cache pinned is sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 or just
 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 (for block cache usage the metric seems per slot)?






Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

2021-01-21 Thread Yun Tang
Hi David,

Thanks for your enthusiasm to figure out the root cause. The key difference is 
that RocksDB holds binary objects which are only defined by the serialized 
bytes while Fs/MemoryStateBackend holds objects in pojo format which are 
defined by the hashCode and equals. If you want to achieve the same effort as 
using MemoryStateBackend, please provide the customizer serializer to your 
user-defined classes [1] to ignore some field (however, it would cause field 
lost when deserializing).

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/custom_serialization.html

Best
Yun Tang

From: Robert Metzger 
Sent: Thursday, January 21, 2021 22:49
To: David Haglund 
Cc: user@flink.apache.org 
Subject: Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 
when checkpointing with RocksDB

Hey David,

this is a good catch! I've filed a JIRA ticket to address this in the docs more 
prominently: https://issues.apache.org/jira/browse/FLINK-21073

Thanks a lot for reporting this issue!

On Thu, Jan 21, 2021 at 9:24 AM David Haglund 
mailto:david.hagl...@niradynamics.se>> wrote:

A colleague of mine found some hint under “Avro type” [2] in the State 
evolution schema page:



“Example: RocksDB state backend relies on binary objects identity, rather than 
hashCode method implementation. Any changes to the keys object structure could 
lead to non deterministic behaviour.”



I guess it is a known issue then, but it would at least to include that kind of 
fundamental information on the state backend page as well.



Best regards,

/David Haglund



[2]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types





From: David Haglund 
mailto:david.hagl...@niradynamics.se>>
Date: Wednesday, 20 January 2021 at 19:57


I have an update. I have created a small project on github, 
https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which 
reproduces the issue.



There seems to be problem with RocksDB in all versions I have tested (from 
1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. 
In Flink 1.10.x and later all events are counted but with separate keys when 
all/both events should be counted using the same key.



The main branch in my sample project is using Flink 1.11.3, then there are 
branches for Flink 1.9.1, 1.10.3 and 1.12.1.



Best regards,

/David Haglund



From: David Haglund 
mailto:david.hagl...@niradynamics.se>>
Date: Wednesday, 20 January 2021 at 09:38

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.



The problem in a combination of 2 components:



* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used 
in

  the keyBy (hashCode/equals) but can have different values for a specific key 
(the

 fields we care about).

* Checkpointing with RocksDB



In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for 
checkpoints.



We do not see this problem if we disable checkpointing or when using

FsStateBackend.



I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.



We are using Scala version 2.11.12 and Java 8.



This looks like a bug to me. Is it a known issue or a new one?



Best regards,

/David Haglund



[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction



David Haglund
Systems Engineer
Fleet Perception for Maintenance

[cid:17725693d5d4cff311]

NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden

Mobile: +46 705 634 848
david.hagl...@niradynamics.se<mailto:david.hagl...@niradynamics.se>
www.niradynamics.se<http://www.niradynamics.se>

Together for smarter safety




Re: Setting different timeouts for savepoints and checkpoints

2021-01-18 Thread Yun Tang
Hi Timo and Rex,

Actually, there existed several existing issues: FLINK-9465 [1] targets for CLI 
option while FLINK-10360 [2] targets for REST API.


[1] https://issues.apache.org/jira/browse/FLINK-9465
[2] https://issues.apache.org/jira/browse/FLINK-10360

Best
Yun Tang

From: Timo Walther 
Sent: Monday, January 18, 2021 17:28
To: user@flink.apache.org 
Subject: Re: Setting different timeouts for savepoints and checkpoints

Hi Rex,

feel free to open an issue for this. I could also imagine that
checkpoints and savepoints will further divert from each other and a
having different timeout might be reasonable.

Regards,
Timo


On 17.01.21 02:43, Rex Fenley wrote:
> Thanks for the quick response.
>
> Is this something that can be added as a feature request? Given that the
> time it takes to restore from either is different, and the semantics are
> slightly different, it seems like they should have completely separate
> configurable timeouts.
>
> Thanks!
>
>
> On Sat, Jan 16, 2021 at 2:43 PM Khachatryan Roman
> mailto:khachatryan.ro...@gmail.com>> wrote:
>
> Hi Rex,
>
> Unfortunately not: the same timeout value is used both for
> savepoints and checkpoints.
>
> Regards,
> Roman
>
>
> On Sat, Jan 16, 2021 at 9:42 AM Rex Fenley  <mailto:r...@remind101.com>> wrote:
>
> Hello,
>
> I'm wondering if there's a way to set different timeouts for
> savepoints and checkpoints. Our savepoints can take a number of
> hours to complete, whereas incremental checkpoints at their
> slowest take around 10 min. We'd like to timeout a checkpoint on
> a significantly smaller duration than a savepoint.
>
> Thanks!
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG
> <http://blog.remind.com/> | FOLLOW US
> <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>



Re: Restoring from checkpoint with different parallism

2021-01-11 Thread Yun Tang
 Hi Rex,

I think doc [1] should have given some descriptions. Rescaling from previous 
checkpoint is still supported in current Flink version.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

Best
Yun Tang

From: Rex Fenley 
Sent: Tuesday, January 12, 2021 11:01
To: user 
Cc: Brad Davis 
Subject: Restoring from checkpoint with different parallism

Hello,

When using the TableAPI, is it safe to run a flink job with a different `-p` 
parallelism while restoring from a checkpoint (not a savepoint) using `-s`, 
without any rescaling of actual machines? I don't seem to find this documented 
anywhere.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


Re: Queryable state on task managers that are not running the job

2020-12-23 Thread Yun Tang
Hi Martin,

What kind of deploy mode you choose? If you use per-job mode [1] to launch 
jobs, there might exist only idle slots instead of idle taskmanagers. 
Currently, queryable state is bounded to specific job and if the idle 
taskmanager is not registered in the target's resource manager, no queryable 
state could be queried.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode

Best
Yun Tang

From: Martin Boyanov 
Sent: Monday, December 21, 2020 19:04
To: user@flink.apache.org 
Subject: Queryable state on task managers that are not running the job

Hi,
I'm running a long-running flink job in cluster mode and I'm interested in 
using the queryable state functionality.
I have the following problem: when I query the flink task managers (i.e. the 
queryable state proxy), it is possible to hit a task manager which doesn't have 
the requested state, because the job is not running on that task manager.
For example, I might have a cluster with 5 task managers, but the job is 
deployed only on 3 of those. If my query hits any of the two idle task 
managers, I naturally get an error message that the job does not exist.
My current solution is to size the cluster appropriately so that there are no 
idle task managers. I was wondering if there was a better solution or if this 
could be handled better in the future?
Thanks in advance.
Kind regards,
Martin


Re: [ANNOUNCE] Apache Flink 1.12.0 released

2020-12-10 Thread Yun Tang
Thanks Dian and Robert for driving this release and thanks everyone who makes 
this great work possible !

Best
Yun Tang

From: Wei Zhong 
Sent: Thursday, December 10, 2020 20:32
To: d...@flink.apache.org 
Cc: user ; annou...@apache.org 
Subject: Re: [ANNOUNCE] Apache Flink 1.12.0 released

Congratulations! Thanks Dian and Robert for the great work!

Best,
Wei

> 在 2020年12月10日,20:26,Leonard Xu  写道:
>
>
> Thanks Dian and Robert for the great work as release manager !
> And thanks everyone who makes the release possible !
>
>
> Best,
> Leonard
>
>> 在 2020年12月10日,20:17,Robert Metzger  写道:
>>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.12.0, which is the latest major release.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> https://flink.apache.org/news/2020/12/10/release-1.12.0.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348263
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Dian & Robert
>



Re: Problem when restoring from savepoint with missing state & POJO modification

2020-12-09 Thread Yun Tang
Hi Bastien,

I think you could refer to WritableSavepoint#write [1] to get all existing 
state and flat map to remove the state you do not want (could refer to 
StatePathExtractor[2] )


[1] 
https://github.com/apache/flink/blob/168124f99c75e873adc81437c700f85f703e2248/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java#L103
[2] 
https://github.com/apache/flink/blob/168124f99c75e873adc81437c700f85f703e2248/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java#L54

Best
Yun Tang

From: bastien dine 
Sent: Wednesday, December 9, 2020 21:17
To: Yun Tang 
Cc: user 
Subject: Re: Problem when restoring from savepoint with missing state & POJO 
modification

Hello Yun,
Thank you very much for your response, that's what I thought,
However, it does not seem possible to remove only one state using the state 
processor API,
We use it a lot, and we can only remove all of the operator states, not one 
specifically,
Am I missing something?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io<http://bastiendine.io>


Le mar. 8 déc. 2020 à 08:54, Yun Tang 
mailto:myas...@live.com>> a écrit :
Hi Bastien,

Flink supports to register state via state descriptor when calling 
runtimeContext.getState(). However, once the state is registered, it cannot be 
removed anymore. And when you restore from savepoint, the previous state is 
registered again [1]. Flink does not to drop state directly and you could use 
state processor API [2] to remove related state.


[1] 
https://github.com/apache/flink/blob/d94c7a451d22f861bd3f79435f777b427020eba0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java#L171
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


From: bastien dine mailto:bastien.d...@gmail.com>>
Sent: Tuesday, December 8, 2020 0:28
To: user mailto:user@flink.apache.org>>
Subject: Problem when restoring from savepoint with missing state & POJO 
modification

Hello,
We have experienced some weird issues with POJO mapState in a streaming job 
upon checkpointing when removing state, then modifying the state POJO and 
restoring job

Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a 
CoFlatMap function

MyCoFlat:
public class MyCoFlat extends RichCoFlatMapFunction {
transient MapState buffer;
@Override
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", 
String.class, Buffer.class));
}


Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration

We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception 
below
The job is submitted to the cluster but fails on checkpointing, job is totally 
stuck.

Debug:
Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class clazz,
TypeSerializer[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer[]) 
checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); < HERE
}

In our fields, we have field[0] & field[2] but field[1] is totally missing from 
the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the 
missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> 
remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the 
savepoints, we could be facing this problem again when we will modify Buffer 
POJO later.
Finally we just modify a savepoint with API and remove this state once for all, 
and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a sav

Re: Flink cli Stop command exception

2020-12-09 Thread Yun Tang
Hi Suchithra,

Have you ever checked job manager log to see whether the savepoint is triggered 
and why the savepoint failed to complete.

Best
Yun Tang

From: V N, Suchithra (Nokia - IN/Bangalore) 
Sent: Wednesday, December 9, 2020 23:45
To: user@flink.apache.org 
Subject: Flink cli Stop command exception


Hello,



I am running streaming flink job and I was using cancel command with savepoint 
to cancel the job. From flink 1.10 version stop command should be used instead 
of cancel command.

But I am getting below error sometimes. Please let me know what might be the 
issue.





{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"The
 flink command to be executed is /opt/flink/bin/flink stop -p 
/opt/flink/share/cflkt-flink/external_pvc -d ec416bf906915e570ef53b242d3d0bb0 
"},"time":"2020-09-02T12:32:19.979Z","type":"log"}

{"host":"cancel1-flinkcli-jobsubmission-55tgq","level":"info","log":{"message":"=
 Submitting the Flink job "},"time":"2020-09-02T12:32:19.983Z","type":"log"}

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by 
org.apache.hadoop.security.authentication.util.KerberosUtil 
(file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.6.5-7.0.jar) to method 
sun.security.krb5.Config.getInstance()

WARNING: Please consider reporting this to the maintainers of 
org.apache.hadoop.security.authentication.util.KerberosUtil

WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations

WARNING: All illegal access operations will be denied in a future release





The program finished with the following exception:



org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"ec416bf906915e570ef53b242d3d0bb0".

at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:450)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:905)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

at java.base/java.security.AccessController.doPrivileged(Native Method)

at java.base/javax.security.auth.Subject.doAs(Subject.java:423)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)

at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)

at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:456)

... 9 more



Thanks,

Suchithra


Re: Recommendation about RocksDB Metrics ?

2020-12-08 Thread Yun Tang
Hi Kien,

>From my point of view, RocksDB native metrics could be classified into 5 parts 
>below, and you could select what you're interested in to enable. Enable those 
>metrics could cause about 10% performance regression, and this might impact 
>the overall performance as not all jobs are state-access bottleneck.

Performance related:
state.backend.rocksdb.metrics.actual-delayed-write-rate
state.backend.rocksdb.metrics.is-write-stopped

Compaction & flush related, which will impact the memory usage and write stall:
state.backend.rocksdb.metrics.mem-table-flush-pending
state.backend.rocksdb.metrics.num-running-flushes
state.backend.rocksdb.metrics.compaction-pending
state.backend.rocksdb.metrics.num-running-compactions

Memory usage status:
state.backend.rocksdb.metrics.block-cache-usage  (If Flink's managed memory 
over RocksDB is enabled, this value would be the same for all column families 
in the same slot)
state.backend.rocksdb.metrics.cur-size-all-mem-tables

DB static properties:
state.backend.rocksdb.metrics.block-cache-capacity

DB number of keys and data usage:
state.backend.rocksdb.metrics.estimate-live-data-size
state.backend.rocksdb.metrics.total-sst-files-size

BTW, state.backend.rocksdb.metrics.column-family-as-variable is not rocksDB 
internal metrics but to expose column family as variable so that we could 
classify different state status.

Best
Yun Tang

From: Steven Wu 
Sent: Wednesday, December 9, 2020 12:11
To: Khachatryan Roman 
Cc: Truong Duc Kien ; Yun Tang ; 
user 
Subject: Re: Recommendation about RocksDB Metrics ?

just a data point. we actually enabled all RocksDb metrics by default 
(including very large jobs in terms of parallelism and state size). We didn't 
see any significant performance impact. There is probably a small impact. At 
least, it didn't jump out for our workload.

On Tue, Dec 8, 2020 at 9:00 AM Khachatryan Roman 
mailto:khachatryan.ro...@gmail.com>> wrote:
Hi Kien,

I am pulling in Yun who might know better.

Regards,
Roman


On Sun, Dec 6, 2020 at 3:52 AM Truong Duc Kien 
mailto:duckientru...@gmail.com>> wrote:
Hi all,

We are thinking about enabling RocksDB metrics to better monitor our pipeline. 
However, since they will have performance impact, we will have to be selective 
about which metrics we use.

Does anyone have experience about which metrics are more important than the 
others ?

And what metrics have the largest performance impact ?

Thanks,
Kien


Re: Problem when restoring from savepoint with missing state & POJO modification

2020-12-07 Thread Yun Tang
Hi Bastien,

Flink supports to register state via state descriptor when calling 
runtimeContext.getState(). However, once the state is registered, it cannot be 
removed anymore. And when you restore from savepoint, the previous state is 
registered again [1]. Flink does not to drop state directly and you could use 
state processor API [2] to remove related state.


[1] 
https://github.com/apache/flink/blob/d94c7a451d22f861bd3f79435f777b427020eba0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java#L171
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


From: bastien dine 
Sent: Tuesday, December 8, 2020 0:28
To: user 
Subject: Problem when restoring from savepoint with missing state & POJO 
modification

Hello,
We have experienced some weird issues with POJO mapState in a streaming job 
upon checkpointing when removing state, then modifying the state POJO and 
restoring job

Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a 
CoFlatMap function

MyCoFlat:
public class MyCoFlat extends RichCoFlatMapFunction {
transient MapState buffer;
@Override
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", 
String.class, Buffer.class));
}


Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration

We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception 
below
The job is submitted to the cluster but fails on checkpointing, job is totally 
stuck.

Debug:
Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class clazz,
TypeSerializer[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer[]) 
checkNotNull(fieldSerializers);
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); < HERE
}

In our fields, we have field[0] & field[2] but field[1] is totally missing from 
the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the 
missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> 
remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the 
savepoints, we could be facing this problem again when we will modify Buffer 
POJO later.
Finally we just modify a savepoint with API and remove this state once for all, 
and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map it 
into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different between 
an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle 
this, I would be glad to discuss !
Best regards,

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin


Re: Flink 1.9Version State TTL parameter configuration it does not work

2020-12-06 Thread Yun Tang
Hi Yang,


Why your checkpoint is failed, was that checkpoint expired or failed due to 
error?

Could you paste the jstack result of what are RocksDB doing during checkpoint?

BTW, you could also use async-profiler [1] to view what the CPU operation of 
your actions, this tool could help to view what's RocksDB doing.

[1] https://github.com/jvm-profiling-tools/async-profiler

Best
Yun Tang


From: Andrey Zagrebin 
Sent: Friday, December 4, 2020 17:49
To: user 
Subject: Re: Flink 1.9Version State TTL parameter configuration it does not work

Hi Yang,

(redirecting this to user mailing list as this is not a dev question)

I am not sure why the state loading is stuck after enabling the compaction 
filter
but the background cleanup of RocksDB state with TTL will not work without 
activating the filter.
This happens on RocksDB opening in Flink, before any state is created and it 
starts to load.

Which version of Flink do you use?
Did you try to enable the filter without starting from the checkpoint, 
basically from the beginning of the job run?

Best,
Andrey

On Fri, Dec 4, 2020 at 11:27 AM Yang Peng 
mailto:yangpengklf...@gmail.com>> wrote:
Hi,I have some questions about state TTL to consult with everybody,the
statebackend is rocksdb  Below is my code:
-code begin-
private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

-code end-

I  have set the TTL of the state is 60mins, But after 12 hours,  through
the monitor of rocksdb metric , we found that the sst file of
 CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
Later we found some information from the taskmanager log:*WARN
org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL compaction
filter for state < EV_EID_FLAG >: feature is disabled for the state backend*
After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
true*"  this parameter, the warn information disappeared, but  ater the
project completed some checkpoints ,The next   checkpoint will always fail, I
checked the jstack command and found that the fail checkpoint was stuck in
acquiring state ,disk io is idle;remove the  "
*state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
parameter,the project  will resume the checkpoint.  So I’m asking everyone
here. Is my usage method wrong?


Re: Running Flink job as a rest

2020-12-02 Thread Yun Tang
Hi Dhurandar,

I'm afraid that Flink's rest API cannot satisfy your request as it would not 
act as any source. One possible example could be SocketWindowWordCount [1] 
which listens data on a port from all taskmanagers with sources.

[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java

Best
Yun Tang

From: dhurandar S 
Sent: Thursday, December 3, 2020 5:31
To: Flink Dev ; user 
Subject: Running Flink job as a rest

Can Flink job be running as Rest Server, Where Apache Flink job is listening on 
a port (443). When a user calls this URL with payload, data directly goes to 
the Apache Flink windowing function.

Right now Flink can ingest data from Kafka or Kinesis, but we have a use case 
where we would like to push data to Flink, where Flink is listening on a port

--
Thank you and regards,
Dhurandar



Re: Questions regarding DDL and savepoints

2020-12-02 Thread Yun Tang
Hi Kevin,

If you pass the savepoint path to resume application [1], the application would 
resume from last savepoint.
If you change the logic of your DDL and since no uid can be set from users, I 
am afraid not all state could be restored as you expected.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#restore-a-savepoint

Best
Yun Tang

From: Kevin Kwon 
Sent: Thursday, December 3, 2020 8:31
To: user@flink.apache.org 
Subject: Questions regarding DDL and savepoints

I have a question regarding DDLs if they are considered operators and can be 
savepointed

For example

CREATE TABLE mytable (
  id BIGINT,
  data STRING
  WATERMARK(...)
) with (
  connector = 'kafka'
)

If I create the table like above, save&exit and resume application, will the 
application start from the save point (including Kafka offset)?

There's also an ongoing issue that was created by me if the operator names can 
be specified when creating tables with DDLs
https://issues.apache.org/jira/browse/FLINK-20368


Re: State Processor API SQL State

2020-12-01 Thread Yun Tang
Hi Dom,

+ user mail list

Once you got to know the state descriptor, I think you could query the join 
state. The state name is easy to get via [1], it should be "left-records" and 
"right-records", and you could check what kind of join and whether has unique 
key to decide what kind of state (value state or map state). The last part is 
to find what the rowData type is in your join, and maybe other SQL guys could 
answer this or you might find it by yourself by dumping the memory of your 
taskmanager.

[1] 
https://github.com/apache/flink/blob/7a7c87096ab76f416cd7c393240faa8454db36f0/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java#L83

Best
Yun Tang




From: Dominik Wosiński 
Sent: Tuesday, December 1, 2020 21:05
To: dev 
Subject: State Processor API SQL State

Hey,
Is it currently possible to obtain the state that was created by SQL query
via the State Processor API? I am able to load the checkpoint via the State
Processor API, but I wasn't able to think of a way to access the internal
state of my JOIN Query.

Best Regards,
Dom.


Re: fromCollection() and savepoints

2020-11-25 Thread Yun Tang
Hi Tomasz

The API fromCollection() would record the number of elements emitted [1] in 
snapshot state, and restore them to remember as elements to skip [2], that is 
to say not all elements would be read again.

But frankly speaking, once fromCollection is completed, the source task would 
finish and no more savepoint could be triggered.


[1] 
https://github.com/apache/flink/blob/c354f7bd679b9fa8c1e0d75feb3827ccca7f317b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java#L67
[2] 
https://github.com/apache/flink/blob/c354f7bd679b9fa8c1e0d75feb3827ccca7f317b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java#L123

Best
Yun Tang


From: Tomasz Dudziak 
Sent: Wednesday, November 25, 2020 18:22
To: user@flink.apache.org 
Subject: fromCollection() and savepoints


Hi,



What is the behaviour of fromCollection() when restarting from a savepoint?

Will the elements of the collection get fed into the resulting stream again or 
not?

What if the contents of the underlying collection change?



Thanks,

Tomasz



Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, London, 
SW1X 9AT | E-mail: t.dudz...@mwam.com<mailto:t.dudz...@mwam.com> | Tel: +44 207 
024 7061





This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is registered with the US 
Securities and Exchange Commission ("SEC") as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.


Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-18 Thread Yun Tang
Hi Slim

Have you ever checked whether the job is on backpressure during the checkpoint, 
and you could check the checkpoint details via web UI [1] to see the duration 
of sync & async phase.

BTW, I cannot see the "IOException: The rpc invocation size 199965215 exceeds 
the maximum akka framesize.  " in attached log, and I don't understand why you 
could have such large state after you already set the 
'state.backend.fs.memory-threshold' as 1024, please provide more checkpoint 
details which reported in web UI.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#monitoring

Best
Yun Tang

From: Slim Bouguerra 
Sent: Thursday, November 19, 2020 7:56
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: Job Manager is taking very long time to finalize the Checkpointing.

Hi Yun,
Thanks for the help after applying your recommendation, I am getting the same 
issue aka very long checkpoints and then timeout
Now My guess is maybe the datagen source is pushing the checkpoint via the 
network to JM is there a way to double check?
IF that is the case is there a way to exclude the source operators from the 
checkpoints ?
Thanks
Please find the attached logs:
1 I checked the shared folder and it has the shared operator state.
2 I did set the value of fs-memory-threshold to 1kb

This the source of the SQL testing job

CREATE TABLE datagen (
  f_sequence INT,
  f_random INT,
  f_random_str STRING,
  f_random_str_4 STRING,
  f_random_str_3 STRING,
  f_random_str_2 STRING,
  f_random_str_1 STRING,
  ts AS localtimestamp,
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'datagen',
  -- optional options --
  'rows-per-second'='50',
  'fields.f_sequence.kind'='sequence',
  'fields.f_sequence.start'='1',
  'fields.f_sequence.end'='2',
  'fields.f_random.min'='1',
  'fields.f_random.max'='100',
  'fields.f_random_str.length'='10',
  'fields.f_random_str_4.length'='10',
  'fields.f_random_str_3.length'='10',
  'fields.f_random_str_2.length'='10',
  'fields.f_random_str_1.length'='10'
  );

---
With more debugging I see this exception stack on the job manager
java.io.IOException: The rpc invocation size 199965215 exceeds the maximum akka 
framesize.
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
 at com.sun.proxy.$Proxy25.acknowledgeCheckpoint(Unknown Source) [?:?]
 at 
org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder.acknowledgeCheckpoint(RpcCheckpointResponder.java:46)
 [flink-dist_2.11-1.11.1.jar:1.1
 .1[]
 at 
org.apache.flink.runtime.state.TaskStateManagerImpl.reportTaskStateSnapshots(TaskStateManagerImpl.java:117)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
 at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:160)
 [flink-dist_2.11-1.11
 1.jar:1.11.1[]
 at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:121)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_172]
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_172]

--
And sometime the JM dies with this OOM
 java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3236) ~[?:1.8.0_172]
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) 
~[?:1.8.0_172]
  at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
~[?:1.8.0_172]
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) 
~[?:1.8.0_172]
  at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
 ~[?:1.8.0_172]
  at 
java.io.ObjectOutputStream$

Re: What happens when a job is rescaled

2020-11-18 Thread Yun Tang
Hi Richard,

Since you did not provide the information of which state backend you use, I 
would give the phase of rescaling from externalized checkpoint for two 
different state backends:

For RocksDB:
1) If parallelism not changed, downloading all sst files and then just open the 
files as one rocksDB.
2) If parallelism changed, Flink will choose one of the candidate rocksDB 
instances as the initial one to open [1], and open other rocksDB instances to 
insert into the target one which means Flink needs to write data to target 
rocksDB with possible high CPU usage.

For FsStateBackend:
1) No matter parallelism changed, reading data from remote DFS and writing to 
memory which also occupy CPU resources.

Since you observed the disk usage during the rescaling phase, I think you 
should use RocksDB state backend. Unfortunately, we might not have real good 
solutions to improve this phase currently [2]. And once you complete the 
rescaling, the next checkpoint could be nearly a complete instead of 
incremental checkpoint, I think you could check the checkpoint size of next new 
checkpoint to see whether data size increased. BTW, you could also check 
whether the job is backpressured during the next checkpoint as backpressure 
would also increase the duration of checkpoint.

[1] 
https://github.com/apache/flink/blob/3907a4a3cfbba3ec57f938f31166ceaaa850e7e0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L274
[2] https://issues.apache.org/jira/browse/FLINK-17288

Best
Yun Tang

From: Richard Deurwaarder 
Sent: Saturday, November 14, 2020 0:14
To: user 
Subject: What happens when a job is rescaled

Hello,

I have a question about what actually happens when a job is started from an 
existing checkpoint, in particular when the parallelism has changed.

Context:
We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP) writing 
its state to GCS.
Normally we run with 12 TMs each 3 CPU cores and about 12gb RAM. We have quite 
a bit of state (stored with rocksdb), about 20-25 operators which have state 
ranging from 20gb to 180gb per operator. In total we have about 600gb of state.

During normal operations, this works fine the only 'problem' we have is that 
savepoints (creation and starting from) are very slow. Therefore we use 
external checkpoints to deploy new versions of our job.

What is our problem?
One of the things I am currently trying to investigate is why rescaling our job 
is so slow. The way we rescale is by canceling the job and then starting the 
job with a higher parallelism, whilst pointing to the previous (external) 
checkpoint.

Without rescaling, for instance when deploying new code, starting a job from a 
checkpoint would cause the first new checkpoint to complete in maybe 5 minutes.
However, if I double the parallelism the first checkpoint takes over an hour or 
more to complete. This is troublesome because kubernetes might sometime decide 
to restart a TM causing a job restart and thus having to redo all the 
checkpoint work...( very annoying if this happens when the checkpoint is about 
to finish.. :) )

What happens during a checkpoint:
Looking at metrics we can see:
 * CPU being at 100%
 * RAM swinging up and down depending on what operator is currently 
checkpointing.
 * Network traffic to GCS peaks at 100mb/s per TM (tests indicate network 
should not be a cause a bottle neck).
 * Disk (SSD) iops are in the order of 2-3000 upwards to spikes of 10k iops, 
not even close to capacity

Now the obvious answer would be to increase the CPU. This does not really seem 
to help though, plus we'd really like to prevent having to vertically scale our 
job just to do parallelism changes, as during normal operations our CPU usage 
is around 50-60%.

Question:
My question is:
What actually happens when flink starts a new job from an existing checkpoint. 
What extra work needs to be done because of a change in parallelism? Is it 
'normal' that we would incur this penalty for scaling up or down?
Do you have any pointers where we should look to get better performance?

Thank you in advance :)

Richard


Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-18 Thread Yun Tang
Hi Slim

You could check the logs of taskmanager to see whether incremental checkpoint 
is really enabled (or you could find whether files existed under 
/opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/shared to judge).
If your configuration of rocksDB and incremental-checkpoingt is really enabled, 
I think the large metadata size is caused by the memory threshold [1] which 
will send data in bytes format back to JM directly if state handle is smaller 
than specific threshold.
Try to decrease this value to '1 kb' to see whether the size of meta data could 
also decrease.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-fs-memory-threshold

Best
Yun Tang

From: Slim Bouguerra 
Sent: Wednesday, November 18, 2020 6:16
To: user@flink.apache.org 
Subject: Job Manager is taking very long time to finalize the Checkpointing.


Originally posed to the dev list
-- Forwarded message -
From: Slim Bouguerra mailto:bs...@apache.org>>
Date: Tue, Nov 17, 2020 at 8:09 AM
Subject: Job Manager is taking very long time to finalize the Checkpointing.
To: mailto:d...@flink.apache.org>>


Hi Devs,
I am very new to the Flink code base and working on the evaluation of  the 
Checkpointing strategy

In my current setup I am using an NFS based file system as a checkpoint store. 
(NAS/NFS has a very high TP over 2GB/s on one node and I am using 12 NFS 
servers )
When pushing the system to some relatively medium scale aka 120 subtasks over 6 
works with a total state of 100GB.
I observe that the Job manager takes over 2 minutes to finalize the checkpoint. 
(observed on the UI and CPU profiling of JM see the flame graph of 30 second 
sample)
As you can see by the attached Flames graphs the JM is very busy serializing 
the metadata 
(>org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.serializeOperatorState
 (2,875 samples, 99.65%))
Now the question is why this metadata file is so big in the order of 3GBs in my 
case.
How does this size scale ? num_of_tasks * num_states ?

/opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/chk-1
bash-4.2$ ls -all -h
-rw-r--r-- 1 flink flink 3.0G Nov 17 01:42 _metadata

The second question how to better measure the time taken by the JM to commit 
the transaction aka time_done_checkpoint - time_got_all_ask_form_tm
Is there a config flag I am missing to make this last step faster ?

My current configs for Checkpoints
state.backend: rocksdb
# See the PV mount path need to be the same as  
state.checkpoints.dir: file:///opt/flink/pv/checkpoints
state.savepoints.dir: file:///opt/flink/pv/savepoints
state.backend.incremental: true
# 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing
execution.checkpointing.interval: 6
execution.checkpointing.mode: AT_LEAST_ONCE
# hitting The rpc invocation size 19598830 exceeds the maximum akka
akka.framesize: 100485760b
# 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#heartbeat-timeout
heartbeat.timeout: 7
# 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout
execution.checkpointing.timeout: 15minutes


some metadata about the checkpoint
{"@class":"completed","id":1,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1605315046120,"latest_ack_timestamp":1605315093466,"state_size":12239786229,"end_to_end_duration":47346,"alignment_buffered":0,"num_subtasks":120,"num_acknowledged_subtasks":120,"tasks":{},"external_path":"file:/opt/flink/pv/checkpoints/7474752476036c14d7fdeb4e86af3638/chk-1"}


Re: Flink checkpointing state

2020-10-29 Thread Yun Tang
Hi

Added Yang Wang who mainly develops this feature, I think he could provide more 
information.

Best
Yun Tang

From: Boris Lublinsky 
Sent: Tuesday, October 27, 2020 22:57
To: Yun Tang 
Cc: user 
Subject: Re: Flink checkpointing state

Thanks Yun,
This refers to Flip144 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink<https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink>
Flip contains 2 parts - leader election and HA information persistence and 
offers two options.
Can you tell us what exactly will be part of 1.12.
We would be happy with second option for now, if its faster to implement.


On Oct 27, 2020, at 1:11 AM, Yun Tang 
mailto:myas...@live.com>> wrote:

Hi Boris

Please refer to FLINK-12884[1] for current progress of native HA support of k8s 
which targets for release-1.12.

[1] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang


From: Boris Lublinsky 
mailto:boris.lublin...@lightbend.com>>
Sent: Tuesday, October 27, 2020 2:56
To: user mailto:user@flink.apache.org>>
Subject: Flink checkpointing state

This is from Flink 1.8:

"Job Manager keeps some state related to checkpointing in it’s memory. This 
state would be lost on Job Manager crashes, which is why this state is 
persisted in ZooKeeper. This means that even though there is no real need for 
the leader election and -discovery part of Flink’s HA mode (as is this handled 
natively by Kubernetes), it still needs to be enabled just for storing the 
checkpoint state.”

Was it ever fixed in Flink 1.10 or 1.11? If running Flink on K8, without HA, 
there is no Zookeeper. And if the above is still the case, then checkpointing 
will never pick up the right one



  1   2   3   4   >