Re: Keytab Setup on Kubernetes

2023-09-06 Thread Chirag Dewan via user
 Thanks Greg, this is a really helpful reply. 
>Any kind of Kerberos usage is starting with a "create a KDC server in your 
>environment". That sever must be set.
When I say that I am kind of referring to Windows users with inbuild KDC and 
AD. That would require kinit for the AS. I was wondering how that would  work 
inside the pod. 

My understanding of Kerberos isnt sound yet, so it maybe a very stupid question 
:)

This link is for Flink 1.19. Are the DTs also applicable in Flink 1.16? 
Thanks

On Tuesday, 5 September, 2023 at 07:15:07 pm IST, Gabor Somogyi 
 wrote:  
 
 hi Chirag,
Flink now supports 2 ways to have TGT which is a Kerberos ticket and has 
nothing to do with the "until 7 days renewable" HDFS TGS ticket (with default 
config).
* Keytab: if one mounts a keytab for at least the JobManager pod then it can 
create TGT infinitely (or until the user's password is not changed).* Kerberos 
ticket cache: Such case the user's responsibility to create/renew TGT for 
example with kinit command. If the user is not creating or forgetting to 
re-obtain the TGT then the Flink workload is going to fail.
When the JM has a valid TGT then it can ask token providers to obtain a service 
specific token TGS. In case of HDFS with default config this can be renewed for 
7 days within every 24 hours.When the HDFS side configured 7days is gone then 
TGS must be re-obtained with a valid TGT.
So this is basically how it works.
> what should be the KDCDon't understand the question at all. Any kind of 
>Kerberos usage is starting with a "create a KDC server in your environment". 
>That sever must be set.

> I understand that Hadoop automatically renews the TGT using keytab.Most of 
>the time it does but not all the times. In Flink's delegation token model it 
>doesn't really matter what Hadoop does, Flink does everything which is 
>required when we speak about token based authentication.
> Does this require us to remount the keytab file again?That said until the 
>user's password is not changed or the user is not deactivated no remount is 
>needed. When we speak about keytab in an oversimplified and not exact way one 
>can consider it as a plaintext password file plus some magic.
As a general advise please have a look at the following doc: 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-delegation-token/
G

On Tue, Sep 5, 2023 at 1:31 PM Chirag Dewan via user  
wrote:

Hi,
I am trying to use the FileSource to collect files from HDFS. The HDFS cluster 
is secured and has Kerberos enabled.
My Flink cluster runs on Kubernetes (not using the Flink operator) with 2 Job 
Managers in HA and 3 Task Managers. I wanted to understand the correct way to 
configure the keytab files for this cluster.
Should I just mount the keytab file in all pods and use the 
"security.kerberos.login.keytab" config?
For such a setup, what should be the KDC and how do we login to the KDC using 
kinit?
I understand that Hadoop automatically renews the TGT using keytab. But this 
should only be possible for the renewable lifetime of the ticket (typically 7 
days). Does this require us to remount the keytab file again?
Any insights will be helpful here. Thanks

  

Async IO metrics for tps

2023-09-06 Thread patricia lee
Hi flink users,

I used Async IO (RichAsyncFunction) for sending 100 txns to a 3rd party.

I check the runtimeContex that it has metric of numRecordsSent, we wanted
to expose this metric to our prometheus server so that we can monitor how
much records we are sending per second. The reason why we need to monitor,
is because there is additional cost for every additional txn exceeding the
allowed tps for us to send.


Any input would be appreciated.


Regards,
Patricia


RE: Re: How to read flinkSQL job state

2023-09-06 Thread Yifan He via user
Hi Hangxiang,

Thanks for your answer! We are using RocksDB state backend, and the
incremental checkpoint is enabled, and it is the incremental size keeps
increasing. We didn't add any custom checkpoint configuration in flink sql
jobs, where can I see the log of
StreamGraphHasherV2.generateDeterministicHash? And is there a default state
name?

Thanks,
Yifan

On 2023/09/06 07:12:05 Hangxiang Yu wrote:
> Hi, Yifan.
> Unfortunately, The State Processor API only supports Datastream currently.
> But you still could use it to read your SQL job state.
> The most difficult thing is that you have to get the operator id which you
> could get from the log of StreamGraphHasherV2.generateDeterministicHash
and
> state name which you could get from the code of operator.
>
> BTW, About investigating why the checkpoint size keeps growing:
> 1. Which State Backend are you using ?
> 2. Are you enabling incremental checkpoint ? The checkpoint size you
> mentioned is incremental size or full size ?
> 3. If full size, Did you evaluate whether the size is matching the
> theoretical size ?
>
>
> On Wed, Sep 6, 2023 at 1:11 PM Yifan He via user 
> wrote:
>
> > Hi team,
> >
> > We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> > growing and we want to look into the checkpoint file to know what is
> > causing the problem. I know we can use the state processor api to read
the
> > state of jobs using datastream api, but how can I read the state of jobs
> > using table api & sql?
> >
> > Thanks,
> > Yifan
> >
>
>
> --
> Best,
> Hangxiang.
>


RE: Re: How to read flinkSQL job state

2023-09-06 Thread Yifan He via user
Hi Shammon,

We are using RocksDB,and the configuration is below:
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.snapshot-compression: true
execution.checkpointing.timeout: 6
state.backend: FILESYSTEM
state.backend.incremental: true
state.backend.local-recovery: true
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.5
state.backend.rocksdb.predefined-options: DEFAULT
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.checkpoints.num-retained: 3

Thanks,
Yifan

On 2023/09/06 08:00:31 Shammon FY wrote:
> Hi Yifan,
>
> Besides reading job state, I would like to know what statebackend are you
> using? Can you give the configurations about state and checkpoint for your
> job? Maybe you can check these configuration items to confirm if they are
> correct first.
>
> Best,
> Shammon FY
>
> On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan  wrote:
>
> > Hi, Yifan.
> >
> > I think the document[1] means to let us convert the DataStream to the
> > Table[2]. Then we could handle the state with the Table API & SQL.
> >
> > Best,
> > Hang
> >
> > [1]
> >
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
> > [2]
> >
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
> >
> > Yifan He via user  于2023年9月6日周三 13:10写道:
> >
> >> Hi team,
> >>
> >> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> >> growing and we want to look into the checkpoint file to know what is
> >> causing the problem. I know we can use the state processor api to read
the
> >> state of jobs using datastream api, but how can I read the state of
jobs
> >> using table api & sql?
> >>
> >> Thanks,
> >> Yifan
> >>
> >
>


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
This is our yarn related settings:

yarn.scheduler.fair.assignmultiple: "true"
yarn.scheduler.fair.dynamic.max.assign: "false"
yarn.scheduler.fair.max.assign: 1

any suggestions?

Best
Lu

On Wed, Sep 6, 2023 at 9:16 AM Lu Niu  wrote:

> Hi, Thanks for all your help. Are there any other insights?
>
>
> Best
> Lu
>
> On Wed, Aug 30, 2023 at 11:29 AM Lu Niu  wrote:
>
>> No. we don't use yarn.taskmanager.node-label
>>
>> Best
>> Lu
>>
>> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:
>>
>>> Maybe you can check if you have set yarn.taskmanager.node-label for some
>>> flink jobs?
>>>
>>> Best,
>>> Biao Geng
>>>
>>> 发送自 Outlook for iOS 
>>> --
>>> *发件人:* Chen Zhanghao 
>>> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
>>> *收件人:* Lu Niu ; Weihua Hu 
>>> *抄送:* Kenan Kılıçtepe ; user <
>>> user@flink.apache.org>
>>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>>
>>> CCing @Weihua Hu  , who is an expert on this.
>>> Do you have any ideas on the phenomenon here?
>>>
>>> Best,
>>> Zhanghao Chen
>>> --
>>> *From:* Lu Niu 
>>> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
>>> *To:* Chen Zhanghao 
>>> *Cc:* Kenan Kılıçtepe ; user <
>>> user@flink.apache.org>
>>> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>>>
>>> Thanks for your reply.
>>>
>>> The interesting fact is that we also managed spark on yarn. However.
>>> Only the flink cluster is having the issue. I am wondering whether there is
>>> a difference in the implementation on flink side.
>>>
>>> Best
>>> Lu
>>>
>>> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
>>> wrote:
>>>
>>> Hi Lu Niu,
>>>
>>> TM distribution on YARN nodes is managed by YARN RM and is out of the
>>> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
>>> even distribution of tasks among Flink TMs, and has nothing to do with your
>>> concerns. Also, the config currently only supports Standalone mode Flink
>>> clusters, and does not take effect on a Flink cluster on YARN.
>>>
>>> Best,
>>> Zhanghao Chen
>>> --
>>> *发件人:* Lu Niu 
>>> *发送时间:* 2023年8月29日 4:30
>>> *收件人:* Kenan Kılıçtepe 
>>> *抄送:* user 
>>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>>
>>> Thanks for the reply. We've already set cluster.evenly-spread-out-slots
>>> = true
>>>
>>> Best
>>> Lu
>>>
>>> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
>>> wrote:
>>>
>>> Have you checked config param cluster.evenly-spread-out-slots ?
>>>
>>>
>>> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>>>
>>> Hi, Flink users
>>>
>>> We have recently observed that the allocation of Flink TaskManagers in
>>> our YARN cluster is not evenly distributed. We would like to hear your
>>> thoughts on this matter.
>>>
>>> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
>>> 2. The uneven distribution is that out of a 370-node YARN cluster, there
>>> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
>>> than 10 vCores available.
>>>
>>> Is such behavior expected? If not, is there a fix provided in Flink?
>>> Thanks!
>>>
>>> Best
>>> Lu
>>>
>>>


Re: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

2023-09-06 Thread Teunissen, F.G.J. (Fred) via user
Thanks for the insight, it did help indeed.

I’ve added `org/apache/calcite**` and `org/apache/flink**` to the exclude-list 
in the `prepare-agent` goal of the jacoco-maven-plugin and that did the trick.

Regards,
Fred

From: Aniket Sule 
Date: Wednesday, 6 September 2023 at 21:07
To: Teunissen, F.G.J. (Fred) , user@flink.apache.org 

Subject: RE: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1
Hello,
You could look at https://github.com/hazelcast/hazelcast/issues/20945 to see if 
the workaround in the linked commit helps.
I had faced test failures in upgrading from Flink 1.16 to 1.17, and that 
workaround resolved the test failures.

Hope this helps.

Regards
Aniket

From: Teunissen, F.G.J. (Fred) via user 
Sent: Wednesday, September 6, 2023 6:45 AM
To: user@flink.apache.org
Subject: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

CAUTION:External email. Do not click or open attachments unless you know and 
trust the sender.

Hi community,

I would like to ask for some help in solving a strange failure in a Unit Test 
when code coverage (jacoco) is enabled.

We have a project with a custom UDF that uses the MiniClusterExtension in a 
Unit Test. The Unit Test works fine when built for Flink v1.16.2, but it fails 
when built for Flink v1.17.1.

The strange part is that it only fails when code coverage is enabled. When code 
coverage is skipped, the test succeeds.

The following exception is thrown when it fails:
java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
at 
java.base/java.util.AbstractList.subListRangeCheck(AbstractList.java:509)
at java.base/java.util.AbstractList.subList(AbstractList.java:497)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.safeArgList(CacheGeneratorUtil.java:213)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.cacheKeyBlock(CacheGeneratorUtil.java:205)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil.cachedMethod(CacheGeneratorUtil.java:68)
at 
org.apache.calcite.rel.metadata.janino.RelMetadataHandlerGeneratorUtil.generateHandler(RelMetadataHandlerGeneratorUtil.java:121)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:138)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:565)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:428)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:130)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:208)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:195)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at 

