[jira] [Created] (FLINK-34105) Akka timeout happens in TPC-DS benchmarks

2024-01-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-34105:
---

 Summary: Akka timeout happens in TPC-DS benchmarks
 Key: FLINK-34105
 URL: https://issues.apache.org/jira/browse/FLINK-34105
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.0
Reporter: Zhu Zhu
 Attachments: image-2024-01-16-13-59-45-556.png

We noticed akka timeout happens in 10TB TPC-DS benchmarks in 1.19. The problem 
did not happen in 1.18.0.
After bisecting, we find the problem was introduced in FLINK-33532.

 !image-2024-01-16-13-59-45-556.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33677) Remove flink-conf.yaml from flink dist

2023-11-28 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-33677:
---

 Summary: Remove flink-conf.yaml from flink dist
 Key: FLINK-33677
 URL: https://issues.apache.org/jira/browse/FLINK-33677
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Zhu Zhu


FLINK-33297/FLIP-366 supports parsing standard YAML files for Flink 
configuration. A new configuration file config.yaml, which should be a standard 
YAML file, is introduced.
To ensure compatibility, in Flink 1.x, the old configuration parser will still 
be used if the old configuration file flink-conf.yaml exists. Only if it does 
not exist, the new configuration file will be used.
In Flink 2.0, we should remove the old configuration file from flink dist, as 
well as the old configuration parser.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32654) Deprecate ExecutionConfig#canEqual(obj)

2023-07-24 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-32654:
---

 Summary: Deprecate ExecutionConfig#canEqual(obj)
 Key: FLINK-32654
 URL: https://issues.apache.org/jira/browse/FLINK-32654
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
 Environment: ExecutionConfig#canEqual(obj) checks whether the object 
is an instance of ExecutionConfig.
It is not intended to be used by users but was used for internal comparison. 
Unfortunately, is was exposed as `@Public` because ExecutionConfig is `@Public`.
We should deprecate it so that we can remove it in Flink 2.0.
Reporter: Zhu Zhu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32384) Remove deprecated configuration keys which violate YAML spec

2023-06-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-32384:
---

 Summary: Remove deprecated configuration keys which violate YAML 
spec
 Key: FLINK-32384
 URL: https://issues.apache.org/jira/browse/FLINK-32384
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Zhu Zhu
 Fix For: 2.0.0


In FLINK-29372, key that violate YAML spec are renamed to a valid form and the 
old names are deprecated.
In Flink 2.0 we should remove these deprecated keys. This will prevent users 
(unintentionally) to create invalid YAML form flink-conf.yaml.
Then with the work of FLINK-23620,  we can remove the non-standard YAML parsing 
logic and enforce standard YAML validation in CI.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32383) 2.0 Breaking configuration changes

2023-06-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-32383:
---

 Summary: 2.0 Breaking configuration changes
 Key: FLINK-32383
 URL: https://issues.apache.org/jira/browse/FLINK-32383
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
Reporter: Zhu Zhu
 Fix For: 2.0.0


Umbrella issue for all breaking changes to Flink configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31052) Release Testing: Verify FLINK-30707 Improve slow task detection

2023-02-13 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-31052:
---

 Summary: Release Testing: Verify FLINK-30707 Improve slow task 
detection
 Key: FLINK-31052
 URL: https://issues.apache.org/jira/browse/FLINK-31052
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.17.0


This task aims to verify [FLINK-30707 which improves the slow task 
detection|https://issues.apache.org/jira/browse/FLINK-30707]. 

The slow task detection now takes the input data volume of tasks into account. 
Tasks which has a longer execution time but consumes more data may not be 
considered as slow. This improvement helps to eliminate the negative impacts of 
data skew on slow task detecting.

The documentation of speculative execution can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#speculative-execution]
 .

One can verify it by creating intended data skew.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31005) Release Testing: Verify FLIP-281 Supports speculative execution of sinks

2023-02-09 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-31005:
---

 Summary: Release Testing: Verify FLIP-281 Supports speculative 
execution of sinks
 Key: FLINK-31005
 URL: https://issues.apache.org/jira/browse/FLINK-31005
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.17.0


This task aims to verify [FLIP-281 Supports speculative execution of 
sinks|https://issues.apache.org/jira/browse/FLINK-30725].
The documentation can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution]
 .

Things to verify:
1. If a sink implements the decorative interface 
{{SupportsConcurrentExecutionAttempts}, Speculative executions can be 
performed, otherwise not. Sinks to verify includes SinkFunction, OutputFormat 
and Sink(V2).
2. These built-in sinks supports speculative execution: DiscardingSink, 
PrintSinkFunction, PrintSink, FileSink, FileSystemOutputFormat, HiveTableSink

If it's hard to construct a case that speculative execution would happen, 
especially for those built-in sinks, the speculative execution configuration 
can be tuned to allow it easier to happen, e.g. set 
{{slow-task-detector.execution-time.baseline-lower-bound}} and 
{{slow-task-detector.execution-time.baseline-ratio}} to {{0}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30904) Update the documentation and configuration description of slow task detector

2023-02-05 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30904:
---

 Summary: Update the documentation and configuration description of 
slow task detector
 Key: FLINK-30904
 URL: https://issues.apache.org/jira/browse/FLINK-30904
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Configuration
 Environment: FLINK-30707 improved the slow task detecting. The 
previous documentation and configuration descriptions of SlowTaskDetector need 
to be be updated for it.
Reporter: Zhu Zhu
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30834) SortAggITCase.testLeadLag is unstable in ALL_EXCHANGES_BLOCKING mode

2023-01-30 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30834:
---

 Summary: SortAggITCase.testLeadLag is unstable in 
ALL_EXCHANGES_BLOCKING mode
 Key: FLINK-30834
 URL: https://issues.apache.org/jira/browse/FLINK-30834
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1, 1.17.0
Reporter: Zhu Zhu


When investigating the problem of FLINK-30828, we found that the 
SortAggITCase.testLeadLag is working well in ALL_EXCHANGES_PIPELINED mode. 
However, it becomes unstable when runs in ALL_EXCHANGES_BLOCKING mode, which is 
unexpected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using Junit4 while some child tests are using JUnit5

2023-01-28 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30815:
---

 Summary: BatchTestBase/BatchAbstractTestBase are using Junit4 
while some child tests are using JUnit5
 Key: FLINK-30815
 URL: https://issues.apache.org/jira/browse/FLINK-30815
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.16.0
Reporter: Zhu Zhu


BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests 
(e.g. DynamicFilteringITCase) are using JUnit5. This may break some assumption 
and hide some problems.
For example, the child test will create a MiniCluster by itself, instead of 
using the MiniCluster(TM=1, slots=3)  created in BatchAbstractTestBase. The 
created MiniCluster may  have more slots and hide resource deadlock issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30814) The parallelism of sort after a global partitioning is forced to be 1

2023-01-28 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30814:
---

 Summary: The parallelism of sort after a global 
partitioning is forced to be 1
 Key: FLINK-30814
 URL: https://issues.apache.org/jira/browse/FLINK-30814
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: Zhu Zhu
 Fix For: 1.17.0


The parallelism of sort after a global partitioning is forced to 
be 1. The may lead to the parallelism to be changed by adaptive batch 
scheduler, which is unexpected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30480) Add benchmarks for adaptive batch scheduler

2022-12-21 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30480:
---

 Summary: Add benchmarks for adaptive batch scheduler
 Key: FLINK-30480
 URL: https://issues.apache.org/jira/browse/FLINK-30480
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks, Runtime / Coordination
Reporter: Zhu Zhu


Currently we only have benchmarks of DefaultScheduler(FLINK-20612). We should 
also have benchmarks of AdaptiveBatchScheduler to identify 
initializing/scheduling/deployment performance problems or regressions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29989) Enable FlameGraph for arbitrary thread on TaskManager

2022-11-10 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-29989:
---

 Summary: Enable FlameGraph for arbitrary thread on TaskManager
 Key: FLINK-29989
 URL: https://issues.apache.org/jira/browse/FLINK-29989
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Zhu Zhu


FlameGraph for arbitrary thread on TaskManager can be helpful for tasks which 
will spawn other worker threads. See FLINK-29629 for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28981) Release Testing: Verify FLIP-245 sources speculative execution

2022-08-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28981:
---

 Summary: Release Testing: Verify FLIP-245 sources speculative 
execution
 Key: FLINK-28981
 URL: https://issues.apache.org/jira/browse/FLINK-28981
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common, Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims for verifying FLIP-245, along with FLIP-168, FLIP-224 and 
FLIP-249.

More details about this feature and how to use it can be found in this 
documentation [PR|https://github.com/apache/flink/pull/20507].

To do the verification, the process can be:
 - Write Flink jobs which has some {{source}} subtasks running much slower than 
others. 3 kinds of sources should be verified, including
   - [Source 
functions|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java]
   - [InputFormat 
sources|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java]
   - [FLIP-27 new 
sources|https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java]
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28980) Release Testing: Verify FLIP-168 speculative execution

2022-08-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28980:
---

 Summary: Release Testing: Verify FLIP-168 speculative execution
 Key: FLINK-28980
 URL: https://issues.apache.org/jira/browse/FLINK-28980
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Speculative execution is introduced in Flink 1.16 to deal with temporary slow 
tasks caused by slow nodes. More details about this feature can be found in 
this documentation [PR|https://github.com/apache/flink/pull/20507].

This feature currently consists of 4 FLIPs:
 - FLIP-168: Speculative Execution core part
 - FLIP-224: Blocklist Mechanism
 - FLIP-245: Source Supports Speculative Execution
 - FLIP-249: Flink Web UI Enhancement for Speculative Execution

This ticket aims to verify FLIP-168, along with FLIP-224 and FLIP-249.
To do the verification, the process can be:
 - Write a Flink job which has a subtask running much slower than others (e.g. 
sleep indefinitely if it runs on a certain host, the hostname can be retrieved 
via InetAddress.getLocalHost().getHostName(), or if its (subtaskIndex + 
attemptNumer) % 2 == 0)
 - Modify Flink configuration file to enable speculative execution and tune the 
configuration as you like
 - Submit the job. Checking the web UI, logs, metrics and produced result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28907) Flink docs does not compile

