[jira] [Updated] (FLINK-35288) Flink Restart Strategy does not work as documented

2024-05-03 Thread Keshav Kansal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keshav Kansal updated FLINK-35288:
--
Description: 
As per the documentation when using the Fixed Delay Restart Strategy, the
*restart-strategy.fixed-delay.attempts* defines the "The number of times that 
Flink retries the execution before the job is declared as failed if has been 
set to fixed-delay". 

However in reality it is the *maximum-total-task-failures*, i.e. it is possbile 
that the job does not even attempt to restart. 
This is as per documented in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures

If there is an outage at a Sink level, for example Elasticsearch outage, all 
the independent tasks might fail and the job will immediately fail without 
restart (if restart-strategy.fixed-delay.attempts is set lower or equal to the 
parallelism of the sink)


  was:
As per the documentation when using the Fixed Delay Restart Strategy, the
*restart-strategy.fixed-delay.attempts* defines the "The number of times that 
Flink retries the execution before the job is declared as failed if has been 
set to fixed-delay". 

However in reality it is the *maximum-total-task-failures*, i.e. it is possbile 
that the job does not even attempt to restart. 
This is as per documented in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures

If there is an outage at a Sink level, for example Elasticsearch outage, all 
the independent tasks might fail and the job will immediately fail without 
restart if restart-strategy.fixed-delay.attempts is set lower or equal to the 
parallelism of the sink. 



> Flink Restart Strategy does not work as documented
> --
>
> Key: FLINK-35288
> URL: https://issues.apache.org/jira/browse/FLINK-35288
> Project: Flink
>  Issue Type: Bug
>Reporter: Keshav Kansal
>Priority: Minor
>
> As per the documentation when using the Fixed Delay Restart Strategy, the
> *restart-strategy.fixed-delay.attempts* defines the "The number of times that 
> Flink retries the execution before the job is declared as failed if has been 
> set to fixed-delay". 
> However in reality it is the *maximum-total-task-failures*, i.e. it is 
> possbile that the job does not even attempt to restart. 
> This is as per documented in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures
> If there is an outage at a Sink level, for example Elasticsearch outage, all 
> the independent tasks might fail and the job will immediately fail without 
> restart (if restart-strategy.fixed-delay.attempts is set lower or equal to 
> the parallelism of the sink)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35288) Flink Restart Strategy does not work as documented

2024-05-03 Thread Keshav Kansal (Jira)
Keshav Kansal created FLINK-35288:
-

 Summary: Flink Restart Strategy does not work as documented
 Key: FLINK-35288
 URL: https://issues.apache.org/jira/browse/FLINK-35288
 Project: Flink
  Issue Type: Bug
Reporter: Keshav Kansal


As per the documentation when using the Fixed Delay Restart Strategy, the
restart-strategy.fixed-delay.attempts defines the "The number of times that 
Flink retries the execution before the job is declared as failed if has been 
set to fixed-delay". 

However in reality it is the *maximum-total-task-failures*, i.e. it is possbile 
that the job does not even attempt to restart. 
This is as per documented in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures

If there is an outage at a Sink level, for example Elasticsearch outage, all 
the independent tasks might fail and the job will immediately fail without 
restart if restart-strategy.fixed-delay.attempts is set lower or equal to the 
parallelism of the sink. 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35288) Flink Restart Strategy does not work as documented

2024-05-03 Thread Keshav Kansal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keshav Kansal updated FLINK-35288:
--
Description: 
As per the documentation when using the Fixed Delay Restart Strategy, the
*restart-strategy.fixed-delay.attempts* defines the "The number of times that 
Flink retries the execution before the job is declared as failed if has been 
set to fixed-delay". 

However in reality it is the *maximum-total-task-failures*, i.e. it is possbile 
that the job does not even attempt to restart. 
This is as per documented in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures

If there is an outage at a Sink level, for example Elasticsearch outage, all 
the independent tasks might fail and the job will immediately fail without 
restart if restart-strategy.fixed-delay.attempts is set lower or equal to the 
parallelism of the sink. 


  was:
As per the documentation when using the Fixed Delay Restart Strategy, the
restart-strategy.fixed-delay.attempts defines the "The number of times that 
Flink retries the execution before the job is declared as failed if has been 
set to fixed-delay". 

However in reality it is the *maximum-total-task-failures*, i.e. it is possbile 
that the job does not even attempt to restart. 
This is as per documented in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures

If there is an outage at a Sink level, for example Elasticsearch outage, all 
the independent tasks might fail and the job will immediately fail without 
restart if restart-strategy.fixed-delay.attempts is set lower or equal to the 
parallelism of the sink. 



> Flink Restart Strategy does not work as documented
> --
>
> Key: FLINK-35288
> URL: https://issues.apache.org/jira/browse/FLINK-35288
> Project: Flink
>  Issue Type: Bug
>Reporter: Keshav Kansal
>Priority: Minor
>
> As per the documentation when using the Fixed Delay Restart Strategy, the
> *restart-strategy.fixed-delay.attempts* defines the "The number of times that 
> Flink retries the execution before the job is declared as failed if has been 
> set to fixed-delay". 
> However in reality it is the *maximum-total-task-failures*, i.e. it is 
> possbile that the job does not even attempt to restart. 
> This is as per documented in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures
> If there is an outage at a Sink level, for example Elasticsearch outage, all 
> the independent tasks might fail and the job will immediately fail without 
> restart if restart-strategy.fixed-delay.attempts is set lower or equal to the 
> parallelism of the sink. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20628] Port RabbitMQ Connector using unified Source & Sink API [flink-connector-rabbitmq]

2024-05-03 Thread via GitHub


RocMarshal commented on PR #16:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/16#issuecomment-2094004095

   > @RocMarshal Without actually seeing a passing CI, it doesn't make much 
sense to review this one...
   
   Got it.
   I have re associated the corresponding PR information. And this pr will only 
do something about new source.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20628] Port RabbitMQ Connector using unified Source & Sink API [flink-connector-rabbitmq]

2024-05-03 Thread via GitHub


RocMarshal commented on PR #16:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/16#issuecomment-2094003590

   The new sink https://github.com/apache/flink-connector-rabbitmq/pull/29


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API [flink-connector-rabbitmq]