RE: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

2023-09-06 Thread Aniket Sule
Hello,
You could look at https://github.com/hazelcast/hazelcast/issues/20945 to see if 
the workaround in the linked commit helps.
I had faced test failures in upgrading from Flink 1.16 to 1.17, and that 
workaround resolved the test failures.

Hope this helps.

Regards
Aniket

From: Teunissen, F.G.J. (Fred) via user 
Sent: Wednesday, September 6, 2023 6:45 AM
To: user@flink.apache.org
Subject: Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

CAUTION:External email. Do not click or open attachments unless you know and 
trust the sender.

Hi community,

I would like to ask for some help in solving a strange failure in a Unit Test 
when code coverage (jacoco) is enabled.

We have a project with a custom UDF that uses the MiniClusterExtension in a 
Unit Test. The Unit Test works fine when built for Flink v1.16.2, but it fails 
when built for Flink v1.17.1.

The strange part is that it only fails when code coverage is enabled. When code 
coverage is skipped, the test succeeds.

The following exception is thrown when it fails:
java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
at 
java.base/java.util.AbstractList.subListRangeCheck(AbstractList.java:509)
at java.base/java.util.AbstractList.subList(AbstractList.java:497)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.safeArgList(CacheGeneratorUtil.java:213)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.cacheKeyBlock(CacheGeneratorUtil.java:205)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil.cachedMethod(CacheGeneratorUtil.java:68)
at 
org.apache.calcite.rel.metadata.janino.RelMetadataHandlerGeneratorUtil.generateHandler(RelMetadataHandlerGeneratorUtil.java:121)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:138)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:565)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:428)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:130)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:208)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:195)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at 

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Thanks, I'll check it out!