2022-08-10 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28907:
---

 Summary: Flink docs does not compile
 Key: FLINK-28907
 URL: https://issues.apache.org/jira/browse/FLINK-28907
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.16.0
Reporter: Zhu Zhu
 Fix For: 1.16.0


Flink docs suddenly fail to compile in my local environment, without no new 
change or rebase. The error is as below:

go: github.com/apache/flink-connector-elasticsearch/docs upgrade => 
v0.0.0-20220715033920-cbeb08187b3a
hugo: collected modules in 1832 ms
Start building sites …
ERROR 2022/08/10 17:48:29 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/elasticsearch": 
"/XXX/docs/content/docs/connectors/table/formats/overview.md:54:20": page not 
found
ERROR 2022/08/10 17:48:29 [en] REF_NOT_FOUND: Ref 
"docs/connectors/datastream/elasticsearch": 
"/XXX/docs/content/docs/connectors/datastream/overview.md:44:20": page not found
ERROR 2022/08/10 17:48:29 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/elasticsearch": 
"/XXX/docs/content/docs/connectors/table/overview.md:58:20": page not found
WARN 2022/08/10 17:48:29 Expand shortcode is deprecated. Use 'details' instead.
ERROR 2022/08/10 17:48:32 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/elasticsearch": 
"/XXX/docs/content.zh/docs/connectors/table/formats/overview.md:54:20": page 
not found
ERROR 2022/08/10 17:48:32 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/datastream/elasticsearch": 
"/XXX/docs/content.zh/docs/connectors/datastream/overview.md:43:20": page not 
found
ERROR 2022/08/10 17:48:32 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/elasticsearch": 
"/XXX/docs/content.zh/docs/connectors/table/overview.md:58:20": page not found
WARN 2022/08/10 17:48:32 Expand shortcode is deprecated. Use 'details' instead.
Built in 6415 ms
Error: Error building site: logged 6 error(s)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28771) Assign speculative execution attempt with correct CREATED timestamp

2022-08-01 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28771:
---

 Summary: Assign speculative execution attempt with correct CREATED 
timestamp
 Key: FLINK-28771
 URL: https://issues.apache.org/jira/browse/FLINK-28771
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.16.0


Currently, newly created speculative execution attempt is assigned with a wrong 
CREATED timestamp in SpeculativeScheduler. We need to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28759) Enable speculative execution for in AdaptiveBatchScheduler TPC-DS e2e tests

2022-08-01 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28759:
---

 Summary: Enable speculative execution for in 
AdaptiveBatchScheduler TPC-DS e2e tests
 Key: FLINK-28759
 URL: https://issues.apache.org/jira/browse/FLINK-28759
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Zhu Zhu
 Fix For: 1.16.0


To verify the correctness of speculative execution, we can enabled it in 
AdaptiveBatchScheduler TPC-DS e2e tests, which runs a lot of different batch 
jobs and verifies the result.
Note that we need to disable the blocklist (by setting block duration to 0) in 
such single machine e2e tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28612) Cancel pending slot allocation after canceling executions

2022-07-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28612:
---

 Summary: Cancel pending slot allocation after canceling executions
 Key: FLINK-28612
 URL: https://issues.apache.org/jira/browse/FLINK-28612
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Canceling pending slot allocation before canceling executions will result in 
execution failures  and pollute the logs. It will also result in an execution 
to be FAILED even if the execution vertex has FINISHED, which breaks the 
assumption of SpeculativeScheduler#isExecutionVertexPossibleToFinish().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28610) Enable speculative execution of sources

2022-07-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28610:
---

 Summary: Enable speculative execution of sources
 Key: FLINK-28610
 URL: https://issues.apache.org/jira/browse/FLINK-28610
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Currently speculative execution of sources is disabled. It can be enabled with 
the improvement done to support InputFormat sources and new sources to work 
correctly with speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28586) Speculative execution for new sources

2022-07-18 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28586:
---

 Summary: Speculative execution for new sources
 Key: FLINK-28586
 URL: https://issues.apache.org/jira/browse/FLINK-28586
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common, Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


This task enables new sources(FLIP-27) for speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28585) Speculative execution for InputFormat sources

2022-07-18 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28585:
---

 Summary: Speculative execution for InputFormat sources
 Key: FLINK-28585
 URL: https://issues.apache.org/jira/browse/FLINK-28585
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


This task enables InputFormat sources for speculative execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28402) Create FailureHandlingResultSnapshot with the truly failed execution

2022-07-05 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28402:
---

 Summary: Create FailureHandlingResultSnapshot with the truly 
failed execution
 Key: FLINK-28402
 URL: https://issues.apache.org/jira/browse/FLINK-28402
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Previously, FailureHandlingResultSnapshot was always created to treat the only 
current attempt of an execution vertex as the failed execution. This is no 
longer right in speculative execution cases, in which an execution vertex can 
have multiple current executions, and any of them may fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28258) Introduce ExecutionHistory to host historical executions for each execution vertex

2022-06-27 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28258:
---

 Summary: Introduce ExecutionHistory to host historical executions 
for each execution vertex
 Key: FLINK-28258
 URL: https://issues.apache.org/jira/browse/FLINK-28258
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


With speculative execution, tracking prior executions in an 
{{EvictingBoundedList}} does not work. This is because when using 
{{EvictingBoundedList}} relies on the assumption that the historical executions 
are added in ascending order of attempt number successively. This is no longer 
true if speculative execution is enabled. e.g. 3 speculative execution attempts 
#1, #2, #3 are running concurrently, later #3 failed, and then #1 failed, and 
execution attempt #2 keeps running.
The broken assumption may result in exceptions in REST, job archiving and so on.
We propose to introduce an {{ExecutionHistory}} to replace 
{{EvictingBoundedList}}. It hosts the historical executions in a 
{{LinkedHashMap}} with a size bound. When the map grows beyond the size bound, 
elements are dropped from the head of the map (FIFO order).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28193) Enable to identify whether a job vertex contains source/sink operators

2022-06-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28193:
---

 Summary: Enable to identify whether a job vertex contains 
source/sink operators
 Key: FLINK-28193
 URL: https://issues.apache.org/jira/browse/FLINK-28193
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Speculative execution does not support sources/sinks in the first version. 
Therefore, it will not create speculation instances for vertices which contains 
source/sink operators.

Note that a job vertex with no input/output does not mean it is a source/sink 
vertex. Multi-input sources can have input. And it's possible that the vertex 
with no output edge does not contain any sink operator. Besides that, a new 
sink with topology can spread the sink logic into multiple job vertices 
connected with job edges.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28139) Add documentation for speculative execution

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28139:
---

 Summary: Add documentation for speculative execution
 Key: FLINK-28139
 URL: https://issues.apache.org/jira/browse/FLINK-28139
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28138) Add metrics for speculative execution

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28138:
---

 Summary: Add metrics for speculative execution
 Key: FLINK-28138
 URL: https://issues.apache.org/jira/browse/FLINK-28138
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Zhu Zhu
 Fix For: 1.16.0


Following two metrics will be added to expose job problems and show the 
effectiveness of speculative execution:
 # {*}numSlowExecutionVertices{*}: Number of slow execution vertices at the 
moment.
 # {*}numEffectiveSpeculativeExecutions{*}: Number of speculative executions 
which finish before their corresponding original executions finish.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28137) Introduce SpeculativeScheduler

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28137:
---

 Summary: Introduce SpeculativeScheduler
 Key: FLINK-28137
 URL: https://issues.apache.org/jira/browse/FLINK-28137
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


A SpeculativeScheduler will be used if speculative execution is enabled. It 
extends AdaptiveBatchScheduler so that speculative execution can work along 
with the feature to adaptively tuning parallelisms for batch jobs.

The major differences of SpeculativeScheduler are:
 * SpeculativeScheduler needs to be able to directly deploy an Execution, while 
AdaptiveBatchScheduler can only perform ExecutionVertex level deployment.
 * SpeculativeScheduler does not restart the ExecutionVertex if an execution 
fails when any other current execution is still making progress
 * SpeculativeScheduler listens on slow tasks. Once there are slow tasks, it 
will block the slow nodes and deploy speculative executions of the slow tasks 
on other nodes.
 * Once any execution finishes, SpeculativeScheduler will cancel all the 
remaining executions of the same execution vertex.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28136) Implement ExecutionTimeBasedSlowTaskDetector

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28136:
---

 Summary: Implement ExecutionTimeBasedSlowTaskDetector
 Key: FLINK-28136
 URL: https://issues.apache.org/jira/browse/FLINK-28136
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


In the first version of speculative execution, an 
ExecutionTimeBasedSlowTaskDetector will be used to detect slow tasks. For 
ExecutionTimeBasedSlowTaskDetector, if a task's execution time is much longer 
than that of most tasks of the same JobVertex, the task will be identified as 
slow. More specifically, it will compute an execution time baseline for each 
JobVertex. Tasks which execute longer than or equals to the baseline will be 
identified as slow tasks.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28135) Introduce SlowTaskDetector

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28135:
---

 Summary: Introduce SlowTaskDetector
 Key: FLINK-28135
 URL: https://issues.apache.org/jira/browse/FLINK-28135
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhu Zhu
 Fix For: 1.16.0


A SlowTaskDetector will periodically check all the current tasks/executions and 
notify the SlowTaskDetectorListener about the detected slow tasks. 
SpeculativeScheduler will register itself as the SlowTaskDetectorListener.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28134) Introduce SpeculativeExecutionVertex

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28134:
---

 Summary: Introduce SpeculativeExecutionVertex
 Key: FLINK-28134
 URL: https://issues.apache.org/jira/browse/FLINK-28134
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


SpeculativeExecutionVertex will be used if speculative execution is enabled, as 
a replacement of ExecutionVertex to form an ExecutionGraph. The core difference 
is that a SpeculativeExecutionVertex can have multiple current executions 
running at the same time.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28133) Rework DefaultScheduler to directly deploy executions

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28133:
---

 Summary: Rework DefaultScheduler to directly deploy executions
 Key: FLINK-28133
 URL: https://issues.apache.org/jira/browse/FLINK-28133
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Currently, the DefaultScheduler(base of AdaptiveBatchScheduler) can only 
perform ExecutionVertex level deployment. However, in this case, the scheduler 
is actually deploying the current execution attempt of the ExecutionVertex.