2024-05-03 Thread via GitHub


RocMarshal commented on PR #1:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-2094003538

   The new sink part https://github.com/apache/flink-connector-rabbitmq/pull/29


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35287] Builder builds NetworkConfig for Elasticsearch connector 8 [flink-connector-elasticsearch]

2024-05-03 Thread via GitHub


liuml07 commented on PR #100:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/100#issuecomment-2093511191

   @mtfelisb and @reswqa mind taking a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27054) Elasticsearch SQL connector SSL issue

2024-05-03 Thread Mingliang Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843288#comment-17843288
 ] 

Mingliang Liu commented on FLINK-27054:
---

Hi, the FLINK-34369 was merged and I can use the same approach to support SQL 
connector. I have a draft PR that shows the idea. Please assign to me if no one 
is actively working on this. I may need help with review and integration 
testing. 
https://github.com/apache/flink-connector-elasticsearch/compare/main...liuml07:flink-connector-elasticsearch:table

> Elasticsearch SQL connector SSL issue
> -
>
> Key: FLINK-27054
> URL: https://issues.apache.org/jira/browse/FLINK-27054
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Reporter: ricardo
>Assignee: Kelu Tao
>Priority: Major
>
> The current Flink ElasticSearch SQL connector 
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/
>  is missing SSL options, can't connect to ES clusters which require SSL 
> certificate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-05-03 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843264#comment-17843264
 ] 

Trystan commented on FLINK-35285:
-

Makes sense. Maybe it's not a problem with the decision itself, but with the 
documentation / validation around it? Keeping the keys balanced is a great 
idea. But it assumes that it _can_ make a decision, when the max-scale (up or 
down) can actually prevent it from making a decision. This test shows that if 
you have a max scale up factor of 10% it will actually never be able to scale 
beyond 60 no matter what.
{code:java}
assertEquals(66, JobVertexScaler.scale(60, inputShipStrategies, 360, 1.1, 8, 
360)); {code}
Maybe there could be some validation, or priority setting? I guess it comes 
down to what's worse: never be able to scale at all or make an inefficient 
decision?

It's true that it's dependent on even key group distribution but it's also 
dependent on even key {_}utilization{_}. Which is ideal and I think most people 
strive for it but it is an easy mistake to make, especially early on when folks 
are just starting out with Flink.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843261#comment-17843261
 ] 

Ryan Skraba commented on FLINK-18476:
-

* 1.20 Java 21 / Test (module: misc) 
https://github.com/apache/flink/actions/runs/221960/job/24404965886#step:10:22919

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843260#comment-17843260
 ] 

Ryan Skraba commented on FLINK-34227:
-

* 1.18 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/8904361381/job/24453748069#step:10:14980
* 1.18 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/8809948818/job/24181785187#step:10:17166


> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35246) SqlClientSSLTest.testGatewayMode failed in AZP

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843257#comment-17843257
 ] 

Ryan Skraba commented on FLINK-35246:
-

* 1.20 Java 17 / Test (module: table) 
https://github.com/apache/flink/actions/runs/8842083488/job/24280428940#step:10:12462
* 1.20 Java 21 / Test (module: table) 
https://github.com/apache/flink/actions/runs/8842083488/job/24280416340#step:10:12463

> SqlClientSSLTest.testGatewayMode failed in AZP
> --
>
> Key: FLINK-35246
> URL: https://issues.apache.org/jira/browse/FLINK-35246
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:java}
> Apr 26 01:51:10 java.lang.IllegalArgumentException: The given host:port 
> ('localhost/:36112') doesn't contain a valid port
> Apr 26 01:51:10   at 
> org.apache.flink.util.NetUtils.validateHostPortString(NetUtils.java:120)
> Apr 26 01:51:10   at 
> org.apache.flink.util.NetUtils.getCorrectHostnamePort(NetUtils.java:81)
> Apr 26 01:51:10   at 
> org.apache.flink.table.client.cli.CliOptionsParser.parseGatewayAddress(CliOptionsParser.java:325)
> Apr 26 01:51:10   at 
> org.apache.flink.table.client.cli.CliOptionsParser.parseGatewayModeClient(CliOptionsParser.java:296)
> Apr 26 01:51:10   at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:207)
> Apr 26 01:51:10   at 
> org.apache.flink.table.client.SqlClientTestBase.runSqlClient(SqlClientTestBase.java:111)
> Apr 26 01:51:10   at 
> org.apache.flink.table.client.SqlClientSSLTest.testGatewayMode(SqlClientSSLTest.java:74)
> Apr 26 01:51:10   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> Apr 26 01:51:10   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 26 01:51:10   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
> Apr 26 01:51:10   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
> Apr 26 01:51:10   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
> Apr 26 01:51:10   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
> Apr 26 01:51:10   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59173=logs=26b84117-e436-5720-913e-3e280ce55cae=77cc7e77-39a0-5007-6d65-4137ac13a471=12418



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35095) ExecutionEnvironmentImplTest.testFromSource failure on GitHub CI

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843263#comment-17843263
 ] 

Ryan Skraba commented on FLINK-35095:
-

* 1.20 Java 21 / Test (module: misc) 
https://github.com/apache/flink/commit/80af4d502318348ba15a8f75a2a622ce9dbdc968/checks/24453751708/logs
* 1.20 Hadoop 3.1.3 / Test (module: misc) 
https://github.com/apache/flink/actions/runs/8809949034/job/24182253915#step:10:22352