Best
Lu

On Wed, Sep 6, 2023 at 10:09 AM Biao Geng  wrote:

> Hi,
>
>
>
> If your YARN cluster uses fair scheduler, maybe you can check if the
> yarn.scheduler.fair.assignmultiple
> 
> config is set. If that’s the case, then adjusting
> yarn.scheduler.fair.dynamic.max.assign and yarn.scheduler.fair.max.assign
> could be helpful. Also, AFAIK, flink does not exert extra control of
> distribution of yarn apps on different nodes. The key diff between flink
> and spark is that most flink jobs are unbounded while spark jobs are
> bounded. It is possible that under same YARN scheduling strategy, the final
> distribution of apps after some time is different.
>
>
>
> Best,
>
> Biao Geng
>
>
>
> *From: *Lu Niu 
> *Date: *Thursday, September 7, 2023 at 12:17 AM
> *To: *Geng Biao 
> *Cc: *Chen Zhanghao , Weihua Hu <
> huweihua@gmail.com>, Kenan Kılıçtepe , user <
> user@flink.apache.org>
> *Subject: *Re: Uneven TM Distribution of Flink on YARN
>
> Hi, Thanks for all your help. Are there any other insights?
>
>
>
>
>
> Best
>
> Lu
>
>
>
> On Wed, Aug 30, 2023 at 11:29 AM Lu Niu  wrote:
>
> No. we don't use yarn.taskmanager.node-label
>
>
>
> Best
>
> Lu
>
>
>
> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:
>
> Maybe you can check if you have set yarn.taskmanager.node-label for some
> flink jobs?
>
>
>
> Best,
>
> Biao Geng
>
>
>
> 发送自 Outlook for iOS 
> --
>
> *发件人**:* Chen Zhanghao 
> *发送时间**:* Tuesday, August 29, 2023 12:14:53 PM
> *收件人**:* Lu Niu ; Weihua Hu 
> *抄送**:* Kenan Kılıçtepe ; user <
> user@flink.apache.org>
> *主题**:* Re: Uneven TM Distribution of Flink on YARN
>
>
>
> CCing @Weihua Hu  , who is an expert on this. Do
> you have any ideas on the phenomenon here?
>
>
>
> Best,
>
> Zhanghao Chen
> --
>
> *From:* Lu Niu 
> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
> *To:* Chen Zhanghao 
> *Cc:* Kenan Kılıçtepe ; user 
> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>
>
>
> Thanks for your reply.
>
>
>
> The interesting fact is that we also managed spark on yarn. However. Only
> the flink cluster is having the issue. I am wondering whether there is a
> difference in the implementation on flink side.
>
>
>
> Best
>
> Lu
>
>
>
> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
> wrote:
>
> Hi Lu Niu,
>
>
>
> TM distribution on YARN nodes is managed by YARN RM and is out of the
> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
> even distribution of tasks among Flink TMs, and has nothing to do with your
> concerns. Also, the config currently only supports Standalone mode Flink
> clusters, and does not take effect on a Flink cluster on YARN.
>
>
>
> Best,
>
> Zhanghao Chen
> --
>
> *发件人**:* Lu Niu 
> *发送时间**:* 2023年8月29日 4:30
> *收件人**:* Kenan Kılıçtepe 
> *抄送**:* user 
> *主题**:* Re: Uneven TM Distribution of Flink on YARN
>
>
>
> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
> true
>
>
>
> Best
>
> Lu
>
>
>
> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
> wrote:
>
> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
>
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>
> Hi, Flink users
>
>
>
> We have recently observed that the allocation of Flink TaskManagers in our
> YARN cluster is not evenly distributed. We would like to hear your thoughts
> on this matter.
>
> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
> 2. The uneven distribution is that out of a 370-node YARN cluster, there
> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
> than 10 vCores available.
>
>
>
> Is such behavior expected? If not, is there a fix provided in Flink?
> Thanks!
>
>
>
> Best
>
> Lu
>
>


Re: Disabling Job Submissions through Flink UI

2023-09-06 Thread Muazim Wani
Hi Team,
I came across this thread Jar Not Found
. In
which it is discussed that
"If you configure `web.submit.enable=false` in your cluster, you could not
upload a jar job, but you can still submit jobs via rest endpoint. You can
create your `RestClusterClient` to do that or use the existing jdbc-driver
and sql-gateway.".
It would be helpful for me if you could provide more information on  how I
could achieve it using *RestClusterClient*. Thanks and Regards Muazim Wani

On Tue, 5 Sept 2023 at 22:01, Muazim Wani  wrote:

