[jira] [Commented] (FLINK-32895) Introduce the max attempts for Exponential Delay Restart Strategy

2023-09-09 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763394#comment-17763394
 ] 

Rui Fan commented on FLINK-32895:
-

Hi [~zhuzh] , I created the FLIP-364 in advance due to I found several points 
in the restart strategy that need to be improved. We can discuss them in the 
mail list in the future.

There are 2 option for discussion:
 * Option1: Start discuss FLIP-364 after deprecating the RestartStrategies is 
discussed.
 * Option2: FLIP-364 has serveral points need to be discussed, we can first 
discuss other parts of FLIP-364 besides RestartStrategies. And the 
RestartStrategies part can be followed by your separate FLIP. 

WDYT?

BTW, after some more thought: 
restart-strategy.exponential-delay.fail-on-exceeding-max-backoff may not work 
well. Because the user may want to restart this job multiple times using 
max-backoff before failing it.

For example, users don't want the delay-time to be too long, so they set the 
initial-backoff=1s, backoff-multiplier=2, max-backoff=30s. So the delay time is 
1s, 2s, 4s, 8s, 16s, 30s, 30s, 30s, 30s, 30s, etc. If we introduced the 
`fail-on-exceeding-max-backoff`, it means that the job won't restart when the 
delay-time is extended to 30s at first time. right?

Please correct me if I'm wrong, and looking forward to more feedbacks from 
community, thanks~

 

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy

> Introduce the max attempts for Exponential Delay Restart Strategy
> -
>
> Key: FLINK-32895
> URL: https://issues.apache.org/jira/browse/FLINK-32895
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, Flink has 3 restart strategies, they are: fixed-delay, 
> failure-rate and exponential-delay.
> The exponential-delay is suitable if a job continues to fail for a period of 
> time. The fixed-delay and failure-rate has the max attempts mechanism, that 
> means, the job won't restart and go to fail after the attempt exceeds the 
> threshold of max attempts. 
> The max attempts mechanism is reasonable, flink should not or need to 
> infinitely restart the job if the job keeps failing. However, the 
> exponential-delay doesn't have the max attempts mechanism.
> I propose introducing the 
> `restart-strategy.exponential-delay.max-attempts-before-reset` to support the 
> max attempts mechanism for exponential-delay. It means flink won't restart 
> job if the number of job failures before reset exceeds 
> max-attempts-before-reset when is exponential-delay is enabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32818) Support region failover for adaptive scheduler

2023-09-09 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32818:

Component/s: Runtime / Coordination

> Support region failover for adaptive scheduler
> --
>
> Key: FLINK-32818
> URL: https://issues.apache.org/jira/browse/FLINK-32818
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> The region failover strategy is useful for fast failover, and reduce the 
> impact for business side. However, the adaptive scheduler doesn't support it 
> so far.
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32818) Support region failover for adaptive scheduler

2023-09-09 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763391#comment-17763391
 ] 

Rui Fan commented on FLINK-32818:
-

Hi, [~talat] , thanks for your interest in Adaptive Scheduler.

 

I'm working in this Jira, I try to finish the improvement at Flink 1.19. 
Welcome to help review it in your free time, thanks~

 

As I understand, this improvement doesn't introduce any new public option and 
interface, so the FLIP isn't necessary, right? If the FLIP is necessary, please 
let me know, I'm happy to start a FLIP for this improvement.

> Support region failover for adaptive scheduler
> --
>
> Key: FLINK-32818
> URL: https://issues.apache.org/jira/browse/FLINK-32818
> Project: Flink
>  Issue Type: Improvement
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> The region failover strategy is useful for fast failover, and reduce the 
> impact for business side. However, the adaptive scheduler doesn't support it 
> so far.
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32575) Unified the Cpu of JobManager Name

2023-09-09 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-32575:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Unified the Cpu of JobManager Name
> --
>
> Key: FLINK-32575
> URL: https://issues.apache.org/jira/browse/FLINK-32575
> Project: Flink
>  Issue Type: New Feature
>Reporter: Bo Cui
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> To set the jm CPU of the Yarn, use the `yarn.appmaster.vcores`. To set the jm 
> cpu of the k8s, use `kubernetes.jobmanager.cpu`. when there are yarn and k8s 
> clusters, managing these configurations is difficult. Add a unified name for 
> them for ease of use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30699) Improved getRandomString method code format in the StringUtils class

2023-09-09 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-30699:
---
Labels: pull-request-available stale-minor  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Improved getRandomString method code format in the StringUtils class
> 
>
> Key: FLINK-30699
> URL: https://issues.apache.org/jira/browse/FLINK-30699
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Bingye Chen
>Priority: Minor
>  Labels: pull-request-available, stale-minor
> Attachments: image-2023-01-16-18-13-56-912.png, 
> image-2023-01-16-18-14-12-939.png
>
>
>  
> !image-2023-01-16-18-13-56-912.png|width=398,height=148!
> !image-2023-01-16-18-14-12-939.png|width=398,height=114!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-25447) batch query cannot generate plan when a sorted view into multi sinks