> ExecutionEnvironmentImplTest.testFromSource failure on GitHub CI
> 
>
> Key: FLINK-35095
> URL: https://issues.apache.org/jira/browse/FLINK-35095
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> 1.20 Java 17: Test (module: misc) 
> https://github.com/apache/flink/actions/runs/8655935935/job/23735920630#step:10:3
> {code}
> Error: 02:29:05 02:29:05.708 [ERROR] Tests run: 5, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.360 s <<< FAILURE! -- in 
> org.apache.flink.datastream.impl.ExecutionEnvironmentImplTest
> Error: 02:29:05 02:29:05.708 [ERROR] 
> org.apache.flink.datastream.impl.ExecutionEnvironmentImplTest.testFromSource 
> -- Time elapsed: 0.131 s <<< FAILURE!
> Apr 12 02:29:05 java.lang.AssertionError: 
> Apr 12 02:29:05 
> Apr 12 02:29:05 Expecting actual:
> Apr 12 02:29:05   [47]
> Apr 12 02:29:05 to contain exactly (and in same order):
> Apr 12 02:29:05   [48]
> Apr 12 02:29:05 but some elements were not found:
> Apr 12 02:29:05   [48]
> Apr 12 02:29:05 and others were not expected:
> Apr 12 02:29:05   [47]
> Apr 12 02:29:05 
> Apr 12 02:29:05   at 
> org.apache.flink.datastream.impl.ExecutionEnvironmentImplTest.testFromSource(ExecutionEnvironmentImplTest.java:97)
> Apr 12 02:29:05   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 12 02:29:05   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 12 02:29:05   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 12 02:29:05   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 12 02:29:05   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 12 02:29:05   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 12 02:29:05   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Apr 12 02:29:05 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843262#comment-17843262
 ] 

Ryan Skraba commented on FLINK-35002:
-

* 1.19 AdaptiveScheduler / Compile 
https://github.com/apache/flink/commit/ac4aa35c6e2e2da87760ffbf45d85888b1976c2f/checks/24453516397/logs
* 1.20 Java 8 / Compile 
https://github.com/apache/flink/commit/e412402ca4dfc438e28fb990dc53ea7809430aee/checks/24356511040/logs
* 1.19 Java 8 / Test (module: table) 
https://github.com/apache/flink/commit/e7816f714ef5298e1ca978aeddf62732794bb93f/checks/24231189927/logs
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8810747051/job/24183773837#step:14:31

> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
> 2024-04-02T02:22:59.9893296Z Post job cleanup.
> 2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
> (This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30644) ChangelogCompatibilityITCase.testRestore fails due to CheckpointCoordinator being shutdown

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843259#comment-17843259
 ] 

Ryan Skraba commented on FLINK-30644:
-

* 1.20 Java 11 / Test (module: tests) 
https://github.com/apache/flink/actions/runs/8856547891/job/24323134209#step:10:7762

> ChangelogCompatibilityITCase.testRestore fails due to CheckpointCoordinator 
> being shutdown
> --
>
> Key: FLINK-30644
> URL: https://issues.apache.org/jira/browse/FLINK-30644
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / State Backends
>Affects Versions: 1.17.0, 1.19.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: auto-deprioritized-critical, test-stability
>
> We observe a build failure in {{ChangelogCompatibilityITCase.testRestore}} 
> due to the {{CheckpointCoordinator}} being shut down:
> {code:java}
> [...]
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: 
> CheckpointCoordinator shutdown.
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:544)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2140)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2127)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoints(CheckpointCoordinator.java:2004)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoints(CheckpointCoordinator.java:1987)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingAndQueuedCheckpoints(CheckpointCoordinator.java:2183)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.shutdown(CheckpointCoordinator.java:426)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.onTerminalState(DefaultExecutionGraph.java:1329)
> [...]{code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44731=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=9255



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843256#comment-17843256
 ] 

Ryan Skraba commented on FLINK-28440:
-

* 1.20 Default (Java 8) / Test (module: tests) 
https://github.com/apache/flink/actions/runs/8901164251/job/2807095#step:10:7971
* 1.20 Default (Java 8) / Test (module: tests) 
https://github.com/apache/flink/actions/runs/8887882381/job/24404087819#step:10:8262

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0, 1.18.0, 1.19.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> stale-assigned, test-stability
> Fix For: 1.20.0
>
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> 

[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843258#comment-17843258
 ] 

Ryan Skraba commented on FLINK-35041:
-

Going through some of the older GitHub actions from the last week, there are a 
lot of these:
 
* 1.20 Java 11 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8917610620/job/24491172511#step:10:8154
* 1.20 Java 21 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8917610620/job/24491154789#step:10:8873
* 1.20 Java 11 / Test (module: core) 
https://github.com/apache/flink/actions/runs/221960/job/24404966761#step:10:7787
* 1.20 AdaptiveScheduler / Test (module: core) 
https://github.com/apache/flink/actions/runs/221960/job/24404939797#step:10:8361
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8874021289/job/24361049250#step:10:8308
* 1.20 Java 17 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8872328953/job/24356752585#step:10:8911
* 1.20 Java 11 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8864296312/job/24339779126#step:10:9083
* 1.20 Java 21 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8856547891/job/24323115199#step:10:8933
* 1.20 Java 11 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8842083488/job/24280420760#step:10:8265
* 1.20 Java 17 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8825970497/job/24231219571#step:10:9087
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8825652254/job/24230389260#step:10:9141
* 1.20 Java 21 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8809949034/job/24182328046#step:10:8078
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8800044378/job/24153034222#step:10:8261
* 1.20 Java 17 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8793750647/job/24132431375#step:10:7754
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8784906766/job/24104618074#step:10:8444


> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34645) StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount fails

2024-05-03 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843255#comment-17843255
 ] 

Ryan Skraba commented on FLINK-34645:
-

1.18 Java 11 / Test (module: misc) 
https://github.com/apache/flink/actions/runs/8825970611/job/24231267277#step:10:21751

> StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount
>  fails
> 
>
> Key: FLINK-34645
> URL: https://issues.apache.org/jira/browse/FLINK-34645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> {code}
> Error: 02:27:17 02:27:17.025 [ERROR] Tests run: 3, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.658 s <<< FAILURE! - in 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
> Error: 02:27:17 02:27:17.025 [ERROR] 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount
>   Time elapsed: 0.3 s  <<< FAILURE!
> Mar 09 02:27:17 java.lang.AssertionError: 
> Mar 09 02:27:17 
> Mar 09 02:27:17 Expected size: 8 but was: 6 in:
> Mar 09 02:27:17 [Record @ (undef) : 
> +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Mar 09 02:27:17 Record @ (undef) : 
> +I(c2,3,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Mar 09 02:27:17 Record @ (undef) : 
> +I(c2,3,1970-01-01T00:00,1970-01-01T00:00:10),
> Mar 09 02:27:17 Record @ (undef) : 
> +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
> Mar 09 02:27:17 Watermark @ 1,
> Mar 09 02:27:17 Watermark @ 2]
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:110)
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:70)
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase.assertOutputEquals(ArrowPythonAggregateFunctionOperatorTestBase.java:62)
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java:326)
> Mar 09 02:27:17   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-05-03 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843254#comment-17843254
 ] 

