[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-04-08 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834798#comment-17834798
 ] 

Zhu Zhu commented on FLINK-32513:
-

{{Transformation}} is not a public interface, it is an @Internal class. 
Ideally, kafka connectors should not directly manipulate {{Transformation}}.
Production-wise, we may take a look whether there is a workaround to avoid 
break existing kafka connectors.

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-04-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833931#comment-17833931
 ] 

Martijn Visser commented on FLINK-32513:


I've filed FLINK-35009 for this issue, and linked it to this issue

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-04-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833929#comment-17833929
 ] 

Martijn Visser commented on FLINK-32513:


[~jeyhunkarimov] That shouldn't be our starting point, especially since this 
change has also been introduced in patch version. Meaning that all custom 
connectors that users have built, could suddenly also no longer work with a 
patch version. We could also potentially have to update all externalized 
connectors, who might depend on this interface. Our starting point should be 
introduce non-breaking changes. Specifics may very based on the interface 
annotation. 

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-04-04 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833919#comment-17833919
 ] 

Jeyhun Karimov commented on FLINK-32513:


Hi [~martijnvisser]  Thanks for reporting it. Would it be possible to fix it in 
kafka connector side, to make it compatible with releases 1.18 and 1.19?

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-04-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833912#comment-17833912
 ] 

Martijn Visser commented on FLINK-32513:


[~zhuzh] [~jeyhunkarimov] I'm seeing tests for the Flink Kafka connector weekly 
builds fail on Flink 1.19-SNAPSHOT with:

{code:java}
Error:  Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
(default-testCompile) on project flink-connector-kafka: Compilation failure: 
Compilation failure: 
Error:  
/home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[214,24]
 
org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation
 is not abstract and does not override abstract method 
getTransitivePredecessorsInternal() in org.apache.flink.api.dag.Transformation
Error:  
/home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[220,44]
 getTransitivePredecessors() in 
org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation
 cannot override getTransitivePredecessors() in 
org.apache.flink.api.dag.Transformation
Error:overridden method is final
{code}

Example: 
https://github.com/apache/flink-connector-kafka/actions/runs/8494349338/job/23269406762#step:15:167

Looking at getTransitivePredecessors, it seems that it was changed in an 
incompatible way with this PR. Can you double check, and fix this in Flink 
itself? 

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2023-12-21 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799647#comment-17799647
 ] 

Zhu Zhu commented on FLINK-32513:
-

The problem happens because the {{getTransitivePredecessors()}} of 
{{TwoInputTransformation}} is not properly implemented, which results in a 
2^N(N = number of TwoInputTransformation) time cost to iterate and space cost 
to store the predecessors.
A possible solution can be adding a cache of predecessors for each 
Transformation and using LinkedHashSet to deduplicate the predecessors.
Note that a few other transformations can lead to the same problem too, e.g. 
UnionTransformation, AbstractMultipleInputTransformation.

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Priority: Critical
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2023-11-23 Thread Vladislav Keda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789203#comment-17789203
 ] 

Vladislav Keda commented on FLINK-32513:


Hi [~huwh]! I would like to ask if there are any news on this issue?

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Priority: Critical
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2023-07-10 Thread Vladislav Keda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741639#comment-17741639
 ] 

Vladislav Keda commented on FLINK-32513:


Hi [~huwh] , this job is automatically generated by our config file parser. So 
I can't give you Flink code example, but I can give you a pseudo description of 
the task topology:

The job graph consists of a Kafka Source, 20 consecutive "Nodes" and a Kafka 
Sink. Each "Node" represents 3 transformations shown in the picture.
!image-2023-07-10-17-26-46-544.png|width=493,height=264!

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Priority: Critical
> Attachments: image-2023-07-10-17-23-30-472.png, 
> image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[jira] [Commented] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2023-07-04 Thread Weihua Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740011#comment-17740011
 ] 

Weihua Hu commented on FLINK-32513:
---

Thanks [~vladislav.keda] reporting this. Could you provide the job topology to 
help reproduce it?

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Priority: Critical
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
>