2023-09-09 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-25447:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> batch query cannot generate plan when a sorted view into multi sinks
> 
>
> Key: FLINK-25447
> URL: https://issues.apache.org/jira/browse/FLINK-25447
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: lincoln lee
>Assignee: Zheng yunhong
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0, 1.17.2, 1.19.0
>
>
> A batch query  write a sorted view into multi sinks will get a cannot plan 
> exception
> {code}
>   @Test
>   def testSortedResultIntoMultiSinks(): Unit = {
> util.tableEnv.executeSql(
>   s"""
>  |CREATE TABLE Src (
>  |  `a` INT,
>  |  `b` BIGINT,
>  |  `c` STRING,
>  |  `d` STRING,
>  |  `e` STRING
>  |) WITH (
>  |  'connector' = 'values',
>  |  'bounded' = 'true'
>  |)
>""".stripMargin)
> val query = "SELECT * FROM Src order by c"
> val table = util.tableEnv.sqlQuery(query)
> util.tableEnv.registerTable("sortedTable", table)
> util.tableEnv.executeSql(
>   s"""
>  |CREATE TABLE sink1 (
>  |  `a` INT,
>  |  `b` BIGINT,
>  |  `c` STRING
>  |) WITH (
>  |  'connector' = 'filesystem',
>  |  'format' = 'testcsv',
>  |  'path' = '/tmp/test'
>  |)
>""".stripMargin)
> util.tableEnv.executeSql(
>   s"""
>  |CREATE TABLE sink2 (
>  |  `a` INT,
>  |  `b` BIGINT,
>  |  `c` STRING,
>  |  `d` STRING
>  |) WITH (
>  |  'connector' = 'filesystem',
>  |  'format' = 'testcsv',
>  |  'path' = '/tmp/test'
>  |)
>   """.stripMargin)
> val stmtSet= util.tableEnv.createStatementSet()
> stmtSet.addInsertSql(
>   "insert into sink1 select a, b, listagg(d) from sortedTable group by a, 
> b")
> stmtSet.addInsertSql(
>   "insert into sink2 select a, b, c, d from sortedTable")
> util.verifyExecPlan(stmtSet)
>   }
> {code}
> {code}
>   org.apache.flink.table.api.TableException: Cannot generate a valid 
> execution plan for the given query: 
>   LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, b, 
> c, d])
>   +- LogicalProject(inputs=[0..3])
>  +- LogicalTableScan(table=[[IntermediateRelTable_0]])
>   This exception indicates that the query uses an unsupported SQL feature.
>   Please check the documentation for the set of currently supported SQL 
> features.
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>   at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>   at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>   at scala.collection.Iterator.foreach(Iterator.scala:937)
>   at scala.collection.Iterator.foreach$(Iterator.scala:937)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>   at 
> 

[GitHub] [flink] flinkbot commented on pull request #23386: [FLINK-33022] Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-09 Thread via GitHub


flinkbot commented on PR #23386:
URL: https://github.com/apache/flink/pull/23386#issuecomment-1712611870

   
   ## CI report:
   
   * ed7322d8863e462f5189d1c28194b64f8574fb18 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on pull request #23386: [FLINK-33022] Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-09 Thread via GitHub


pgaref commented on PR #23386:
URL: https://github.com/apache/flink/pull/23386#issuecomment-1712609517

   @wangzzu @zentol  can you please take a look when you have a sec?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33022) Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33022:
---
Labels: pull-request-available  (was: )

> Log an error when enrichers defined as part of the configuration can not be 
> found/loaded
> 
>
> Key: FLINK-33022
> URL: https://issues.apache.org/jira/browse/FLINK-33022
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matt Wang
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>  Labels: pull-request-available
>
> if we configurate the `jobmanager.failure-enrichers`, but could not load the 
> class in 
> FailureEnricherUtils, no exceptions can be seen in the log currently, and it 
> is very inconvenient to check the problem. Here I suggest that some 
> ERROR-level logs should be added, or an exception should be thrown directly 
> (because the load cannot be uploaded is not an expected result)
> {code:java}
> // code placeholder
> @VisibleForTesting
> static Collection getFailureEnrichers(
> final Configuration configuration, final PluginManager pluginManager) 
> {
> Set includedEnrichers = 
> getIncludedFailureEnrichers(configuration);
> LOG.info("includedEnrichers: {}", includedEnrichers);
> //  When empty, NO enrichers will be started.
> if (includedEnrichers.isEmpty()) {
> return Collections.emptySet();
> }
> // TODO: here maybe load nothing
> final Iterator factoryIterator =
> pluginManager.load(FailureEnricherFactory.class);
> 
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pgaref opened a new pull request, #23386: [FLINK-33022] Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-09 Thread via GitHub


pgaref opened a new pull request, #23386:
URL: https://github.com/apache/flink/pull/23386

   https://issues.apache.org/jira/browse/FLINK-33022
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33022) Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-09 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-33022:
---
Summary: Log an error when enrichers defined as part of the configuration 
can not be found/loaded  (was: Log when FailureEnricherUtils load 
FailureEnricherFactory failed should throw exception or add some error logs)