Gyula Fora commented on FLINK-35285:


I think you make a good point here but we have to be a bit careful in terms of 
how much key group skew we allow while scaling up or down.

When we are scaling down to a parallelism which doesn't result in an even key 
distribution then our computed expected throughput will be off (because that is 
based on the assumption that throughput is linearly dependent on the 
parallelism but that assumes even key group distribution -> no data skew 
introduced by the scaling itself).

However this is something we can actually calculate by looking at how uneven 
the key group distribition is. As long as the introduced skew is within the 
flexible target rate boundaries then we should be able to scale down (without 
expecting a "rebound"). 

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]

2024-05-03 Thread via GitHub


trystanj commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1589232955


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -154,6 +153,56 @@ private Map 
evaluateMetrics(
 return evaluatedMetrics;
 }
 
+private static EvaluatedScalingMetric evaluateTpr(
+SortedMap metricsHistory,
+JobVertexID vertex,
+Map latestVertexMetrics,
+Configuration conf) {
+
+var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+var observedTprAvg =
+getAverage(
+OBSERVED_TPR,
+vertex,
+metricsHistory,
+
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+return new EvaluatedScalingMetric(
+latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+}
+
+private static ScalingMetric selectTprMetric(
+JobVertexID jobVertexID,
+Configuration conf,
+double busyTimeTprAvg,
+double observedTprAvg) {
+
+if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+return OBSERVED_TPR;
+}
+
+if (Double.isNaN(observedTprAvg)) {
+return TRUE_PROCESSING_RATE;
+}
+
+double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+// If we could measure the observed tpr we decide whether to switch to 
using it
+// instead of busy time based on the error / difference between the two
+if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   Great idea, thanks for the tip! I saw that config setting but did not 
understand what it was actually used for. I will experiment with increasing 
that setting - our jobs are all over the place in how quickly they recover 
(some are a few seconds, some are several minutes) but I would bias towards 
accuracy in order to avoid the flapping in our case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20628] Port RabbitMQ Connector using unified Source & Sink API [flink-connector-rabbitmq]

2024-05-03 Thread via GitHub


MartijnVisser commented on PR #16:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/16#issuecomment-2092982954

   @RocMarshal Without actually seeing a passing CI, it doesn't make much sense 
to review this one... 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]

2024-05-03 Thread via GitHub


gyfora commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1589168145


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -154,6 +153,56 @@ private Map 
evaluateMetrics(
 return evaluatedMetrics;
 }
 