Therefore, we need to rework the DefaultScheduler to directly deploy executions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28131) FLIP-168: Speculative Execution for Batch Job

2022-06-20 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-28131:
---

 Summary: FLIP-168: Speculative Execution for Batch Job
 Key: FLINK-28131
 URL: https://issues.apache.org/jira/browse/FLINK-28131
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.16.0


Speculative executions is helpful to mitigate slow tasks caused by problematic 
nodes. The basic idea is to start mirror tasks on other nodes when a slow task 
is detected. The mirror task processes the same input data and produces the 
same data as the original task. 

More detailed can be found in 
[FLIP-168|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job].]

 

This is the umbrella ticket to track all the changes of this feature.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27944) IO metric collision happens when a task has union inputs

2022-06-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-27944:
---

 Summary: IO metric collision happens when a task has union inputs
 Key: FLINK-27944
 URL: https://issues.apache.org/jira/browse/FLINK-27944
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.15.0
Reporter: Zhu Zhu
 Fix For: 1.16.0


When a task has union inputs, some IO metrics(numBytesIn* and numBuffersIn*) of 
the different inputs may collide and failed to be registered.

 

The problem can be reproduced with a simple job like:
{code:java}
DataStream source1 = env.fromElements("abc");
DataStream source2 = env.fromElements("123");
source1.union(source2).print();{code}
 

Logs of collisions:
{code:java}
2022-06-08 00:59:01,629 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the name 
'numBytesInLocal'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,629 WARN  
org.apache.flink.metrics.MetricGroup [] - Name 
collision: Group already contains a Metric with the name 
'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,629 WARN  
org.apache.flink.metrics.MetricGroup [] - Name 
collision: Group already contains a Metric with the name 'numBytesInLocal'. 
Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 
0]2022-06-08 00:59:01,629 WARN  org.apache.flink.metrics.MetricGroup
 [] - Name collision: Group already contains a Metric with the name 
'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 
0]2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
 [] - Name collision: Group already contains a Metric with the name 
'numBytesInRemote'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN  
org.apache.flink.metrics.MetricGroup [] - Name 
collision: Group already contains a Metric with the name 
'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN  
org.apache.flink.metrics.MetricGroup [] - Name 
collision: Group already contains a Metric with the name 'numBytesInRemote'. 
Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 
0]2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
 [] - Name collision: Group already contains a Metric with the name 
'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 
0]2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
 [] - Name collision: Group already contains a Metric with the name 
'numBuffersInLocal'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN  
org.apache.flink.metrics.MetricGroup [] - Name 
collision: Group already contains a Metric with the name 
'numBuffersInLocalPerSecond'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN  
org.apache.flink.metrics.MetricGroup [] - Name 
collision: Group already contains a Metric with the name 'numBuffersInLocal'. 
Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 
0]2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
 [] - Name collision: Group already contains a Metric with the name 
'numBuffersInLocalPerSecond'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 
0]2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
 [] - Name collision: Group already contains a Metric with the name 
'numBuffersInRemote'. Metric will not be reported.[, taskmanager, 
fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
Shuffle, Netty, Input]2022-06-08 00:59:01,630 WARN  

[jira] [Created] (FLINK-27710) Improve logs to better display Execution

2022-05-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-27710:
---

 Summary: Improve logs to better display Execution
 Key: FLINK-27710
 URL: https://issues.apache.org/jira/browse/FLINK-27710
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Task
Affects Versions: 1.16.0
Reporter: Zhu Zhu
 Fix For: 1.16.0


Currently, an execution is usually represented as "{{{}job vertex name{}}} 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in 
logs, which may be redundant after this refactoring work. With the change of 
FLINK-17295, the representation of Execution in logs will be redundant. e.g. 
the subtask index is displayed 2 times.

Therefore, I'm proposing to change the format to be "{{{}job vertex name{}}} 
({{{}short ExecutionGraphID{}}}:{{{}JobVertexID{}}}) 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}#attemptNumber{}}})" 
and avoid directly display the {{{}ExecutionAttemptID{}}}. This can increase 
the log readability.

Besides that, the displayed {{JobVertexID}} can also help to distinguish job 
vertices of the same name, which is common in DataStream jobs (e.g. multiple 
{{{}Map{}}}).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27460) Remove unused notifyPartitionDataAvailable process

2022-04-30 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-27460:
---

 Summary: Remove unused notifyPartitionDataAvailable process
 Key: FLINK-27460
 URL: https://issues.apache.org/jira/browse/FLINK-27460
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Zhu Zhu
 Fix For: 1.16.0


The `notifyPartitionDataAvailable` process was used to trigger downstream task 
scheduling once a pipelined partition has data produced at TM side. It is no 
longer used. Therefore I propose to remove it to cleanup the code base.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-25996) Introduce job property isDynamicGraph to ExecutionConfig

2022-02-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25996:
---

 Summary: Introduce job property isDynamicGraph to ExecutionConfig
 Key: FLINK-25996
 URL: https://issues.apache.org/jira/browse/FLINK-25996
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.15.0


To enable FLINK-25995 and FLINK-25046 only in dynamic graph scenario, we need a 
property ExecutionConfig#isDynamicGraph. In the first step, the property will 
be decided automatically, true iff config {{jobmanager.scheduler}} is 
{{AdaptiveBatch}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25995) Make implicit assumption of SQL local keyBy/groupBy explicit

2022-02-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25995:
---

 Summary: Make implicit assumption of SQL local keyBy/groupBy 
explicit
 Key: FLINK-25995
 URL: https://issues.apache.org/jira/browse/FLINK-25995
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Zhu Zhu
 Fix For: 1.15.0


If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them except the first one to use forward partitioner, so that these 
operators can be chained to reduce unnecessary shuffles.
However, sometimes the local keyBy operators are not chained (e.g. multiple 
inputs), and  this kind of forward partitioners will turn into forward job 
edges. These forward edges still have the local keyBy assumption, so that they 
cannot be changed into rescale/rebalance edges, otherwise it can lead to 
incorrect results. This prevents the adaptive batch scheduler from determining 
parallelism for other forward edge downstream job vertices (see FLINK-25046).
To solve it, I propose to introduce a new {{ForwardForRescalePartitioner}}. 
When SQL planner optimizes the case of multiple consecutive the same groupBy, 
it should use the proposed partitioner, so that the runtime framework can 
further decide whether the partitioner can be changed to rescale or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25226) Add documentation about the AdaptiveBatchScheduler

2021-12-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25226:
---

 Summary: Add documentation about the AdaptiveBatchScheduler
 Key: FLINK-25226
 URL: https://issues.apache.org/jira/browse/FLINK-25226
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Zhu Zhu
 Fix For: 1.15.0


Documentation is needed to explain to users how to enable the 
AdaptiveBatchScheduler and properly configuring it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25225) Add e2e TPCDS tests to run against the AdatpiveBatchScheduler

2021-12-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-25225:
---

 Summary: Add e2e TPCDS tests to run against the 
AdatpiveBatchScheduler
 Key: FLINK-25225
 URL: https://issues.apache.org/jira/browse/FLINK-25225
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.15.0


To automatically and continuously verify the AdatpiveBatchScheduler, we should 
add a new e2e test which runs TPCDS against the AdatpiveBatchScheduler.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24005) Resource requirements declaration may be incorrect if JobMaster disconnects with a TaskManager with available slots in the SlotPool

2021-08-26 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-24005:
---

 Summary: Resource requirements declaration may be incorrect if 
JobMaster disconnects with a TaskManager with available slots in the SlotPool
 Key: FLINK-24005
 URL: https://issues.apache.org/jira/browse/FLINK-24005
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2, 1.12.5, 1.14.0
Reporter: Zhu Zhu


When a TaskManager disconnects with JobMaster, it will trigger the 
`DeclarativeSlotPoolService#decreaseResourceRequirementsBy()` for all the slots 
that are registered to the JobMaster from the TaskManager. If the slots are 
still available, i.e. not assigned to any task,  the 
`decreaseResourceRequirementsBy` may lead to incorrect resource requirements 
declaration.

For example, there is one job with 3 source tasks only. It requires 3 slots and 
declares for 3 slots. Initially all the tasks are running. Suddenly one task 
failed and waits for some delay before restarting. The previous slot is 
returned to the SlotPool. Now the job requires 2 slots and declares for 2 
slots. At this moment, the TaskManager of that returned slot get lost. After 
the triggered `decreaseResourceRequirementsBy`, the job only declares for 1 
slot. Finally, when the failed task starts to re-schedule, the job will declare 
for 2 slots while it actually needs 3 slots.

The attached log of a real job and logs of the added test in 
https://github.com/zhuzhurk/flink/commit/59ca0ac5fa9c77b97c6e8a43dcc53ca8a0ad6c37
 can demonstrate this case.
Note that the real job is configured with a large 
"restart-strategy.fixed-delay.delay" and and large "slot.idle.timeout". So 
possibly in production it is a rare case.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23826) Verify optimized scheduler performance for large-scale jobs

2021-08-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-23826:
---

 Summary: Verify optimized scheduler performance for large-scale 
jobs
 Key: FLINK-23826
 URL: https://issues.apache.org/jira/browse/FLINK-23826
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhu Zhu
 Fix For: 1.14.0


This ticket is used to verify the result of FLINK-21110.
It should check if large scale jobs' scheduling are working well and the 
scheduling performance, with a real job running on cluster. 

The conclusion should include:
1. time of job initialization on master (job received -> scheduling started)
2. time of job scheduling and deployment (scheduling started -> all tasks in 
INITIALIZATION)
3. time of job restarting on task failover (JM notified about task failure -> 
all tasks in INITIALIZATION again)
4. master heap memory required for large scale jobs





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23806) StackOverflowException can happen if a large scale job failed to acquire enough slots in time

2021-08-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-23806:
---

 Summary: StackOverflowException can happen if a large scale job 
failed to acquire enough slots in time
 Key: FLINK-23806
 URL: https://issues.apache.org/jira/browse/FLINK-23806
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2, 1.12.5
Reporter: Zhu Zhu
 Fix For: 1.14.0, 1.12.6, 1.13.3