> Log an error when enrichers defined as part of the configuration can not be 
> found/loaded
> 
>
> Key: FLINK-33022
> URL: https://issues.apache.org/jira/browse/FLINK-33022
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matt Wang
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>
> if we configurate the `jobmanager.failure-enrichers`, but could not load the 
> class in 
> FailureEnricherUtils, no exceptions can be seen in the log currently, and it 
> is very inconvenient to check the problem. Here I suggest that some 
> ERROR-level logs should be added, or an exception should be thrown directly 
> (because the load cannot be uploaded is not an expected result)
> {code:java}
> // code placeholder
> @VisibleForTesting
> static Collection getFailureEnrichers(
> final Configuration configuration, final PluginManager pluginManager) 
> {
> Set includedEnrichers = 
> getIncludedFailureEnrichers(configuration);
> LOG.info("includedEnrichers: {}", includedEnrichers);
> //  When empty, NO enrichers will be started.
> if (includedEnrichers.isEmpty()) {
> return Collections.emptySet();
> }
> // TODO: here maybe load nothing
> final Iterator factoryIterator =
> pluginManager.load(FailureEnricherFactory.class);
> 
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33022) Log when FailureEnricherUtils load FailureEnricherFactory failed should throw exception or add some error logs

2023-09-09 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-33022:
---
Summary: Log when FailureEnricherUtils load FailureEnricherFactory failed 
should throw exception or add some error logs  (was: When FailureEnricherUtils 
load FailureEnricherFactory failed should throw exception or add some error 
logs)

> Log when FailureEnricherUtils load FailureEnricherFactory failed should throw 
> exception or add some error logs
> --
>
> Key: FLINK-33022
> URL: https://issues.apache.org/jira/browse/FLINK-33022
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matt Wang
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>
> if we configurate the `jobmanager.failure-enrichers`, but could not load the 
> class in 
> FailureEnricherUtils, no exceptions can be seen in the log currently, and it 
> is very inconvenient to check the problem. Here I suggest that some 
> ERROR-level logs should be added, or an exception should be thrown directly 
> (because the load cannot be uploaded is not an expected result)
> {code:java}
> // code placeholder
> @VisibleForTesting
> static Collection getFailureEnrichers(
> final Configuration configuration, final PluginManager pluginManager) 
> {
> Set includedEnrichers = 
> getIncludedFailureEnrichers(configuration);
> LOG.info("includedEnrichers: {}", includedEnrichers);
> //  When empty, NO enrichers will be started.
> if (includedEnrichers.isEmpty()) {
> return Collections.emptySet();
> }
> // TODO: here maybe load nothing
> final Iterator factoryIterator =
> pluginManager.load(FailureEnricherFactory.class);
> 
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32818) Support region failover for adaptive scheduler

2023-09-09 Thread Talat Uyarer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763366#comment-17763366
 ] 

Talat Uyarer commented on FLINK-32818:
--

Hey [~fanrui] At Palo Alto Networks We also use Adaptive Scheduler. And we plan 
to work on this feature to enhance our jobs. Is there any plan or FLIP about 
this ? We can also provide resource for development for this task. 

> Support region failover for adaptive scheduler
> --
>
> Key: FLINK-32818
> URL: https://issues.apache.org/jira/browse/FLINK-32818
> Project: Flink
>  Issue Type: Improvement
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> The region failover strategy is useful for fast failover, and reduce the 
> impact for business side. However, the adaptive scheduler doesn't support it 
> so far.
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pgaref commented on a diff in pull request #22468: [FLINK-31889] Add documentation for implementing/loading enrichers

2023-09-09 Thread via GitHub


pgaref commented on code in PR #22468:
URL: https://github.com/apache/flink/pull/22468#discussion_r1320633358


##
docs/content/docs/deployment/advanced/failure_enrichers.md:
##
@@ -0,0 +1,112 @@
+---
+title: "Failure Enrichers"
+nav-title: failure-enrichers
+nav-parent_id: advanced
+nav-pos: 3
+---
+
+
+## Custom failure enrichers
+Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string kv pairs).
+The goal is to enable developers implement their own failure enrichment 
plugins to categorize job failures, expose custom metrics, make calls to 
external notification systems, and more.
+
+FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
+Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's Rest interface (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+
+
+### Implement a plugin for your custom enricher
+
+To implement a custom FailureEnricher plugin, you need to:
+
+- Add your own FailureEnricher by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="FailureEnricher" >}} interface.
+
+- Add your own FailureEnricherFactory by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java"
 name="FailureEnricherFactory" >}} interface.