+private static EvaluatedScalingMetric evaluateTpr(
+SortedMap metricsHistory,
+JobVertexID vertex,
+Map latestVertexMetrics,
+Configuration conf) {
+
+var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+var observedTprAvg =
+getAverage(
+OBSERVED_TPR,
+vertex,
+metricsHistory,
+
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+return new EvaluatedScalingMetric(
+latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+}
+
+private static ScalingMetric selectTprMetric(
+JobVertexID jobVertexID,
+Configuration conf,
+double busyTimeTprAvg,
+double observedTprAvg) {
+
+if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+return OBSERVED_TPR;
+}
+
+if (Double.isNaN(observedTprAvg)) {
+return TRUE_PROCESSING_RATE;
+}
+
+double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+// If we could measure the observed tpr we decide whether to switch to 
using it
+// instead of busy time based on the error / difference between the two
+if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   If your job catches up super quick then you may find anomalies for these 
measurements because then you will likely only have 1-2 datapoints at most and 
that can be off arbitrarily



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]

2024-05-03 Thread via GitHub


gyfora commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1589167127


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -154,6 +153,56 @@ private Map 
evaluateMetrics(
 return evaluatedMetrics;
 }
 
+private static EvaluatedScalingMetric evaluateTpr(
+SortedMap metricsHistory,
+JobVertexID vertex,
+Map latestVertexMetrics,
+Configuration conf) {
+
+var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+var observedTprAvg =
+getAverage(
+OBSERVED_TPR,
+vertex,
+metricsHistory,
+
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+return new EvaluatedScalingMetric(
+latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+}
+
+private static ScalingMetric selectTprMetric(
+JobVertexID jobVertexID,
+Configuration conf,
+double busyTimeTprAvg,
+double observedTprAvg) {
+
+if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+return OBSERVED_TPR;
+}
+
+if (Double.isNaN(observedTprAvg)) {
+return TRUE_PROCESSING_RATE;
+}
+
+double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+// If we could measure the observed tpr we decide whether to switch to 
using it
+// instead of busy time based on the error / difference between the two
+if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   Observed TPR is a bit of outlier metric for a "good" reason. It is actually 
computed regardless of the stabilisation time as you can see 
[here](https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java#L104).
 There is a config threshold for the observed tpr computation which sets a 
minimum source lag when this can be computed (set at 30 seconds by default)
   
   The rationale is that you can only really observe the true processing rate 
when the job is catching up (going full speed). So that's the reason here, 
maybe you can set that threshold a bit higher (1m) which would mean that we 
measure only when there is at least 1 minute lag. Higher threshold will mean 
more accurate measurement when it's triggered but less overall datapoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]

2024-05-03 Thread via GitHub


trystanj commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1589140335


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -154,6 +153,56 @@ private Map 
evaluateMetrics(
 return evaluatedMetrics;
 }
 
+private static EvaluatedScalingMetric evaluateTpr(
+SortedMap metricsHistory,
+JobVertexID vertex,
+Map latestVertexMetrics,
+Configuration conf) {
+
+var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+var observedTprAvg =
+getAverage(
+OBSERVED_TPR,
+vertex,
+metricsHistory,
+
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+return new EvaluatedScalingMetric(
+latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+}
+
+private static ScalingMetric selectTprMetric(
+JobVertexID jobVertexID,
+Configuration conf,
+double busyTimeTprAvg,
+double observedTprAvg) {
+
+if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+return OBSERVED_TPR;
+}
+
+if (Double.isNaN(observedTprAvg)) {
+return TRUE_PROCESSING_RATE;
+}
+
+double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+// If we could measure the observed tpr we decide whether to switch to 
using it
+// instead of busy time based on the error / difference between the two
+if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   It's possible if I am misunderstanding something!
   
   During the same time that log was printed during a downscale, I have metrics 
showing that actual `flink_taskmanager_job_task_operator_numRecordsIn` (per 
sec) was above 1.2M / sec for 15 minutes before and after, and peaked at about 
2.8M for around 3 minutes. The peak was due to the rescale.
   
   It caused it to severely underestimate the capacity, scale way back up 
(higher than it was before the downscale), and then eventually settle back down 
to where it tried to go initially. At that point it was no longer seeing the 
discrepancy between observed and busy-time-based.
   
   I am guessing (but probably wrong ) that it has more to do with _when_ it's 
measuring the "observed" TPR than it being wrong? Like it perhaps misses the 
window where it's happening. Our restart time is 30s, stabilization time 5m, 
window 15m. The whole recovery lasted maybe 3m. If those times are not aligned 
with reality (it likely restarted much quicker than 30s) then could it cause it 
to miss the peak observation TPR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-20628) Port RabbitMQ Sources to FLIP-27 API

2024-05-03 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843202#comment-17843202
 ] 

RocMarshal commented on FLINK-20628:


Could someone help review this one ? thank a lot~

> Port RabbitMQ Sources to FLIP-27 API
> 
>
> Key: FLINK-20628
> URL: https://issues.apache.org/jira/browse/FLINK-20628
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Assignee: RocMarshal
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available, 
> stale-assigned
>
> *Structure*
> The new RabbitMQ Source will have three components:
>  * RabbitMQ enumerator that receives one RabbitMQ Channel Config.
>  * RabbitMQ splits contain the RabbitMQ Channel Config
>  * RabbitMQ Readers which subscribe to the same RabbitMQ channel and receive 
> the messages (automatically load balanced by RabbitMQ).
> *Checkpointing Enumerators*
> The enumerator only needs to checkpoint the RabbitMQ channel config since the 
> continuous discovery of new unread/unhandled messages is taken care of by the 
> subscribed RabbitMQ readers and RabbitMQ itself.
> *Checkpointing Readers*
> The new RabbitMQ Source needs to ensure that every reader can be checkpointed.
> Since RabbitMQ is non-persistent and cannot be read by offset, a combined 
> usage of checkpoints and message acknowledgments is necessary. Until a 
> received message is checkpointed by a reader, it will stay in an 
> un-acknowledge state. As soon as the checkpoint is created, the messages from 
> the last checkpoint can be acknowledged as handled against RabbitMQ and thus 
> will be deleted only then. Messages need to be acknowledged one by one as 
> messages are handled by each SourceReader individually.
> When deserializing the messages we will make use of the implementation in the 
> existing RabbitMQ Source.
> *Message Delivery Guarantees* 
> Unacknowledged messages of a reader will be redelivered by RabbitMQ 
> automatically to other consumers of the same channel if the reader goes down.
>  
> This Source is going to only support at-least-once as this is the default 
> RabbitMQ behavior and thus everything else would require changes to RabbitMQ 
> itself or would impair the idea of parallelizing SourceReaders.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35280) Migrate HBase Sink connector to use the ASync Sink API

2024-05-03 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-35280:
--

Assignee: Ferenc Csaky

> Migrate HBase Sink connector to use the ASync Sink API
> --
>
> Key: FLINK-35280
> URL: https://issues.apache.org/jira/browse/FLINK-35280
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.0, hbase-3.0.1, hbase-4.0.0
>Reporter: Martijn Visser
>Assignee: Ferenc Csaky
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33183) Enable metadata columns in NduAnalyzer with retract if non-virtual

2024-05-03 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843175#comment-17843175
 ] 

Martijn Visser commented on FLINK-33183:


[~lincoln.86xy] Friendly ping, what do you think of this ticket?

> Enable metadata columns in NduAnalyzer with retract if non-virtual
> --
>
> Key: FLINK-33183
> URL: https://issues.apache.org/jira/browse/FLINK-33183
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. Compared to append and upsert sources (see also FLINK-33182), 
> retract sources are tricky. And the analyzer is actually correct.
> However, for retract sources we should expose more functionality to the user 
> and add a warning to the documentation that retract mode could potentially 
> cause NDU problems if not enough attention is paid. We should only throw an 
> error on virtual metadata columns. Persisted metadata columns can be 
> considered “safe“. When a metadata column is persisted, we can assume that an 
> upstream Flink job fills its content thus likely also fills its correct 
> retraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-21373) Port RabbitMQ Sink to FLIP-143 API

2024-05-03 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843173#comment-17843173
 ] 

Ahmed Hamdy edited comment on FLINK-21373 at 5/3/24 9:50 AM:
-