When requested slots are not fulfilled in time, task failure will be triggered 
and all related tasks will be canceled and restarted. However, in this process, 
if a task is already assigned a slot, the slot will be returned to the slot 
pool and it will be immediately used to fulfill pending slot requests of the 
tasks which will soon be canceled. The execution version of those tasks are 
already bumped in {{DefaultScheduler#restartTasksWithDelay(...)}} so that the 
assignment will fail immediately and the slot will be returned to the slot pool 
and again used to fulfill pending slot requests. StackOverflow can happen in 
this way when there are many vertices, and fatal error can happen and lead to 
JM will crash. A sample call stack is attached below.

To fix the problem, one way is to cancel the pending requests of all the tasks 
which will be canceled soon(i.e. tasks with version bumped) before canceling 
these tasks.

{panel}
...
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.removeLogicalSlotRequest(SharedSlot.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:542)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:505)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0.7-SNAPSHOT.jar:1.13-vvr-4.0.7-SNAPSHOT]
at 

[jira] [Created] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELED/FAILED

2021-06-09 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-22945:
---

 Summary: StackOverflowException can happen when a large scale job 
is CANCELED/FAILED
 Key: FLINK-22945
 URL: https://issues.apache.org/jira/browse/FLINK-22945
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.12.4, 1.13.1
Reporter: Zhu Zhu


The pending requests in ExecutionSlotAllocator are not cleared when a job 
transitions to CANCELING or FAILING, while all vertices will be canceled and 
assigned slot will be returned. The returned slot is possible to be used to 
fulfill the pending request of a CANCELED vertex and the assignment will fail 
immediately and the slot will be returned and used to fulfilled another vertex 
in a recursive way. StackOverflow can happen in this way when there are many 
vertices. A sample call stack is attached below.
To fix this problem, we should clear the pending requests in 
ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think 
it's better to improve the call stack of slot assignment to avoid similar 
StackOverflowException to occur.


...
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 

[jira] [Created] (FLINK-21735) Harden JobMaster#updateTaskExecutionState()

2021-03-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-21735:
---

 Summary: Harden JobMaster#updateTaskExecutionState()
 Key: FLINK-21735
 URL: https://issues.apache.org/jira/browse/FLINK-21735
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhu Zhu


Currently JobMaster#updateTaskExecutionState() is very important to trigger 
task scheduling and recovery. We do not expect any exception to happen directly 
in its invocation, so I'd propose to wrap it with a try-catch to fail the 
JobMaster once any exception is caught. This can better expose the unexpected 
bugs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21734) Allow BLOCKING result partition to be individually consumable

2021-03-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-21734:
---

 Summary: Allow BLOCKING result partition to be individually 
consumable
 Key: FLINK-21734
 URL: https://issues.apache.org/jira/browse/FLINK-21734
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhu Zhu


Currently a BLOCKING result partition is consumable only when all the 
partitions of the corresponding IntermediateResult become consumable. This is 
an unnecessary complication and has caused some problems (FLINK-21707). 
Therefore I propose to simplify it to let BLOCKING result partitions be 
consumable individually. 
Note that this will result in the scheduling to become execution-vertex-wise 
instead of stage-wise, with a nice side effect towards better resource 
utilization. The PipelinedRegionSchedulingStrategy can be simplified along with 
change to get rid of the {{correlatedResultPartitions}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21707) Job is possible to hang when restarting a FINISHED task with POINTWISE BLOCKING consumers

2021-03-09 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-21707:
---

 Summary: Job is possible to hang when restarting a FINISHED task 
with POINTWISE BLOCKING consumers
 Key: FLINK-21707
 URL: https://issues.apache.org/jira/browse/FLINK-21707
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.2, 1.11.3, 1.13.0
Reporter: Zhu Zhu


Job is possible to hang when restarting a FINISHED task with POINTWISE BLOCKING 
consumers. This is because 
{{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} will try to 
schedule all the consumer tasks/regions of the finished *ExecutionJobVertex*, 
even though the regions are not the exact consumers of the finished 
*ExecutionVertex*. In this case, some of the regions can be in state other than 
CREATED because they are not connected to and affected by the restarted tasks. 
However, {{PipelinedRegionSchedulingStrategy#maybeScheduleRegion()}} does not 
allow to schedule a non-CREATED region and will throw an Exception and breaks 
the scheduling of all the other regions. One example to show this problem case 
can be found at 
[PipelinedRegionSchedulingITCase#testRecoverFromPartitionException 
|https://github.com/zhuzhurk/flink/commit/1eb036b6566c5cb4958d9957ba84dc78ce62a08c].

To fix the problem, we can add a filter in 
{{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} to only trigger 
the scheduling of regions in CREATED state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21576) Remove ExecutionVertex#getPreferredLocations

2021-03-02 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-21576:
---

 Summary: Remove ExecutionVertex#getPreferredLocations
 Key: FLINK-21576
 URL: https://issues.apache.org/jira/browse/FLINK-21576
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhu Zhu
 Fix For: 1.13.0


{{ExecutionVertex#getPreferredLocations()}} is superseded by 
{{DefaultPreferredLocationsRetriever}} and is no longer used. Hence, we can 
remove it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20931) Remove globalModVersion from ExecutionGraph

2021-01-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20931:
---

 Summary: Remove globalModVersion from ExecutionGraph
 Key: FLINK-20931
 URL: https://issues.apache.org/jira/browse/FLINK-20931
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhu Zhu
 Fix For: 1.13.0


{{ExecutionGraph#globalModVersion}} is no longer used after legacy scheduler is 
removed.
{{ExecutionVertexVersioner}} is used instead for concurrent global-local and 
local-local failures' handling.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20930) Remove AbstractExecutionSlotAllocator

2021-01-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20930:
---

 Summary: Remove AbstractExecutionSlotAllocator
 Key: FLINK-20930
 URL: https://issues.apache.org/jira/browse/FLINK-20930
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhu Zhu
 Fix For: 1.13.0


{{AbstractExecutionSlotAllocator}} is no longer used after 
{{DefaultExecutionSlotAllocator}} was removed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20721) Remove unknown input channels and process to update partitions

2020-12-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20721:
---

 Summary: Remove unknown input channels and process to update 
partitions
 Key: FLINK-20721
 URL: https://issues.apache.org/jira/browse/FLINK-20721
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Affects Versions: 1.13.0
Reporter: Zhu Zhu
 Fix For: 1.13.0


With the latest pipelined region scheduling, Flink no longer launch a task 
before knowing the locations of all the partitions it consumes. 
`scheduleOrUpdateConsumers` is no longer needed so we removed it in FLINK-20439.

Unknown input channels and the process to update it is also no longer needed. 
I'd propose to remove them and the benefits are:
1. simplifying the code of both scheduler and shuffle components
2. simplifying interfaces of ShuffleEnvironment and ShuffleDescriptor 
3. ensure the assumptions in InputGate#resumeConsumption() implementations
4. allow to remove ScheduleMode#allowLazyDeployment() and later completely 
remove ScheduleMode





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20626) Canceling a job when it is failing will result in job hanging in CANCELING state

2020-12-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20626:
---

 Summary: Canceling a job when it is failing will result in job 
hanging in CANCELING state
 Key: FLINK-20626
 URL: https://issues.apache.org/jira/browse/FLINK-20626
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.2, 1.12.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.13.0, 1.11.4, 1.12.1


If user manually cancels a job when the job is failing(here failing means the 
job encounters unrecoverable failure and is about to fail),  the job will hang 
in CANCELING state and cannot terminate. The cause is that DefaultScheduler 
currently will always try to transition from `FAILING` to `FAILED` to terminate 
the job. However, job canceling will change job status to `CANCELING` so that 
the transition to `FAILED` will not success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20619) Remove InputDependencyConstraint and InputDependencyConstraintChecker

2020-12-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20619:
---

 Summary: Remove InputDependencyConstraint and 
InputDependencyConstraintChecker
 Key: FLINK-20619
 URL: https://issues.apache.org/jira/browse/FLINK-20619
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhu Zhu
 Fix For: 1.13.0


InputDependencyConstraint was used by legacy scheduler and lazy-from-sources 
scheduling strategy. It is not needed anymore since both legacy scheduler and 
lazy-from-sources are removed. Hence we can remove it, as well as 
InputDependencyConstraintChecker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20439) Consider simplifying or removing mechanism to scheduleOrUpdateConsumers

2020-12-01 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20439:
---

 Summary: Consider simplifying or removing mechanism to 
scheduleOrUpdateConsumers
 Key: FLINK-20439
 URL: https://issues.apache.org/jira/browse/FLINK-20439
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.13.0


Execution#scheduleOrUpdateConsumers() was used for multiple purposes:
- schedule a vertex when its PIPELINED inputs have produced data 
- schedule a vertex when its BLOCKING inputs have finished
- update consumed partition info to RUNNING consumers
- cache consumed partition info for DEPLOYING consumers

It is not needed anymore in the latest pipelined region scheduling because 
- a region will be scheduled only when all its upstream regions have finished
- a vertex will always know all its consumed partitions when scheduled

So we can consider how to simply or remove it, which also involves things like 
UnknownInputChannel, ResultPartitionConsumableNotifier, 
Execution#cachePartitionInfo(), etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20285) LazyFromSourcesSchedulingStrategy is possible to schedule non-CREATED vertices

2020-11-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20285:
---

 Summary: LazyFromSourcesSchedulingStrategy is possible to schedule 
non-CREATED vertices
 Key: FLINK-20285
 URL: https://issues.apache.org/jira/browse/FLINK-20285
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.12.0, 1.11.3


LazyFromSourcesSchedulingStrategy is possible to schedule vertices which are 
not in CREATED state. This will lead result in unexpected check failure and 
result in fatal error[1].

The reason is that the status of a vertex to schedule was changed in 
LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices() 
during the invocation of schedulerOperations.allocateSlotsAndDeploy(...) on 
other vertices.

e.g. ev1 and ev2 are in the same pipelined region and are restarted one by one 
in the scheduling loop in 
LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices(). 
They are all CREATED at the moment. ev1 is scheduled first but it immediately 
fails due to some slot allocation error and ev2 will be canceled as a result. 
So when ev2 is scheduled, its state would be CANCELED and the state check 
failed.

[1]
{code:java}
2020-11-19 13:34:17,231 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: Thread 
'flink-akka.actor.default-dispatcher-15' produced an uncaught exception. 
Stopping the process...
java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: 
CANCELED
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_222]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 ~[?:1.8.0_222]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708) 
~[?:1.8.0_222]
at 
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
 ~[?:1.8.0_222]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 ~[?:1.8.0_222]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: java.lang.IllegalStateException: expected vertex 
aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED
at 

[jira] [Created] (FLINK-20284) Error happens in TaskExecutor when closing JobMaster connection if there was a python UDF

2020-11-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20284:
---

 Summary: Error happens in TaskExecutor when closing JobMaster 
connection if there was a python UDF
 Key: FLINK-20284
 URL: https://issues.apache.org/jira/browse/FLINK-20284
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


When a TaskExecutor successfully finished running a python UDF task and 
disconnecting from JobMaster, errors below will happen. This error, however, 
seems not affect job execution at the moment.

{code:java}
2020-11-20 17:05:21,932 INFO  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - 1 Beam Fn 
Logging clients still connected during shutdown.
2020-11-20 17:05:21,938 WARN  
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer[] - Hanged up for 
unknown endpoint.
2020-11-20 17:05:22,126 INFO  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: Custom Source -> select: (f0) -> select: (add_one(f0) 
AS a) -> to: Tuple2 -> Sink: Streaming select table sink (1/1)#0 
(b0c2104dd8f87bb1caf0c83586c22a51) switched from RUNNING to FINISHED.
2020-11-20 17:05:22,126 INFO  org.apache.flink.runtime.taskmanager.Task 
   [] - Freeing task resources for Source: Custom Source -> select: 
(f0) -> select: (add_one(f0) AS a) -> to: Tuple2 -> Sink: Streaming select 
table sink (1/1)#0 (b0c2104dd8f87bb1caf0c83586c22a51).
2020-11-20 17:05:22,128 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: Custom Source -> select: (f0) -> select: (add_one(f0) AS a) -> 
to: Tuple2 -> Sink: Streaming select table sink (1/1)#0 
b0c2104dd8f87bb1caf0c83586c22a51.
2020-11-20 17:05:22,156 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=1., taskHeapMemory=384.000mb 
(402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb 
(536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 
b67c3307dcf93757adfb4f0f9f7b8c7b, jobId: d05f32162f38ec3ec813c4621bc106d9).
2020-11-20 17:05:22,157 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
d05f32162f38ec3ec813c4621bc106d9 from job leader monitoring.
2020-11-20 17:05:22,157 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
JobManager connection for job d05f32162f38ec3ec813c4621bc106d9.
2020-11-20 17:05:23,064 ERROR 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution
 [] - Failed to submit a listener notification task. Event loop shut down?
java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227)
 
~[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215)
 
~[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
 
[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498)
 
[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
 
[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
 
[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
 
[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089)
 
[blob_p-bd7a5d615512eb8a2e856e7c1630a0c22fca7cf3-ff27946fda7e2b8cb24ea56d505b689e:1.12-SNAPSHOT]
at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 

[jira] [Created] (FLINK-20283) Make invalid managed memory fraction errors of python udf more user friendly

2020-11-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20283:
---

 Summary: Make invalid managed memory fraction errors of python udf 
more user friendly
 Key: FLINK-20283
 URL: https://issues.apache.org/jira/browse/FLINK-20283
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


When managed memory is required for python udf but its 
"taskmanager.memory.managed.consumer-weights" is set to 0, error will happen 
but the message is hard to understand for users, see [1].

I think we should expose the invalid fraction error to users in this case and 
guide users to properly configure "taskmanager.memory.managed.consumer-weights".

[1] org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:126)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:254)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:121)
at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:134)
at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:94)
at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator.open(AbstractRowPythonScalarFunctionOperator.java:67)
at 
org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:64)
at 