+
+- Add a service entry. Create a file 
`META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory` which 
contains the class name of your failure enricher factory class (see [Java 
Service 
Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html)
 docs for more details).
+
+
+Then, create a jar which includes your `FailureEnricher`, 
`FailureEnricherFactory`, `META-INF/services/` and all the external 
dependencies.
+Make a directory in `plugins/` of your Flink distribution with an arbitrary 
name, e.g. "failure-enrichment", and put the jar into this directory.
+See [Flink Plugin]({% link deployment/filesystems/plugins.md %}) for more 
details.
+
+{{< hint warning >}}
+Note that every FailureEnricher should have defined a set of {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="output keys" >}} that may be associated with values. This set of keys 
has to be unique across enrichers otherwise may be ignored.
+{{< /hint >}}
+
+FailureEnricherFactory example:
+
+``` java
+public class TestFailureEnricherFactory implements FailureEnricherFactory {
+
+   @Override
+   public FailureEnricher createFailureEnricher(Configuration conf) {
+return new CustomEnricher();
+   }
+}
+```
+
+FailureEnricher example:
+
+``` java
+public class CustomEnricher implements FailureEnricher {
+private final Set outputKeys;
+
+public CustomEnricher() {
+this.outputKeys = Collections.singleton("labelKey");
+}
+
+@Override
+public Set getOutputKeys() {
+return outputKeys;
+}
+
+@Override
+public CompletableFuture> processFailure(
+Throwable cause, Context context) {
+return 
CompletableFuture.completedFuture(Collections.singletonMap("labelKey", 
"labelValue"));
+}
+}
+```
+
+### Configuration
+
+JobManager loads FailureEnricher plugins at startup. To make sure your 
FailureEnrichers are loaded:
+* All class names should be defined as part of {{< javadoc 
file="org/apache/flink/configuration/JobManagerOptions.html#FAILURE_ENRICHERS_LIST"
 name="jobmanager.failure-enrichers">}} configuration.