[~martijnvisser] Thanks for assigning it to me, I have already opened a 
[PR|https://github.com/apache/flink-connector-rabbitmq/pull/29], your review is 
appereciated


was (Author: JIRAUSER280246):
[~martijnvisser] Thanks for assigning it to me, I have already opened a 
[PR|[GitHub Pull Request 
#29|https://github.com/apache/flink-connector-rabbitmq/pull/29]], your review 
is appereciated

> Port RabbitMQ Sink to FLIP-143 API
> --
>
> Key: FLINK-21373
> URL: https://issues.apache.org/jira/browse/FLINK-21373
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
>
> *Structure*
> The unified Sink API provides a Writer, a Committer and a GlobalCommitter. 
> Right now we don’t see the need to use the Committer and GlobalCommitter as 
> the Writer is sufficient to hold up to the consistencies. Since we are in the 
> need of asynchronous RabbitMQ callbacks to know whether or not a message was 
> published successfully and have to store unacknowledged messages in the 
> checkpoint, there would be a large bidirectional communication and state 
> exchange overhead between the Writer and the Committer.
> *At-most-once*
> The Writer receives a message from Flink and simply publishes it to RabbitMQ. 
> The current RabbitMQ Sink only provides this mode.
> *At-least-once*
> The objective here is, to receive an acknowledgement by RabbitMQ for 
> published messages. Therefore, before publishing a message, we store the 
> message in a Map with the sequence number as its key. If the message is 
> acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive 
> an acknowledgement for a certain amount of time (or a RabbitMQ specific so 
> called negative acknowledgement)  we will try to resend the message when 
> doing a checkpoint.
> *Exactly-once*
> On checkpointing we send all messages by Flink in transaction mode to 
> RabbitMQ. This way, all the messages get sent or are rolled back on failure. 
> All messages that are not sent successfully are written to the checkpoint and 
> are tried to be sent with the next checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-21373) Port RabbitMQ Sink to FLIP-143 API

2024-05-03 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843173#comment-17843173
 ] 

Ahmed Hamdy commented on FLINK-21373:
-

[~martijnvisser] Thanks for assigning it to me, I have already opened a 
[PR|[GitHub Pull Request 
#29|https://github.com/apache/flink-connector-rabbitmq/pull/29]], your review 
is appereciated

> Port RabbitMQ Sink to FLIP-143 API
> --
>
> Key: FLINK-21373
> URL: https://issues.apache.org/jira/browse/FLINK-21373
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
>
> *Structure*
> The unified Sink API provides a Writer, a Committer and a GlobalCommitter. 
> Right now we don’t see the need to use the Committer and GlobalCommitter as 
> the Writer is sufficient to hold up to the consistencies. Since we are in the 
> need of asynchronous RabbitMQ callbacks to know whether or not a message was 
> published successfully and have to store unacknowledged messages in the 
> checkpoint, there would be a large bidirectional communication and state 
> exchange overhead between the Writer and the Committer.
> *At-most-once*
> The Writer receives a message from Flink and simply publishes it to RabbitMQ. 
> The current RabbitMQ Sink only provides this mode.
> *At-least-once*
> The objective here is, to receive an acknowledgement by RabbitMQ for 
> published messages. Therefore, before publishing a message, we store the 
> message in a Map with the sequence number as its key. If the message is 
> acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive 
> an acknowledgement for a certain amount of time (or a RabbitMQ specific so 
> called negative acknowledgement)  we will try to resend the message when 
> doing a checkpoint.
> *Exactly-once*
> On checkpointing we send all messages by Flink in transaction mode to 
> RabbitMQ. This way, all the messages get sent or are rolled back on failure. 
> All messages that are not sent successfully are written to the checkpoint and 
> are tried to be sent with the next checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-21373) Port RabbitMQ Sink to FLIP-143 API

2024-05-03 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-21373:
--

Assignee: Ahmed Hamdy

> Port RabbitMQ Sink to FLIP-143 API
> --
>
> Key: FLINK-21373
> URL: https://issues.apache.org/jira/browse/FLINK-21373
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.12.0
>
>
> *Structure*
> The unified Sink API provides a Writer, a Committer and a GlobalCommitter. 
> Right now we don’t see the need to use the Committer and GlobalCommitter as 
> the Writer is sufficient to hold up to the consistencies. Since we are in the 
> need of asynchronous RabbitMQ callbacks to know whether or not a message was 
> published successfully and have to store unacknowledged messages in the 
> checkpoint, there would be a large bidirectional communication and state 
> exchange overhead between the Writer and the Committer.
> *At-most-once*
> The Writer receives a message from Flink and simply publishes it to RabbitMQ. 
> The current RabbitMQ Sink only provides this mode.
> *At-least-once*
> The objective here is, to receive an acknowledgement by RabbitMQ for 
> published messages. Therefore, before publishing a message, we store the 
> message in a Map with the sequence number as its key. If the message is 
> acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive 
> an acknowledgement for a certain amount of time (or a RabbitMQ specific so 
> called negative acknowledgement)  we will try to resend the message when 
> doing a checkpoint.
> *Exactly-once*
> On checkpointing we send all messages by Flink in transaction mode to 
> RabbitMQ. This way, all the messages get sent or are rolled back on failure. 
> All messages that are not sent successfully are written to the checkpoint and 
> are tried to be sent with the next checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-21373) Port RabbitMQ Sink to FLIP-143 API

2024-05-03 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-21373:
---
Fix Version/s: (was: 1.12.0)

> Port RabbitMQ Sink to FLIP-143 API
> --
>
> Key: FLINK-21373
> URL: https://issues.apache.org/jira/browse/FLINK-21373
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
>
> *Structure*
> The unified Sink API provides a Writer, a Committer and a GlobalCommitter. 
> Right now we don’t see the need to use the Committer and GlobalCommitter as 
> the Writer is sufficient to hold up to the consistencies. Since we are in the 
> need of asynchronous RabbitMQ callbacks to know whether or not a message was 
> published successfully and have to store unacknowledged messages in the 
> checkpoint, there would be a large bidirectional communication and state 
> exchange overhead between the Writer and the Committer.
> *At-most-once*
> The Writer receives a message from Flink and simply publishes it to RabbitMQ. 
> The current RabbitMQ Sink only provides this mode.
> *At-least-once*
> The objective here is, to receive an acknowledgement by RabbitMQ for 
> published messages. Therefore, before publishing a message, we store the 
> message in a Map with the sequence number as its key. If the message is 
> acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive 
> an acknowledgement for a certain amount of time (or a RabbitMQ specific so 
> called negative acknowledgement)  we will try to resend the message when 
> doing a checkpoint.
> *Exactly-once*
> On checkpointing we send all messages by Flink in transaction mode to 
> RabbitMQ. This way, all the messages get sent or are rolled back on failure. 
> All messages that are not sent successfully are written to the checkpoint and 
> are tried to be sent with the next checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-21373) Port RabbitMQ Sink to FLIP-143 API

2024-05-03 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843160#comment-17843160
 ] 

Ahmed Hamdy commented on FLINK-21373:
-

[~martijnvisser] I would love to work on this to close it out!