> Hello Team,
>
> I hope this email finds you well. I've been working on disabling job
> submissions from the Flink UI, and I am facing an issue with the
> configuration. Specifically, I've set web.submit.enable = false to
> achieve this. However, it seems that this configuration change is
> preventing the Flink Kubernetes Operator from submitting jobs to the
> JobManager.
>
> According to the documentation, disabling the web.submit.enable flag
> should still allow job submissions via REST APIs. However, when I disable
> the flag, I'm encountering an error message that says *"[Not found:
> /v1/jars/upload].*" (Also here I feel this "v1" is getting appended in
> path somehow) Here's the corresponding log entry:
>
> *Status | Error | UPGRADING |
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Not found:
> /v1/jars/upload]","throwableList":[{"type":"org.apache.flink.runtime.rest.util.RestClientException","message":"[Not
> found: /v1/jars/upload]","additionalMetadata":{"httpResponseCode":404}}]}*
>
> I've also come across a Jira issue, FLINK-26808
> ,
> which mentions that disabling web submission disables all handlers related
> to web submission. This statement appears to contradict the information in
> the Flink documentation.
>
> Could you please provide some guidance or insights on how to correctly
> configure Flink to disable UI job submissions while allowing submissions
> via REST APIs? Any assistance on this matter would be greatly appreciated.
>
> Thank you for your help.
>
> Best regards, Muazim Wani
>


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Biao Geng
Hi,

If your YARN cluster uses fair scheduler, maybe you can check if the 
yarn.scheduler.fair.assignmultiple
 config is set. If that’s the case, then adjusting 
yarn.scheduler.fair.dynamic.max.assign and yarn.scheduler.fair.max.assign could 
be helpful. Also, AFAIK, flink does not exert extra control of distribution of 
yarn apps on different nodes. The key diff between flink and spark is that most 
flink jobs are unbounded while spark jobs are bounded. It is possible that 
under same YARN scheduling strategy, the final distribution of apps after some 
time is different.

Best,
Biao Geng

From: Lu Niu 
Date: Thursday, September 7, 2023 at 12:17 AM
To: Geng Biao 
Cc: Chen Zhanghao , Weihua Hu 
, Kenan Kılıçtepe , user 

Subject: Re: Uneven TM Distribution of Flink on YARN
Hi, Thanks for all your help. Are there any other insights?


Best
Lu

On Wed, Aug 30, 2023 at 11:29 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
No. we don't use yarn.taskmanager.node-label

Best
Lu

On Tue, Aug 29, 2023 at 12:17 AM Geng Biao 
mailto:biaoge...@gmail.com>> wrote:
Maybe you can check if you have set yarn.taskmanager.node-label for some flink 
jobs?

Best,
Biao Geng

发送自 Outlook for iOS

发件人: Chen Zhanghao mailto:zhanghao.c...@outlook.com>>
发送时间: Tuesday, August 29, 2023 12:14:53 PM
收件人: Lu Niu mailto:qqib...@gmail.com>>; Weihua Hu 
mailto:huweihua@gmail.com>>
抄送: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>; user 
mailto:user@flink.apache.org>>
主题: Re: Uneven TM Distribution of Flink on YARN

CCing @Weihua Hu , who is an expert on this. Do 
you have any ideas on the phenomenon here?

Best,
Zhanghao Chen

From: Lu Niu mailto:qqib...@gmail.com>>
Sent: Tuesday, August 29, 2023 12:11:35 PM
To: Chen Zhanghao mailto:zhanghao.c...@outlook.com>>
Cc: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Uneven TM Distribution of Flink on YARN

Thanks for your reply.

The interesting fact is that we also managed spark on yarn. However. Only the 
flink cluster is having the issue. I am wondering whether there is a difference 
in the implementation on flink side.

Best
Lu

On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Lu Niu,

TM distribution on YARN nodes is managed by YARN RM and is out of the scope of 
Flink. On the other hand, cluster.evenly-spread-out-slots forces even 
distribution of tasks among Flink TMs, and has nothing to do with your 
concerns. Also, the config currently only supports Standalone mode Flink 
clusters, and does not take effect on a Flink cluster on YARN.

Best,
Zhanghao Chen

发件人: Lu Niu mailto:qqib...@gmail.com>>
发送时间: 2023年8月29日 4:30
收件人: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: Uneven TM Distribution of Flink on YARN

Thanks for the reply. We've already set cluster.evenly-spread-out-slots = true

Best
Lu

On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
mailto:kkilict...@gmail.com>> wrote:
Have you checked config param cluster.evenly-spread-out-slots ?


On Mon, Aug 28, 2023 at 10:31 PM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Flink users

We have recently observed that the allocation of Flink TaskManagers in our YARN 
cluster is not evenly distributed. We would like to hear your thoughts on this 
matter.

1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a 370-node YARN cluster, there are 16 
nodes with either 0 or 1 vCore available, while 110 nodes have more than 10 
vCores available.

Is such behavior expected? If not, is there a fix provided in Flink? Thanks!

Best
Lu


Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Hi, Thanks for all your help. Are there any other insights?


Best
Lu

On Wed, Aug 30, 2023 at 11:29 AM Lu Niu  wrote:

> No. we don't use yarn.taskmanager.node-label
>
> Best
> Lu
>
> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao  wrote:
>
>> Maybe you can check if you have set yarn.taskmanager.node-label for some
>> flink jobs?
>>
>> Best,
>> Biao Geng
>>
>> 发送自 Outlook for iOS 
>> --
>> *发件人:* Chen Zhanghao 
>> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
>> *收件人:* Lu Niu ; Weihua Hu 
>> *抄送:* Kenan Kılıçtepe ; user > >
>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>
>> CCing @Weihua Hu  , who is an expert on this. Do
>> you have any ideas on the phenomenon here?
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Lu Niu 
>> *Sent:* Tuesday, August 29, 2023 12:11:35 PM
>> *To:* Chen Zhanghao 
>> *Cc:* Kenan Kılıçtepe ; user > >
>> *Subject:* Re: Uneven TM Distribution of Flink on YARN
>>
>> Thanks for your reply.
>>
>> The interesting fact is that we also managed spark on yarn. However. Only
>> the flink cluster is having the issue. I am wondering whether there is a
>> difference in the implementation on flink side.
>>
>> Best
>> Lu
>>
>> On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao 
>> wrote:
>>
>> Hi Lu Niu,
>>
>> TM distribution on YARN nodes is managed by YARN RM and is out of the
>> scope of Flink. On the other hand, cluster.evenly-spread-out-slots forces
>> even distribution of tasks among Flink TMs, and has nothing to do with your
>> concerns. Also, the config currently only supports Standalone mode Flink
>> clusters, and does not take effect on a Flink cluster on YARN.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *发件人:* Lu Niu 
>> *发送时间:* 2023年8月29日 4:30
>> *收件人:* Kenan Kılıçtepe 
>> *抄送:* user 
>> *主题:* Re: Uneven TM Distribution of Flink on YARN
>>
>> Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
>> true
>>
>> Best
>> Lu
>>
>> On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe 
>> wrote:
>>
>> Have you checked config param cluster.evenly-spread-out-slots ?
>>
>>
>> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu  wrote:
>>
>> Hi, Flink users
>>
>> We have recently observed that the allocation of Flink TaskManagers in
>> our YARN cluster is not evenly distributed. We would like to hear your
>> thoughts on this matter.
>>
>> 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
>> 2. The uneven distribution is that out of a 370-node YARN cluster, there
>> are 16 nodes with either 0 or 1 vCore available, while 110 nodes have more
>> than 10 vCores available.
>>
>> Is such behavior expected? If not, is there a fix provided in Flink?
>> Thanks!
>>
>> Best
>> Lu
>>
>>