+  If this configuration is empty, NO enrichers will be started. Example:
+```
+jobmanager.failure-enrichers = 
org.apache.flink.test.plugin.jar.failure.CustomEnricher

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #22468: [FLINK-31889] Add documentation for implementing/loading enrichers

2023-09-09 Thread via GitHub


pgaref commented on code in PR #22468:
URL: https://github.com/apache/flink/pull/22468#discussion_r1320633268


##
docs/content/docs/deployment/advanced/failure_enrichers.md:
##
@@ -0,0 +1,112 @@
+---
+title: "Failure Enrichers"
+nav-title: failure-enrichers
+nav-parent_id: advanced
+nav-pos: 3
+---
+
+
+## Custom failure enrichers
+Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string kv pairs).
+The goal is to enable developers implement their own failure enrichment 
plugins to categorize job failures, expose custom metrics, make calls to 
external notification systems, and more.
+
+FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
+Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's Rest interface (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+
+
+### Implement a plugin for your custom enricher
+
+To implement a custom FailureEnricher plugin, you need to:
+
+- Add your own FailureEnricher by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="FailureEnricher" >}} interface.
+
+- Add your own FailureEnricherFactory by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java"
 name="FailureEnricherFactory" >}} interface.
+
+- Add a service entry. Create a file 
`META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory` which 
contains the class name of your failure enricher factory class (see [Java 
Service 
Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html)
 docs for more details).
+
+
+Then, create a jar which includes your `FailureEnricher`, 
`FailureEnricherFactory`, `META-INF/services/` and all the external 
dependencies.
+Make a directory in `plugins/` of your Flink distribution with an arbitrary 
name, e.g. "failure-enrichment", and put the jar into this directory.
+See [Flink Plugin]({% link deployment/filesystems/plugins.md %}) for more 
details.
+
+{{< hint warning >}}
+Note that every FailureEnricher should have defined a set of {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="output keys" >}} that may be associated with values. This set of keys 
has to be unique across enrichers otherwise may be ignored.

Review Comment:
   You are right, that was the initial logic that we then reworked before 
merging.
   Updating README accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] StefanXiepj commented on pull request #53: [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support

2023-09-09 Thread via GitHub


StefanXiepj commented on PR #53:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/53#issuecomment-1712529841

   > Thanks @mtfelisb! I have seen many people's urgent need for this, so feel 
free to ping me for reviewing to accelerate the progress. And I can provide any 
possible help :)
   
   @reswqa can you help me to review my another pr for this issue ? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] StefanXiepj commented on pull request #74: [FLINK-26088][Connectors/ElasticSearch] Add Elasticsearch 8.0 support

2023-09-09 Thread via GitHub


StefanXiepj commented on PR #74:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/74#issuecomment-1712529671

   @reswqa  hi, Weijie, nice meet you here. As you said, I also have many 
friends who are looking forward to this PR, so I also tried to implement it, 
can you provide some help? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] boring-cyborg[bot] commented on pull request #74: Add Elasticsearch 8.0 support

2023-09-09 Thread via GitHub


boring-cyborg[bot] commented on PR #74:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/74#issuecomment-1712527358

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33042) Allow trigger flamegraph when task is initializing

2023-09-09 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763344#comment-17763344
 ] 

Rui Fan commented on FLINK-33042:
-

Merged via  53702736382b80fa905494e69654061a84741e84

> Allow trigger flamegraph when task is initializing 
> ---
>
> Key: FLINK-33042
> URL: https://issues.apache.org/jira/browse/FLINK-33042
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-09-06-15-43-37-075.png
>
>
> Currently, the flamegraph can be triggered when task is running.
> After FLINK-17012 and FLINK-22215, flink split the running to running and 
> initializing. We should allow trigger flamegraph when task is initializing. 
> For example, the initialization is very slow, we need to troubleshoot.
>  
> Here is a stack example, task is rebuilding the rocksdb after the parallelism 
> is changed.
>  
> !image-2023-09-06-15-43-37-075.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33042) Allow trigger flamegraph when task is initializing

2023-09-09 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33042.
-
Fix Version/s: 1.19.0
   Resolution: Fixed

> Allow trigger flamegraph when task is initializing 
> ---
>
> Key: FLINK-33042
> URL: https://issues.apache.org/jira/browse/FLINK-33042
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-09-06-15-43-37-075.png
>
>
> Currently, the flamegraph can be triggered when task is running.
> After FLINK-17012 and FLINK-22215, flink split the running to running and 
> initializing. We should allow trigger flamegraph when task is initializing. 
> For example, the initialization is very slow, we need to troubleshoot.
>  
> Here is a stack example, task is rebuilding the rocksdb after the parallelism 
> is changed.
>  
> !image-2023-09-06-15-43-37-075.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui merged pull request #23372: [FLINK-33042][flamegraph] Allow trigger flamegraph when task is initializing

2023-09-09 Thread via GitHub


1996fanrui merged PR #23372:
URL: https://github.com/apache/flink/pull/23372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33042) Allow trigger flamegraph when task is initializing

2023-09-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33042:
---
Labels: pull-request-available  (was: )

> Allow trigger flamegraph when task is initializing 
> ---
>
> Key: FLINK-33042
> URL: https://issues.apache.org/jira/browse/FLINK-33042
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-09-06-15-43-37-075.png
>
>
> Currently, the flamegraph can be triggered when task is running.
> After FLINK-17012 and FLINK-22215, flink split the running to running and 
> initializing. We should allow trigger flamegraph when task is initializing. 
> For example, the initialization is very slow, we need to troubleshoot.
>  
> Here is a stack example, task is rebuilding the rocksdb after the parallelism 
> is changed.
>  
> !image-2023-09-06-15-43-37-075.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RocMarshal commented on a diff in pull request #23372: [FLINK-33042][flamegraph] Allow trigger flamegraph when task is initializing

2023-09-09 Thread via GitHub


RocMarshal commented on code in PR #23372:
URL: https://github.com/apache/flink/pull/23372#discussion_r1320564934


##
flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html:
##
@@ -16,8 +16,13 @@
   ~ limitations under the License.
   -->
 
-
-  Operator is not running. Cannot sample back pressure.
+
+  Operator is not running or initializing. Cannot sample flame graph.

Review Comment:
     nice~



##
flink-runtime-web/web-dashboard/src/app/pages/job/overview/flamegraph/job-overview-drawer-flamegraph.component.html:
##
@@ -16,8 +16,13 @@
   ~ limitations under the License.
   -->
 
-
-  Operator is not running. Cannot sample back pressure.
+
+  Operator is not running or initializing. Cannot sample flame graph.

Review Comment:
     nice~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #23218: [FLINK-32854][flink-runtime][JUnit5 Migration] The state package of flink-runtime module

2023-09-09 Thread via GitHub


1996fanrui commented on code in PR #23218:
URL: https://github.com/apache/flink/pull/23218#discussion_r1316013616


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java:
##
@@ -19,180 +19,167 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.UUID;
 
-/** Tests for the {@link SnapshotDirectory}. */
-public class SnapshotDirectoryTest extends TestLogger {
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-private static TemporaryFolder temporaryFolder;
+/** Tests for the {@link SnapshotDirectory}. */
+class SnapshotDirectoryTest {
 
-@BeforeClass
-public static void beforeClass() throws IOException {
-temporaryFolder = new TemporaryFolder();
-temporaryFolder.create();
-}
-
-@AfterClass
-public static void afterClass() {
-temporaryFolder.delete();
-}
+@TempDir private Path temporaryFolder;
 
 /** Tests if mkdirs for snapshot directories works. */
 @Test
-public void mkdirs() throws Exception {
-File folderRoot = temporaryFolder.getRoot();
+void mkdirs() throws Exception {
+File folderRoot = temporaryFolder.toFile();
 File newFolder = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
 File innerNewFolder = new File(newFolder, 
String.valueOf(UUID.randomUUID()));
 Path path = innerNewFolder.toPath();
 
-Assert.assertFalse(newFolder.isDirectory());
-Assert.assertFalse(innerNewFolder.isDirectory());
+assertThat(newFolder).doesNotExist();
+assertThat(innerNewFolder).doesNotExist();
 SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(path);
-Assert.assertFalse(snapshotDirectory.exists());
-Assert.assertFalse(newFolder.isDirectory());
-Assert.assertFalse(innerNewFolder.isDirectory());
-
-Assert.assertTrue(snapshotDirectory.mkdirs());
-Assert.assertTrue(newFolder.isDirectory());
-Assert.assertTrue(innerNewFolder.isDirectory());
-Assert.assertTrue(snapshotDirectory.exists());
+assertThat(snapshotDirectory.exists()).isFalse();
+assertThat(newFolder).doesNotExist();
+assertThat(innerNewFolder).doesNotExist();

Review Comment:
   Why are these 2 asserts changed? The original asserts are :
   
   ```
   Assert.assertFalse(newFolder.isDirectory());
   Assert.assertFalse(innerNewFolder.isDirectory());
   ```
   
   I see this class has a couple of similar changes.



##
flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStorageLoaderTest.java:
##
@@ -35,114 +35,118 @@
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.DynamicCodeLoadingException;
-import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.HamcrestCondition.matching;
+
 /** This test validates that checkpoint storage is properly loaded from 
configuration. */
-public class CheckpointStorageLoaderTest extends TestLogger {
+class CheckpointStorageLoaderTest {
+
+private final Logger LOG = 
LoggerFactory.getLogger(CheckpointStorageLoaderTest.class);
 
-@Rule public final TemporaryFolder tmp = new TemporaryFolder();
+@TempDir private java.nio.file.Path tmp;
 
 private final ClassLoader cl = getClass().getClassLoader();
 
 @Test
-public void testNoCheckpointStorageDefined() throws Exception {
-Assert.assertFalse(
-CheckpointStorageLoader.fromConfig(new Configuration(), cl, 
null).isPresent());
+void testNoCheckpointStorageDefined() throws Exception {
+assertThat(CheckpointStorageLoader.fromConfig(new Configuration(), 

[GitHub] [flink] 1996fanrui commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-09 Thread via GitHub


1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1319551944


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##
@@ -496,6 +556,8 @@ private final class ExecutingStateBuilder {
 TestingDefaultExecutionGraphBuilder.newBuilder()
 .build(EXECUTOR_RESOURCE.getExecutor());
 private OperatorCoordinatorHandler operatorCoordinatorHandler;
+private Duration scalingIntervalMin = Duration.ZERO;

Review Comment:
   The default value should be the 
`SCHEDULER_SCALING_INTERVAL_MIN.defaultValue()`, right?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -47,6 +51,10 @@
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;

Review Comment:
   Actually, I don't understand the `lastRescale` of your implematation. The 
`Executing` is a new object after each rescale, so `lastRescale` isn't used 
across multiple rescale.
   
   We should record the `Instant.now()` as the lastRescale at the constructor, 
right?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -55,7 +63,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
 Logger logger,
 Context context,
 ClassLoader userCodeClassLoader,
-List failureCollection) {
+List failureCollection,
+Duration scalingIntervalMin,
+Duration scalingIntervalMax) {

Review Comment:
   ```suggestion
   @Nullable Duration scalingIntervalMax) {
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -67,13 +77,33 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
 this.context = context;
 Preconditions.checkState(
 executionGraph.getState() == JobStatus.RUNNING, "Assuming 
running execution graph");
+this.scalingIntervalMin = scalingIntervalMin;
+this.scalingIntervalMax = scalingIntervalMax;
+Preconditions.checkState(
+!scalingIntervalMin.isNegative(),
+"{} must be positive integer or 0",
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+if (scalingIntervalMax != null) {
+Preconditions.checkState(
+scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+"{}({}) must be greater than {}({})",
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax,
+JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+scalingIntervalMin);
+}
 
 deploy();
 
 // check if new resources have come available in the meantime
 context.runIfState(this, this::maybeRescale, Duration.ZERO);

Review Comment:
   After the `Executing` is created, it means the it's a new rescale. We can 
rescale after `scalingIntervalMin` even if the new resources have come, right? 
If yes, we should call `rescaleWhenCooldownPeriodIsOver()` here.
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} 

[jira] [Commented] (FLINK-32962) Failure to install python dependencies from requirements file

2023-09-09 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763326#comment-17763326
 ] 

Aleksandr Pilipenko commented on FLINK-32962:
-

Fix for 1.18 has been merged, added 
[PR|https://github.com/apache/flink/pull/23384] for 1.17 branch.

> Failure to install python dependencies from requirements file
> -
>
> Key: FLINK-32962
> URL: https://issues.apache.org/jira/browse/FLINK-32962
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.4, 1.16.2, 1.18.0, 1.17.1
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2, 1.19.0
>
>
> We have encountered an issue when Flink fails to install python dependencies 
> from requirements file if python environment contains setuptools dependency 
> version 67.5.0 or above.
> Flink job fails with following error:
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o118.await.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
> at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
> ...
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 2ca4026944022ac4537c503464d4c47f)
> at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> at 
> org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
> ... 6 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 2ca4026944022ac4537c503464d4c47f)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
> at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ...
> Caused by: java.io.IOException: java.io.IOException: Failed to execute the 
> command: /Users/z3d1k/.pyenv/versions/3.9.17/bin/python -m pip install 
> --ignore-installed -r 
> /var/folders/rb/q_3h54_94b57gz_qkbp593vwgn/T/tm_localhost:63904-4178d3/blobStorage/job_2ca4026944022ac4537c503464d4c47f/blob_p-feb04bed919e628d98b1ef085111482f05bf43a1-1f5c3fda7cbdd6842e950e04d9d807c5
>  --install-option 
> --prefix=/var/folders/rb/q_3h54_94b57gz_qkbp593vwgn/T/python-dist-c17912db-5aba-439f-9e39-69cb8c957bec/python-requirements
> output:
> Usage:
>   /Users/z3d1k/.pyenv/versions/3.9.17/bin/python -m pip install [options] 
>  [package-index-options] ...
>   /Users/z3d1k/.pyenv/versions/3.9.17/bin/python -m pip install [options] -r 
>  [package-index-options] ...
>   /Users/z3d1k/.pyenv/versions/3.9.17/bin/python -m pip install [options] 
> [-e]  ...
>   /Users/z3d1k/.pyenv/versions/3.9.17/bin/python -m pip install [options] 
> [-e]  ...
>   /Users/z3d1k/.pyenv/versions/3.9.17/bin/python -m pip install [options] 
>  ...
> no such option: --install-option
> at 
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.pipInstallRequirements(PythonEnvironmentManagerUtils.java:144)
> at 
> org.apache.flink.python.env.AbstractPythonEnvironmentManager.installRequirements(AbstractPythonEnvironmentManager.java:215)
> at 
> org.apache.flink.python.env.AbstractPythonEnvironmentManager.lambda$open$0(AbstractPythonEnvironmentManager.java:126)
> at 
> org.apache.flink.python.env.AbstractPythonEnvironmentManager$PythonEnvResources.createResource(AbstractPythonEnvironmentManager.java:435)
> at 
> org.apache.flink.python.env.AbstractPythonEnvironmentManager$PythonEnvResources.getOrAllocateSharedResource(AbstractPythonEnvironmentManager.java:402)
> at 
> org.apache.flink.python.env.AbstractPythonEnvironmentManager.open(AbstractPythonEnvironmentManager.java:114)
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
> at 
> 