> Port RabbitMQ Sink to FLIP-143 API
> --
>
> Key: FLINK-21373
> URL: https://issues.apache.org/jira/browse/FLINK-21373
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.12.0
>
>
> *Structure*
> The unified Sink API provides a Writer, a Committer and a GlobalCommitter. 
> Right now we don’t see the need to use the Committer and GlobalCommitter as 
> the Writer is sufficient to hold up to the consistencies. Since we are in the 
> need of asynchronous RabbitMQ callbacks to know whether or not a message was 
> published successfully and have to store unacknowledged messages in the 
> checkpoint, there would be a large bidirectional communication and state 
> exchange overhead between the Writer and the Committer.
> *At-most-once*
> The Writer receives a message from Flink and simply publishes it to RabbitMQ. 
> The current RabbitMQ Sink only provides this mode.
> *At-least-once*
> The objective here is, to receive an acknowledgement by RabbitMQ for 
> published messages. Therefore, before publishing a message, we store the 
> message in a Map with the sequence number as its key. If the message is 
> acknowledged by RabbitMQ we can remove it from the Map. If we don’t receive 
> an acknowledgement for a certain amount of time (or a RabbitMQ specific so 
> called negative acknowledgement)  we will try to resend the message when 
> doing a checkpoint.
> *Exactly-once*
> On checkpointing we send all messages by Flink in transaction mode to 
> RabbitMQ. This way, all the messages get sent or are rolled back on failure. 
> All messages that are not sent successfully are written to the checkpoint and 
> are tried to be sent with the next checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588942354