Re: Memory Leak

2023-09-06 Thread Neha . via user
We also faced the same issue with Flink 1.16.1. Please enable jemalloc as a
memory allocator, it fixed the issue for us.

On Wed, Sep 6, 2023 at 9:07 PM Kenan Kılıçtepe  wrote:

> Hi,
> Thanks for the answer.
> I will try the documents you have shared.
> But still it would be great if you can take a look at the numbers below
> and give some tips.
>
>
> At the moment RSS is 46.6GB although taskmanager.memory.process.size is
> set to 4m
>
> GC Statistics:
> 2023-09-06 15:15:03,785 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Memory
> usage stats: [HEAP: 3703/18208/18208 MB, NON HEAP: 154/175/744 MB
> (used/committed/max)]
> 2023-09-06 15:15:03,785 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Direct
> memory stats: Count: 33620, Total Capacity: 1102003811, Used Memory:
> 1102003812
> 2023-09-06 15:15:03,785 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Off-heap
> pool stats: [CodeHeap 'non-nmethods': 1/3/7 MB (used/committed/max)],
> [Metaspace: 87/99/256 MB (used/committed/max)], [CodeHeap 'profiled
> nmethods': 32/35/116 MB (used/committed/max)], [Compressed Class Space:
> 10/14/248 MB (used/committed/max)], [CodeHeap 'non-profiled nmethods':
> 21/22/116 MB (used/committed/max)]
> 2023-09-06 15:15:03,785 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Garbage
> collector stats: [G1 Young Generation, GC TIME (ms): 30452, GC COUNT: 351],
> [G1 Old Generation, GC TIME (ms): 0, GC COUNT: 0]
>
> my configuration:
>
> INFO  [] - Final TaskExecutor Memory configuration:
> INFO  [] -   Total Process Memory:  39.063gb (4194304 bytes)
> INFO  [] - Total Flink Memory:  37.813gb (40600862720 bytes)
> INFO  [] -   Total JVM Heap Memory: 17.781gb (19092471808 bytes)
> INFO  [] - Framework:   128.000mb (134217728 bytes)
> INFO  [] - Task:17.656gb (18958254080 bytes)
> INFO  [] -   Total Off-heap Memory: 20.031gb (21508390912 bytes)
> INFO  [] - Managed: 18.906gb (20300431360 bytes)
> INFO  [] - Total JVM Direct Memory: 1.125gb (1207959552 bytes)
> INFO  [] -   Framework: 128.000mb (134217728 bytes)
> INFO  [] -   Task:  0 bytes
> INFO  [] -   Network:   1024.000mb (1073741824 bytes)
> INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
> INFO  [] - JVM Overhead:1024.000mb (1073741824 bytes)
>
> jcmd output:
>
> ubuntu@dzs-tef-test-01:~/flink/log$ jcmd 1035173  VM.native_memory summary
> 1035173:
>
> Native Memory Tracking:
>
> Total: reserved=23615554KB, committed=21049538KB
> - Java Heap (reserved=18644992KB, committed=18644992KB)
> (mmap: reserved=18644992KB,
> committed=18644992KB)
>
> - Class (reserved=347038KB, committed=106970KB)
> (classes #15959)
> (  instance classes #15140, array classes #819)
> (malloc=5022KB #72815)
> (mmap: reserved=342016KB, committed=101948KB)
> (  Metadata:   )
> (reserved=88064KB, committed=86948KB)
> (used=79128KB)
> (free=7820KB)
> (waste=0KB =0.00%)
> (  Class space:)
> (reserved=253952KB, committed=15000KB)
> (used=11278KB)
> (free=3722KB)
> (waste=0KB =0.00%)
>
> -Thread (reserved=2404259KB, committed=262791KB)
> (thread #2328)
> (stack: reserved=2393052KB, committed=251584KB)
> (malloc=8481KB #13970)
> (arena=2726KB #4654)
>
> -  Code (reserved=252334KB, committed=67866KB)
> (malloc=4650KB #21507)
> (mmap: reserved=247684KB, committed=63216KB)
>
> -GC (reserved=800181KB, committed=800181KB)
> (malloc=74637KB #63221)
> (mmap: reserved=725544KB, committed=725544KB)
>
> -  Compiler (reserved=20432KB, committed=20432KB)
> (malloc=20300KB #8557)
> (arena=133KB #5)
>
> -  Internal (reserved=21883KB, committed=21871KB)
> (malloc=21839KB #29146)
> (mmap: reserved=44KB, committed=32KB)
>
> - Other (reserved=1082212KB, committed=1082212KB)
> (malloc=1082212KB #34463)
>
> -Symbol 

Re: Memory Leak

2023-09-06 Thread Kenan Kılıçtepe
Hi,
Thanks for the answer.
I will try the documents you have shared.
But still it would be great if you can take a look at the numbers below and
give some tips.


At the moment RSS is 46.6GB although taskmanager.memory.process.size is set
to 4m

GC Statistics:
2023-09-06 15:15:03,785 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Memory
usage stats: [HEAP: 3703/18208/18208 MB, NON HEAP: 154/175/744 MB
(used/committed/max)]
2023-09-06 15:15:03,785 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Direct
memory stats: Count: 33620, Total Capacity: 1102003811, Used Memory:
1102003812
2023-09-06 15:15:03,785 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Off-heap
pool stats: [CodeHeap 'non-nmethods': 1/3/7 MB (used/committed/max)],
[Metaspace: 87/99/256 MB (used/committed/max)], [CodeHeap 'profiled
nmethods': 32/35/116 MB (used/committed/max)], [Compressed Class Space:
10/14/248 MB (used/committed/max)], [CodeHeap 'non-profiled nmethods':
21/22/116 MB (used/committed/max)]
2023-09-06 15:15:03,785 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 30452, GC COUNT: 351],
[G1 Old Generation, GC TIME (ms): 0, GC COUNT: 0]

my configuration:

INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:  39.063gb (4194304 bytes)
INFO  [] - Total Flink Memory:  37.813gb (40600862720 bytes)
INFO  [] -   Total JVM Heap Memory: 17.781gb (19092471808 bytes)
INFO  [] - Framework:   128.000mb (134217728 bytes)
INFO  [] - Task:17.656gb (18958254080 bytes)
INFO  [] -   Total Off-heap Memory: 20.031gb (21508390912 bytes)
INFO  [] - Managed: 18.906gb (20300431360 bytes)
INFO  [] - Total JVM Direct Memory: 1.125gb (1207959552 bytes)
INFO  [] -   Framework: 128.000mb (134217728 bytes)
INFO  [] -   Task:  0 bytes
INFO  [] -   Network:   1024.000mb (1073741824 bytes)
INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:1024.000mb (1073741824 bytes)

jcmd output:

ubuntu@dzs-tef-test-01:~/flink/log$ jcmd 1035173  VM.native_memory summary
1035173:

Native Memory Tracking:

Total: reserved=23615554KB, committed=21049538KB
- Java Heap (reserved=18644992KB, committed=18644992KB)
(mmap: reserved=18644992KB,
committed=18644992KB)

- Class (reserved=347038KB, committed=106970KB)
(classes #15959)
(  instance classes #15140, array classes #819)
(malloc=5022KB #72815)
(mmap: reserved=342016KB, committed=101948KB)
(  Metadata:   )
(reserved=88064KB, committed=86948KB)
(used=79128KB)
(free=7820KB)
(waste=0KB =0.00%)
(  Class space:)
(reserved=253952KB, committed=15000KB)
(used=11278KB)
(free=3722KB)
(waste=0KB =0.00%)

-Thread (reserved=2404259KB, committed=262791KB)
(thread #2328)
(stack: reserved=2393052KB, committed=251584KB)
(malloc=8481KB #13970)
(arena=2726KB #4654)

-  Code (reserved=252334KB, committed=67866KB)
(malloc=4650KB #21507)
(mmap: reserved=247684KB, committed=63216KB)

-GC (reserved=800181KB, committed=800181KB)
(malloc=74637KB #63221)
(mmap: reserved=725544KB, committed=725544KB)

-  Compiler (reserved=20432KB, committed=20432KB)
(malloc=20300KB #8557)
(arena=133KB #5)

-  Internal (reserved=21883KB, committed=21871KB)
(malloc=21839KB #29146)
(mmap: reserved=44KB, committed=32KB)

- Other (reserved=1082212KB, committed=1082212KB)
(malloc=1082212KB #34463)

-Symbol (reserved=17581KB, committed=17581KB)
(malloc=16678KB #187368)
(arena=903KB #1)

-Native Memory Tracking (reserved=9173KB, committed=9173KB)
(malloc=1656KB #23012)
(tracking overhead=7517KB)

-Shared class space (reserved=10904KB, committed=10904KB)
  

Flink KafkaSource failure on empty partitions

2023-09-06 Thread David Clutter
I am using Flink 1.13.1 on AWS EMR and I seem to have hit this bug:
https://issues.apache.org/jira/browse/FLINK-27041.  My job will fail when
there are empty partitions.  I see it is fixed in a newer version of Flink
but I cannot update Flink version at this time.

Suggestions on a workaround?  I am looking at trying to subclass the
KafkaPartitionSplitReader to include the changes from the PR
https://github.com/apache/flink/pull/19456/files.  But it is not intended
to be subclassed.


Re: Memory Leak

2023-09-06 Thread Biao Geng
Hi Kenan,
If you have confirmed the heap memory is ok(e.g. no Java OOM exception and
no frequent GC), then the cause may be off-heap memory over usage,
especially when your flink job uses some native library.
To diagnose such problem, you can refer to [1][2] for more details about
using NMT and jeprof.

[1]
https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf
[2] https://www.evanjones.ca/java-native-leak-bug.html
Best,
Biao Geng

Kenan Kılıçtepe  于2023年9月6日周三 20:32写道:

> Hi,
>
> I have Flink 1.16.2 on a single server with 64GB Ram.
>
> Although  taskmanager.memory.process.size  is set to 4m, I can see
> memory usage of the  task manager exceed 59GB and OS kills it because of
> OOM.
> I check the RSS column of application top for memory usage.
>
> I don`t see any heap memory problem.
>
> taskmanager.memory.process.size: 4m
> taskmanager.memory.managed.fraction: 0.53
> state.backend.rocksdb.memory.managed: true
>
> Any help is appreciated for analyzing the problem.
>
> Thanks
>
>


Re: backpressured metrics doesnt work

2023-09-06 Thread Kenan Kılıçtepe
Hi Ron,
Thanks for your answer.
The problem was with a job in my job graph. As it is locked immediately, no
backpressure metrics were emitted. I think at least once, all jobs should
be in free status.

Kenan


On Wed, Sep 6, 2023 at 12:35 PM liu ron  wrote:

> Hi, Kenan
>
> I think you need to provide more context, maybe help to find the root
> cause.
>
> Best,
> Ron
>
> Kenan Kılıçtepe  于2023年9月4日周一 21:49写道:
>
>> Hi,
>>
>> Any idea why backpressured metrics are not working and how I can fix it?
>>
>> [image: image.png]
>>
>> Thanks
>> Kenan
>>
>>


Memory Leak

2023-09-06 Thread Kenan Kılıçtepe
Hi,

I have Flink 1.16.2 on a single server with 64GB Ram.

Although  taskmanager.memory.process.size  is set to 4m, I can see
memory usage of the  task manager exceed 59GB and OS kills it because of
OOM.
I check the RSS column of application top for memory usage.

I don`t see any heap memory problem.

taskmanager.memory.process.size: 4m
taskmanager.memory.managed.fraction: 0.53
state.backend.rocksdb.memory.managed: true

Any help is appreciated for analyzing the problem.

Thanks


Re: Help needed on stack overflow query

2023-09-06 Thread Feng Jin
Hi Nihar,
Have you tried using the following configuration:

metrics.reporter.my_reporter.filter.includes: jobmanager:*:*;taskmanager:*:*

Please note that the default delimiter for the List parameter in Flink is
";".

Best regards,
Feng

On Thu, Aug 24, 2023 at 11:36 PM Nihar Rao  wrote:

> Hello,
> Creating a new question for this query
>  as I
> am not able to reply to the post.
>
> Can anyone help with the below query
>
>
> https://stackoverflow.com/questions/76949195/how-to-include-multiple-filters-in-filter-includes-parameter-for-my-flink-metric
>
> Thanks
>


Strange exception in UnitTest when migrate Flink v1.16.2 to v1.17.1

2023-09-06 Thread Teunissen, F.G.J. (Fred) via user
Hi community,

I would like to ask for some help in solving a strange failure in a Unit Test 
when code coverage (jacoco) is enabled.

We have a project with a custom UDF that uses the MiniClusterExtension in a 
Unit Test. The Unit Test works fine when built for Flink v1.16.2, but it fails 
when built for Flink v1.17.1.

The strange part is that it only fails when code coverage is enabled. When code 
coverage is skipped, the test succeeds.

The following exception is thrown when it fails:
java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
at 
java.base/java.util.AbstractList.subListRangeCheck(AbstractList.java:509)
at java.base/java.util.AbstractList.subList(AbstractList.java:497)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.safeArgList(CacheGeneratorUtil.java:213)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.cacheKeyBlock(CacheGeneratorUtil.java:205)
at 
org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil.cachedMethod(CacheGeneratorUtil.java:68)
at 
org.apache.calcite.rel.metadata.janino.RelMetadataHandlerGeneratorUtil.generateHandler(RelMetadataHandlerGeneratorUtil.java:121)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:138)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:565)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:428)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:130)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:208)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:195)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at 

Re: How to read flinkSQL job state

2023-09-06 Thread liu ron
Hi, Yifan

Flink SQL & Table API currently doesn't support reading the state directly.

Best,
Ron

Yifan He via user  于2023年9月6日周三 13:11写道:

> Hi team,
>
> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> growing and we want to look into the checkpoint file to know what is
> causing the problem. I know we can use the state processor api to read the
> state of jobs using datastream api, but how can I read the state of jobs
> using table api & sql?
>
> Thanks,
> Yifan
>


Re: backpressured metrics doesnt work

2023-09-06 Thread liu ron
Hi, Kenan

I think you need to provide more context, maybe help to find the root cause.

Best,
Ron

Kenan Kılıçtepe  于2023年9月4日周一 21:49写道:

> Hi,
>
> Any idea why backpressured metrics are not working and how I can fix it?
>
> [image: image.png]
>
> Thanks
> Kenan
>
>


Re: Send data asynchronously to a 3rd party via SinkFunction

2023-09-06 Thread liu ron
Hi, patricia

If you want to use SinkFunction, maybe you should use
`RichSinkFunction`[1], you can close the resources in close method.

[1]
https://github.com/apache/flink/blob/7d8f9821d2b3ed9876eae4ffe2e3c8b86af2d88a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java#L25


Best,
Ron

Feng Jin  于2023年9月1日周五 22:18写道:

> hi, patricia
>
> I suggest using the generic asynchronous base sink.
>
> https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/
>
>
> Best,
> Feng
>
> On Fri, Sep 1, 2023 at 6:07 PM patricia lee  wrote:
>
>>
>> I'd like to ask if there is a way to send data to a vendor (SDK plugin,
>> which is also an HTTP request) asynchronously in flink 1.17?
>>
>> After transformation on the data, I usually collate them as a List to my
>> custom SinkFunction. I initialized a CompleteableFuture inside the invoke()
>> method. However I read about the Async I/O from the documentation but I
>> couldn't figure out how to use it in my use case.
>>
>>
>> How can I close the resources initialized in SinkFunction properly? e.g.
>> the job failed.
>> Is using completableFuture inside SinkFunction a good approach?
>>
>>
>> Regards,
>> Pat
>>
>>


Re: Question regarding asyncIO timeout

2023-09-06 Thread liu ron
Hi, Leon

> Besides that, Do you know if the async timeout is actually a global
timeout? meaning it accounts for the time of each attempt call plus any
interval time in between.

Yes, the timeout is total timeout, you can see [1][2] for more detail.


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
[2]
https://github.com/apache/flink/blob/7d8f9821d2b3ed9876eae4ffe2e3c8b86af2d88a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java#L209

Best,
Ron

Leon Xu  于2023年9月6日周三 12:07写道:

> Hi Ken,
>
> Thanks for the suggestion. Definitely a good call to just wrap the retry
> inside the client code. I'll give it a try.
> Besides that, Do you know if the async timeout is actually a global
> timeout? meaning it accounts for the time of each attempt call plus any
> interval time in between.
> I increase the async timeout and reduce the client timeout and it seems to
> help. But I will continue to monitor.
>
> Leon
>
> On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler 
> wrote:
>
>> Hi Leon,
>>
>> Normally I try to handle retrying in the client being used to call the
>> server, as you have more control/context.
>>
>> If that’s not an option for you, then normally (un)orderedWaitWithRetry()
>> should work - when you say “it doesn’t seem to help much”, are you saying
>> that even with retry you get transient failures that you want to handle
>> better?
>>
>> If so, then you could implement the timeout() method in your
>> AsyncFunction, and complete with a special result that indicates you
>> exceeded the retry count. This would then avoid having the job restart.
>>
>> — Ken
>>
>> PS - note that you can also do something similar inside of the
>> asyncInvoke() method of your AsyncFunction, e.g:
>>
>> @Override
>>
>> *public* *void* asyncInvoke(String request,
>> ResultFuture resultFuture) *throws* Exception {
>>
>>
>>
>> *final* ServerResult timeoutResult = makeErrorResult(blah,
>> "Timeout");
>>
>>
>>
>> // Use your own executor, so that you're not relying on the size
>> of the common fork pool.
>>
>> CompletableFuture.*supplyAsync*(*new*
>> Supplier() {
>>
>>
>> @Override
>>
>> *public* ServerResult get() {
>>
>> *try* {
>>
>> *return* client.request(request);
>>
>> } *catch* (Exception e) {
>>
>> *LOGGER*.debug("Exception requesting " + request, e);
>>
>> *return* makeErrorResult(blah, e.getMessage());
>>
>> }
>>
>> }
>>
>> }, executor)
>>
>> .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*,
>> TimeUnit.*SECONDS*)
>>
>> .thenAccept((ServerResult result) -> {
>>
>> ServerRequestResult requestResult = *new*
>>  ServerRequestResult();
>>
>> requestResult.setBlah();
>>
>> resultFuture.complete(Collections.*singleton*(fetchResult));
>>
>> });
>>
>> }
>>
>>
>> On Sep 5, 2023, at 12:16 PM, Leon Xu  wrote:
>>
>> Hi Flink users,
>>
>> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
>> And from time to time we are experiencing Async function timeout issues,
>> here's the exception.
>> ```
>> java.lang.Exception: Could not complete the stream element: Record @
>> 169393916 : [B@cadc5b3.
>> Caused by: java.util.concurrent.TimeoutException: Async function call
>> has timed out.
>> ```
>> Every timeout will cause the job to restart, which seems to be very
>> expensive.
>>
>> On the server side it looks like these timeouts are transient and we were
>> expecting a retry will fix the issue.
>> We tried using the asyncIO retry strategy but it doesn't seem to help
>> much.
>> `AsyncDataStream.orderedWaitWithRetry`
>>
>> Do you have any suggestions on how to better reduce these timeout errors?
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>


Re: How to read flinkSQL job state

2023-09-06 Thread Shammon FY
Hi Yifan,

Besides reading job state, I would like to know what statebackend are you
using? Can you give the configurations about state and checkpoint for your
job? Maybe you can check these configuration items to confirm if they are
correct first.

Best,
Shammon FY

On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan  wrote:

> Hi, Yifan.
>
> I think the document[1] means to let us convert the DataStream to the
> Table[2]. Then we could handle the state with the Table API & SQL.
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
>
> Yifan He via user  于2023年9月6日周三 13:10写道:
>
>> Hi team,
>>
>> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
>> growing and we want to look into the checkpoint file to know what is
>> causing the problem. I know we can use the state processor api to read the
>> state of jobs using datastream api, but how can I read the state of jobs
>> using table api & sql?
>>
>> Thanks,
>> Yifan
>>
>


Re: flink rocksdb在托管&非托管模式下rocksdb内存计算

2023-09-06 Thread Hangxiang Yu
Hi,
https://flink-learning.org.cn/article/detail/c1db8bc157c72069979e411cd99714fd
这篇文章中有一些关于 Flink RocksDB write buffer 和 block cache 内存计算的理论和实例讲解,可以参考下

On Fri, Sep 1, 2023 at 2:56 PM crazy <2463829...@qq.com.invalid> wrote:

> 大佬们好,
>flink1.13.5
> statebackends基于rocksdb,请问下在托管模式和非托管模式下,实际内存的使用中,block cache的上限,write
> buffer总的上限是如何计算的?
>感谢!
>
>
> crazy
> 2463829...@qq.com
>
>
>
> 



-- 
Best,
Hangxiang.


Re: How to read flinkSQL job state

2023-09-06 Thread Hang Ruan
Hi, Yifan.

I think the document[1] means to let us convert the DataStream to the
Table[2]. Then we could handle the state with the Table API & SQL.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

Yifan He via user  于2023年9月6日周三 13:10写道:

> Hi team,
>
> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> growing and we want to look into the checkpoint file to know what is
> causing the problem. I know we can use the state processor api to read the
> state of jobs using datastream api, but how can I read the state of jobs
> using table api & sql?
>
> Thanks,
> Yifan
>


Re: How to read flinkSQL job state

2023-09-06 Thread Hangxiang Yu
Hi, Yifan.
Unfortunately, The State Processor API only supports Datastream currently.
But you still could use it to read your SQL job state.
The most difficult thing is that you have to get the operator id which you
could get from the log of StreamGraphHasherV2.generateDeterministicHash and
state name which you could get from the code of operator.

BTW, About investigating why the checkpoint size keeps growing:
1. Which State Backend are you using ?
2. Are you enabling incremental checkpoint ? The checkpoint size you
mentioned is incremental size or full size ?
3. If full size, Did you evaluate whether the size is matching the
theoretical size ?


On Wed, Sep 6, 2023 at 1:11 PM Yifan He via user 
wrote:

> Hi team,
>
> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> growing and we want to look into the checkpoint file to know what is
> causing the problem. I know we can use the state processor api to read the
> state of jobs using datastream api, but how can I read the state of jobs
> using table api & sql?
>
> Thanks,
> Yifan
>


-- 
Best,
Hangxiang.


Re: How to read flinkSQL job state

2023-09-06 Thread xiangyu feng
Hi Yifan,

AFAIK, if you want to query a job’s state from outside Flink, you can use
Queryable State[1].
Hope this helps.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/

Xiangyu


Yifan He via user  于2023年9月6日周三 13:10写道:

> Hi team,
>
> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
> growing and we want to look into the checkpoint file to know what is
> causing the problem. I know we can use the state processor api to read the
> state of jobs using datastream api, but how can I read the state of jobs
> using table api & sql?
>
> Thanks,
> Yifan
>