[jira] [Commented] (FLINK-33063) udaf with user defined pojo object throw error while generate record equaliser

2023-09-09 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763319#comment-17763319
 ] 

Jark Wu commented on FLINK-33063:
-

Fixed in 
- master: 01cb6ef008273100718cd8a8cfd921c02b8b8caa
- release-1.18: TODO

> udaf with user defined pojo object throw error while generate record equaliser
> --
>
> Key: FLINK-33063
> URL: https://issues.apache.org/jira/browse/FLINK-33063
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Udaf with user define pojo object throw error while generating record 
> equaliser: 
> When user create an udaf while recore contains user define complex pojo 
> object (like List or Map). The codegen 
> will throw error while generating record equaliser, the error is:
> {code:java}
> A method named "compareTo" is not declared in any enclosing class nor any 
> subtype, nor through a static import.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33063) udaf with user defined pojo object throw error while generate record equaliser

2023-09-09 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-33063:

Fix Version/s: 1.19.0

> udaf with user defined pojo object throw error while generate record equaliser
> --
>
> Key: FLINK-33063
> URL: https://issues.apache.org/jira/browse/FLINK-33063
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>
> Udaf with user define pojo object throw error while generating record 
> equaliser: 
> When user create an udaf while recore contains user define complex pojo 
> object (like List or Map). The codegen 
> will throw error while generating record equaliser, the error is:
> {code:java}
> A method named "compareTo" is not declared in any enclosing class nor any 
> subtype, nor through a static import.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33063) udaf with user defined pojo object throw error while generate record equaliser