[jira] [Created] (FLINK-20282) Make invalid managed memory fraction errors more advisory in MemoryManager

2020-11-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20282:
---

 Summary: Make invalid managed memory fraction errors more advisory 
in MemoryManager
 Key: FLINK-20282
 URL: https://issues.apache.org/jira/browse/FLINK-20282
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


The invalid managed memory fraction errors[1] reported from MemoryManager are 
not advisory for users to solve the problem. This error happens when managed 
memory is required for a use case but its weight is 0.
I think it would be better to enrich the error message to guide users to 
properly configure "taskmanager.memory.managed.consumer-weights".

[1] "Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
allocate must within (0, 1], was: 0.0"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19994) All vertices in an DataSet iteration job will be eagerly scheduled

2020-11-05 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19994:
---

 Summary: All vertices in an DataSet iteration job will be eagerly 
scheduled
 Key: FLINK-19994
 URL: https://issues.apache.org/jira/browse/FLINK-19994
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


After switching to pipelined region scheduling, all vertices in an DataSet 
iteration job will be eagerly scheduled, which means BLOCKING result consumers 
can be deployed even before the result finishes and resource waste happens. 
This is because all vertices will be put into one pipelined region if the job 
contains {{ColocationConstraint}}, see 
[PipelinedRegionComputeUtil|https://github.com/apache/flink/blob/c0f382f5f0072441ef8933f6993f1c34168004d6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java#L52].

IIUC, this {{makeAllOneRegion()}} behavior was introduced to ensure co-located 
iteration head and tail to be restarted together in pipelined region failover. 
However, given that edges within an iteration will always be PIPELINED 
([ref|https://github.com/apache/flink/blob/0523ef6451a93da450c6bdf5dd4757c3702f3962/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java#L1188]),
 co-located iteration head and tail will always be in the same region. So I 
think we can drop the {{PipelinedRegionComputeUtil#makeAllOneRegion()}} code 
path and build regions in the the same way no matter if there is co-location 
constraints or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19922) Remove SlotProvider from SchedulerNG

2020-11-02 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19922:
---

 Summary: Remove SlotProvider from SchedulerNG
 Key: FLINK-19922
 URL: https://issues.apache.org/jira/browse/FLINK-19922
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


SchedulerNG now uses ExecutionSlotAllocator to allocate slots. SlotProvider is 
no longer directly needed by SchedulerNG and ExecutionGraph. Currently a 
ThrowingSlotProvider is used as a SlotProvider placeholder in SchedulerNG and 
ExecutionGraph.
After legacy scheduling is removed in FLINK-19919, the placeholder for 
SlotProvider is not needed anymore and thus can be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19921) Remove legacy RestartStrategy

2020-11-02 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19921:
---

 Summary: Remove legacy RestartStrategy
 Key: FLINK-19921
 URL: https://issues.apache.org/jira/browse/FLINK-19921
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


The legacy {{org.apache.flink.runtime.executiongraph.restart.RestartStrategy}} 
is no longer in use since FLINK-19919 and thus can be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19920) Remove legacy FailoverStrategy

2020-11-02 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19920:
---

 Summary: Remove legacy FailoverStrategy
 Key: FLINK-19920
 URL: https://issues.apache.org/jira/browse/FLINK-19920
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


The legacy 
{{org.apache.flink.runtime.executiongraph.failover.FailoverStrategy}} is no 
longer in use since FLINK-19919 and thus can be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19919) Remove legacy scheduling in ExecutionGraph components

2020-11-02 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19919:
---

 Summary: Remove legacy scheduling in ExecutionGraph components
 Key: FLINK-19919
 URL: https://issues.apache.org/jira/browse/FLINK-19919
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


This is one step towards making ExecutionGraph a pure data structure.
Note that this task mainly targets to remove the legacy codes of scheduling and 
failover. Codes of Execution state transition and task deployment will be 
factored out in follow-up tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19806) Job may try to leave SUSPENDED state in ExecutionGraph#failJob()

2020-10-26 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19806:
---

 Summary: Job may try to leave SUSPENDED state in 
ExecutionGraph#failJob()
 Key: FLINK-19806
 URL: https://issues.apache.org/jira/browse/FLINK-19806
 Project: Flink
  Issue Type: Bug
Reporter: Zhu Zhu
Assignee: Zhu Zhu


{{SUSPENDED}} is a terminal state which a job is not supposed to leave this 
state once entering. However, {{ExecutionGraph#failJob()}} did not check it and 
may try to transition a job out from {{SUSPENDED}} state. This will cause 
unexpected errors and may lead to JM crash.
The problem can be visible if we rework {{ExecutionGraphSuspendTest}} to be 
based on {{DefaultScheduler}}.
We should harden the check in {{ExecutionGraph#failJob()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19714) Assign different logical pipelined regions of a DataSet job to different slot sharing group

2020-10-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19714:
---

 Summary: Assign different logical pipelined regions of a DataSet 
job to different slot sharing group
 Key: FLINK-19714
 URL: https://issues.apache.org/jira/browse/FLINK-19714
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


Currently, job vertices of a DataSet job will be put into the same 
SlotSharingGroup. It does not make sense because producer and consumer 
pipelined regions will not be able to share slots.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19712) Pipelined region failover should skip restarting tasks in CREATED

2020-10-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19712:
---

 Summary: Pipelined region failover should skip restarting tasks in 
CREATED
 Key: FLINK-19712
 URL: https://issues.apache.org/jira/browse/FLINK-19712
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


When a task fails and it is pipelined region failover strategy, all tasks in 
the region of the failed task and in the downstream regions will be canceled 
for later re-scheduling. However, these tasks can be still in CREATED state so 
that there is no need to cancel these tasks. Skipping canceling these tasks can 
speed up the failover and reduce a lot of unnecessary CANCELING logs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19703) A result partition is not untracked after its producer task failed in TaskManager

2020-10-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19703:
---

 Summary: A result partition is not untracked after its producer 
task failed in TaskManager
 Key: FLINK-19703
 URL: https://issues.apache.org/jira/browse/FLINK-19703
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.2, 1.10.2, 1.12.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0


{{Execution#maybeReleasePartitionsAndSendCancelRpcCall(...)}} will be not 
invoked when a task is reported to be failed in TaskManager, which results in 
its partitions to still be tacked by the job manager partition tracker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19598) E2E test Kerberized YARN application on Docker test (default input) fails on CI

2020-10-13 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19598:
---

 Summary: E2E test Kerberized YARN application on Docker test 
(default input) fails on CI
 Key: FLINK-19598
 URL: https://issues.apache.org/jira/browse/FLINK-19598
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines
Affects Versions: 1.12.0
Reporter: Zhu Zhu


The root cause might be that the HDFS NameNode failed to start:

{code:java}
2020-10-12T14:03:45.7653530Z 20/10/12 13:57:51 ERROR namenode.NameNode: Failed 
to start namenode.
2020-10-12T14:03:45.7653932Z java.net.BindException: Port in use: 0.0.0.0:50470
2020-10-12T14:03:45.7654346Zat 
org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:998)
2020-10-12T14:03:45.7655069Zat 
org.apache.hadoop.http.HttpServer2.start(HttpServer2.java:935)
2020-10-12T14:03:45.7655596Zat 
org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.start(NameNodeHttpServer.java:171)
2020-10-12T14:03:45.7656143Zat 
org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:842)
2020-10-12T14:03:45.7656665Zat 
org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:693)
2020-10-12T14:03:45.7657159Zat 
org.apache.hadoop.hdfs.server.namenode.NameNode.(NameNode.java:906)
2020-10-12T14:03:45.7657651Zat 
org.apache.hadoop.hdfs.server.namenode.NameNode.(NameNode.java:885)
2020-10-12T14:03:45.7658151Zat 
org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1626)
2020-10-12T14:03:45.7658661Zat 
org.apache.hadoop.hdfs.server.namenode.NameNode.main(NameNode.java:1694)
2020-10-12T14:03:45.7659086Z Caused by: java.net.BindException: Address already 
in use
2020-10-12T14:03:45.7659418Zat sun.nio.ch.Net.bind0(Native Method)
2020-10-12T14:03:45.7659700Zat sun.nio.ch.Net.bind(Net.java:433)
2020-10-12T14:03:45.7660012Zat sun.nio.ch.Net.bind(Net.java:425)
2020-10-12T14:03:45.7660397Zat 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:220)
2020-10-12T14:03:45.7660872Zat 
sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
2020-10-12T14:03:45.7661348Zat 
org.mortbay.jetty.nio.SelectChannelConnector.open(SelectChannelConnector.java:216)
2020-10-12T14:03:45.7661954Zat 
org.apache.hadoop.http.HttpServer2.openListeners(HttpServer2.java:993)
2020-10-12T14:03:45.7662289Z... 8 more
{code}

