[jira] [Created] (FLINK-34105) Akka timeout happens in TPC-DS benchmarks
Zhu Zhu created FLINK-34105: --- Summary: Akka timeout happens in TPC-DS benchmarks Key: FLINK-34105 URL: https://issues.apache.org/jira/browse/FLINK-34105 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.19.0 Reporter: Zhu Zhu Attachments: image-2024-01-16-13-59-45-556.png We noticed akka timeout happens in 10TB TPC-DS benchmarks in 1.19. The problem did not happen in 1.18.0. After bisecting, we find the problem was introduced in FLINK-33532. !image-2024-01-16-13-59-45-556.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33677) Remove flink-conf.yaml from flink dist
Zhu Zhu created FLINK-33677: --- Summary: Remove flink-conf.yaml from flink dist Key: FLINK-33677 URL: https://issues.apache.org/jira/browse/FLINK-33677 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Zhu Zhu FLINK-33297/FLIP-366 supports parsing standard YAML files for Flink configuration. A new configuration file config.yaml, which should be a standard YAML file, is introduced. To ensure compatibility, in Flink 1.x, the old configuration parser will still be used if the old configuration file flink-conf.yaml exists. Only if it does not exist, the new configuration file will be used. In Flink 2.0, we should remove the old configuration file from flink dist, as well as the old configuration parser. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32654) Deprecate ExecutionConfig#canEqual(obj)
Zhu Zhu created FLINK-32654: --- Summary: Deprecate ExecutionConfig#canEqual(obj) Key: FLINK-32654 URL: https://issues.apache.org/jira/browse/FLINK-32654 Project: Flink Issue Type: Technical Debt Components: Runtime / Configuration Environment: ExecutionConfig#canEqual(obj) checks whether the object is an instance of ExecutionConfig. It is not intended to be used by users but was used for internal comparison. Unfortunately, is was exposed as `@Public` because ExecutionConfig is `@Public`. We should deprecate it so that we can remove it in Flink 2.0. Reporter: Zhu Zhu Fix For: 1.18.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32384) Remove deprecated configuration keys which violate YAML spec
Zhu Zhu created FLINK-32384: --- Summary: Remove deprecated configuration keys which violate YAML spec Key: FLINK-32384 URL: https://issues.apache.org/jira/browse/FLINK-32384 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Zhu Zhu Fix For: 2.0.0 In FLINK-29372, key that violate YAML spec are renamed to a valid form and the old names are deprecated. In Flink 2.0 we should remove these deprecated keys. This will prevent users (unintentionally) to create invalid YAML form flink-conf.yaml. Then with the work of FLINK-23620, we can remove the non-standard YAML parsing logic and enforce standard YAML validation in CI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32383) 2.0 Breaking configuration changes
Zhu Zhu created FLINK-32383: --- Summary: 2.0 Breaking configuration changes Key: FLINK-32383 URL: https://issues.apache.org/jira/browse/FLINK-32383 Project: Flink Issue Type: Technical Debt Components: Runtime / Configuration Reporter: Zhu Zhu Fix For: 2.0.0 Umbrella issue for all breaking changes to Flink configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31052) Release Testing: Verify FLINK-30707 Improve slow task detection
Zhu Zhu created FLINK-31052: --- Summary: Release Testing: Verify FLINK-30707 Improve slow task detection Key: FLINK-31052 URL: https://issues.apache.org/jira/browse/FLINK-31052 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.17.0 This task aims to verify [FLINK-30707 which improves the slow task detection|https://issues.apache.org/jira/browse/FLINK-30707]. The slow task detection now takes the input data volume of tasks into account. Tasks which has a longer execution time but consumes more data may not be considered as slow. This improvement helps to eliminate the negative impacts of data skew on slow task detecting. The documentation of speculative execution can be found [here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#speculative-execution] . One can verify it by creating intended data skew. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31005) Release Testing: Verify FLIP-281 Supports speculative execution of sinks
Zhu Zhu created FLINK-31005: --- Summary: Release Testing: Verify FLIP-281 Supports speculative execution of sinks Key: FLINK-31005 URL: https://issues.apache.org/jira/browse/FLINK-31005 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.17.0 This task aims to verify [FLIP-281 Supports speculative execution of sinks|https://issues.apache.org/jira/browse/FLINK-30725]. The documentation can be found [here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution] . Things to verify: 1. If a sink implements the decorative interface {{SupportsConcurrentExecutionAttempts}, Speculative executions can be performed, otherwise not. Sinks to verify includes SinkFunction, OutputFormat and Sink(V2). 2. These built-in sinks supports speculative execution: DiscardingSink, PrintSinkFunction, PrintSink, FileSink, FileSystemOutputFormat, HiveTableSink If it's hard to construct a case that speculative execution would happen, especially for those built-in sinks, the speculative execution configuration can be tuned to allow it easier to happen, e.g. set {{slow-task-detector.execution-time.baseline-lower-bound}} and {{slow-task-detector.execution-time.baseline-ratio}} to {{0}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30904) Update the documentation and configuration description of slow task detector
Zhu Zhu created FLINK-30904: --- Summary: Update the documentation and configuration description of slow task detector Key: FLINK-30904 URL: https://issues.apache.org/jira/browse/FLINK-30904 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Configuration Environment: FLINK-30707 improved the slow task detecting. The previous documentation and configuration descriptions of SlowTaskDetector need to be be updated for it. Reporter: Zhu Zhu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30834) SortAggITCase.testLeadLag is unstable in ALL_EXCHANGES_BLOCKING mode
Zhu Zhu created FLINK-30834: --- Summary: SortAggITCase.testLeadLag is unstable in ALL_EXCHANGES_BLOCKING mode Key: FLINK-30834 URL: https://issues.apache.org/jira/browse/FLINK-30834 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.1, 1.17.0 Reporter: Zhu Zhu When investigating the problem of FLINK-30828, we found that the SortAggITCase.testLeadLag is working well in ALL_EXCHANGES_PIPELINED mode. However, it becomes unstable when runs in ALL_EXCHANGES_BLOCKING mode, which is unexpected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using Junit4 while some child tests are using JUnit5
Zhu Zhu created FLINK-30815: --- Summary: BatchTestBase/BatchAbstractTestBase are using Junit4 while some child tests are using JUnit5 Key: FLINK-30815 URL: https://issues.apache.org/jira/browse/FLINK-30815 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.16.0 Reporter: Zhu Zhu BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests (e.g. DynamicFilteringITCase) are using JUnit5. This may break some assumption and hide some problems. For example, the child test will create a MiniCluster by itself, instead of using the MiniCluster(TM=1, slots=3) created in BatchAbstractTestBase. The created MiniCluster may have more slots and hide resource deadlock issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30814) The parallelism of sort after a global partitioning is forced to be 1
Zhu Zhu created FLINK-30814: --- Summary: The parallelism of sort after a global partitioning is forced to be 1 Key: FLINK-30814 URL: https://issues.apache.org/jira/browse/FLINK-30814 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0 Reporter: Zhu Zhu Fix For: 1.17.0 The parallelism of sort after a global partitioning is forced to be 1. The may lead to the parallelism to be changed by adaptive batch scheduler, which is unexpected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30480) Add benchmarks for adaptive batch scheduler
Zhu Zhu created FLINK-30480: --- Summary: Add benchmarks for adaptive batch scheduler Key: FLINK-30480 URL: https://issues.apache.org/jira/browse/FLINK-30480 Project: Flink Issue Type: Improvement Components: Benchmarks, Runtime / Coordination Reporter: Zhu Zhu Currently we only have benchmarks of DefaultScheduler(FLINK-20612). We should also have benchmarks of AdaptiveBatchScheduler to identify initializing/scheduling/deployment performance problems or regressions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29989) Enable FlameGraph for arbitrary thread on TaskManager
Zhu Zhu created FLINK-29989: --- Summary: Enable FlameGraph for arbitrary thread on TaskManager Key: FLINK-29989 URL: https://issues.apache.org/jira/browse/FLINK-29989 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Zhu Zhu FlameGraph for arbitrary thread on TaskManager can be helpful for tasks which will spawn other worker threads. See FLINK-29629 for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution
Zhu Zhu created FLINK-28981: --- Summary: Release Testing: Verify FLIP-245 sources speculative execution Key: FLINK-28981 URL: https://issues.apache.org/jira/browse/FLINK-28981 Project: Flink Issue Type: Sub-task Components: Connectors / Common, Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and FLIP-249. More details about this feature and how to use it can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. To do the verification, the process can be: - Write Flink jobs which has some {{source}} subtasks running much slower than others. 3 kinds of sources should be verified, including - [Source functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java] - [InputFormat sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java] - [FLIP-27 new sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java] - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution
Zhu Zhu created FLINK-28980: --- Summary: Release Testing: Verify FLIP-168 speculative execution Key: FLINK-28980 URL: https://issues.apache.org/jira/browse/FLINK-28980 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Speculative execution is introduced in Flink 1.16 to deal with temporary slow tasks caused by slow nodes. More details about this feature can be found in this documentation [PR|https://github.com/apache/flink/pull/20507]. This feature currently consists of 4 FLIPs: - FLIP-168: Speculative Execution core part - FLIP-224: Blocklist Mechanism - FLIP-245: Source Supports Speculative Execution - FLIP-249: Flink Web UI Enhancement for Speculative Execution This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249. To do the verification, the process can be: - Write a Flink job which has a subtask running much slower than others (e.g. sleep indefinitely if it runs on a certain host, the hostname can be retrieved via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + attemptNumer) % 2 == 0) - Modify Flink configuration file to enable speculative execution and tune the configuration as you like - Submit the job. Checking the web UI, logs, metrics and produced result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28907) Flink docs does not compile
Zhu Zhu created FLINK-28907: --- Summary: Flink docs does not compile Key: FLINK-28907 URL: https://issues.apache.org/jira/browse/FLINK-28907 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.16.0 Reporter: Zhu Zhu Fix For: 1.16.0 Flink docs suddenly fail to compile in my local environment, without no new change or rebase. The error is as below: go: github.com/apache/flink-connector-elasticsearch/docs upgrade => v0.0.0-20220715033920-cbeb08187b3a hugo: collected modules in 1832 ms Start building sites … ERROR 2022/08/10 17:48:29 [en] REF_NOT_FOUND: Ref "docs/connectors/table/elasticsearch": "/XXX/docs/content/docs/connectors/table/formats/overview.md:54:20": page not found ERROR 2022/08/10 17:48:29 [en] REF_NOT_FOUND: Ref "docs/connectors/datastream/elasticsearch": "/XXX/docs/content/docs/connectors/datastream/overview.md:44:20": page not found ERROR 2022/08/10 17:48:29 [en] REF_NOT_FOUND: Ref "docs/connectors/table/elasticsearch": "/XXX/docs/content/docs/connectors/table/overview.md:58:20": page not found WARN 2022/08/10 17:48:29 Expand shortcode is deprecated. Use 'details' instead. ERROR 2022/08/10 17:48:32 [zh] REF_NOT_FOUND: Ref "docs/connectors/table/elasticsearch": "/XXX/docs/content.zh/docs/connectors/table/formats/overview.md:54:20": page not found ERROR 2022/08/10 17:48:32 [zh] REF_NOT_FOUND: Ref "docs/connectors/datastream/elasticsearch": "/XXX/docs/content.zh/docs/connectors/datastream/overview.md:43:20": page not found ERROR 2022/08/10 17:48:32 [zh] REF_NOT_FOUND: Ref "docs/connectors/table/elasticsearch": "/XXX/docs/content.zh/docs/connectors/table/overview.md:58:20": page not found WARN 2022/08/10 17:48:32 Expand shortcode is deprecated. Use 'details' instead. Built in 6415 ms Error: Error building site: logged 6 error(s) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28771) Assign speculative execution attempt with correct CREATED timestamp
Zhu Zhu created FLINK-28771: --- Summary: Assign speculative execution attempt with correct CREATED timestamp Key: FLINK-28771 URL: https://issues.apache.org/jira/browse/FLINK-28771 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.16.0 Currently, newly created speculative execution attempt is assigned with a wrong CREATED timestamp in SpeculativeScheduler. We need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28759) Enable speculative execution for in AdaptiveBatchScheduler TPC-DS e2e tests
Zhu Zhu created FLINK-28759: --- Summary: Enable speculative execution for in AdaptiveBatchScheduler TPC-DS e2e tests Key: FLINK-28759 URL: https://issues.apache.org/jira/browse/FLINK-28759 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Reporter: Zhu Zhu Fix For: 1.16.0 To verify the correctness of speculative execution, we can enabled it in AdaptiveBatchScheduler TPC-DS e2e tests, which runs a lot of different batch jobs and verifies the result. Note that we need to disable the blocklist (by setting block duration to 0) in such single machine e2e tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28612) Cancel pending slot allocation after canceling executions
Zhu Zhu created FLINK-28612: --- Summary: Cancel pending slot allocation after canceling executions Key: FLINK-28612 URL: https://issues.apache.org/jira/browse/FLINK-28612 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Canceling pending slot allocation before canceling executions will result in execution failures and pollute the logs. It will also result in an execution to be FAILED even if the execution vertex has FINISHED, which breaks the assumption of SpeculativeScheduler#isExecutionVertexPossibleToFinish(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28610) Enable speculative execution of sources
Zhu Zhu created FLINK-28610: --- Summary: Enable speculative execution of sources Key: FLINK-28610 URL: https://issues.apache.org/jira/browse/FLINK-28610 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Currently speculative execution of sources is disabled. It can be enabled with the improvement done to support InputFormat sources and new sources to work correctly with speculative execution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28586) Speculative execution for new sources
Zhu Zhu created FLINK-28586: --- Summary: Speculative execution for new sources Key: FLINK-28586 URL: https://issues.apache.org/jira/browse/FLINK-28586 Project: Flink Issue Type: Sub-task Components: Connectors / Common, Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 This task enables new sources(FLIP-27) for speculative execution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28585) Speculative execution for InputFormat sources
Zhu Zhu created FLINK-28585: --- Summary: Speculative execution for InputFormat sources Key: FLINK-28585 URL: https://issues.apache.org/jira/browse/FLINK-28585 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 This task enables InputFormat sources for speculative execution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28402) Create FailureHandlingResultSnapshot with the truly failed execution
Zhu Zhu created FLINK-28402: --- Summary: Create FailureHandlingResultSnapshot with the truly failed execution Key: FLINK-28402 URL: https://issues.apache.org/jira/browse/FLINK-28402 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Previously, FailureHandlingResultSnapshot was always created to treat the only current attempt of an execution vertex as the failed execution. This is no longer right in speculative execution cases, in which an execution vertex can have multiple current executions, and any of them may fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28258) Introduce ExecutionHistory to host historical executions for each execution vertex
Zhu Zhu created FLINK-28258: --- Summary: Introduce ExecutionHistory to host historical executions for each execution vertex Key: FLINK-28258 URL: https://issues.apache.org/jira/browse/FLINK-28258 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 With speculative execution, tracking prior executions in an {{EvictingBoundedList}} does not work. This is because when using {{EvictingBoundedList}} relies on the assumption that the historical executions are added in ascending order of attempt number successively. This is no longer true if speculative execution is enabled. e.g. 3 speculative execution attempts #1, #2, #3 are running concurrently, later #3 failed, and then #1 failed, and execution attempt #2 keeps running. The broken assumption may result in exceptions in REST, job archiving and so on. We propose to introduce an {{ExecutionHistory}} to replace {{EvictingBoundedList}}. It hosts the historical executions in a {{LinkedHashMap}} with a size bound. When the map grows beyond the size bound, elements are dropped from the head of the map (FIFO order). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28193) Enable to identify whether a job vertex contains source/sink operators
Zhu Zhu created FLINK-28193: --- Summary: Enable to identify whether a job vertex contains source/sink operators Key: FLINK-28193 URL: https://issues.apache.org/jira/browse/FLINK-28193 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Speculative execution does not support sources/sinks in the first version. Therefore, it will not create speculation instances for vertices which contains source/sink operators. Note that a job vertex with no input/output does not mean it is a source/sink vertex. Multi-input sources can have input. And it's possible that the vertex with no output edge does not contain any sink operator. Besides that, a new sink with topology can spread the sink logic into multiple job vertices connected with job edges. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28139) Add documentation for speculative execution
Zhu Zhu created FLINK-28139: --- Summary: Add documentation for speculative execution Key: FLINK-28139 URL: https://issues.apache.org/jira/browse/FLINK-28139 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28138) Add metrics for speculative execution
Zhu Zhu created FLINK-28138: --- Summary: Add metrics for speculative execution Key: FLINK-28138 URL: https://issues.apache.org/jira/browse/FLINK-28138 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Zhu Zhu Fix For: 1.16.0 Following two metrics will be added to expose job problems and show the effectiveness of speculative execution: # {*}numSlowExecutionVertices{*}: Number of slow execution vertices at the moment. # {*}numEffectiveSpeculativeExecutions{*}: Number of speculative executions which finish before their corresponding original executions finish. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28137) Introduce SpeculativeScheduler
Zhu Zhu created FLINK-28137: --- Summary: Introduce SpeculativeScheduler Key: FLINK-28137 URL: https://issues.apache.org/jira/browse/FLINK-28137 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 A SpeculativeScheduler will be used if speculative execution is enabled. It extends AdaptiveBatchScheduler so that speculative execution can work along with the feature to adaptively tuning parallelisms for batch jobs. The major differences of SpeculativeScheduler are: * SpeculativeScheduler needs to be able to directly deploy an Execution, while AdaptiveBatchScheduler can only perform ExecutionVertex level deployment. * SpeculativeScheduler does not restart the ExecutionVertex if an execution fails when any other current execution is still making progress * SpeculativeScheduler listens on slow tasks. Once there are slow tasks, it will block the slow nodes and deploy speculative executions of the slow tasks on other nodes. * Once any execution finishes, SpeculativeScheduler will cancel all the remaining executions of the same execution vertex. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28136) Implement ExecutionTimeBasedSlowTaskDetector
Zhu Zhu created FLINK-28136: --- Summary: Implement ExecutionTimeBasedSlowTaskDetector Key: FLINK-28136 URL: https://issues.apache.org/jira/browse/FLINK-28136 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 In the first version of speculative execution, an ExecutionTimeBasedSlowTaskDetector will be used to detect slow tasks. For ExecutionTimeBasedSlowTaskDetector, if a task's execution time is much longer than that of most tasks of the same JobVertex, the task will be identified as slow. More specifically, it will compute an execution time baseline for each JobVertex. Tasks which execute longer than or equals to the baseline will be identified as slow tasks. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28135) Introduce SlowTaskDetector
Zhu Zhu created FLINK-28135: --- Summary: Introduce SlowTaskDetector Key: FLINK-28135 URL: https://issues.apache.org/jira/browse/FLINK-28135 Project: Flink Issue Type: Sub-task Reporter: Zhu Zhu Fix For: 1.16.0 A SlowTaskDetector will periodically check all the current tasks/executions and notify the SlowTaskDetectorListener about the detected slow tasks. SpeculativeScheduler will register itself as the SlowTaskDetectorListener. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28134) Introduce SpeculativeExecutionVertex
Zhu Zhu created FLINK-28134: --- Summary: Introduce SpeculativeExecutionVertex Key: FLINK-28134 URL: https://issues.apache.org/jira/browse/FLINK-28134 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 SpeculativeExecutionVertex will be used if speculative execution is enabled, as a replacement of ExecutionVertex to form an ExecutionGraph. The core difference is that a SpeculativeExecutionVertex can have multiple current executions running at the same time. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28133) Rework DefaultScheduler to directly deploy executions
Zhu Zhu created FLINK-28133: --- Summary: Rework DefaultScheduler to directly deploy executions Key: FLINK-28133 URL: https://issues.apache.org/jira/browse/FLINK-28133 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Currently, the DefaultScheduler(base of AdaptiveBatchScheduler) can only perform ExecutionVertex level deployment. However, in this case, the scheduler is actually deploying the current execution attempt of the ExecutionVertex. Therefore, we need to rework the DefaultScheduler to directly deploy executions. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28131) FLIP-168: Speculative Execution for Batch Job
Zhu Zhu created FLINK-28131: --- Summary: FLIP-168: Speculative Execution for Batch Job Key: FLINK-28131 URL: https://issues.apache.org/jira/browse/FLINK-28131 Project: Flink Issue Type: New Feature Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.16.0 Speculative executions is helpful to mitigate slow tasks caused by problematic nodes. The basic idea is to start mirror tasks on other nodes when a slow task is detected. The mirror task processes the same input data and produces the same data as the original task. More detailed can be found in [FLIP-168|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job].] This is the umbrella ticket to track all the changes of this feature. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27944) IO metric collision happens when a task has union inputs
Zhu Zhu created FLINK-27944: --- Summary: IO metric collision happens when a task has union inputs Key: FLINK-27944 URL: https://issues.apache.org/jira/browse/FLINK-27944 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.15.0 Reporter: Zhu Zhu Fix For: 1.16.0 When a task has union inputs, some IO metrics(numBytesIn* and numBuffersIn*) of the different inputs may collide and failed to be registered. The problem can be reproduced with a simple job like: {code:java} DataStream source1 = env.fromElements("abc"); DataStream source2 = env.fromElements("123"); source1.union(source2).print();{code} Logs of collisions: {code:java} 2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInLocal'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInLocal'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInRemote'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInRemote'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBuffersInLocal'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBuffersInLocalPerSecond'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBuffersInLocal'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBuffersInLocalPerSecond'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBuffersInRemote'. Metric will not be reported.[, taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN
[jira] [Created] (FLINK-27710) Improve logs to better display Execution
Zhu Zhu created FLINK-27710: --- Summary: Improve logs to better display Execution Key: FLINK-27710 URL: https://issues.apache.org/jira/browse/FLINK-27710 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Task Affects Versions: 1.16.0 Reporter: Zhu Zhu Fix For: 1.16.0 Currently, an execution is usually represented as "{{{}job vertex name{}}} ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in logs, which may be redundant after this refactoring work. With the change of FLINK-17295, the representation of Execution in logs will be redundant. e.g. the subtask index is displayed 2 times. Therefore, I'm proposing to change the format to be "{{{}job vertex name{}}} ({{{}short ExecutionGraphID{}}}:{{{}JobVertexID{}}}) ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}#attemptNumber{}}})" and avoid directly display the {{{}ExecutionAttemptID{}}}. This can increase the log readability. Besides that, the displayed {{JobVertexID}} can also help to distinguish job vertices of the same name, which is common in DataStream jobs (e.g. multiple {{{}Map{}}}). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27460) Remove unused notifyPartitionDataAvailable process
Zhu Zhu created FLINK-27460: --- Summary: Remove unused notifyPartitionDataAvailable process Key: FLINK-27460 URL: https://issues.apache.org/jira/browse/FLINK-27460 Project: Flink Issue Type: Technical Debt Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Zhu Zhu Fix For: 1.16.0 The `notifyPartitionDataAvailable` process was used to trigger downstream task scheduling once a pipelined partition has data produced at TM side. It is no longer used. Therefore I propose to remove it to cleanup the code base. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-25996) Introduce job property isDynamicGraph to ExecutionConfig
Zhu Zhu created FLINK-25996: --- Summary: Introduce job property isDynamicGraph to ExecutionConfig Key: FLINK-25996 URL: https://issues.apache.org/jira/browse/FLINK-25996 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.15.0 To enable FLINK-25995 and FLINK-25046 only in dynamic graph scenario, we need a property ExecutionConfig#isDynamicGraph. In the first step, the property will be decided automatically, true iff config {{jobmanager.scheduler}} is {{AdaptiveBatch}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25995) Make implicit assumption of SQL local keyBy/groupBy explicit
Zhu Zhu created FLINK-25995: --- Summary: Make implicit assumption of SQL local keyBy/groupBy explicit Key: FLINK-25995 URL: https://issues.apache.org/jira/browse/FLINK-25995 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: Zhu Zhu Fix For: 1.15.0 If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner will change them except the first one to use forward partitioner, so that these operators can be chained to reduce unnecessary shuffles. However, sometimes the local keyBy operators are not chained (e.g. multiple inputs), and this kind of forward partitioners will turn into forward job edges. These forward edges still have the local keyBy assumption, so that they cannot be changed into rescale/rebalance edges, otherwise it can lead to incorrect results. This prevents the adaptive batch scheduler from determining parallelism for other forward edge downstream job vertices (see FLINK-25046). To solve it, I propose to introduce a new {{ForwardForRescalePartitioner}}. When SQL planner optimizes the case of multiple consecutive the same groupBy, it should use the proposed partitioner, so that the runtime framework can further decide whether the partitioner can be changed to rescale or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25226) Add documentation about the AdaptiveBatchScheduler
Zhu Zhu created FLINK-25226: --- Summary: Add documentation about the AdaptiveBatchScheduler Key: FLINK-25226 URL: https://issues.apache.org/jira/browse/FLINK-25226 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Zhu Zhu Fix For: 1.15.0 Documentation is needed to explain to users how to enable the AdaptiveBatchScheduler and properly configuring it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25225) Add e2e TPCDS tests to run against the AdatpiveBatchScheduler
Zhu Zhu created FLINK-25225: --- Summary: Add e2e TPCDS tests to run against the AdatpiveBatchScheduler Key: FLINK-25225 URL: https://issues.apache.org/jira/browse/FLINK-25225 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.15.0 To automatically and continuously verify the AdatpiveBatchScheduler, we should add a new e2e test which runs TPCDS against the AdatpiveBatchScheduler. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24005) Resource requirements declaration may be incorrect if JobMaster disconnects with a TaskManager with available slots in the SlotPool
Zhu Zhu created FLINK-24005: --- Summary: Resource requirements declaration may be incorrect if JobMaster disconnects with a TaskManager with available slots in the SlotPool Key: FLINK-24005 URL: https://issues.apache.org/jira/browse/FLINK-24005 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.2, 1.12.5, 1.14.0 Reporter: Zhu Zhu When a TaskManager disconnects with JobMaster, it will trigger the `DeclarativeSlotPoolService#decreaseResourceRequirementsBy()` for all the slots that are registered to the JobMaster from the TaskManager. If the slots are still available, i.e. not assigned to any task, the `decreaseResourceRequirementsBy` may lead to incorrect resource requirements declaration. For example, there is one job with 3 source tasks only. It requires 3 slots and declares for 3 slots. Initially all the tasks are running. Suddenly one task failed and waits for some delay before restarting. The previous slot is returned to the SlotPool. Now the job requires 2 slots and declares for 2 slots. At this moment, the TaskManager of that returned slot get lost. After the triggered `decreaseResourceRequirementsBy`, the job only declares for 1 slot. Finally, when the failed task starts to re-schedule, the job will declare for 2 slots while it actually needs 3 slots. The attached log of a real job and logs of the added test in https://github.com/zhuzhurk/flink/commit/59ca0ac5fa9c77b97c6e8a43dcc53ca8a0ad6c37 can demonstrate this case. Note that the real job is configured with a large "restart-strategy.fixed-delay.delay" and and large "slot.idle.timeout". So possibly in production it is a rare case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23826) Verify optimized scheduler performance for large-scale jobs
Zhu Zhu created FLINK-23826: --- Summary: Verify optimized scheduler performance for large-scale jobs Key: FLINK-23826 URL: https://issues.apache.org/jira/browse/FLINK-23826 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Zhu Zhu Fix For: 1.14.0 This ticket is used to verify the result of FLINK-21110. It should check if large scale jobs' scheduling are working well and the scheduling performance, with a real job running on cluster. The conclusion should include: 1. time of job initialization on master (job received -> scheduling started) 2. time of job scheduling and deployment (scheduling started -> all tasks in INITIALIZATION) 3. time of job restarting on task failover (JM notified about task failure -> all tasks in INITIALIZATION again) 4. master heap memory required for large scale jobs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23806) StackOverflowException can happen if a large scale job failed to acquire enough slots in time
Zhu Zhu created FLINK-23806: --- Summary: StackOverflowException can happen if a large scale job failed to acquire enough slots in time Key: FLINK-23806 URL: https://issues.apache.org/jira/browse/FLINK-23806 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.2, 1.12.5 Reporter: Zhu Zhu Fix For: 1.14.0, 1.12.6, 1.13.3 When requested slots are not fulfilled in time, task failure will be triggered and all related tasks will be canceled and restarted. However, in this process, if a task is already assigned a slot, the slot will be returned to the slot pool and it will be immediately used to fulfill pending slot requests of the tasks which will soon be canceled. The execution version of those tasks are already bumped in {{DefaultScheduler#restartTasksWithDelay(...)}} so that the assignment will fail immediately and the slot will be returned to the slot pool and again used to fulfill pending slot requests. StackOverflow can happen in this way when there are many vertices, and fatal error can happen and lead to JM will crash. A sample call stack is attached below. To fix the problem, one way is to cancel the pending requests of all the tasks which will be canceled soon(i.e. tasks with version bumped) before canceling these tasks. {panel} ... at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) ~[?:1.8.0_102] at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:542) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:505) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_102] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316) ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT] at
[jira] [Created] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELED/FAILED
Zhu Zhu created FLINK-22945: --- Summary: StackOverflowException can happen when a large scale job is CANCELED/FAILED Key: FLINK-22945 URL: https://issues.apache.org/jira/browse/FLINK-22945 Project: Flink Issue Type: Improvement Affects Versions: 1.12.4, 1.13.1 Reporter: Zhu Zhu The pending requests in ExecutionSlotAllocator are not cleared when a job transitions to CANCELING or FAILING, while all vertices will be canceled and assigned slot will be returned. The returned slot is possible to be used to fulfill the pending request of a CANCELED vertex and the assignment will fail immediately and the slot will be returned and used to fulfilled another vertex in a recursive way. StackOverflow can happen in this way when there are many vertices. A sample call stack is attached below. To fix this problem, we should clear the pending requests in ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think it's better to improve the call stack of slot assignment to avoid similar StackOverflowException to occur. ... at org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) ~[?:1.8.0_102] at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_102] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_102] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102] at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281) ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT] at
[jira] [Created] (FLINK-21735) Harden JobMaster#updateTaskExecutionState()
Zhu Zhu created FLINK-21735: --- Summary: Harden JobMaster#updateTaskExecutionState() Key: FLINK-21735 URL: https://issues.apache.org/jira/browse/FLINK-21735 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhu Zhu Currently JobMaster#updateTaskExecutionState() is very important to trigger task scheduling and recovery. We do not expect any exception to happen directly in its invocation, so I'd propose to wrap it with a try-catch to fail the JobMaster once any exception is caught. This can better expose the unexpected bugs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21734) Allow BLOCKING result partition to be individually consumable
Zhu Zhu created FLINK-21734: --- Summary: Allow BLOCKING result partition to be individually consumable Key: FLINK-21734 URL: https://issues.apache.org/jira/browse/FLINK-21734 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhu Zhu Currently a BLOCKING result partition is consumable only when all the partitions of the corresponding IntermediateResult become consumable. This is an unnecessary complication and has caused some problems (FLINK-21707). Therefore I propose to simplify it to let BLOCKING result partitions be consumable individually. Note that this will result in the scheduling to become execution-vertex-wise instead of stage-wise, with a nice side effect towards better resource utilization. The PipelinedRegionSchedulingStrategy can be simplified along with change to get rid of the {{correlatedResultPartitions}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21707) Job is possible to hang when restarting a FINISHED task with POINTWISE BLOCKING consumers
Zhu Zhu created FLINK-21707: --- Summary: Job is possible to hang when restarting a FINISHED task with POINTWISE BLOCKING consumers Key: FLINK-21707 URL: https://issues.apache.org/jira/browse/FLINK-21707 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.2, 1.11.3, 1.13.0 Reporter: Zhu Zhu Job is possible to hang when restarting a FINISHED task with POINTWISE BLOCKING consumers. This is because {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} will try to schedule all the consumer tasks/regions of the finished *ExecutionJobVertex*, even though the regions are not the exact consumers of the finished *ExecutionVertex*. In this case, some of the regions can be in state other than CREATED because they are not connected to and affected by the restarted tasks. However, {{PipelinedRegionSchedulingStrategy#maybeScheduleRegion()}} does not allow to schedule a non-CREATED region and will throw an Exception and breaks the scheduling of all the other regions. One example to show this problem case can be found at [PipelinedRegionSchedulingITCase#testRecoverFromPartitionException |https://github.com/zhuzhurk/flink/commit/1eb036b6566c5cb4958d9957ba84dc78ce62a08c]. To fix the problem, we can add a filter in {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} to only trigger the scheduling of regions in CREATED state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21576) Remove ExecutionVertex#getPreferredLocations
Zhu Zhu created FLINK-21576: --- Summary: Remove ExecutionVertex#getPreferredLocations Key: FLINK-21576 URL: https://issues.apache.org/jira/browse/FLINK-21576 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhu Zhu Fix For: 1.13.0 {{ExecutionVertex#getPreferredLocations()}} is superseded by {{DefaultPreferredLocationsRetriever}} and is no longer used. Hence, we can remove it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20931) Remove globalModVersion from ExecutionGraph
Zhu Zhu created FLINK-20931: --- Summary: Remove globalModVersion from ExecutionGraph Key: FLINK-20931 URL: https://issues.apache.org/jira/browse/FLINK-20931 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhu Zhu Fix For: 1.13.0 {{ExecutionGraph#globalModVersion}} is no longer used after legacy scheduler is removed. {{ExecutionVertexVersioner}} is used instead for concurrent global-local and local-local failures' handling. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20930) Remove AbstractExecutionSlotAllocator
Zhu Zhu created FLINK-20930: --- Summary: Remove AbstractExecutionSlotAllocator Key: FLINK-20930 URL: https://issues.apache.org/jira/browse/FLINK-20930 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhu Zhu Fix For: 1.13.0 {{AbstractExecutionSlotAllocator}} is no longer used after {{DefaultExecutionSlotAllocator}} was removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20721) Remove unknown input channels and process to update partitions
Zhu Zhu created FLINK-20721: --- Summary: Remove unknown input channels and process to update partitions Key: FLINK-20721 URL: https://issues.apache.org/jira/browse/FLINK-20721 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Network Affects Versions: 1.13.0 Reporter: Zhu Zhu Fix For: 1.13.0 With the latest pipelined region scheduling, Flink no longer launch a task before knowing the locations of all the partitions it consumes. `scheduleOrUpdateConsumers` is no longer needed so we removed it in FLINK-20439. Unknown input channels and the process to update it is also no longer needed. I'd propose to remove them and the benefits are: 1. simplifying the code of both scheduler and shuffle components 2. simplifying interfaces of ShuffleEnvironment and ShuffleDescriptor 3. ensure the assumptions in InputGate#resumeConsumption() implementations 4. allow to remove ScheduleMode#allowLazyDeployment() and later completely remove ScheduleMode -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20626) Canceling a job when it is failing will result in job hanging in CANCELING state
Zhu Zhu created FLINK-20626: --- Summary: Canceling a job when it is failing will result in job hanging in CANCELING state Key: FLINK-20626 URL: https://issues.apache.org/jira/browse/FLINK-20626 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.2, 1.12.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.13.0, 1.11.4, 1.12.1 If user manually cancels a job when the job is failing(here failing means the job encounters unrecoverable failure and is about to fail), the job will hang in CANCELING state and cannot terminate. The cause is that DefaultScheduler currently will always try to transition from `FAILING` to `FAILED` to terminate the job. However, job canceling will change job status to `CANCELING` so that the transition to `FAILED` will not success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20619) Remove InputDependencyConstraint and InputDependencyConstraintChecker
Zhu Zhu created FLINK-20619: --- Summary: Remove InputDependencyConstraint and InputDependencyConstraintChecker Key: FLINK-20619 URL: https://issues.apache.org/jira/browse/FLINK-20619 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhu Zhu Fix For: 1.13.0 InputDependencyConstraint was used by legacy scheduler and lazy-from-sources scheduling strategy. It is not needed anymore since both legacy scheduler and lazy-from-sources are removed. Hence we can remove it, as well as InputDependencyConstraintChecker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20439) Consider simplifying or removing mechanism to scheduleOrUpdateConsumers
Zhu Zhu created FLINK-20439: --- Summary: Consider simplifying or removing mechanism to scheduleOrUpdateConsumers Key: FLINK-20439 URL: https://issues.apache.org/jira/browse/FLINK-20439 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.13.0 Execution#scheduleOrUpdateConsumers() was used for multiple purposes: - schedule a vertex when its PIPELINED inputs have produced data - schedule a vertex when its BLOCKING inputs have finished - update consumed partition info to RUNNING consumers - cache consumed partition info for DEPLOYING consumers It is not needed anymore in the latest pipelined region scheduling because - a region will be scheduled only when all its upstream regions have finished - a vertex will always know all its consumed partitions when scheduled So we can consider how to simply or remove it, which also involves things like UnknownInputChannel, ResultPartitionConsumableNotifier, Execution#cachePartitionInfo(), etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20285) LazyFromSourcesSchedulingStrategy is possible to schedule non-CREATED vertices
Zhu Zhu created FLINK-20285: --- Summary: LazyFromSourcesSchedulingStrategy is possible to schedule non-CREATED vertices Key: FLINK-20285 URL: https://issues.apache.org/jira/browse/FLINK-20285 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.12.0, 1.11.3 LazyFromSourcesSchedulingStrategy is possible to schedule vertices which are not in CREATED state. This will lead result in unexpected check failure and result in fatal error[1]. The reason is that the status of a vertex to schedule was changed in LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices() during the invocation of schedulerOperations.allocateSlotsAndDeploy(...) on other vertices. e.g. ev1 and ev2 are in the same pipelined region and are restarted one by one in the scheduling loop in LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices(). They are all CREATED at the moment. ev1 is scheduled first but it immediately fails due to some slot allocation error and ev2 will be canceled as a result. So when ev2 is scheduled, its state would be CANCELED and the state check failed. [1] {code:java} 2020-11-19 13:34:17,231 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread 'flink-akka.actor.default-dispatcher-15' produced an uncaught exception. Stopping the process... java.util.concurrent.CompletionException: java.lang.IllegalStateException: expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_222] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_222] at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708) ~[?:1.8.0_222] at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687) ~[?:1.8.0_222] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_222] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.2.jar:1.11.2] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.2.jar:1.11.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.2.jar:1.11.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.2.jar:1.11.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.2.jar:1.11.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.2.jar:1.11.2] Caused by: java.lang.IllegalStateException: expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED at
[jira] [Created] (FLINK-20284) Error happens in TaskExecutor when closing JobMaster connection if there was a python UDF
Zhu Zhu created FLINK-20284: --- Summary: Error happens in TaskExecutor when closing JobMaster connection if there was a python UDF Key: FLINK-20284 URL: https://issues.apache.org/jira/browse/FLINK-20284 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 When a TaskExecutor successfully finished running a python UDF task and disconnecting from JobMaster, errors below will happen. This error, however, seems not affect job execution at the moment. {code:java} 2020-11-20 17:05:21,932 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - 1 Beam Fn Logging clients still connected during shutdown. 2020-11-20 17:05:21,938 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer[] - Hanged up for unknown endpoint. 2020-11-20 17:05:22,126 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> select: (f0) -> select: (add_one(f0) AS a) -> to: Tuple2 -> Sink: Streaming select table sink (1/1)#0 (b0c2104dd8f87bb1caf0c83586c22a51) switched from RUNNING to FINISHED. 2020-11-20 17:05:22,126 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source -> select: (f0) -> select: (add_one(f0) AS a) -> to: Tuple2 -> Sink: Streaming select table sink (1/1)#0 (b0c2104dd8f87bb1caf0c83586c22a51). 2020-11-20 17:05:22,128 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> select: (f0) -> select: (add_one(f0) AS a) -> to: Tuple2 -> Sink: Streaming select table sink (1/1)#0 b0c2104dd8f87bb1caf0c83586c22a51. 2020-11-20 17:05:22,156 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1., taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: b67c3307dcf93757adfb4f0f9f7b8c7b, jobId: d05f32162f38ec3ec813c4621bc106d9). 2020-11-20 17:05:22,157 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job d05f32162f38ec3ec813c4621bc106d9 from job leader monitoring. 2020-11-20 17:05:22,157 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job d05f32162f38ec3ec813c4621bc106d9. 2020-11-20 17:05:23,064 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down? java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2 at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT] at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[jira] [Created] (FLINK-20283) Make invalid managed memory fraction errors of python udf more user friendly
Zhu Zhu created FLINK-20283: --- Summary: Make invalid managed memory fraction errors of python udf more user friendly Key: FLINK-20283 URL: https://issues.apache.org/jira/browse/FLINK-20283 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 When managed memory is required for python udf but its "taskmanager.memory.managed.consumer-weights" is set to 0, error will happen but the message is hard to understand for users, see [1]. I think we should expose the invalid fraction error to users in this case and guide users to properly configure "taskmanager.memory.managed.consumer-weights". [1] org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:126) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:254) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:121) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:134) at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:94) at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator.open(AbstractRowPythonScalarFunctionOperator.java:67) at org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:64) at
[jira] [Created] (FLINK-20282) Make invalid managed memory fraction errors more advisory in MemoryManager
Zhu Zhu created FLINK-20282: --- Summary: Make invalid managed memory fraction errors more advisory in MemoryManager Key: FLINK-20282 URL: https://issues.apache.org/jira/browse/FLINK-20282 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 The invalid managed memory fraction errors[1] reported from MemoryManager are not advisory for users to solve the problem. This error happens when managed memory is required for a use case but its weight is 0. I think it would be better to enrich the error message to guide users to properly configure "taskmanager.memory.managed.consumer-weights". [1] "Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate must within (0, 1], was: 0.0" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19994) All vertices in an DataSet iteration job will be eagerly scheduled
Zhu Zhu created FLINK-19994: --- Summary: All vertices in an DataSet iteration job will be eagerly scheduled Key: FLINK-19994 URL: https://issues.apache.org/jira/browse/FLINK-19994 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 After switching to pipelined region scheduling, all vertices in an DataSet iteration job will be eagerly scheduled, which means BLOCKING result consumers can be deployed even before the result finishes and resource waste happens. This is because all vertices will be put into one pipelined region if the job contains {{ColocationConstraint}}, see [PipelinedRegionComputeUtil|https://github.com/apache/flink/blob/c0f382f5f0072441ef8933f6993f1c34168004d6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java#L52]. IIUC, this {{makeAllOneRegion()}} behavior was introduced to ensure co-located iteration head and tail to be restarted together in pipelined region failover. However, given that edges within an iteration will always be PIPELINED ([ref|https://github.com/apache/flink/blob/0523ef6451a93da450c6bdf5dd4757c3702f3962/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java#L1188]), co-located iteration head and tail will always be in the same region. So I think we can drop the {{PipelinedRegionComputeUtil#makeAllOneRegion()}} code path and build regions in the the same way no matter if there is co-location constraints or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19922) Remove SlotProvider from SchedulerNG
Zhu Zhu created FLINK-19922: --- Summary: Remove SlotProvider from SchedulerNG Key: FLINK-19922 URL: https://issues.apache.org/jira/browse/FLINK-19922 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 SchedulerNG now uses ExecutionSlotAllocator to allocate slots. SlotProvider is no longer directly needed by SchedulerNG and ExecutionGraph. Currently a ThrowingSlotProvider is used as a SlotProvider placeholder in SchedulerNG and ExecutionGraph. After legacy scheduling is removed in FLINK-19919, the placeholder for SlotProvider is not needed anymore and thus can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19921) Remove legacy RestartStrategy
Zhu Zhu created FLINK-19921: --- Summary: Remove legacy RestartStrategy Key: FLINK-19921 URL: https://issues.apache.org/jira/browse/FLINK-19921 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 The legacy {{org.apache.flink.runtime.executiongraph.restart.RestartStrategy}} is no longer in use since FLINK-19919 and thus can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19920) Remove legacy FailoverStrategy
Zhu Zhu created FLINK-19920: --- Summary: Remove legacy FailoverStrategy Key: FLINK-19920 URL: https://issues.apache.org/jira/browse/FLINK-19920 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 The legacy {{org.apache.flink.runtime.executiongraph.failover.FailoverStrategy}} is no longer in use since FLINK-19919 and thus can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19919) Remove legacy scheduling in ExecutionGraph components
Zhu Zhu created FLINK-19919: --- Summary: Remove legacy scheduling in ExecutionGraph components Key: FLINK-19919 URL: https://issues.apache.org/jira/browse/FLINK-19919 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 This is one step towards making ExecutionGraph a pure data structure. Note that this task mainly targets to remove the legacy codes of scheduling and failover. Codes of Execution state transition and task deployment will be factored out in follow-up tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19806) Job may try to leave SUSPENDED state in ExecutionGraph#failJob()
Zhu Zhu created FLINK-19806: --- Summary: Job may try to leave SUSPENDED state in ExecutionGraph#failJob() Key: FLINK-19806 URL: https://issues.apache.org/jira/browse/FLINK-19806 Project: Flink Issue Type: Bug Reporter: Zhu Zhu Assignee: Zhu Zhu {{SUSPENDED}} is a terminal state which a job is not supposed to leave this state once entering. However, {{ExecutionGraph#failJob()}} did not check it and may try to transition a job out from {{SUSPENDED}} state. This will cause unexpected errors and may lead to JM crash. The problem can be visible if we rework {{ExecutionGraphSuspendTest}} to be based on {{DefaultScheduler}}. We should harden the check in {{ExecutionGraph#failJob()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19714) Assign different logical pipelined regions of a DataSet job to different slot sharing group
Zhu Zhu created FLINK-19714: --- Summary: Assign different logical pipelined regions of a DataSet job to different slot sharing group Key: FLINK-19714 URL: https://issues.apache.org/jira/browse/FLINK-19714 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 Currently, job vertices of a DataSet job will be put into the same SlotSharingGroup. It does not make sense because producer and consumer pipelined regions will not be able to share slots. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19712) Pipelined region failover should skip restarting tasks in CREATED
Zhu Zhu created FLINK-19712: --- Summary: Pipelined region failover should skip restarting tasks in CREATED Key: FLINK-19712 URL: https://issues.apache.org/jira/browse/FLINK-19712 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 When a task fails and it is pipelined region failover strategy, all tasks in the region of the failed task and in the downstream regions will be canceled for later re-scheduling. However, these tasks can be still in CREATED state so that there is no need to cancel these tasks. Skipping canceling these tasks can speed up the failover and reduce a lot of unnecessary CANCELING logs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19703) A result partition is not untracked after its producer task failed in TaskManager
Zhu Zhu created FLINK-19703: --- Summary: A result partition is not untracked after its producer task failed in TaskManager Key: FLINK-19703 URL: https://issues.apache.org/jira/browse/FLINK-19703 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.2, 1.10.2, 1.12.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.12.0 {{Execution#maybeReleasePartitionsAndSendCancelRpcCall(...)}} will be not invoked when a task is reported to be failed in TaskManager, which results in its partitions to still be tacked by the job manager partition tracker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19598) E2E test Kerberized YARN application on Docker test (default input) fails on CI
Zhu Zhu created FLINK-19598: --- Summary: E2E test Kerberized YARN application on Docker test (default input) fails on CI Key: FLINK-19598 URL: https://issues.apache.org/jira/browse/FLINK-19598 Project: Flink Issue Type: Bug Components: Build System / Azure Pipelines Affects Versions: 1.12.0 Reporter: Zhu Zhu The root cause might be that the HDFS NameNode failed to start: {code:java} 2020-10-12T14:03:45.7653530Z 20/10/12 13:57:51 ERROR namenode.NameNode: Failed to start namenode. 2020-10-12T14:03:45.7653932Z java.net.BindException: Port in use: 0.0.0.0:50470 2020-10-12T14:03:45.7654346Zat org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:998) 2020-10-12T14:03:45.7655069Zat org.apache.hadoop.http.HttpServer2.start(HttpServer2.java:935) 2020-10-12T14:03:45.7655596Zat org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.start(NameNodeHttpServer.java:171) 2020-10-12T14:03:45.7656143Zat org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:842) 2020-10-12T14:03:45.7656665Zat org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:693) 2020-10-12T14:03:45.7657159Zat org.apache.hadoop.hdfs.server.namenode.NameNode.(NameNode.java:906) 2020-10-12T14:03:45.7657651Zat org.apache.hadoop.hdfs.server.namenode.NameNode.(NameNode.java:885) 2020-10-12T14:03:45.7658151Zat org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1626) 2020-10-12T14:03:45.7658661Zat org.apache.hadoop.hdfs.server.namenode.NameNode.main(NameNode.java:1694) 2020-10-12T14:03:45.7659086Z Caused by: java.net.BindException: Address already in use 2020-10-12T14:03:45.7659418Zat sun.nio.ch.Net.bind0(Native Method) 2020-10-12T14:03:45.7659700Zat sun.nio.ch.Net.bind(Net.java:433) 2020-10-12T14:03:45.7660012Zat sun.nio.ch.Net.bind(Net.java:425) 2020-10-12T14:03:45.7660397Zat sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:220) 2020-10-12T14:03:45.7660872Zat sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85) 2020-10-12T14:03:45.7661348Zat org.mortbay.jetty.nio.SelectChannelConnector.open(SelectChannelConnector.java:216) 2020-10-12T14:03:45.7661954Zat org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:993) 2020-10-12T14:03:45.7662289Z... 8 more {code} https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/7436/logs/127 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19570) Execution graph related tests are possibly broken due to registering duplicated ExecutionAttemptID
Zhu Zhu created FLINK-19570: --- Summary: Execution graph related tests are possibly broken due to registering duplicated ExecutionAttemptID Key: FLINK-19570 URL: https://issues.apache.org/jira/browse/FLINK-19570 Project: Flink Issue Type: Bug Components: Runtime / Coordination, Tests Affects Versions: 1.12.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.12.0 Since FLINK-17295, many tests encounters unexpected global failure due to registering duplicated ExecutionAttemptID. Although these tests do not appear to be broken yet, they are potentially broken/unstable. And it further blocks to rework these tests to be based on the new scheduer (FLINK-17760). Below is a sample error which happens in ExecutionTest#testAllPreferredLocationCalculation(): {code:java} java.lang.Exception: Trying to register execution Attempt #0 (TestVertex (1/1)) @ (unassigned) - [CREATED] for already used ID a22afb832b5f94b075d7ffb32fbc9023_146968a4de2df0b2fef1e4b2e8297993_0_0 at org.apache.flink.runtime.executiongraph.ExecutionGraph.registerExecution(ExecutionGraph.java:1621) [classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionVertex.(ExecutionVertex.java:181) [classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:211) [classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:139) [classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:448) [test-classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:419) [test-classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:411) [test-classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:452) [test-classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecution(ExecutionGraphTestUtils.java:477) [test-classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionTest.testAllPreferredLocationCalculation(ExecutionTest.java:298) [test-classes/:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_261] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_261] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_261] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_261] at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) [junit-4.12.jar:4.12] at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12] at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.12.jar:4.12] at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) [junit-4.12.jar:4.12] at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) [junit-4.12.jar:4.12] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) [junit-4.12.jar:4.12] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12] at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) [junit-4.12.jar:4.12] at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12] at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12] at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) [junit-rt.jar:?] at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) [junit-rt.jar:?] at
[jira] [Created] (FLINK-19287) Fix minor version in flink docs
Zhu Zhu created FLINK-19287: --- Summary: Fix minor version in flink docs Key: FLINK-19287 URL: https://issues.apache.org/jira/browse/FLINK-19287 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.2, 1.10.2 Reporter: Zhu Zhu Assignee: Zhu Zhu The minor version in docs of 1.10/1.11 still points to 1.10.0/1.11.0, which can mislead users to use old version artifacts. It should be corrected to 1.10.2/1.11.2. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19286) Improve pipelined region scheduling performance
Zhu Zhu created FLINK-19286: --- Summary: Improve pipelined region scheduling performance Key: FLINK-19286 URL: https://issues.apache.org/jira/browse/FLINK-19286 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.12.0 In my recent TPCDS benchmark, pipelined region scheduling is slower than lazy-from-sources scheduling. The regression is due to some suboptimal implementation of {{PipelinedRegionSchedulingStrategy}}, including: 1. topologically sorting of vertices to deploy 2. unnecessary O(V) loop when sorting an empty set of regions After improving these implementations, pipelined region scheduling turned to be 10% faster in the previous benchmark setup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19189) Enable pipelined scheduling by default
Zhu Zhu created FLINK-19189: --- Summary: Enable pipelined scheduling by default Key: FLINK-19189 URL: https://issues.apache.org/jira/browse/FLINK-19189 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.12.0 This task is to enable pipelined region scheduling by default, via setting the default value of config option "jobmanager.scheduler.scheduling-strategy" to "region". Here are the required verifications before we can make this change: 1. CI, including all UT/IT cases and E2E tests 2. stability tests 3. TPC-DS benchmark in 1T/10T data scale -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18972) Unfulfillable slot requests of Blink planner batch jobs never timeout
Zhu Zhu created FLINK-18972: --- Summary: Unfulfillable slot requests of Blink planner batch jobs never timeout Key: FLINK-18972 URL: https://issues.apache.org/jira/browse/FLINK-18972 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.1 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.12.0, 1.11.2 The unfulfillability check of batch slot requests are unexpectedly disabled in {{SchedulerImpl#start() -> BulkSlotProviderImpl#start()}}. This means slot allocation timeout will not be triggered if a Blink planner batch job cannot obtain any slot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18496) Anchors are not generated based on ZH characters
Zhu Zhu created FLINK-18496: --- Summary: Anchors are not generated based on ZH characters Key: FLINK-18496 URL: https://issues.apache.org/jira/browse/FLINK-18496 Project: Flink Issue Type: Bug Components: Project Website Reporter: Zhu Zhu In ZH version pages of flink-web, the anchors are not generated based on ZH characters. The anchor name would be like 'section-1', 'section-2' if there is no EN characters. An example can be found at https://flink.apache.org/zh/contributing/contribute-code.html This makes it impossible to ref an anchor from the content because the anchor name can change automatically if a new section is added. Note that it is a problem for flink-web only and the docs generated from the flink repo can properly generate ZH anchors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18372) NullPointerException can happen in SlotPoolImpl#maybeRemapOrphanedAllocation
Zhu Zhu created FLINK-18372: --- Summary: NullPointerException can happen in SlotPoolImpl#maybeRemapOrphanedAllocation Key: FLINK-18372 URL: https://issues.apache.org/jira/browse/FLINK-18372 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 NullPointerException can happen in SlotPoolImpl#maybeRemapOrphanedAllocation, which indicates a bug. https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/8189/logs/115 6:07:07,950 [flink-akka.actor.default-dispatcher-7] WARN org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager. java.lang.NullPointerException: null at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.maybeRemapOrphanedAllocation(SlotPoolImpl.java:599) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFulfillSlotRequestOrMakeAvailable(SlotPoolImpl.java:564) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.offerSlot(SlotPoolImpl.java:701) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.offerSlots(SlotPoolImpl.java:625) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.jobmaster.JobMaster.offerSlots(JobMaster.java:541) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_242] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_242] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [scala-library-2.11.12.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [scala-library-2.11.12.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.21.jar:2.5.21] 16:07:07,977 [flink-akka.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.5000, taskHeapMemory=64.000mb (67108864 bytes),
[jira] [Created] (FLINK-18355) Simplify tests of SlotPoolImpl
Zhu Zhu created FLINK-18355: --- Summary: Simplify tests of SlotPoolImpl Key: FLINK-18355 URL: https://issues.apache.org/jira/browse/FLINK-18355 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Reporter: Zhu Zhu Fix For: 1.12.0 Tests of SlotPoolImpl, including SlotPoolImplTest, SlotPoolInteractionsTest and SlotPoolSlotSharingTest, are somehow unnecessarily complicated in the code involvement. E.g. SchedulerImp built on top of SlotPoolImpl is used to allocate slots from SlotPoolImpl, which can be simplified by directly invoke slot allocation on SlotPoolImpl. Besides that, there are quite some duplications between tests classes of SlotPoolImpl, this further includes SlotPoolPendingRequestFailureTest, SlotPoolRequestCompletionTest and SlotPoolBatchSlotRequestTest. It can ease future development and maintenance a lot if we clean up these tests by 1. introduce a comment test base for fields and methods reuse 2. remove the usages of SchedulerImp for slotpool testing 3. other possible simplifications -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18295) Remove the hack logics of result consumers
Zhu Zhu created FLINK-18295: --- Summary: Remove the hack logics of result consumers Key: FLINK-18295 URL: https://issues.apache.org/jira/browse/FLINK-18295 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.12.0 Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its consumers. That's why the consumers of a `IntermediateResultPartition` is in the form of {{List>}}. However, in scheduler/{{ExecutionGraph}} there is assumption that one `IntermediateResultPartition` can be consumed by one only `ExecutionJobVertex`. This results in a lot of hack logics which assumes partition consumers to contain a single list. We should remove these hack logics. The idea is to change `IntermediateResultPartition#consumers` to be `List`. `ExecutionGraph` building logics should be adjusted accordingly with the assumption that an `IntermediateResult` can have one only consumer vertex. In `JobGraph`, there should also be check logics for this assumption. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18114) Expose more details in logs for debugging bulk slot allocation failures
Zhu Zhu created FLINK-18114: --- Summary: Expose more details in logs for debugging bulk slot allocation failures Key: FLINK-18114 URL: https://issues.apache.org/jira/browse/FLINK-18114 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.12.0 More detailed logs are needed for users to troubleshoot bulk slot allocation failures. https://github.com/apache/flink/pull/12375#discussion_r433922167 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18034) Introduce PreferredLocationsRetriever
Zhu Zhu created FLINK-18034: --- Summary: Introduce PreferredLocationsRetriever Key: FLINK-18034 URL: https://issues.apache.org/jira/browse/FLINK-18034 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.12.0 Preferred locations based on state and inputs are scattered into multiple components, which makes it harder to reasoning the calculation and complicates those hosting components. We can introduce a {{PreferredLocationsRetriever}} to be used by {{ExecutionSlotAllocator}} which returns preferred locations of an execution vertex and hides the details of the calculation from other components. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17821) Kafka010TableITCase>KafkaTableTestBase.testKafkaSourceSink failed on AZP
Zhu Zhu created FLINK-17821: --- Summary: Kafka010TableITCase>KafkaTableTestBase.testKafkaSourceSink failed on AZP Key: FLINK-17821 URL: https://issues.apache.org/jira/browse/FLINK-17821 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.12.0 Reporter: Zhu Zhu https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1871=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=34f486e1-e1e4-5dd2-9c06-bfdd9b9c74a8=12032 2020-05-19T16:29:40.7239430Z Test testKafkaSourceSink[legacy = false, topicId = 1](org.apache.flink.streaming.connectors.kafka.table.Kafka010TableITCase) failed with: 2020-05-19T16:29:40.7240291Z java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 2020-05-19T16:29:40.7241033Zat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 2020-05-19T16:29:40.7241542Zat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 2020-05-19T16:29:40.7242127Zat org.apache.flink.table.planner.runtime.utils.TableEnvUtil$.execInsertSqlAndWaitResult(TableEnvUtil.scala:31) 2020-05-19T16:29:40.7242729Zat org.apache.flink.table.planner.runtime.utils.TableEnvUtil.execInsertSqlAndWaitResult(TableEnvUtil.scala) 2020-05-19T16:29:40.7243239Zat org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.testKafkaSourceSink(KafkaTableTestBase.java:145) 2020-05-19T16:29:40.7243691Zat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-05-19T16:29:40.7244273Zat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-05-19T16:29:40.7244729Zat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-05-19T16:29:40.7245117Zat java.lang.reflect.Method.invoke(Method.java:498) 2020-05-19T16:29:40.7245515Zat org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 2020-05-19T16:29:40.7245956Zat org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2020-05-19T16:29:40.7246419Zat org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 2020-05-19T16:29:40.7246870Zat org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2020-05-19T16:29:40.7247287Zat org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) 2020-05-19T16:29:40.7251320Zat org.junit.rules.RunRules.evaluate(RunRules.java:20) 2020-05-19T16:29:40.7251833Zat org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 2020-05-19T16:29:40.7252251Zat org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 2020-05-19T16:29:40.7252716Zat org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 2020-05-19T16:29:40.7253117Zat org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 2020-05-19T16:29:40.7253502Zat org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 2020-05-19T16:29:40.7254041Zat org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 2020-05-19T16:29:40.7254528Zat org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 2020-05-19T16:29:40.7255500Zat org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 2020-05-19T16:29:40.7256064Zat org.junit.runners.ParentRunner.run(ParentRunner.java:363) 2020-05-19T16:29:40.7256438Zat org.junit.runners.Suite.runChild(Suite.java:128) 2020-05-19T16:29:40.7256758Zat org.junit.runners.Suite.runChild(Suite.java:27) 2020-05-19T16:29:40.7257118Zat org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 2020-05-19T16:29:40.7257486Zat org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 2020-05-19T16:29:40.7257885Zat org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 2020-05-19T16:29:40.7258389Zat org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 2020-05-19T16:29:40.7258821Zat org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 2020-05-19T16:29:40.7259219Zat org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 2020-05-19T16:29:40.7259664Zat org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 2020-05-19T16:29:40.7260098Zat org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 2020-05-19T16:29:40.7260635Zat org.junit.rules.RunRules.evaluate(RunRules.java:20) 2020-05-19T16:29:40.7261065Zat org.junit.runners.ParentRunner.run(ParentRunner.java:363) 2020-05-19T16:29:40.7261467Zat org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) 2020-05-19T16:29:40.7261952Zat org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
[jira] [Created] (FLINK-17760) Rework tests to not rely on legacy scheduling logics in ExecutionGraph anymore
Zhu Zhu created FLINK-17760: --- Summary: Rework tests to not rely on legacy scheduling logics in ExecutionGraph anymore Key: FLINK-17760 URL: https://issues.apache.org/jira/browse/FLINK-17760 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Reporter: Zhu Zhu The legacy scheduling logics in ExecutionGraph were used by the legacy scheduler. They are not in production use anymore. In order to remove legacy scheduling logics, it is needed to rework the tests which relied on them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17759) Remove RestartIndividualStrategy
Zhu Zhu created FLINK-17759: --- Summary: Remove RestartIndividualStrategy Key: FLINK-17759 URL: https://issues.apache.org/jira/browse/FLINK-17759 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu It was used by the legacy scheduler and is not used anymore. Removing it can ease the work to further remove the legacy scheduling logics in ExecutionGraph. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17758) Remove AdaptedRestartPipelinedRegionStrategyNG
Zhu Zhu created FLINK-17758: --- Summary: Remove AdaptedRestartPipelinedRegionStrategyNG Key: FLINK-17758 URL: https://issues.apache.org/jira/browse/FLINK-17758 Project: Flink Issue Type: Sub-task Reporter: Zhu Zhu It was used by the legacy scheduler and is not used anymore. Removing it can ease the work to further remove the legacy scheduling logics in ExecutionGraph. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17726) Scheduler should take care of tasks directly canceled by TaskManager
Zhu Zhu created FLINK-17726: --- Summary: Scheduler should take care of tasks directly canceled by TaskManager Key: FLINK-17726 URL: https://issues.apache.org/jira/browse/FLINK-17726 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 JobManager will not trigger failure handling when receiving CANCELED task update. This is because CANCELED tasks are usually caused by another FAILED task. These CANCELED tasks will be restarted by the failover process triggered FAILED task. However, if a task is directly CANCELED by TaskManager due to its own runtime issue, the task will not be recovered by JM and thus the job would hang. This is a potential issue and we should avoid it. A possible solution is to let JobManager treat tasks transitioning to CANCELED from all states except from CANCELING as failed tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17714) Support custom RestartBackoffTimeStrategy
Zhu Zhu created FLINK-17714: --- Summary: Support custom RestartBackoffTimeStrategy Key: FLINK-17714 URL: https://issues.apache.org/jira/browse/FLINK-17714 Project: Flink Issue Type: Wish Components: Runtime / Coordination Reporter: Zhu Zhu There are cases that users need to customize RestartBackoffTimeStrategy to better control job recovery. One example is that users want a job to restart only on certain errors and fail on others. See this ML [disscusion|https://lists.apache.org/thread.html/rde685552a83d0d146cf83560df1bc6f33d3dd569f69ae7bbcc4ae508%40%3Cuser.flink.apache.org%3E]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17542) Unify slot request timeout handling for streaming and batch tasks
Zhu Zhu created FLINK-17542: --- Summary: Unify slot request timeout handling for streaming and batch tasks Key: FLINK-17542 URL: https://issues.apache.org/jira/browse/FLINK-17542 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.11.0 There are 2 different slot request timeout handling mechanism for batch and streaming tasks. For streaming tasks, the slot request will fail if it is not fulfilled within slotRequestTimeout. For batch tasks, the slot request will be checked periodically to see whether it is fulfillable, and only fails if it has been unfulfillable for a certain period(slotRequestTimeout). With slot marked with whether they will be occupied indefinitely, we can unify these handling. See [FLIP-119|https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP-119PipelinedRegionScheduling-ExtendedSlotProviderInterface] for more details. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17410) BlockingPartitionBenchmark compilation failed due to changed StreamGraph interface
Zhu Zhu created FLINK-17410: --- Summary: BlockingPartitionBenchmark compilation failed due to changed StreamGraph interface Key: FLINK-17410 URL: https://issues.apache.org/jira/browse/FLINK-17410 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.11.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.11.0 {{StreamGraph#setGlobalDataExchangeMode(...)}} is introduced in FLINK-17020 to replace {{StreamGraph#setBlockingConnectionsBetweenChains(...)}}. {{BlockingPartitionBenchmark}} failed because it relies on this changed interface. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17330) Avoid scheduling deadlocks caused by intra-logical-region ALL-to-ALL blocking edges
Zhu Zhu created FLINK-17330: --- Summary: Avoid scheduling deadlocks caused by intra-logical-region ALL-to-ALL blocking edges Key: FLINK-17330 URL: https://issues.apache.org/jira/browse/FLINK-17330 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 Imagine a job like this: A --(pipelined FORWARD)--> B --(blocking ALL-to-ALL)--> D A --(pipelined FORWARD)--> C --(pipelined FORWARD)--> D parallelism=2 for all vertices. We will have 2 execution pipelined regions: R1={A1, B1, C1, D1}, R2={A2, B2, C2, D2} R1 has a cross-region input edge (B2->D1). R2 has a cross-region input edge (B1->D2). Scheduling deadlock will happen since we schedule a region only when all its inputs are consumable (i.e. blocking partitions to be finished). Because R1 can be scheduled only if R2 finishes, while R2 can be scheduled only if R1 finishes. To avoid this, one solution is to force a logical pipelined region with intra-region ALL-to-ALL blocking edges to form one only execution pipelined region, so that there would not be cyclic input dependency between regions. Besides that, we should also pay attention to avoid cyclic cross-region POINTWISE blocking edges. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17047) Simplify SchedulingStrategy#onPartitionConsumable(...) parameters
Zhu Zhu created FLINK-17047: --- Summary: Simplify SchedulingStrategy#onPartitionConsumable(...) parameters Key: FLINK-17047 URL: https://issues.apache.org/jira/browse/FLINK-17047 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 I'd propose to simplify SchedulingStrategy#onPartitionConsumable(...) parameters as below: 1. take IntermediateResultPartitionID instead of ResultPartitionID ResultPartitionID is a composition of IntermediateResultPartitionID and ExecutionAttemptID. SchedulingStrategy is not aware of ExecutionAttemptID so there is no need to expose it. 2. drop the executionVertexId param. executionVertexId does not provide extra information. The check in LazyFromSourcesSchedulingStrategy does not make much sense since the executionVertexId is just retrieved by the partitionId in an earlier stage. It makes things more complex since a blocking result partition can become consumable when a vertex who is not its producer finishes. This simplification also eases the work of FLINK-14234. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17046) SavepointWriterITCase failed on travis
Zhu Zhu created FLINK-17046: --- Summary: SavepointWriterITCase failed on travis Key: FLINK-17046 URL: https://issues.apache.org/jira/browse/FLINK-17046 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 https://api.travis-ci.com/v3/job/316732861/log.txt [ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 7.67 s <<< FAILURE! - in org.apache.flink.state.api.SavepointWriterITCase [ERROR] testStateBootstrapAndModification[Savepoint Writer: MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: UNDEFINED, maxStateSize: 5242880)](org.apache.flink.state.api.SavepointWriterITCase) Time elapsed: 1.736 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147) at org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.lang.UnsupportedOperationException: This method should never be called [ERROR] testStateBootstrapAndModification[Savepoint Writer: RocksDBStateBackend{checkpointStreamBackend=MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: UNDEFINED, maxStateSize: 5242880), localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED, numberOfTransferThreads=-1, writeBatchSize=-1}](org.apache.flink.state.api.SavepointWriterITCase) Time elapsed: 0.486 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147) at org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.lang.UnsupportedOperationException: This method should never be called -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17021) Blink Planner set GlobalDataExchangeMode
Zhu Zhu created FLINK-17021: --- Summary: Blink Planner set GlobalDataExchangeMode Key: FLINK-17021 URL: https://issues.apache.org/jira/browse/FLINK-17021 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Table SQL / Planner Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 Blink planner config option "table.exec.shuffle-mode" should be extended to set GlobalDataExchangeMode for a job, values supported are: * all-blocking/batch --> ALL_EDGES_BLOCKING * forward-pipelined-only --> FORWARD_EDGES_PIPELINED * pointwise-pipelined-only --> POINTWISE_EDGES_PIPELINED * all-pipelined/pipelined --> ALL_EDGES_PIPELINED Note that values 'pipelined' and 'batch' are still supported to be compatible: * ‘pipelined’ will be treated the same as ‘all-pipelined’ * ‘batch’ will be treated the same as as ‘all-blocking’ Blink planner needs to set GlobalDataExchangeMode to StreamGraph according to the config value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17020) Introduce GlobalDataExchangeMode for JobGraph Generation
Zhu Zhu created FLINK-17020: --- Summary: Introduce GlobalDataExchangeMode for JobGraph Generation Key: FLINK-17020 URL: https://issues.apache.org/jira/browse/FLINK-17020 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 Introduce GlobalDataExchangeMode with 4 modes: * ALL_EDGES_BLOCKING * FORWARD_EDGES_PIPELINED * POINTWISE_EDGES_PIPELINED * ALL_EDGES_PIPELINED StreamGraph will be extended with a new field to host the GlobalDataExchangeMode. In the JobGraph generation stage, this mode will be used to determine the data exchange type of each job edge. More details see [FLIP-119#Global Data Exchange Mode|https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP-119PipelinedRegionScheduling-GlobalDataExchangeMode] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17019) Implement FIFO Physical Slot Assignment in SlotPoolImpl
Zhu Zhu created FLINK-17019: --- Summary: Implement FIFO Physical Slot Assignment in SlotPoolImpl Key: FLINK-17019 URL: https://issues.apache.org/jira/browse/FLINK-17019 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 The SlotPool should try to fulfill the oldest pending slot request once it receives an available slot, no matter if the slot is returned by another terminated task or is just offered from a task manager. This naturally ensures that slot requests of an earlier scheduled region will be fulfilled earlier than requests of a later scheduled region. We only need to change the slot assignment logic on slot offers. This is because the fields {{pendingRequests}} and {{waitingForResourceManager}} store the pending requests in LinkedHashMaps . Therefore, {{tryFulfillSlotRequestOrMakeAvailable(...)}} will naturally fulfill the pending requests in inserted order. When a new slot is offered via {{SlotPoolImpl#offerSlot(...)}} , we should use it to fulfill the oldest fulfillable slot request directly by invoking {{tryFulfillSlotRequestOrMakeAvailable(...)}}. If a pending request (say R1) exists with the allocationId of the offered slot, and it is different from the request to fulfill (say R2), we should update the pendingRequest to replace AllocationID of R1 to be the AllocationID of R2. This ensures failAllocation(...) can fail slot allocation requests to trigger restarting tasks and re-allocating slots. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17018) Use Bulk Slot Allocation in DefaultExecutionSlotAllocator
Zhu Zhu created FLINK-17018: --- Summary: Use Bulk Slot Allocation in DefaultExecutionSlotAllocator Key: FLINK-17018 URL: https://issues.apache.org/jira/browse/FLINK-17018 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 The DefaultExecutionSlotAllocator should invoke bulk slot allocation methods to allocate slots. The SlotProviderStrategy should also be reworked to forward such requests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17017) Implement Bulk Slot Allocation in SchedulerImpl
Zhu Zhu created FLINK-17017: --- Summary: Implement Bulk Slot Allocation in SchedulerImpl Key: FLINK-17017 URL: https://issues.apache.org/jira/browse/FLINK-17017 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 The SlotProvider interface should be extended with an bulk slot allocation method which accepts a bulk of slot requests as one of the parameters. {code:java} CompletableFuture> allocateSlots( Collection slotRequests, Time allocationTimeout); class LogicalSlotRequest { SlotRequestId slotRequestId; ScheduledUnit scheduledUnit; SlotProfile slotProfile; boolean slotWillBeOccupiedIndefinitely; } class LogicalSlotRequestResult { SlotRequestId slotRequestId; LogicalSlot slot; } {code} More details see [FLIP-119#Bulk Slot Allocation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP-119PipelinedRegionScheduling-BulkSlotAllocation] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17014) Implement PipelinedRegionSchedulingStrategy
Zhu Zhu created FLINK-17014: --- Summary: Implement PipelinedRegionSchedulingStrategy Key: FLINK-17014 URL: https://issues.apache.org/jira/browse/FLINK-17014 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 The PipelinedRegionSchedulingStrategy submits one pipelined region to the DefaultScheduler each time. The PipelinedRegionSchedulingStrategy must be aware of the inputs of each pipelined region. It should schedule a region if and only if all the inputs of that region become consumable. PipelinedRegionSchedulingStrategy can implement as below: * startScheduling() : schedule all source regions one by one. * onPartitionConsumable(partition) : Check all the consumer regions of the notified partition, if all the inputs of a region have turned to be consumable, schedule the region * restartTasks(tasksToRestart) : find out all regions which contain the tasks to restart, reschedule those whose inputs are all consumable -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16560) StreamExecutionEnvironment configuration is empty when building program via PackagedProgramUtils#createJobGraph
Zhu Zhu created FLINK-16560: --- Summary: StreamExecutionEnvironment configuration is empty when building program via PackagedProgramUtils#createJobGraph Key: FLINK-16560 URL: https://issues.apache.org/jira/browse/FLINK-16560 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.10.0 Reporter: Zhu Zhu PackagedProgramUtils#createJobGraph(...) is used to generate JobGraph in k8s job mode. The problem is that the configuration field of StreamExecutionEnvironment is a newly created one when building the job program. This is because StreamPlanEnvironment ctor will base on the no param version ctor of StreamExecutionEnvironment. This may lead to an unexpected result when invoking StreamExecutionEnvironment#configure(...) which relies on the configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16430) Pipelined region scheduling
Zhu Zhu created FLINK-16430: --- Summary: Pipelined region scheduling Key: FLINK-16430 URL: https://issues.apache.org/jira/browse/FLINK-16430 Project: Flink Issue Type: Task Components: Runtime / Coordination Affects Versions: 1.11.0 Reporter: Zhu Zhu Fix For: 1.11.0 Pipelined region scheduling is targeting to allow batch jobs with PIPELINED data exchanges to run without the risk to encounter a resource deadlock. More details and work items will be added later when the detailed design is ready. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16300) Reworks SchedulerTestUtils with testing classes to replace mockito usages
Zhu Zhu created FLINK-16300: --- Summary: Reworks SchedulerTestUtils with testing classes to replace mockito usages Key: FLINK-16300 URL: https://issues.apache.org/jira/browse/FLINK-16300 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Affects Versions: 1.11.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.11.0 Mockito is used in SchedulerTestUtils to mock ExecutionVertex and Execution for testing. It fails to mock every getter so that other tests use it may encounter NPE issues, e.g. ExecutionVertex#getID(). Mockito is also discouraged to be used in Flink tests. So I'd propose to reworks the utils with testing classes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16276) Introduce factory methods to create DefaultScheduler for testing
Zhu Zhu created FLINK-16276: --- Summary: Introduce factory methods to create DefaultScheduler for testing Key: FLINK-16276 URL: https://issues.apache.org/jira/browse/FLINK-16276 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Affects Versions: 1.11.0 Reporter: Zhu Zhu Assignee: Zhu Zhu Fix For: 1.11.0 Currently tests create DefaultScheduler via its constructor. Having a builder and a set of factory methods can significantly reduce the complexity to instantiate DefaultScheduler. It can be very helpful especially when we are to rework tests to base on the new scheduler. -- This message was sent by Atlassian Jira (v8.3.4#803005)