2023-09-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33063:
---
Labels: pull-request-available  (was: )

> udaf with user defined pojo object throw error while generate record equaliser
> --
>
> Key: FLINK-33063
> URL: https://issues.apache.org/jira/browse/FLINK-33063
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Udaf with user define pojo object throw error while generating record 
> equaliser: 
> When user create an udaf while recore contains user define complex pojo 
> object (like List or Map). The codegen 
> will throw error while generating record equaliser, the error is:
> {code:java}
> A method named "compareTo" is not declared in any enclosing class nor any 
> subtype, nor through a static import.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong commented on pull request #23374: [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser

2023-09-09 Thread via GitHub


wuchong commented on PR #23374:
URL: https://github.com/apache/flink/pull/23374#issuecomment-1712461880

   @swuferhong could you open a cherry-pick pull request for release-1.18 
branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wuchong merged pull request #23374: [FLINK-33063][table-runtime] Fix udaf with complex user defined pojo object throw error while generate record equaliser

2023-09-09 Thread via GitHub


wuchong merged PR #23374:
URL: https://github.com/apache/flink/pull/23374


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33053) Watcher leak in Zookeeper HA mode

2023-09-09 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-33053:
---
Attachment: taskmanager_flink-native-test-117-taskmanager-1-9_thread_dump 
(1).json