https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/7436/logs/127



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19570) Execution graph related tests are possibly broken due to registering duplicated ExecutionAttemptID

2020-10-12 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19570:
---

 Summary: Execution graph related tests are possibly broken due to 
registering duplicated ExecutionAttemptID
 Key: FLINK-19570
 URL: https://issues.apache.org/jira/browse/FLINK-19570
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Tests
Affects Versions: 1.12.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0


Since FLINK-17295, many tests encounters unexpected global failure due to 
registering duplicated ExecutionAttemptID. Although these tests do not appear 
to be broken yet, they are potentially broken/unstable. And it further blocks 
to rework these tests to be based on the new scheduer (FLINK-17760). 

Below is a sample error which happens in 
ExecutionTest#testAllPreferredLocationCalculation():

{code:java}
java.lang.Exception: Trying to register execution Attempt #0 (TestVertex (1/1)) 
@ (unassigned) - [CREATED] for already used ID 
a22afb832b5f94b075d7ffb32fbc9023_146968a4de2df0b2fef1e4b2e8297993_0_0
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.registerExecution(ExecutionGraph.java:1621)
 [classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.(ExecutionVertex.java:181)
 [classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:211)
 [classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:139)
 [classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:448)
 [test-classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:419)
 [test-classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:411)
 [test-classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionJobVertex(ExecutionGraphTestUtils.java:452)
 [test-classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecution(ExecutionGraphTestUtils.java:477)
 [test-classes/:?]
at 
org.apache.flink.runtime.executiongraph.ExecutionTest.testAllPreferredLocationCalculation(ExecutionTest.java:298)
 [test-classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_261]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_261]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_261]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_261]
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 [junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) 
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
[junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
[junit-4.12.jar:4.12]
at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
[junit-4.12.jar:4.12]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
[junit-4.12.jar:4.12]
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
 [junit-rt.jar:?]
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 [junit-rt.jar:?]
at 

[jira] [Created] (FLINK-19287) Fix minor version in flink docs

2020-09-18 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19287:
---

 Summary: Fix minor version in flink docs
 Key: FLINK-19287
 URL: https://issues.apache.org/jira/browse/FLINK-19287
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.11.2, 1.10.2
Reporter: Zhu Zhu
Assignee: Zhu Zhu


The minor version in docs of 1.10/1.11 still points to 1.10.0/1.11.0, which can 
mislead users to use old version artifacts.
It should be corrected to 1.10.2/1.11.2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19286) Improve pipelined region scheduling performance

2020-09-17 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19286:
---

 Summary: Improve pipelined region scheduling performance
 Key: FLINK-19286
 URL: https://issues.apache.org/jira/browse/FLINK-19286
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0


In my recent TPCDS benchmark, pipelined region scheduling is slower than 
lazy-from-sources scheduling. 
The regression is due to some suboptimal implementation of 
{{PipelinedRegionSchedulingStrategy}}, including:
1. topologically sorting of vertices to deploy
2. unnecessary O(V) loop when sorting an empty set of regions

After improving these implementations, pipelined region scheduling turned to be 
10% faster in the previous benchmark setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19189) Enable pipelined scheduling by default

2020-09-10 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-19189:
---

 Summary: Enable pipelined scheduling by default
 Key: FLINK-19189
 URL: https://issues.apache.org/jira/browse/FLINK-19189
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.12.0


This task is to enable pipelined region scheduling by default, via setting the 
default value of  config option "jobmanager.scheduler.scheduling-strategy" to 
"region".

Here are the required verifications before we can make this change:
1. CI, including all UT/IT cases and E2E tests
2. stability tests
3. TPC-DS benchmark in 1T/10T data scale



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18972) Unfulfillable slot requests of Blink planner batch jobs never timeout

2020-08-17 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18972:
---

 Summary: Unfulfillable slot requests of Blink planner batch jobs 
never timeout
 Key: FLINK-18972
 URL: https://issues.apache.org/jira/browse/FLINK-18972
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.1
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0, 1.11.2


The unfulfillability check of batch slot requests are unexpectedly disabled in 
{{SchedulerImpl#start() -> BulkSlotProviderImpl#start()}}.
This means slot allocation timeout will not be triggered if a Blink planner 
batch job cannot obtain any slot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18496) Anchors are not generated based on ZH characters

2020-07-06 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18496:
---

 Summary: Anchors are not generated based on ZH characters
 Key: FLINK-18496
 URL: https://issues.apache.org/jira/browse/FLINK-18496
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Zhu Zhu


In ZH version pages of flink-web, the anchors are not generated based on ZH 
characters. The anchor name would be like 'section-1', 'section-2' if there is 
no EN characters. An example can be found at 
https://flink.apache.org/zh/contributing/contribute-code.html

This makes it impossible to ref an anchor from the content because the anchor 
name can change automatically if a new section is added.

Note that it is a problem for flink-web only and the docs generated from the 
flink repo can properly generate ZH anchors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18372) NullPointerException can happen in SlotPoolImpl#maybeRemapOrphanedAllocation

2020-06-18 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18372:
---

 Summary: NullPointerException can happen in 
SlotPoolImpl#maybeRemapOrphanedAllocation
 Key: FLINK-18372
 URL: https://issues.apache.org/jira/browse/FLINK-18372
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


NullPointerException can happen in SlotPoolImpl#maybeRemapOrphanedAllocation, 
which indicates a bug.

https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/8189/logs/115

6:07:07,950 [flink-akka.actor.default-dispatcher-7] WARN  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Slot offering 
to JobManager failed. Freeing the slots and returning them to the 
ResourceManager.
java.lang.NullPointerException: null
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.maybeRemapOrphanedAllocation(SlotPoolImpl.java:599)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFulfillSlotRequestOrMakeAvailable(SlotPoolImpl.java:564)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.offerSlot(SlotPoolImpl.java:701)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.offerSlots(SlotPoolImpl.java:625)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.JobMaster.offerSlots(JobMaster.java:541) 
~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_242]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_242]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_242]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-runtime_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[scala-library-2.11.12.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[scala-library-2.11.12.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
16:07:07,977 [flink-akka.actor.default-dispatcher-7] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=0.5000, taskHeapMemory=64.000mb (67108864 
bytes), 

[jira] [Created] (FLINK-18355) Simplify tests of SlotPoolImpl

2020-06-17 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18355:
---

 Summary: Simplify tests of SlotPoolImpl
 Key: FLINK-18355
 URL: https://issues.apache.org/jira/browse/FLINK-18355
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Zhu Zhu
 Fix For: 1.12.0


Tests of SlotPoolImpl, including SlotPoolImplTest, SlotPoolInteractionsTest and 
SlotPoolSlotSharingTest, are somehow unnecessarily complicated in the code 
involvement. E.g. SchedulerImp built on top of SlotPoolImpl is used to allocate 
slots from SlotPoolImpl, which can be simplified by directly invoke slot 
allocation on SlotPoolImpl.
Besides that, there are quite some duplications between tests classes of 
SlotPoolImpl, this further includes SlotPoolPendingRequestFailureTest, 
SlotPoolRequestCompletionTest and SlotPoolBatchSlotRequestTest.

It can ease future development and maintenance a lot if we clean up these tests 
by
1. introduce a comment test base for fields and methods reuse  
2. remove the usages of SchedulerImp for slotpool testing
3. other possible simplifications



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18295) Remove the hack logics of result consumers

2020-06-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18295:
---

 Summary: Remove the hack logics of result consumers
 Key: FLINK-18295
 URL: https://issues.apache.org/jira/browse/FLINK-18295
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0


Currently an {{IntermediateDataSet}} can have multiple {{JobVertex}} as its 
consumers. That's why the consumers of a `IntermediateResultPartition` is in 
the form of {{List>}}.

However, in scheduler/{{ExecutionGraph}} there is assumption that one 
`IntermediateResultPartition` can be consumed by one only `ExecutionJobVertex`. 
This results in a lot of hack logics which assumes partition consumers to 
contain a single list.

We should remove these hack logics. The idea is to change 
`IntermediateResultPartition#consumers` to be `List`. 
`ExecutionGraph` building logics should be adjusted accordingly with the 
assumption that an `IntermediateResult` can have one only consumer vertex. In 
`JobGraph`, there should also be check logics for this assumption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18114) Expose more details in logs for debugging bulk slot allocation failures