##
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java:
##
@@ -281,7 +281,7 @@ void testJobSubmitCancel() throws Exception {
 try (RestClusterClient restClusterClient =
 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 assertThat(submitHandler.jobSubmitted).isFalse();
-restClusterClient.submitJob(jobGraph).get();

Review Comment:
   I would leave this as  restClusterClient.submitJob(jobGraph).get();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588942354


##
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java:
##
@@ -281,7 +281,7 @@ void testJobSubmitCancel() throws Exception {
 try (RestClusterClient restClusterClient =
 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 assertThat(submitHandler.jobSubmitted).isFalse();
-restClusterClient.submitJob(jobGraph).get();

Review Comment:
   I would leave this as  restClusterClient.submitJob(jobGraph).get();
   same of the other instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588941192


##
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##
@@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull 
JobGraph jobGraph) {
 receiver,
 error);
 } else {
+RuntimeExecutionMode executionMode 
=
+
jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+if 
(jobStatusChangedListeners.size() > 0) {
+
jobStatusChangedListeners.forEach(
+listener ->
+
listener.onEvent(
+new 
DefaultJobCreatedEvent(
+   
 jobGraph.getJobID(),
+   
 jobGraph.getName(),
+   
 pipeline == null
+   
 ? null
+   
 : ((StreamGraph)
+   
 pipeline)
+   
 .getLineageGraph(),
+   
 executionMode)));
+}
+
 LOG.info(
 "Successfully submitted 
job '{}' ({}) to '{}'.",
 jobGraph.getName(),

Review Comment:
add the name/id of the pipeline here for debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588938587


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -80,7 +80,7 @@ public CompletableFuture execute(
 clusterDescriptor.retrieve(clusterID);
 ClusterClient clusterClient = 
clusterClientProvider.getClusterClient();
 return clusterClient
-.submitJob(jobGraph)
+.submitJob(jobGraph, pipeline)

Review Comment:
   I think that you are saying that the original submitJob with one parameter 
still works. As well as the new method.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588939108


##
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##
@@ -94,7 +95,18 @@ public interface ClusterClient extends AutoCloseable {
  * @param jobGraph to submit
  * @return {@link JobID} of the submitted job
  */
-CompletableFuture submitJob(JobGraph jobGraph);
+default CompletableFuture submitJob(JobGraph jobGraph) {
+return submitJob(jobGraph, null);

Review Comment:
   cant we leave this as return submitJob(jobGraph(jobGraph);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to Flink 1.19 [flink-connector-rabbitmq]

2024-05-03 Thread via GitHub


vahmed-hamdy commented on PR #29:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/29#issuecomment-2092561482

   @MartijnVisser @zentol could you please review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35287] Builder builds NetworkConfig for Elasticsearch connector 8 [flink-connector-elasticsearch]

2024-05-03 Thread via GitHub


liuml07 commented on code in PR #100:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/100#discussion_r1588846646


##
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java:
##
@@ -108,7 +108,7 @@ public Elasticsearch8AsyncSinkBuilder 
setHeaders(Header... headers) {
  */
 public Elasticsearch8AsyncSinkBuilder setCertificateFingerprint(
 String certificateFingerprint) {
-checkNotNull(username, "certificateFingerprint must not be null");
+checkNotNull(certificateFingerprint, "certificateFingerprint must not 
be null");

Review Comment:
   nit: should be `certificateFingerprint`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35287] Builder builds NetworkConfig for Elasticsearch connector 8 [flink-connector-elasticsearch]

2024-05-03 Thread via GitHub


liuml07 commented on code in PR #100:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/100#discussion_r1588846145


##
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java:
##
@@ -94,7 +94,7 @@ public Elasticsearch8AsyncSinkBuilder 
setHosts(HttpHost... hosts) {
  */
 public Elasticsearch8AsyncSinkBuilder setHeaders(Header... 
headers) {
 checkNotNull(hosts);
-checkArgument(headers.length > 0, "Hosts cannot be empty");
+checkArgument(headers.length > 0, "Headers cannot be empty");

Review Comment:
   nit: should be headers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35287][connectors/elasticsearch] Build NetworkConfig for Elasticsearch connector 8 [flink-connector-elasticsearch]

2024-05-03 Thread via GitHub


liuml07 opened a new pull request, #100:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/100

   https://issues.apache.org/jira/browse/FLINK-35287
   
   > In [FLINK-26088](https://issues.apache.org/jira/browse/FLINK-26088) we 
added support for ElasticSearch 8.0. It is based on Async sink API and does not 
use the base module flink-connector-elasticsearch-base. Regarding the config 
options (host, username, password, headers, ssl...), we pass all options from 
the builder to AsyncSink, and last to AsyncWriter. It is less flexible when we 
add new options and the constructors will get longer and multiple places may 
validate options unnecessarily. I think it's nice if we make the sink builder 
builds the NetworkConfig once, and pass it all the way to the writer. This is 
also how the base module for 6.x / 7.x is implemented.
   
   In my recent work adding new options to the network config, this way works 
simpler. I'll port that work to 8.x
   
   1. builder builds `NetworkConfig` and pass that into sink and writer, 
instead of all specific config options
   1. other small fixes in the builder


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35287) Builder builds NetworkConfig for Elasticsearch connector 8

2024-05-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35287:
---
Labels: pull-request-available  (was: )

> Builder builds NetworkConfig for Elasticsearch connector 8
> --
>
> Key: FLINK-35287
> URL: https://issues.apache.org/jira/browse/FLINK-35287
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-26088 we added support for ElasticSearch 8.0. It is based on Async 
> sink API and does not use the base module 
> {{flink-connector-elasticsearch-base}}. Regarding the config options (host, 
> username, password, headers, ssl...), we pass all options from the builder to 
> AsyncSink, and last to AsyncWriter. It is less flexible when we add new 
> options and the constructors will get longer and multiple places may validate 
> options unnecessarily. I think it's nice if we make the sink builder builds 
> the NetworkConfig once, and pass it all the way to the writer. This is also 
> how the base module for 6.x / 7.x is implemented. In my recent work adding 
> new options to the network config, this way works simpler.
> Let me create a PR to demonstrate the idea. No new features or major code 
> refactoring other than the builder builds the NetworkConfig (code will be 
> shorter). I have a few small fixes which I'll include into the incoming PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35287) Builder builds NetworkConfig for Elasticsearch connector 8

2024-05-03 Thread Mingliang Liu (Jira)
Mingliang Liu created FLINK-35287:
-

 Summary: Builder builds NetworkConfig for Elasticsearch connector 8
 Key: FLINK-35287
 URL: https://issues.apache.org/jira/browse/FLINK-35287
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: Mingliang Liu


In FLINK-26088 we added support for ElasticSearch 8.0. It is based on Async 
sink API and does not use the base module 
{{flink-connector-elasticsearch-base}}. Regarding the config options (host, 
username, password, headers, ssl...), we pass all options from the builder to 
AsyncSink, and last to AsyncWriter. It is less flexible when we add new options 
and the constructors will get longer and multiple places may validate options 
unnecessarily. I think it's nice if we make the sink builder builds the 
NetworkConfig once, and pass it all the way to the writer. This is also how the 
base module for 6.x / 7.x is implemented. In my recent work adding new options 
to the network config, this way works simpler.

Let me create a PR to demonstrate the idea. No new features or major code 
refactoring other than the builder builds the NetworkConfig (code will be 
shorter). I have a few small fixes which I'll include into the incoming PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34369][connectors/elasticsearch] Elasticsearch connector supports SSL context [flink-connector-elasticsearch]

2024-05-03 Thread via GitHub


snuyanzin merged PR #91:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/91


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34369][connectors/elasticsearch] Elasticsearch connector supports SSL context [flink-connector-elasticsearch]

2024-05-03 Thread via GitHub


boring-cyborg[bot] commented on PR #91:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/91#issuecomment-2092439455

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35182) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector

2024-05-03 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen resolved FLINK-35182.
---
Fix Version/s: pulsar-4.2.0
 Assignee: Zhongqiang Gong
   Resolution: Fixed

master via 
https://github.com/apache/flink-connector-pulsar/commit/b37a8b32f30683664ff25888d403c4de414043e1

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
> Pulsar connector
> -
>
> Key: FLINK-35182
> URL: https://issues.apache.org/jira/browse/FLINK-35182
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Pulsar
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 [flink-connector-pulsar]

2024-05-03 Thread via GitHub


dependabot[bot] commented on PR #83:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/83#issuecomment-2092425790

   OK, I won't notify you again about this release, but will get in touch when 
a new version is available. If you'd rather skip all updates until the next 
major or minor version, let me know by commenting `@dependabot ignore this 
major version` or `@dependabot ignore this minor version`.
   
   If you change your mind, just re-open this PR and I'll resolve any conflicts 
on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 [flink-connector-pulsar]

2024-05-03 Thread via GitHub


tisonkun closed pull request #83: Bump org.apache.commons:commons-compress from 
1.24.0 to 1.26.0
URL: https://github.com/apache/flink-connector-pulsar/pull/83


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35182] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector [flink-connector-pulsar]

2024-05-03 Thread via GitHub


tisonkun merged PR #90:
URL: https://github.com/apache/flink-connector-pulsar/pull/90


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]

2024-05-03 Thread via GitHub


tisonkun closed pull request #84: [FLINK-34629] Fix non-partition topic 
subscribe lost.
URL: https://github.com/apache/flink-connector-pulsar/pull/84


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 [flink-connector-pulsar]

2024-05-03 Thread via GitHub


tisonkun commented on PR #83:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/83#issuecomment-2092419278

   @dependabot rebase


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33306] Use observed source throughput as true processing rate [flink-kubernetes-operator]

2024-05-03 Thread via GitHub


gyfora commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1588809789


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -154,6 +153,56 @@ private Map 
evaluateMetrics(
 return evaluatedMetrics;
 }
 
+private static EvaluatedScalingMetric evaluateTpr(
+SortedMap metricsHistory,
+JobVertexID vertex,
+Map latestVertexMetrics,
+Configuration conf) {
+
+var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+var observedTprAvg =
+getAverage(
+OBSERVED_TPR,
+vertex,
+metricsHistory,
+
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+return new EvaluatedScalingMetric(
+latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+}
+
+private static ScalingMetric selectTprMetric(
+JobVertexID jobVertexID,
+Configuration conf,
+double busyTimeTprAvg,
+double observedTprAvg) {
+
+if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+return OBSERVED_TPR;
+}
+
+if (Double.isNaN(observedTprAvg)) {
+return TRUE_PROCESSING_RATE;
+}
+
+double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+// If we could measure the observed tpr we decide whether to switch to 
using it
+// instead of busy time based on the error / difference between the two
+if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   The observed true processing rate is simply measured based on the num 
records processed when the job is catching up. Why do you think that your job 
as 2M/sec processing capacity?
   
   Maybe when the job is catching up it cannot go 2M / sec ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org