> Watcher leak in Zookeeper HA mode
> -
>
> Key: FLINK-33053
> URL: https://issues.apache.org/jira/browse/FLINK-33053
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0, 1.17.1
>Reporter: Yangze Guo
>Priority: Critical
> Attachments: 26.dump.zip, 26.log, 
> taskmanager_flink-native-test-117-taskmanager-1-9_thread_dump (1).json
>
>
> We observe a watcher leak in our OLAP stress test when enabling Zookeeper HA 
> mode. TM's watches on the leader of JobMaster has not been stopped after job 
> finished.
> Here is how we re-produce this issue:
>  - Start a session cluster and enable Zookeeper HA mode.
>  - Continuously and concurrently submit short queries, e.g. WordCount to the 
> cluster.
>  - echo -n wchp | nc \{zk host} \{zk port} to get current watches.
> We can see a lot of watches on 
> /flink/\{cluster_name}/leader/\{job_id}/connection_info.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33053) Watcher leak in Zookeeper HA mode

2023-09-09 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763315#comment-17763315
 ] 

Yangze Guo commented on FLINK-33053:


[~tison] Yes, it seems teh curator framework issued the watcher removal request 
at least. But will there be any logs printed when this request is successful?

 

[~mapohl] Thead dump of a TM with watch leak 
[^taskmanager_flink-native-test-117-taskmanager-1-9_thread_dump (1).json]

> Watcher leak in Zookeeper HA mode
> -
>
> Key: FLINK-33053
> URL: https://issues.apache.org/jira/browse/FLINK-33053
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0, 1.17.1
>Reporter: Yangze Guo
>Priority: Critical
> Attachments: 26.dump.zip, 26.log, 
> taskmanager_flink-native-test-117-taskmanager-1-9_thread_dump (1).json
>
>
> We observe a watcher leak in our OLAP stress test when enabling Zookeeper HA 
> mode. TM's watches on the leader of JobMaster has not been stopped after job 
> finished.
> Here is how we re-produce this issue:
>  - Start a session cluster and enable Zookeeper HA mode.
>  - Continuously and concurrently submit short queries, e.g. WordCount to the 
> cluster.
>  - echo -n wchp | nc \{zk host} \{zk port} to get current watches.
> We can see a lot of watches on 
> /flink/\{cluster_name}/leader/\{job_id}/connection_info.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JunRuiLee closed pull request #23385: Test

2023-09-09 Thread via GitHub


JunRuiLee closed pull request #23385: Test
URL: https://github.com/apache/flink/pull/23385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23385: Test

2023-09-09 Thread via GitHub


flinkbot commented on PR #23385:
URL: https://github.com/apache/flink/pull/23385#issuecomment-1712427473

   
   ## CI report:
   
   * ef05623fab9dfdfd60770be3964bbcdb0422a4a9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee opened a new pull request, #23385: Test

2023-09-09 Thread via GitHub


JunRuiLee opened a new pull request, #23385:
URL: https://github.com/apache/flink/pull/23385

   A test pr which used to trigger ci.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org