2020-06-03 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18114:
---

 Summary: Expose more details in logs for debugging bulk slot 
allocation failures
 Key: FLINK-18114
 URL: https://issues.apache.org/jira/browse/FLINK-18114
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.12.0


More detailed logs are needed for users to troubleshoot bulk slot allocation 
failures.

https://github.com/apache/flink/pull/12375#discussion_r433922167



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18034) Introduce PreferredLocationsRetriever

2020-05-29 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-18034:
---

 Summary: Introduce PreferredLocationsRetriever
 Key: FLINK-18034
 URL: https://issues.apache.org/jira/browse/FLINK-18034
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.12.0


Preferred locations based on state and inputs are scattered into multiple 
components, which makes it harder to reasoning the calculation and complicates 
those hosting components.

We can introduce a {{PreferredLocationsRetriever}} to be used by 
{{ExecutionSlotAllocator}} which returns preferred locations of an execution 
vertex and hides the details of the calculation from other components.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17821) Kafka010TableITCase>KafkaTableTestBase.testKafkaSourceSink failed on AZP

2020-05-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17821:
---

 Summary: 
Kafka010TableITCase>KafkaTableTestBase.testKafkaSourceSink failed on AZP
 Key: FLINK-17821
 URL: https://issues.apache.org/jira/browse/FLINK-17821
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.12.0
Reporter: Zhu Zhu


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1871=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=34f486e1-e1e4-5dd2-9c06-bfdd9b9c74a8=12032

2020-05-19T16:29:40.7239430Z Test testKafkaSourceSink[legacy = false, topicId = 
1](org.apache.flink.streaming.connectors.kafka.table.Kafka010TableITCase) 
failed with:
2020-05-19T16:29:40.7240291Z java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
2020-05-19T16:29:40.7241033Zat 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
2020-05-19T16:29:40.7241542Zat 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
2020-05-19T16:29:40.7242127Zat 
org.apache.flink.table.planner.runtime.utils.TableEnvUtil$.execInsertSqlAndWaitResult(TableEnvUtil.scala:31)
2020-05-19T16:29:40.7242729Zat 
org.apache.flink.table.planner.runtime.utils.TableEnvUtil.execInsertSqlAndWaitResult(TableEnvUtil.scala)
2020-05-19T16:29:40.7243239Zat 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.testKafkaSourceSink(KafkaTableTestBase.java:145)
2020-05-19T16:29:40.7243691Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-05-19T16:29:40.7244273Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-05-19T16:29:40.7244729Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-05-19T16:29:40.7245117Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2020-05-19T16:29:40.7245515Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
2020-05-19T16:29:40.7245956Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2020-05-19T16:29:40.7246419Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
2020-05-19T16:29:40.7246870Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2020-05-19T16:29:40.7247287Zat 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
2020-05-19T16:29:40.7251320Zat 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2020-05-19T16:29:40.7251833Zat 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
2020-05-19T16:29:40.7252251Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
2020-05-19T16:29:40.7252716Zat 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
2020-05-19T16:29:40.7253117Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2020-05-19T16:29:40.7253502Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2020-05-19T16:29:40.7254041Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2020-05-19T16:29:40.7254528Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2020-05-19T16:29:40.7255500Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2020-05-19T16:29:40.7256064Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2020-05-19T16:29:40.7256438Zat 
org.junit.runners.Suite.runChild(Suite.java:128)
2020-05-19T16:29:40.7256758Zat 
org.junit.runners.Suite.runChild(Suite.java:27)
2020-05-19T16:29:40.7257118Zat 
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
2020-05-19T16:29:40.7257486Zat 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
2020-05-19T16:29:40.7257885Zat 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
2020-05-19T16:29:40.7258389Zat 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
2020-05-19T16:29:40.7258821Zat 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
2020-05-19T16:29:40.7259219Zat 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2020-05-19T16:29:40.7259664Zat 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2020-05-19T16:29:40.7260098Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2020-05-19T16:29:40.7260635Zat 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2020-05-19T16:29:40.7261065Zat 
org.junit.runners.ParentRunner.run(ParentRunner.java:363)
2020-05-19T16:29:40.7261467Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
2020-05-19T16:29:40.7261952Zat 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)

[jira] [Created] (FLINK-17760) Rework tests to not rely on legacy scheduling logics in ExecutionGraph anymore

2020-05-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17760:
---

 Summary: Rework tests to not rely on legacy scheduling logics in 
ExecutionGraph anymore
 Key: FLINK-17760
 URL: https://issues.apache.org/jira/browse/FLINK-17760
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Zhu Zhu


The legacy scheduling logics in ExecutionGraph were used by the legacy 
scheduler. They are not in production use anymore.
In order to remove legacy scheduling logics, it is needed to rework the tests 
which relied on them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17759) Remove RestartIndividualStrategy

2020-05-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17759:
---

 Summary: Remove RestartIndividualStrategy
 Key: FLINK-17759
 URL: https://issues.apache.org/jira/browse/FLINK-17759
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu


It was used by the legacy scheduler and is not used anymore.
Removing it can ease the work to further remove the legacy scheduling logics in 
ExecutionGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17758) Remove AdaptedRestartPipelinedRegionStrategyNG

2020-05-16 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17758:
---

 Summary: Remove AdaptedRestartPipelinedRegionStrategyNG
 Key: FLINK-17758
 URL: https://issues.apache.org/jira/browse/FLINK-17758
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhu Zhu


It was used by the legacy scheduler and is not used anymore.
Removing it can ease the work to further remove the legacy scheduling logics in 
ExecutionGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17726) Scheduler should take care of tasks directly canceled by TaskManager

2020-05-15 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17726:
---

 Summary: Scheduler should take care of tasks directly canceled by 
TaskManager
 Key: FLINK-17726
 URL: https://issues.apache.org/jira/browse/FLINK-17726
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Zhu Zhu
 Fix For: 1.12.0


JobManager will not trigger failure handling when receiving CANCELED task 
update. 
This is because CANCELED tasks are usually caused by another FAILED task. These 
CANCELED tasks will be restarted by the failover process triggered  FAILED task.

However, if a task is directly CANCELED by TaskManager due to its own runtime 
issue, the task will not be recovered by JM and thus the job would hang.
This is a potential issue and we should avoid it.

A possible solution is to let JobManager treat tasks transitioning to CANCELED 
from all states except from CANCELING as failed tasks. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17714) Support custom RestartBackoffTimeStrategy

2020-05-14 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17714:
---

 Summary: Support custom RestartBackoffTimeStrategy
 Key: FLINK-17714
 URL: https://issues.apache.org/jira/browse/FLINK-17714
 Project: Flink
  Issue Type: Wish
  Components: Runtime / Coordination
Reporter: Zhu Zhu


There are cases that users need to customize RestartBackoffTimeStrategy to 
better control job recovery. 
One example is that users want a job to restart only on certain errors and fail 
on others. See this ML 
[disscusion|https://lists.apache.org/thread.html/rde685552a83d0d146cf83560df1bc6f33d3dd569f69ae7bbcc4ae508%40%3Cuser.flink.apache.org%3E].





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17542) Unify slot request timeout handling for streaming and batch tasks

2020-05-06 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17542:
---

 Summary: Unify slot request timeout handling for streaming and 
batch tasks
 Key: FLINK-17542
 URL: https://issues.apache.org/jira/browse/FLINK-17542
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.11.0


There are 2 different slot request timeout handling mechanism for batch and 
streaming tasks.
For streaming tasks, the slot request will fail if it is not fulfilled within 
slotRequestTimeout.
For batch tasks, the slot request will be checked periodically to see whether 
it is fulfillable, and only fails if it has been unfulfillable for a certain 
period(slotRequestTimeout).

With slot marked with whether they will be occupied indefinitely, we can unify 
these handling. See 
[FLIP-119|https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP-119PipelinedRegionScheduling-ExtendedSlotProviderInterface]
 for more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17410) BlockingPartitionBenchmark compilation failed due to changed StreamGraph interface

2020-04-27 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17410:
---

 Summary: BlockingPartitionBenchmark compilation failed due to 
changed StreamGraph interface
 Key: FLINK-17410
 URL: https://issues.apache.org/jira/browse/FLINK-17410
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.11.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.11.0


{{StreamGraph#setGlobalDataExchangeMode(...)}} is introduced in FLINK-17020 to 
replace {{StreamGraph#setBlockingConnectionsBetweenChains(...)}}.
{{BlockingPartitionBenchmark}} failed because it relies on this changed 
interface.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17330) Avoid scheduling deadlocks caused by intra-logical-region ALL-to-ALL blocking edges

2020-04-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17330:
---

 Summary: Avoid scheduling deadlocks caused by intra-logical-region 
ALL-to-ALL blocking edges
 Key: FLINK-17330
 URL: https://issues.apache.org/jira/browse/FLINK-17330
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


Imagine a job like this:
A --(pipelined FORWARD)--> B --(blocking ALL-to-ALL)--> D
A --(pipelined FORWARD)--> C --(pipelined FORWARD)--> D
parallelism=2 for all vertices.

We will have 2 execution pipelined regions:
R1={A1, B1, C1, D1}, R2={A2, B2, C2, D2}

R1 has a cross-region input edge (B2->D1).
R2 has a cross-region input edge (B1->D2).

Scheduling deadlock will happen since we schedule a region only when all its 
inputs are consumable (i.e. blocking partitions to be finished). Because R1 can 
be scheduled only if R2 finishes, while R2 can be scheduled only if R1 finishes.

To avoid this, one solution is to force a logical pipelined region with 
intra-region ALL-to-ALL blocking edges to form one only execution pipelined 
region, so that there would not be cyclic input dependency between regions.
Besides that, we should also pay attention to avoid cyclic cross-region 
POINTWISE blocking edges. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17047) Simplify SchedulingStrategy#onPartitionConsumable(...) parameters

2020-04-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17047:
---

 Summary: Simplify SchedulingStrategy#onPartitionConsumable(...) 
parameters
 Key: FLINK-17047
 URL: https://issues.apache.org/jira/browse/FLINK-17047
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


I'd propose to simplify SchedulingStrategy#onPartitionConsumable(...) 
parameters as below:
1. take IntermediateResultPartitionID instead of ResultPartitionID
ResultPartitionID is a composition of IntermediateResultPartitionID and 
ExecutionAttemptID. SchedulingStrategy is not aware of ExecutionAttemptID so 
there is no need to expose it.
2. drop the executionVertexId param. executionVertexId does not provide extra 
information. The check in LazyFromSourcesSchedulingStrategy does not make much 
sense since the executionVertexId is just retrieved by the partitionId in an 
earlier stage. It makes things more complex since a blocking result partition 
can become consumable when a vertex who is not its producer finishes.

This simplification also eases the work of FLINK-14234.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17046) SavepointWriterITCase failed on travis

2020-04-08 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17046:
---

 Summary: SavepointWriterITCase failed on travis
 Key: FLINK-17046
 URL: https://issues.apache.org/jira/browse/FLINK-17046
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


https://api.travis-ci.com/v3/job/316732861/log.txt

[ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 7.67 s 
<<< FAILURE! - in org.apache.flink.state.api.SavepointWriterITCase
[ERROR] testStateBootstrapAndModification[Savepoint Writer: MemoryStateBackend 
(data in heap memory / checkpoints to JobManager) (checkpoints: 'null', 
savepoints: 'null', asynchronous: UNDEFINED, maxStateSize: 
5242880)](org.apache.flink.state.api.SavepointWriterITCase)  Time elapsed: 
1.736 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
at 
org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
Caused by: java.lang.UnsupportedOperationException: This method should never be 
called
[ERROR] testStateBootstrapAndModification[Savepoint Writer: 
RocksDBStateBackend{checkpointStreamBackend=MemoryStateBackend (data in heap 
memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: UNDEFINED, maxStateSize: 5242880), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=UNDEFINED, numberOfTransferThreads=-1, 
writeBatchSize=-1}](org.apache.flink.state.api.SavepointWriterITCase)  Time 
elapsed: 0.486 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.state.api.SavepointWriterITCase.bootstrapState(SavepointWriterITCase.java:147)
at 
org.apache.flink.state.api.SavepointWriterITCase.testStateBootstrapAndModification(SavepointWriterITCase.java:114)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
Caused by: java.lang.UnsupportedOperationException: This method should never be 
called



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17021) Blink Planner set GlobalDataExchangeMode

2020-04-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17021:
---

 Summary: Blink Planner set GlobalDataExchangeMode
 Key: FLINK-17021
 URL: https://issues.apache.org/jira/browse/FLINK-17021
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Table SQL / Planner
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


Blink planner config option "table.exec.shuffle-mode" should be extended to set 
GlobalDataExchangeMode for a job, values supported are:
 * all-blocking/batch --> ALL_EDGES_BLOCKING
 * forward-pipelined-only --> FORWARD_EDGES_PIPELINED
 * pointwise-pipelined-only --> POINTWISE_EDGES_PIPELINED
 * all-pipelined/pipelined --> ALL_EDGES_PIPELINED

Note that values 'pipelined' and 'batch' are still supported to be compatible:
 * ‘pipelined’ will be treated the same as ‘all-pipelined’
 * ‘batch’ will be treated the same as as ‘all-blocking’

Blink planner needs to set GlobalDataExchangeMode to StreamGraph according to 
the config value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17020) Introduce GlobalDataExchangeMode for JobGraph Generation

2020-04-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17020:
---

 Summary: Introduce GlobalDataExchangeMode for JobGraph Generation
 Key: FLINK-17020
 URL: https://issues.apache.org/jira/browse/FLINK-17020
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


Introduce GlobalDataExchangeMode with 4 modes:
 * ALL_EDGES_BLOCKING
 * FORWARD_EDGES_PIPELINED
 * POINTWISE_EDGES_PIPELINED
 * ALL_EDGES_PIPELINED

StreamGraph will be extended with a new field to host the 
GlobalDataExchangeMode. In the JobGraph generation stage, this mode will be 
used to determine the data exchange type of each job edge.

More details see [FLIP-119#Global Data Exchange 
Mode|https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP-119PipelinedRegionScheduling-GlobalDataExchangeMode]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17019) Implement FIFO Physical Slot Assignment in SlotPoolImpl

2020-04-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17019:
---

 Summary: Implement FIFO Physical Slot Assignment in SlotPoolImpl
 Key: FLINK-17019
 URL: https://issues.apache.org/jira/browse/FLINK-17019
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


The SlotPool should try to fulfill the oldest pending slot request once it 
receives an available slot, no matter if the slot is returned by another 
terminated task or is just offered from a task manager. This naturally ensures 
that slot requests of an earlier scheduled region will be fulfilled earlier 
than requests of a later scheduled region.

We only need to change the slot assignment logic on slot offers. This is 
because the fields {{pendingRequests}} and {{waitingForResourceManager}} store 
the pending requests in LinkedHashMaps . Therefore, 
{{tryFulfillSlotRequestOrMakeAvailable(...)}} will naturally fulfill the 
pending requests in inserted order.

When a new slot is offered via {{SlotPoolImpl#offerSlot(...)}} , we should use 
it to fulfill the oldest fulfillable slot request directly by invoking 
{{tryFulfillSlotRequestOrMakeAvailable(...)}}. 

If a pending request (say R1) exists with the allocationId of the offered slot, 
and it is different from the request to fulfill (say R2), we should update the 
pendingRequest to replace AllocationID of R1 to be the AllocationID of R2. This 
ensures failAllocation(...) can fail slot allocation requests to trigger 
restarting tasks and re-allocating slots. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17018) Use Bulk Slot Allocation in DefaultExecutionSlotAllocator

2020-04-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17018:
---

 Summary: Use Bulk Slot Allocation in DefaultExecutionSlotAllocator
 Key: FLINK-17018
 URL: https://issues.apache.org/jira/browse/FLINK-17018
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


The DefaultExecutionSlotAllocator should invoke bulk slot allocation methods to 
allocate slots.
The SlotProviderStrategy should also be reworked to forward such requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17017) Implement Bulk Slot Allocation in SchedulerImpl

2020-04-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17017:
---

 Summary: Implement Bulk Slot Allocation in SchedulerImpl
 Key: FLINK-17017
 URL: https://issues.apache.org/jira/browse/FLINK-17017
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


The SlotProvider interface should be extended with an bulk slot allocation 
method which accepts a bulk of slot requests as one of the parameters.

{code:java}
CompletableFuture> allocateSlots(
  Collection slotRequests,
  Time allocationTimeout);
 
class LogicalSlotRequest {
  SlotRequestId slotRequestId;
  ScheduledUnit scheduledUnit;
  SlotProfile slotProfile;
  boolean slotWillBeOccupiedIndefinitely;
}
 
class LogicalSlotRequestResult {
  SlotRequestId slotRequestId;
  LogicalSlot slot;
}
{code}

More details see [FLIP-119#Bulk Slot 
Allocation|https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP-119PipelinedRegionScheduling-BulkSlotAllocation]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17014) Implement PipelinedRegionSchedulingStrategy

2020-04-07 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-17014:
---

 Summary: Implement PipelinedRegionSchedulingStrategy
 Key: FLINK-17014
 URL: https://issues.apache.org/jira/browse/FLINK-17014
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


The PipelinedRegionSchedulingStrategy submits one pipelined region to the 
DefaultScheduler each time. The PipelinedRegionSchedulingStrategy must be aware 
of the inputs of each pipelined region. It should schedule a region if and only 
if all the inputs of that region become consumable.

PipelinedRegionSchedulingStrategy can implement as below:
 * startScheduling() : schedule all source regions one by one.
 * onPartitionConsumable(partition) : Check all the consumer regions of the 
notified partition, if all the inputs of a region have turned to be consumable, 
schedule the region
 * restartTasks(tasksToRestart) : find out all regions which contain the tasks 
to restart, reschedule those whose inputs are all consumable



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16560) StreamExecutionEnvironment configuration is empty when building program via PackagedProgramUtils#createJobGraph

2020-03-11 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-16560:
---

 Summary: StreamExecutionEnvironment configuration is empty when 
building program via PackagedProgramUtils#createJobGraph
 Key: FLINK-16560
 URL: https://issues.apache.org/jira/browse/FLINK-16560
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.10.0
Reporter: Zhu Zhu


PackagedProgramUtils#createJobGraph(...) is used to generate JobGraph in k8s 
job mode.
The problem is that the configuration field of StreamExecutionEnvironment is a 
newly created one when building the job program. This is because 
StreamPlanEnvironment ctor will base on the no param version ctor of 
StreamExecutionEnvironment.

This may lead to an unexpected result when invoking 
StreamExecutionEnvironment#configure(...) which relies on the configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16430) Pipelined region scheduling

2020-03-04 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-16430:
---

 Summary: Pipelined region scheduling
 Key: FLINK-16430
 URL: https://issues.apache.org/jira/browse/FLINK-16430
 Project: Flink
  Issue Type: Task
  Components: Runtime / Coordination
Affects Versions: 1.11.0
Reporter: Zhu Zhu
 Fix For: 1.11.0


Pipelined region scheduling is targeting to allow batch jobs with PIPELINED 
data exchanges to run without the risk to encounter a resource deadlock.

More details and work items will be added later when the detailed design is 
ready.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16300) Reworks SchedulerTestUtils with testing classes to replace mockito usages

2020-02-26 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-16300:
---

 Summary: Reworks SchedulerTestUtils with testing classes to 
replace mockito usages
 Key: FLINK-16300
 URL: https://issues.apache.org/jira/browse/FLINK-16300
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Affects Versions: 1.11.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.11.0


Mockito is used in SchedulerTestUtils to mock ExecutionVertex and Execution for 
testing. It fails to mock every getter so that other tests use it may encounter 
NPE issues, e.g. ExecutionVertex#getID().
Mockito is also discouraged to be used in Flink tests. So I'd propose to 
reworks the utils with testing classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16276) Introduce factory methods to create DefaultScheduler for testing

2020-02-25 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-16276:
---

 Summary: Introduce factory methods to create DefaultScheduler for 
testing
 Key: FLINK-16276
 URL: https://issues.apache.org/jira/browse/FLINK-16276
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Affects Versions: 1.11.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.11.0


Currently tests create DefaultScheduler via its constructor. Having a builder 
and a set of factory methods can significantly reduce the complexity to 
instantiate DefaultScheduler.
It can be very helpful especially when we are to rework tests to base on the 
new scheduler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >