[flink] branch master updated (9391a70 -> 24ab7bb)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9391a70 [FLINK-13752] Only references necessary variables when bookkeeping result partitions on TM add 24ab7bb [FLINK-13599][e2e tests] Harden test_streaming_kinesis with kinesalite docker image download/run retries No new revisions were added by this update. Summary of changes: flink-end-to-end-tests/test-scripts/common.sh | 1 + .../test-scripts/test_streaming_kinesis.sh | 18 +++--- 2 files changed, 16 insertions(+), 3 deletions(-)
[flink] branch release-1.9 updated: [FLINK-13599][e2e tests] Harden test_streaming_kinesis with kinesalite docker image download/run retries
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new a893b70b [FLINK-13599][e2e tests] Harden test_streaming_kinesis with kinesalite docker image download/run retries a893b70b is described below commit a893b70b203ffbce1165fd4424b106443b4ab6c8 Author: Andrey Zagrebin AuthorDate: Mon Aug 12 12:02:56 2019 +0300 [FLINK-13599][e2e tests] Harden test_streaming_kinesis with kinesalite docker image download/run retries --- flink-end-to-end-tests/test-scripts/common.sh | 1 + .../test-scripts/test_streaming_kinesis.sh | 18 +++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 0191a73..f3c70ac 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -747,3 +747,4 @@ function retry_times() { echo "Command: ${command} failed ${retriesNumber} times." return 1 } + diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh index 8dfba83..08416a0 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh @@ -28,9 +28,21 @@ export AWS_SECRET_KEY=flinkKinesisTestFakeAccessKey KINESALITE_PORT=4567 -#docker run -d --rm --name flink-test-kinesis -p ${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite -# override entrypoint to enable SSL -docker run -d --rm --entrypoint "/tini" --name flink-test-kinesis -p ${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite -- /usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite --ssl +function start_kinesalite { +#docker run -d --rm --name flink-test-kinesis -p ${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite +# override entrypoint to enable SSL +docker run -d --rm --entrypoint "/tini" \ +--name flink-test-kinesis \ +-p ${KINESALITE_PORT}:${KINESALITE_PORT} \ +instructure/kinesalite -- \ +/usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite --ssl +} + +START_KINESALITE_MAX_RETRIES=50 +if ! retry_times ${START_KINESALITE_MAX_RETRIES} 0 start_kinesalite; then +echo "Failed to run kinesalite docker image" +exit 1 +fi # reveal potential issues with the container in the CI environment docker logs flink-test-kinesis
[flink-web] branch asf-site updated (b2439e9 -> 36dd318)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from b2439e9 Minor fix in 1.9 announcement new d61151b Add a new committer with the ASF id azagrebin new 36dd318 Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: community.md | 6 ++ content/community.html | 6 ++ 2 files changed, 12 insertions(+)
[flink-web] 02/02: Rebuild website
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 36dd3180b98f55dd6d8a758ca1c1fe56752f029f Author: Andrey Zagrebin AuthorDate: Thu Aug 22 15:36:21 2019 +0200 Rebuild website --- content/community.html | 6 ++ 1 file changed, 6 insertions(+) diff --git a/content/community.html b/content/community.html index 1d18937..1ed68ab 100644 --- a/content/community.html +++ b/content/community.html @@ -638,6 +638,12 @@ Committer kurt + +https://avatars0.githubusercontent.com/u/10573485?s=50; class="committer-avatar" /> +Andrey Zagrebin +Committer +azagrebin +
[flink-web] 01/02: Add a new committer with the ASF id azagrebin
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit d61151bbecea7aa9e4c44511bbe84e4d6d76caf2 Author: Andrey Zagrebin AuthorDate: Thu Aug 22 15:17:10 2019 +0200 Add a new committer with the ASF id azagrebin --- community.md | 6 ++ 1 file changed, 6 insertions(+) diff --git a/community.md b/community.md index 6c70099..2f4e689 100644 --- a/community.md +++ b/community.md @@ -450,6 +450,12 @@ Flink Forward is a conference happening yearly in different locations around the Committer kurt + +https://avatars0.githubusercontent.com/u/10573485?s=50; class="committer-avatar"> +Andrey Zagrebin +Committer +azagrebin +
[flink] 01/02: [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git commit 1a837659b71dd02ef6775bd2a4de331aab3ddc2e Author: Andrey Zagrebin AuthorDate: Wed Aug 21 11:19:27 2019 +0200 [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint To better reflect the lifecycle of RpcEndpoint, we suggest to introduce its running state. We can use the non-running state e.g. to make decision about how to react on API calls if it is already known that the RpcEndpoint is terminating. --- .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 85 -- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 4 +- .../apache/flink/runtime/rpc/RpcEndpointTest.java | 72 ++ 3 files changed, 153 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 5c14a54..7c7a4c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -56,6 +56,36 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * The RPC endpoint provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)} * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread. + * + * Lifecycle + * + * The RPC endpoint has the following stages: + * + * + *The RPC endpoint is created in a non-running state and does not serve any RPC requests. + * + * + *Calling the {@link #start()} method triggers the start of the RPC endpoint and schedules overridable + *{@link #onStart()} method call to the main thread. + * + * + *When the start operation ends the RPC endpoint is moved to the running state + *and starts to serve and complete RPC requests. + * + * + *Calling the {@link #closeAsync()} method triggers the termination of the RPC endpoint and schedules overridable + *{@link #onStop()} method call to the main thread. + * + * + *When {@link #onStop()} method is called, it triggers an asynchronous stop operation. + *The RPC endpoint is not in the running state anymore but it continues to serve RPC requests. + * + * + *When the asynchronous stop operation ends, the RPC endpoint terminates completely + *and does not serve RPC requests anymore. + * + * + * The running state can be queried in a RPC method handler or in the main thread by calling {@link #isRunning()} method. */ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { @@ -80,6 +110,13 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { private final MainThreadExecutor mainThreadExecutor; /** +* Indicates whether the RPC endpoint is started and not stopped or being stopped. +* +* IMPORTANT: the running state is not thread safe and can be used only in the main thread of the rpc endpoint. +*/ + private boolean isRunning; + + /** * Initializes the RPC endpoint. * * @param rpcService The RPC server that dispatches calls to this RPC endpoint. @@ -112,12 +149,22 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { return endpointId; } + /** +* Returns whether the RPC endpoint is started and not stopped or being stopped. +* +* @return whether the RPC endpoint is started and not stopped or being stopped. +*/ + protected boolean isRunning() { + validateRunsInMainThread(); + return isRunning; + } + // // Start & shutdown & lifecycle callbacks // /** -* Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready +* Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready * to process remote procedure calls. * * @throws Exception indicating that something went wrong while starting the RPC endpoint @@ -127,20 +174,33 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { } /** -* User overridable callback. +* Internal method which is called by the RpcService implementation to start the RpcEndpoint. +* +* @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs, +* then the rpc
[flink] 02/02: [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git commit b682e9a316669f30fa4bfcaa32a8fa0d3ac1dc02 Author: Andrey Zagrebin AuthorDate: Mon Aug 19 16:20:39 2019 +0200 [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect This prevents JM from acquiring slots which belong to the stopped TM. --- .../apache/flink/runtime/taskexecutor/TaskExecutor.java| 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index c8bcaf9..b1238ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -340,11 +340,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { Throwable jobManagerDisconnectThrowable = null; - if (resourceManagerConnection != null) { - resourceManagerConnection.close(); - } - FlinkException cause = new FlinkException("The TaskExecutor is shutting down."); + + closeResourceManagerConnection(cause); + for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) { try { disassociateFromJobManager(jobManagerConnection, cause); @@ -958,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public void disconnectResourceManager(Exception cause) { - reconnectToResourceManager(cause); + if (isRunning()) { + reconnectToResourceManager(cause); + } } // == @@ -986,6 +987,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private void reconnectToResourceManager(Exception cause) { closeResourceManagerConnection(cause); + startRegistrationTimeout(); tryConnectToResourceManager(); } @@ -1098,8 +1100,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { resourceManagerConnection.close(); resourceManagerConnection = null; } - - startRegistrationTimeout(); } private void startRegistrationTimeout() {
[flink] branch release-1.9 updated (c175cc4 -> b682e9a)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from c175cc4 [hotfix][FLINK-13901][docs] Fix documentation links check errors new 1a83765 [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint new b682e9a [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 85 -- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 4 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 14 ++-- .../apache/flink/runtime/rpc/RpcEndpointTest.java | 72 ++ 4 files changed, 160 insertions(+), 15 deletions(-)
[flink] branch master updated (e263856 -> aabde88)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e263856 [FLINK-13774][table-planner-blink] Use LocalReferenceExpression and RexNodeExpression instead of blink expressions add aabde88 [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint No new revisions were added by this update. Summary of changes: .../org/apache/flink/runtime/rpc/RpcEndpoint.java | 85 -- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 4 +- .../apache/flink/runtime/rpc/RpcEndpointTest.java | 72 ++ 3 files changed, 153 insertions(+), 8 deletions(-)
[flink] branch master updated (aabde88 -> fa12f2d)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from aabde88 [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint add fa12f2d [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect No new revisions were added by this update. Summary of changes: .../apache/flink/runtime/taskexecutor/TaskExecutor.java| 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-)
[flink-web] branch asf-site updated: [FLINK-13804][code style][zh] Set the initial capacity for a collection only if there is a good proven reason
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new e4f85d6 [FLINK-13804][code style][zh] Set the initial capacity for a collection only if there is a good proven reason e4f85d6 is described below commit e4f85d62998c08c4acb66604c4235944279a Author: Andrey Zagrebin AuthorDate: Wed Aug 28 11:41:04 2019 +0200 [FLINK-13804][code style][zh] Set the initial capacity for a collection only if there is a good proven reason --- contributing/code-style-and-quality-java.zh.md | 1 + 1 file changed, 1 insertion(+) diff --git a/contributing/code-style-and-quality-java.zh.md b/contributing/code-style-and-quality-java.zh.md index 335e889..f0de4e7 100644 --- a/contributing/code-style-and-quality-java.zh.md +++ b/contributing/code-style-and-quality-java.zh.md @@ -63,6 +63,7 @@ title: "Apache Flink Code Style and Quality Guide — Java" * `contains()` before `get()` → `get()` and check null * `contains()` before `put()` → `putIfAbsent()` or `computeIfAbsent()` * Iterating over keys, getting values → iterate over `entrySet()` +* **Set the initial capacity for a collection only if there is a good proven reason** for that, otherwise do not clutter the code. In case of **Maps** it can be even deluding because the Map's load factor effectively reduces the capacity. ### Lambdas
[flink-web] branch asf-site updated: [FLINK-13812][code style] Usage of Java Optional
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 5088cce [FLINK-13812][code style] Usage of Java Optional 5088cce is described below commit 5088cce017c888514567460a7a4cc2b8915127da Author: Andrey Zagrebin AuthorDate: Wed Aug 21 13:17:50 2019 +0200 [FLINK-13812][code style] Usage of Java Optional --- contributing/code-style-and-quality-common.md | 2 +- contributing/code-style-and-quality-java.md | 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/contributing/code-style-and-quality-common.md b/contributing/code-style-and-quality-common.md index 05f703b..68de3a2 100644 --- a/contributing/code-style-and-quality-common.md +++ b/contributing/code-style-and-quality-common.md @@ -140,7 +140,7 @@ That way, you get warnings from IntelliJ about all sections where you have to re _Note: This means that `@Nonnull` annotations are usually not necessary, but can be used in certain cases to override a previous annotation, or to point non-nullability out in a context where one would expect a nullable value._ `Optional` is a good solution as a return type for method that may or may not have a result, so nullable return types are good candidates to be replaced with `Optional`. -For fields and parameters, `Optional` is disputed in Java and most parts of the Flink code case don’t use optional for fields. +See also [usage of Java Optional](code-style-and-quality-java.md#java-optional). ### Avoid Code Duplication diff --git a/contributing/code-style-and-quality-java.md b/contributing/code-style-and-quality-java.md index e22b2c8..ef7a74c 100644 --- a/contributing/code-style-and-quality-java.md +++ b/contributing/code-style-and-quality-java.md @@ -66,6 +66,17 @@ title: "Apache Flink Code Style and Quality Guide — Java" * **Set the initial capacity for a collection only if there is a good proven reason** for that, otherwise do not clutter the code. In case of **Maps** it can be even deluding because the Map's load factor effectively reduces the capacity. +### Java Optional + +* Use **@Nullable annotation where you do not use Optional** for the nullable values. +* If you can prove that `Optional` usage would lead to a **performance degradation in critical code then fallback to @Nullable**. +* Always use **Optional to return nullable values** in the API/public methods except the case of a proven performance concern. +* **Do not use Optional as a function argument**, instead either overload the method or use the Builder pattern for the set of function arguments. + * Note: an Optional argument can be allowed in a private helper method if you believe that it simplifies the code + ([example](https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java#L95)). +* **Do not use Optional for class fields**. + + ### Lambdas * Prefer non-capturing lambdas (lambdas that do not contain references to the outer scope). Capturing lambdas need to create a new object instance for every call. Non-capturing lambdas can use the same instance for each invocation.
[flink-web] branch asf-site updated: [FLINK-13804][code style] Set the initial capacity for a collection only if there is a good proven reason
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 8ce7c00 [FLINK-13804][code style] Set the initial capacity for a collection only if there is a good proven reason 8ce7c00 is described below commit 8ce7c00db1a7ceebdd14b2c7ee8e6365ed8a8081 Author: Andrey Zagrebin AuthorDate: Tue Aug 20 17:44:41 2019 +0200 [FLINK-13804][code style] Set the initial capacity for a collection only if there is a good proven reason --- contributing/code-style-and-quality-java.md | 1 + 1 file changed, 1 insertion(+) diff --git a/contributing/code-style-and-quality-java.md b/contributing/code-style-and-quality-java.md index d1e7dea..e22b2c8 100644 --- a/contributing/code-style-and-quality-java.md +++ b/contributing/code-style-and-quality-java.md @@ -63,6 +63,7 @@ title: "Apache Flink Code Style and Quality Guide — Java" * `contains()` before `get()` → `get()` and check null * `contains()` before `put()` → `putIfAbsent()` or `computeIfAbsent()` * Iterating over keys, getting values → iterate over `entrySet()` +* **Set the initial capacity for a collection only if there is a good proven reason** for that, otherwise do not clutter the code. In case of **Maps** it can be even deluding because the Map's load factor effectively reduces the capacity. ### Lambdas
[flink-web] branch asf-site updated: [FLINK-13812][code style][zh] Usage of Java Optional
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 2f8ce2f [FLINK-13812][code style][zh] Usage of Java Optional 2f8ce2f is described below commit 2f8ce2ff9d3d471a8440e466ce12e52a770ecce3 Author: Andrey Zagrebin AuthorDate: Wed Aug 28 11:46:17 2019 +0200 [FLINK-13812][code style][zh] Usage of Java Optional --- contributing/code-style-and-quality-common.zh.md | 2 +- contributing/code-style-and-quality-java.zh.md | 11 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/contributing/code-style-and-quality-common.zh.md b/contributing/code-style-and-quality-common.zh.md index a8643a9..82caabf 100644 --- a/contributing/code-style-and-quality-common.zh.md +++ b/contributing/code-style-and-quality-common.zh.md @@ -140,7 +140,7 @@ That way, you get warnings from IntelliJ about all sections where you have to re _Note: This means that `@Nonnull` annotations are usually not necessary, but can be used in certain cases to override a previous annotation, or to point non-nullability out in a context where one would expect a nullable value._ `Optional` is a good solution as a return type for method that may or may not have a result, so nullable return types are good candidates to be replaced with `Optional`. -For fields and parameters, `Optional` is disputed in Java and most parts of the Flink code case don’t use optional for fields. +See also [usage of Java Optional](code-style-and-quality-java.md#java-optional). ### Avoid Code Duplication diff --git a/contributing/code-style-and-quality-java.zh.md b/contributing/code-style-and-quality-java.zh.md index f0de4e7..d5af274 100644 --- a/contributing/code-style-and-quality-java.zh.md +++ b/contributing/code-style-and-quality-java.zh.md @@ -66,6 +66,17 @@ title: "Apache Flink Code Style and Quality Guide — Java" * **Set the initial capacity for a collection only if there is a good proven reason** for that, otherwise do not clutter the code. In case of **Maps** it can be even deluding because the Map's load factor effectively reduces the capacity. +### Java Optional + + * Use **@Nullable annotation where you do not use Optional** for the nullable values. +* If you can prove that `Optional` usage would lead to a **performance degradation in critical code then fallback to @Nullable**. +* Always use **Optional to return nullable values** in the API/public methods except the case of a proven performance concern. +* **Do not use Optional as a function argument**, instead either overload the method or use the Builder pattern for the set of function arguments. + * Note: an Optional argument can be allowed in a private helper method if you believe that it simplifies the code + ([example](https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java#L95)). +* **Do not use Optional for class fields**. + + ### Lambdas * Prefer non-capturing lambdas (lambdas that do not contain references to the outer scope). Capturing lambdas need to create a new object instance for every call. Non-capturing lambdas can use the same instance for each invocation.
[flink-web] branch asf-site updated (2f8ce2f -> f7c040b)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 2f8ce2f [FLINK-13812][code style][zh] Usage of Java Optional new 00462dd [FLINK-13820][code style] Breaking long function argument lists and chained method calls new f7c040b Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../code-style-and-quality-common.html | 2 +- .../code-style-and-quality-formatting.html | 39 ++ .../contributing/code-style-and-quality-java.html | 17 ++ .../code-style-and-quality-common.html | 2 +- .../code-style-and-quality-formatting.html | 38 + .../contributing/code-style-and-quality-java.html | 17 ++ contributing/code-style-and-quality-formatting.md | 39 ++ .../code-style-and-quality-formatting.zh.md| 38 + 8 files changed, 190 insertions(+), 2 deletions(-)
[flink-web] 02/02: Rebuild website
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit f7c040be053c871b9fc66588aca78510db5608b5 Author: Andrey Zagrebin AuthorDate: Wed Aug 28 17:47:59 2019 +0200 Rebuild website --- .../code-style-and-quality-common.html | 2 +- .../code-style-and-quality-formatting.html | 39 ++ .../contributing/code-style-and-quality-java.html | 17 ++ .../code-style-and-quality-common.html | 2 +- .../code-style-and-quality-formatting.html | 38 + .../contributing/code-style-and-quality-java.html | 17 ++ 6 files changed, 113 insertions(+), 2 deletions(-) diff --git a/content/contributing/code-style-and-quality-common.html b/content/contributing/code-style-and-quality-common.html index aba7479..e92b0d8 100644 --- a/content/contributing/code-style-and-quality-common.html +++ b/content/contributing/code-style-and-quality-common.html @@ -377,7 +377,7 @@ That way, you get warnings from IntelliJ about all sections where you have to re Note: This means that @Nonnull annotations are usually not necessary, but can be used in certain cases to override a previous annotation, or to point non-nullability out in a context where one would expect a nullable value. Optional is a good solution as a return type for method that may or may not have a result, so nullable return types are good candidates to be replaced with Optional. -For fields and parameters, Optional is disputed in Java and most parts of the Flink code case don’t use optional for fields. +See also usage of Java Optional. Avoid Code Duplication diff --git a/content/contributing/code-style-and-quality-formatting.html b/content/contributing/code-style-and-quality-formatting.html index 7d934b9..10f8305 100644 --- a/content/contributing/code-style-and-quality-formatting.html +++ b/content/contributing/code-style-and-quality-formatting.html @@ -223,6 +223,7 @@ Imports Naming Whitespaces + Breaking the lines of too long statements Braces Javadocs Modifiers @@ -281,6 +282,44 @@ We are aware that spaces are a bit nicer; it just happened to be that we started Spaces around operators/keywords. Operators (+, =, , …) and keywords (if, for, catch, …) must have a space before and after them, provided they are not at the start or end of the line. +Breaking the lines of too long statements + +In general long lines should be avoided for the better readability. Try to use short statements which operate on the same level of abstraction. Break the long statements by creating more local variables, defining helper functions etc. + +Two major sources of long lines are: + * Long list of arguments in function declaration or call: void func(type1 arg1, type2 arg2, ...) + * Long sequence of chained calls: list.stream().map(...).reduce(...).collect(...)... + +Rules about breaking the long lines: + * Break the argument list or chain of calls if the line exceeds limit or earlier if you believe that the breaking would improve the code readability + * If you break the line then each argument/call should have a separate line, including the first one + * Each new line should have one extra indentation (or two for a function declaration) relative to the line of the parent function name or the called entity + +Additionally for function arguments: + * The opening parenthesis always stays on the line of the parent function name + * The possible thrown exception list is never broken and stays on the same last line, even if the line length exceeds its limit + * The line of the function argument should end with a comma staying on the same line except the last argument + +Example of breaking the list of function arguments: +``` +public void func( +int arg1, +int arg2, +…) throws E1, E2, E3 { + +} +``` + +The dot of a chained call is always on the line of that chained call proceeding the call at the beginning. + +Example of breaking the list of chaind calls: + +values +.stream() +.map(...) +.collect(...); + + Braces diff --git a/content/contributing/code-style-and-quality-java.html b/content/contributing/code-style-and-quality-java.html index f2d8bd3..91f5f59 100644 --- a/content/contributing/code-style-and-quality-java.html +++ b/content/contributing/code-style-and-quality-java.html @@ -225,6 +225,7 @@ Java Serialization Java Reflection Collections + Java Optional Lambdas Java Streams @@ -305,6 +306,22 @@ Iterating over keys, getting values → iterate over entrySet() + Set the initial capacity for a collection only if there is a good proven reason for that, otherwise do not clutter the code. In case of Maps it can be even deluding because the Map’s load factor effectively reduces
[flink-web] 01/02: [FLINK-13820][code style] Breaking long function argument lists and chained method calls
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 00462ddbd4d289f9f9c35e61899d8eb55a21e849 Author: Andrey Zagrebin AuthorDate: Thu Aug 22 16:43:25 2019 +0200 [FLINK-13820][code style] Breaking long function argument lists and chained method calls --- contributing/code-style-and-quality-formatting.md | 39 ++ .../code-style-and-quality-formatting.zh.md| 38 + 2 files changed, 77 insertions(+) diff --git a/contributing/code-style-and-quality-formatting.md b/contributing/code-style-and-quality-formatting.md index cb83717..f75b140 100644 --- a/contributing/code-style-and-quality-formatting.md +++ b/contributing/code-style-and-quality-formatting.md @@ -48,6 +48,45 @@ We are aware that spaces are a bit nicer; it just happened to be that we started * **Spaces around operators/keywords.** Operators (`+`, `=`, `>`, …) and keywords (`if`, `for`, `catch`, …) must have a space before and after them, provided they are not at the start or end of the line. +### Breaking the lines of too long statements + +In general long lines should be avoided for the better readability. Try to use short statements which operate on the same level of abstraction. Break the long statements by creating more local variables, defining helper functions etc. + +Two major sources of long lines are: + * Long list of arguments in function declaration or call: void func(type1 arg1, type2 arg2, ...) + * Long sequence of chained calls: list.stream().map(...).reduce(...).collect(...)... + +Rules about breaking the long lines: + * Break the argument list or chain of calls if the line exceeds limit or earlier if you believe that the breaking would improve the code readability + * If you break the line then each argument/call should have a separate line, including the first one + * Each new line should have one extra indentation (or two for a function declaration) relative to the line of the parent function name or the called entity + +Additionally for function arguments: + * The opening parenthesis always stays on the line of the parent function name + * The possible thrown exception list is never broken and stays on the same last line, even if the line length exceeds its limit + * The line of the function argument should end with a comma staying on the same line except the last argument + +Example of breaking the list of function arguments: +``` +public void func( +int arg1, +int arg2, +...) throws E1, E2, E3 { + +} +``` + +The dot of a chained call is always on the line of that chained call proceeding the call at the beginning. + +Example of breaking the list of chaind calls: +``` +values +.stream() +.map(...) +.collect(...); +``` + + ### Braces * **Left curly braces ({) must not be placed on a new line.** diff --git a/contributing/code-style-and-quality-formatting.zh.md b/contributing/code-style-and-quality-formatting.zh.md index 2abac04..2c0bc0f 100644 --- a/contributing/code-style-and-quality-formatting.zh.md +++ b/contributing/code-style-and-quality-formatting.zh.md @@ -48,6 +48,44 @@ We are aware that spaces are a bit nicer; it just happened to be that we started * **Spaces around operators/keywords.** Operators (`+`, `=`, `>`, …) and keywords (`if`, `for`, `catch`, …) must have a space before and after them, provided they are not at the start or end of the line. +### Breaking the lines of too long statements + +In general long lines should be avoided for the better readability. Try to use short statements which operate on the same level of abstraction. Break the long statements by creating more local variables, defining helper functions etc. + +Two major sources of long lines are: + * Long list of arguments in function declaration or call: void func(type1 arg1, type2 arg2, ...) + * Long sequence of chained calls: list.stream().map(...).reduce(...).collect(...)... + +Rules about breaking the long lines: + * Break the argument list or chain of calls if the line exceeds limit or earlier if you believe that the breaking would improve the code readability + * If you break the line then each argument/call should have a separate line, including the first one + * Each new line should have one extra indentation (or two for a function declaration) relative to the line of the parent function name or the called entity + +Additionally for function arguments: + * The opening parenthesis always stays on the line of the parent function name + * The possible thrown exception list is never broken and stays on the same last line, even if the line length exceeds its limit + * The line of the function argument should end with a comma staying on the same line except the last argument + +Example of breaking the list of function arguments: +``` +public void func( +int arg1, +int arg2, +...)
[flink-web] branch asf-site updated (c520e37 -> 1c7a290)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from c520e37 Rebuild website add a9dabc0 [FLINK-13908][code style] Fix formatting issue in Breaking the lines of too long statements add 0a9f019 [hotfix][code style] Fix typo in: Breaking the lines of too long statements add 1c7a290 Rebuild website No new revisions were added by this update. Summary of changes: .../code-style-and-quality-formatting.html | 57 +- .../code-style-and-quality-formatting.html | 56 - contributing/code-style-and-quality-formatting.md | 23 + .../code-style-and-quality-formatting.zh.md| 26 ++ 4 files changed, 96 insertions(+), 66 deletions(-)
[flink] branch master updated (5531b23 -> 3c991dc)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5531b23 [FLINK-13892][hs] Harden HistoryServerTest add 3c991dc [FLINK-13927][docs] Add note about adding Hadoop dependencies to run/debug Flink locally in mini cluster No new revisions were added by this update. Summary of changes: docs/ops/deployment/hadoop.md| 22 ++ docs/ops/deployment/hadoop.zh.md | 22 ++ 2 files changed, 44 insertions(+)
[flink] branch master updated (0d390c1 -> 5ec02e2)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0d390c1 [FLINK-13967][licensing] Fully generate binary licensing add 5ec02e2 [FLINK-13963][docs] Consolidate Hadoop file systems usage and Hadoop integration docs No new revisions were added by this update. Summary of changes: docs/dev/batch/connectors.md | 34 - docs/dev/batch/connectors.zh.md | 34 - docs/dev/batch/hadoop_compatibility.md| 9 + docs/dev/batch/hadoop_compatibility.zh.md | 9 + docs/ops/config.md| 2 + docs/ops/config.zh.md | 2 + docs/ops/deployment/hadoop.md | 22 ++- docs/ops/deployment/hadoop.zh.md | 22 ++- docs/ops/filesystems/index.md | 61 +- docs/ops/filesystems/index.zh.md | 63 +-- 10 files changed, 119 insertions(+), 139 deletions(-)
[flink] 19/21: Fix yarn cut off
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 1f24495c0df56473c850605d9e34baf2f7464be1 Author: Andrey Zagrebin AuthorDate: Wed Nov 6 09:59:32 2019 +0100 Fix yarn cut off --- .../client/deployment/ClusterSpecification.java| 6 +- .../clusterframework/TaskExecutorResourceSpec.java | 4 + .../TaskExecutorResourceUtils.java | 4 +- .../ActiveResourceManagerFactory.java | 23 +--- .../ActiveResourceManagerFactoryTest.java | 97 -- .../flink/yarn/CliFrontendRunWithYarnTest.java | 3 +- .../apache/flink/yarn/YarnConfigurationITCase.java | 28 +--- .../flink/yarn/YarnClusterClientFactory.java | 6 +- .../apache/flink/yarn/YarnClusterDescriptor.java | 18 +-- .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++-- 11 files changed, 73 insertions(+), 260 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java index 72975d8..0d8d105 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java @@ -21,6 +21,7 @@ package org.apache.flink.client.deployment; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; /** * Description of the cluster to start by the {@link ClusterDescriptor}. @@ -68,7 +69,10 @@ public final class ClusterSpecification { int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes(); - int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes(); + int taskManagerMemoryMb = TaskExecutorResourceUtils + .resourceSpecFromConfig(configuration) + .getTotalProcessMemorySize() + .getMebiBytes(); return new ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMb) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java index d6cbe5b..d73e7b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java @@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements java.io.Serializable { return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize); } + public MemorySize getHeapSize() { + return frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize); + } + @Override public String toString() { return "TaskExecutorResourceSpec {" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java index 4b649e4..9c69b62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java @@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils { // public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) { - final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize() - .add(taskExecutorResourceSpec.getTaskHeapSize()) - .add(taskExecutorResourceSpec.getOnHeapManagedMemorySize()); + final MemorySize jvmHeapSize = taskExecutorResourceSpec.getHeapSize(); final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize() .add(taskExecutorResourceSpec.getShuffleMemSize()); final MemorySize jvmMetaspaceSize = taskExecutorResourceSpec.getJvmMetaspaceSize(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resou
[flink] 10/21: [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 436f46753baa44f2892910e50709bd5cd49d09d6 Author: Xintong Song AuthorDate: Mon Oct 21 18:46:32 2019 +0800 [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type. --- .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java index 8bc60a0..bd094c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.blob.BlobCacheService; @@ -132,7 +131,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger { highAvailabilityServices); fail("Should fail synchronously with an exception"); - } catch (IllegalConfigurationException e) { + } catch (Throwable t) { // splendid! } }
[flink] 14/21: Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 1b7df767ce95db9d40f71c2936aa298ae51cc16f Author: Andrey Zagrebin AuthorDate: Mon Nov 4 14:27:03 2019 +0100 Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap) --- docs/_includes/generated/task_manager_memory_configuration.html | 2 +- .../main/java/org/apache/flink/configuration/TaskManagerOptions.java| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/_includes/generated/task_manager_memory_configuration.html b/docs/_includes/generated/task_manager_memory_configuration.html index 09b9dc6..e95160e 100644 --- a/docs/_includes/generated/task_manager_memory_configuration.html +++ b/docs/_includes/generated/task_manager_memory_configuration.html @@ -79,7 +79,7 @@ taskmanager.memory.task.off-heap.size -"0b" +"1m" Task Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory or native memory) reserved for user code. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index d717ad5..234d16a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -293,7 +293,7 @@ public class TaskManagerOptions { */ public static final ConfigOption TASK_OFF_HEAP_MEMORY = key("taskmanager.memory.task.off-heap.size") - .defaultValue("0b") + .defaultValue("1m") .withDescription("Task Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct" + " memory or native memory) reserved for user code.");
[flink] 13/21: Add backwards compatibility
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 5b5be514ad75d0343792b95ea5be8f83b0fa25d8 Author: Andrey Zagrebin AuthorDate: Thu Oct 31 17:06:58 2019 +0100 Add backwards compatibility --- flink-dist/src/main/resources/flink-conf.yaml | 4 ++-- .../flink/runtime/clusterframework/TaskExecutorResourceUtils.java | 7 +-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index a3bc57d..15d7aa9 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -42,9 +42,9 @@ jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m -# The heap size for the TaskManager JVM +# Total Flink process size for the TaskExecutor -taskmanager.heap.size: 1024m +taskmanager.memory.total-process.size: 1024m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java index f2a275f..4b649e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java @@ -417,7 +417,9 @@ public class TaskExecutorResourceUtils { return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY)); } else { @SuppressWarnings("deprecation") - final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB); + final long legacyHeapMemoryMB = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) ? + config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) : + MemorySize.parse(config.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getBytes(); return new MemorySize(legacyHeapMemoryMB << 20); // megabytes to bytes } } @@ -427,7 +429,8 @@ public class TaskExecutorResourceUtils { } private static boolean isManagedMemorySizeExplicitlyConfigured(final Configuration config) { - return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE); + return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE) || + config.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE); } private static boolean isManagedMemoryOffHeapFractionExplicitlyConfigured(final Configuration config) {
[flink] 20/21: [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit b8c6a4cdfab29447dc1cfc74d37c1a4c34217717 Author: Andrey Zagrebin AuthorDate: Wed Nov 6 17:24:28 2019 +0100 [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle) At the moment after FLINK-13982, when we calculate JVM direct memory limit, we only account for memory segment network buffers but not for direct allocations from netty arenas in org.apache.flink.runtime.io.network.netty.NettyBufferPool. We should include netty arenas into shuffle memory calculations. --- .../test-scripts/test_batch_allround.sh| 4 +-- .../TaskExecutorResourceUtils.java | 6 +++- .../runtime/io/network/netty/NettyBufferPool.java | 2 ++ .../runtime/io/network/netty/NettyConfig.java | 10 -- .../NettyShuffleEnvironmentConfiguration.java | 39 ++ .../TaskExecutorResourceUtilsTest.java | 6 +++- .../NettyShuffleEnvironmentConfigurationTest.java | 16 + .../example/failing/JobSubmissionFailsITCase.java | 6 +++- .../test/streaming/runtime/BackPressureITCase.java | 4 ++- .../apache/flink/yarn/YarnConfigurationITCase.java | 4 +-- 10 files changed, 79 insertions(+), 18 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh index 5143ef4..dc6f753 100755 --- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh +++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh @@ -26,8 +26,8 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll echo "Run DataSet-Allround-Test Program" # modify configuration to include spilling to disk -set_config_key "taskmanager.network.memory.min" "10485760" -set_config_key "taskmanager.network.memory.max" "10485760" +set_config_key "taskmanager.network.memory.min" "27262976" +set_config_key "taskmanager.network.memory.max" "27262976" set_conf_ssl "server" start_cluster diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java index 9c69b62..f98670b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java @@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.io.network.netty.NettyBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.util.ConfigurationParserUtils; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -375,7 +377,9 @@ public class TaskExecutorResourceUtils { @SuppressWarnings("deprecation") final long numOfBuffers = config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS); final long pageSize = ConfigurationParserUtils.getPageSize(config); - return new MemorySize(numOfBuffers * pageSize); + final int numberOfSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); + final long numberOfArenas = NettyConfig.getNumberOfArenas(config, numberOfSlots); + return new MemorySize(numOfBuffers * pageSize + numberOfArenas * NettyBufferPool.ARENA_SIZE); } private static RangeFraction getShuffleMemoryRangeFraction(final Configuration config) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java index 6d2a6c8..fc49712 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java @@ -67,6 +67,8 @@ public class NettyBufferPool extends PooledByteBufAllocator { */ private static final int MAX_ORDER = 11; + public static final long ARENA_SIZE = PAGE_SIZE << MAX_ORDER; + /** * Creates Netty's buffer pool with the specified number of direct arenas. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime
[flink] 04/21: [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 4088bea93ad89940909e1c0d3bb92054315f795d Author: Xintong Song AuthorDate: Fri Sep 27 11:19:55 2019 +0800 [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled. --- flink-dist/src/main/flink-bin/bin/config.sh | 49 flink-dist/src/main/flink-bin/bin/taskmanager.sh | 30 --- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 090c1fb..52ce960 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -117,6 +117,8 @@ KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min" KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max" KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback +KEY_TASKM_ENABLE_FLIP49="taskmanager.enable-flip-49" # temporal feature option for flip-49 + KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" KEY_ENV_PID_DIR="env.pid.dir" @@ -429,6 +431,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX}) fi +# Define FLINK_TM_ENABLE_FLIP49 if it is not already set +# temporal feature option for flip-49 +if [ -z "${FLINK_TM_ENABLE_FLIP49}" ]; then +FLINK_TM_ENABLE_FLIP49=$(readFromConfig ${KEY_TASKM_ENABLE_FLIP49} "false" "${YAML_CONF}") +fi # Verify that NUMA tooling is available command -v numactl >/dev/null 2>&1 @@ -790,3 +797,45 @@ runBashJavaUtilsCmd() { echo ${output} } + +getTmResourceDynamicConfigsAndJvmParams() { +if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == "true" ]]; then +echo "$(getTmResourceDynamicConfigsAndJvmParamsFlip49)" +else +echo "$(getTmResourceDynamicConfigsAndJvmParamsLegacy)" +fi +} + +getTmResourceDynamicConfigsAndJvmParamsFlip49() { +local class_path=`constructFlinkClassPath` +class_path=`manglePathList ${class_path}` + +local dynamic_configs=$(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR}) +local jvm_params=$(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR}) + +echo ${jvm_params} $'\n' ${dynamic_configs} +} + +getTmResourceDynamicConfigsAndJvmParamsLegacy() { +if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then + echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`" +else + flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) + FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) +fi + +if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then + +TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) +# Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used +TM_MAX_OFFHEAP_SIZE="8388607T" + +local jvm_params="-Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" +echo ${jvm_params} # no dynamic configs +fi +} diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index f12f9d6..e78a1f1 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -44,32 +44,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" fi -if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then - echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`" -else - flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) - FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) -fi - -if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' i
[flink] 09/21: [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 1d80b12834b2f11a64d61ef489ea0c9e38357524 Author: Xintong Song AuthorDate: Mon Oct 21 16:28:57 2019 +0800 [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations. FLIP-49 requires either one of the following three size(s) to be explicitly configured. - Task Heap Memory and Managed Memory - Total Flink Memory - Total Process Memory This commit fix test cases that fail due to all the above three are missing, by setting a reasonable Total Flink Memory size. --- .../src/main/java/org/apache/flink/client/LocalExecutor.java | 2 ++ .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java | 3 ++- .../runtime/state/TaskExecutorLocalStateStoresManagerTest.java | 4 +++- .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java | 9 ++--- .../apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java | 7 +-- .../test/recovery/JobManagerHAProcessFailureRecoveryITCase.java | 3 ++- 6 files changed, 20 insertions(+), 8 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index b1d3330..7ff020c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.JobExecutorService; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -64,6 +65,7 @@ public class LocalExecutor extends PlanExecutor { if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } + TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration); int numTaskManagers = configuration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 94d9133..85f8f2f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -117,7 +117,8 @@ public abstract class KafkaTestBase extends TestLogger { protected static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); - flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m"); + flinkConfig.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "16m"); + flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); return flinkConfig; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index d9eac20..6191d49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; @@ -46,6 +47,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { public static TemporaryFolder temporaryFolder = new TemporaryFolder(); priv
[flink] 06/21: [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit a28bdc854df6e2c52dcb03e36de0ba3915f7866c Author: Xintong Song AuthorDate: Sun Sep 29 11:56:23 2019 +0800 [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment. --- .../io/network/NettyShuffleServiceFactory.java | 1 + .../runtime/shuffle/ShuffleEnvironmentContext.java | 13 ++ .../runtime/taskexecutor/TaskManagerServices.java | 1 + .../TaskManagerServicesConfiguration.java | 40 + .../NettyShuffleEnvironmentConfiguration.java | 51 +++--- .../NettyShuffleEnvironmentConfigurationTest.java | 3 +- 6 files changed, 93 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index f266c77..477aca0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -59,6 +59,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory systemResourceMetricsProbingInterval; + @Nullable // should only be null when flip49 is disabled + private final TaskExecutorResourceSpec taskExecutorResourceSpec; + public TaskManagerServicesConfiguration( Configuration configuration, ResourceID resourceID, @@ -101,6 +107,8 @@ public class TaskManagerServicesConfiguration { MemoryType memoryType, float memoryFraction, int pageSize, + @Nullable // should only be null when flip49 is disabled + TaskExecutorResourceSpec taskExecutorResourceSpec, long timerServiceShutdownTimeout, RetryingRegistrationConfiguration retryingRegistrationConfiguration, Optional systemResourceMetricsProbingInterval) { @@ -122,6 +130,8 @@ public class TaskManagerServicesConfiguration { this.memoryFraction = memoryFraction; this.pageSize = pageSize; + this.taskExecutorResourceSpec = taskExecutorResourceSpec; + checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " + "service shutdown timeout must be greater or equal to 0."); this.timerServiceShutdownTimeout = timerServiceShutdownTimeout; @@ -207,6 +217,11 @@ public class TaskManagerServicesConfiguration { return pageSize; } + @Nullable // should only be null when flip49 is disabled + public MemorySize getShuffleMemorySize() { + return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getShuffleMemSize(); + } + long getTimerServiceShutdownTimeout() { return timerServiceShutdownTimeout; } @@ -259,6 +274,30 @@ public class TaskManagerServicesConfiguration { final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration); + if (configuration.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) { + final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); + return new TaskManagerServicesConfiguration( + configuration, + resourceID, + remoteAddress, + localCommunicationOnly, + tmpDirs, + localStateRootDir, + freeHeapMemoryWithDefrag, + maxJvmHeapMemory, + localRecoveryMode, + queryableStateConfig, + ConfigurationParserUtils.getSlot(configuration), + ConfigurationParserUtils.getManagedMemorySize(configuration), + ConfigurationParserUtils.getMemoryType(configuration), + ConfigurationParserUtils.getManagedMemoryFraction(configuration), + ConfigurationParserUtils.getPageSize(configuration), + taskExecutorResourceSpec, + timerServiceShutdownTimeout, + retryingRe
[flink] 15/21: Increase memory in YARNHighAvailabilityITCase
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit c499e102929b5793c9470e4527e6206d866f874a Author: Andrey Zagrebin AuthorDate: Mon Nov 4 15:24:02 2019 +0100 Increase memory in YARNHighAvailabilityITCase --- .../src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index a30046e..f082c84 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -288,7 +288,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { } private RestClusterClient deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException { - final int containerMemory = 256; + final int containerMemory = 1024; final ClusterClient yarnClusterClient = yarnClusterDescriptor.deploySessionCluster( new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(containerMemory)
[flink] 18/21: fix YarnConfigurationITCase
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit d123baf24e68ba7058bbb739954a59879bf5265d Author: Andrey Zagrebin AuthorDate: Tue Nov 5 09:14:50 2019 +0100 fix YarnConfigurationITCase --- .../src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java| 2 +- .../src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index 8350293..dbad597 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -85,7 +85,7 @@ public class YarnConfigurationITCase extends YarnTestBase { final Configuration configuration = new Configuration(flinkConfiguration); final int masterMemory = 64; - final int taskManagerMemory = 128; + final int taskManagerMemory = 512; final int slotsPerTaskManager = 3; // disable heap cutoff min diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index f958b70..74207ff 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -835,7 +835,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor { clusterSpecification.getSlotsPerTaskManager()); configuration.setString( - TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, + TaskManagerOptions.TOTAL_PROCESS_MEMORY, clusterSpecification.getTaskManagerMemoryMB() + "m"); // Upload the flink configuration
[flink] 12/21: Treat legacy TM heap size as total process memory, not flink memory
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce Author: Andrey Zagrebin AuthorDate: Wed Nov 6 16:24:08 2019 +0100 Treat legacy TM heap size as total process memory, not flink memory --- .../flink/configuration/TaskManagerOptions.java| 2 +- .../TaskExecutorResourceUtils.java | 27 +++--- .../TaskExecutorResourceUtilsTest.java | 16 ++--- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 7d1492c..d717ad5 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -253,6 +253,7 @@ public class TaskManagerOptions { public static final ConfigOption TOTAL_PROCESS_MEMORY = key("taskmanager.memory.total-process.size") .noDefaultValue() + .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key()) .withDescription("Total Process Memory size for the TaskExecutors. This includes all the memory that a" + " TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On" + " containerized setups, this should be set to the container memory."); @@ -264,7 +265,6 @@ public class TaskManagerOptions { public static final ConfigOption TOTAL_FLINK_MEMORY = key("taskmanager.memory.total-flink.size") .noDefaultValue() - .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key()) .withDescription("Total Flink Memory size for the TaskExecutors. This includes all the memory that a" + " TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory," + " Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java index 83d2d7d..f2a275f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java @@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils { private static MemorySize getTotalFlinkMemorySize(final Configuration config) { checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config)); - if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) { - return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY)); + return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY)); + } + + private static MemorySize getTotalProcessMemorySize(final Configuration config) { + checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config)); + if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) { + return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY)); } else { @SuppressWarnings("deprecation") final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB); @@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils { } } - private static MemorySize getTotalProcessMemorySize(final Configuration config) { - checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config)); - return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY)); - } - private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final Configuration config) { return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY); } @@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils { } private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final Configuration config) { - // backward compatible with the deprecated config option TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly - // configured by the user - @SuppressWarnings("deprecation") -
[flink] 02/21: [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit dd728794937d69b710accea513e4662c9e669949 Author: Xintong Song AuthorDate: Fri Sep 27 18:09:58 2019 +0800 [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools. This is to eliminate the dependencies on white space counts from 'BootstrapToolsTest#testGetTaskManagerShellCommand'. --- .../runtime/clusterframework/BootstrapTools.java | 4 +- .../clusterframework/BootstrapToolsTest.java | 20 - .../flink/yarn/YarnClusterDescriptorTest.java | 49 +++--- 3 files changed, 37 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 75de581..0b156ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -472,7 +472,9 @@ public class BootstrapTools { for (Map.Entry variable : startCommandValues .entrySet()) { template = template - .replace("%" + variable.getKey() + "%", variable.getValue()); + .replace("%" + variable.getKey() + "%", variable.getValue()) + .replace(" ", " ") + .trim(); } return template; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index 54aadf5..04ad29c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -165,8 +165,8 @@ public class BootstrapToolsTest extends TestLogger { assertEquals( java + " " + jvmmem + - " " + // jvmOpts - " " + // logging + "" + // jvmOpts + "" + // logging " " + mainClass + " " + args + " " + redirects, BootstrapTools .getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs", @@ -175,8 +175,8 @@ public class BootstrapToolsTest extends TestLogger { final String krb5 = "-Djava.security.krb5.conf=krb5.conf"; assertEquals( java + " " + jvmmem + - " " + " " + krb5 + // jvmOpts - " " + // logging + " " + krb5 + // jvmOpts + "" + // logging " " + mainClass + " " + args + " " + redirects, BootstrapTools .getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs", @@ -185,7 +185,7 @@ public class BootstrapToolsTest extends TestLogger { // logback only, with/out krb5 assertEquals( java + " " + jvmmem + - " " + // jvmOpts + "" + // jvmOpts " " + logfile + " " + logback + " " + mainClass + " " + args + " " + redirects, BootstrapTools @@ -194,7 +194,7 @@ public class BootstrapToolsTest extends TestLogger { assertEquals( java + " " + jvmmem + - " " + " " + krb5 + // jvmOpts + " " + krb5 + // jvmOpts " " + logfile + " " + logback + " " + mainClass + " " + args + " " + redirects, BootstrapTools @@ -204,7 +204,7 @@ public class BootstrapToolsTest extends TestLogger { // log4j, with/out krb5 assertEquals(
[flink] 11/21: Adjust memory configuration for local execution
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 4224fd9dd21a6fd6d96aa389676660a522c92e21 Author: Andrey Zagrebin AuthorDate: Thu Oct 31 11:53:43 2019 +0100 Adjust memory configuration for local execution --- .../MesosTaskManagerParametersTest.java| 32 -- .../TaskExecutorResourceUtils.java | 24 .../minicluster/MiniClusterConfiguration.java | 3 +- ...tractTaskManagerProcessFailureRecoveryTest.java | 2 ++ .../apache/flink/yarn/YarnResourceManagerTest.java | 2 ++ 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java index ead7a12..73a36d0 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java @@ -32,6 +32,7 @@ import java.util.List; import scala.Option; +import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -78,7 +79,7 @@ public class MesosTaskManagerParametersTest extends TestLogger { Configuration config = new Configuration(); config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_VOLUMES, "/host/path:/container/path:ro"); - MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + MesosTaskManagerParameters params = createMesosTaskManagerParameters(config); assertEquals(1, params.containerVolumes().size()); assertEquals("/container/path", params.containerVolumes().get(0).getContainerPath()); assertEquals("/host/path", params.containerVolumes().get(0).getHostPath()); @@ -90,7 +91,7 @@ public class MesosTaskManagerParametersTest extends TestLogger { Configuration config = new Configuration(); config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey=testValue"); - MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + MesosTaskManagerParameters params = createMesosTaskManagerParameters(config); assertEquals(params.dockerParameters().size(), 1); assertEquals(params.dockerParameters().get(0).getKey(), "testKey"); assertEquals(params.dockerParameters().get(0).getValue(), "testValue"); @@ -102,7 +103,7 @@ public class MesosTaskManagerParametersTest extends TestLogger { config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\""); - MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + MesosTaskManagerParameters params = createMesosTaskManagerParameters(config); assertEquals(params.dockerParameters().size(), 4); assertEquals(params.dockerParameters().get(0).getKey(), "testKey1"); assertEquals(params.dockerParameters().get(0).getValue(), "testValue1"); @@ -118,7 +119,7 @@ public class MesosTaskManagerParametersTest extends TestLogger { public void testContainerDockerParametersMalformed() throws Exception { Configuration config = new Configuration(); config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "badParam"); - MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + MesosTaskManagerParameters params = createMesosTaskManagerParameters(config); } @Test @@ -127,7 +128,7 @@ public class MesosTaskManagerParametersTest extends TestLogger { config.setString(MesosTaskManagerParameters.MESOS_TM_URIS, "file:///dev/null,http://localhost/test, test_url "); - MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + MesosTaskManagerParameters params = createMesosTaskManagerParameters(config); assertEquals(params.uris().size(), 3); assertEquals(params.uris().get(0), "file
[flink] 03/21: [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 30ce8c0059a03b494ad7cae93487f9b8c8117749 Author: Xintong Song AuthorDate: Thu Sep 26 19:36:20 2019 +0800 [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters. --- .../flink/configuration/ConfigurationUtils.java| 58 flink-dist/src/main/flink-bin/bin/config.sh| 14 flink-dist/src/test/bin/runBashJavaUtilsCmd.sh | 38 +++ .../org/apache/flink/dist/BashJavaUtilsTest.java | 54 +++ .../runtime/taskexecutor/TaskManagerRunner.java| 4 +- .../apache/flink/runtime/util/BashJavaUtils.java | 77 ++ .../TaskExecutorResourceUtilsTest.java | 44 +++-- 7 files changed, 250 insertions(+), 39 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index b94865c..b6d817c 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import javax.annotation.Nonnull; @@ -31,6 +32,7 @@ import java.util.Set; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL; +import static org.apache.flink.util.Preconditions.checkArgument; /** * Utility class for {@link Configuration} related helper functions. @@ -174,6 +176,62 @@ public class ConfigurationUtils { return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY; } + @VisibleForTesting + public static Map parseTmResourceDynamicConfigs(String dynamicConfigsStr) { + Map configs = new HashMap<>(); + String[] configStrs = dynamicConfigsStr.split(" "); + + checkArgument(configStrs.length % 2 == 0); + for (int i = 0; i < configStrs.length; ++i) { + String configStr = configStrs[i]; + if (i % 2 == 0) { + checkArgument(configStr.equals("-D")); + } else { + String[] configKV = configStr.split("="); + checkArgument(configKV.length == 2); + configs.put(configKV[0], configKV[1]); + } + } + + checkArgument(configs.containsKey(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())); + checkArgument(configs.containsKey(TaskManagerOptions.TASK_HEAP_MEMORY.key())); + checkArgument(configs.containsKey(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())); + checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())); + checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())); + checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())); + checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())); + + return configs; + } + + @VisibleForTesting + public static Map parseTmResourceJvmParams(String jvmParamsStr) { + final String xmx = "-Xmx"; + final String xms = "-Xms"; + final String maxDirect = "-XX:MaxDirectMemorySize="; + final String maxMetadata = "-XX:MaxMetaspaceSize="; + + Map configs = new HashMap<>(); + for (String paramStr : jvmParamsStr.split(" ")) { + if (paramStr.startsWith(xmx)) { + configs.put(xmx, paramStr.substring(xmx.length())); + } else if (paramStr.startsWith(xms)) { + configs.put(xms, paramStr.substring(xms.length())); + } else if (paramStr.startsWith(maxDirect)) { + configs.put(maxDirect, paramStr.substring(maxDirect.length())); + } else if (paramStr.startsWith(maxMetadata)) { + configs.put(maxMetadata, paramStr.substring(maxMetadata.length())); + } + } + + checkArgument(configs.containsKey(xmx))
[flink] 16/21: fix ClassLoaderITCase
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 71b1c3827b65560f004e067ec66ecf2bd1ae5e1f Author: Andrey Zagrebin AuthorDate: Mon Nov 4 18:27:57 2019 +0100 fix ClassLoaderITCase --- flink-dist/src/main/flink-bin/bin/config.sh| 26 -- flink-dist/src/main/flink-bin/bin/taskmanager.sh | 3 ++- .../flink/test/classloading/ClassLoaderITCase.java | 2 +- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 52ce960..8843877 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -798,6 +798,20 @@ runBashJavaUtilsCmd() { echo ${output} } +verifyTmResourceConfig() { + if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then + echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`" +else + flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) + FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) +fi + +if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." +exit 1 +fi +} + getTmResourceDynamicConfigsAndJvmParams() { if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == "true" ]]; then echo "$(getTmResourceDynamicConfigsAndJvmParamsFlip49)" @@ -817,18 +831,6 @@ getTmResourceDynamicConfigsAndJvmParamsFlip49() { } getTmResourceDynamicConfigsAndJvmParamsLegacy() { -if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then - echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`" -else - flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) - FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) -fi - -if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." -exit 1 -fi - if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index e78a1f1..a3d62e2 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -48,6 +48,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" # Startup parameters +verifyTmResourceConfig dynamic_configs_and_jvm_params=$(getTmResourceDynamicConfigsAndJvmParams) IFS=$'\n' lines=(${dynamic_configs_and_jvm_params}) @@ -55,7 +56,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then export JVM_ARGS="${JVM_ARGS} ${jvm_params}" dynamic_configs=${lines[1]} -ARGS=(${ARGS[@]} ${dynamic_configs}) +ARGS+=(${dynamic_configs}) ARGS+=("--configDir" "${FLINK_CONF_DIR}") fi diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index a09142e..f90a5f3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -116,7 +116,7 @@ public class ClassLoaderITCase extends TestLogger { FOLDER.newFolder().getAbsoluteFile().toURI().toString()); // required as we otherwise run out of memory - config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "80m"); + config.setString(TaskManagerOptions.TOTAL_FLINK_MEMORY, "512m"); miniClusterResource = new MiniClusterResource( new MiniClusterResourceConfiguration.Builder()
[flink] 08/21: [FLINK-13986][core][config] Change default value of flip49 feature flag to true
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit e1daac36f0ac6e5e2039823d0a48ef576839f437 Author: Xintong Song AuthorDate: Wed Oct 16 21:00:09 2019 +0800 [FLINK-13986][core][config] Change default value of flip49 feature flag to true --- .../main/java/org/apache/flink/configuration/TaskManagerOptions.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index b190ee8..7d1492c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -484,7 +484,7 @@ public class TaskManagerOptions { @Documentation.ExcludeFromDocumentation("FLIP-49 is still in development.") public static final ConfigOption ENABLE_FLIP_49_CONFIG = key("taskmanager.enable-flip-49") - .defaultValue(false) + .defaultValue(true) .withDescription("Toggle to switch between FLIP-49 and current task manager memory configurations."); /** Not intended to be instantiated. */
[flink] 07/21: [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit b4b482af4b93ee9fc98de08b7b427514df41d61b Author: Xintong Song AuthorDate: Mon Oct 14 16:09:23 2019 +0800 [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager. --- .../flink/runtime/taskexecutor/TaskManagerServices.java | 15 +++ .../taskexecutor/TaskManagerServicesConfiguration.java| 10 ++ 2 files changed, 25 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index e570799..91f611a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -51,7 +51,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -330,6 +332,19 @@ public class TaskManagerServices { */ private static MemoryManager createMemoryManager( TaskManagerServicesConfiguration taskManagerServicesConfiguration) { + if (taskManagerServicesConfiguration.getOnHeapManagedMemorySize() != null && + taskManagerServicesConfiguration.getOffHeapManagedMemorySize() != null) { + // flip49 enabled + + final Map memorySizeByType = new HashMap<>(); + memorySizeByType.put(MemoryType.HEAP, taskManagerServicesConfiguration.getOnHeapManagedMemorySize().getBytes()); + memorySizeByType.put(MemoryType.OFF_HEAP, taskManagerServicesConfiguration.getOffHeapManagedMemorySize().getBytes()); + + return new MemoryManager(memorySizeByType, + taskManagerServicesConfiguration.getNumberOfSlots(), + taskManagerServicesConfiguration.getPageSize()); + } + // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index c2133c9..bd84107 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -222,6 +222,16 @@ public class TaskManagerServicesConfiguration { return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getShuffleMemSize(); } + @Nullable // should only be null when flip49 is disabled + public MemorySize getOnHeapManagedMemorySize() { + return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getOnHeapManagedMemorySize(); + } + + @Nullable // should only be null when flip49 is disabled + public MemorySize getOffHeapManagedMemorySize() { + return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getOffHeapManagedMemorySize(); + } + long getTimerServiceShutdownTimeout() { return timerServiceShutdownTimeout; }
[flink] 21/21: run e2e
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 38b436cbd24d54b65bf7479b05f13920255271f9 Author: Andrey Zagrebin AuthorDate: Wed Nov 6 18:15:49 2019 +0100 run e2e --- .travis.yml | 157 1 file changed, 52 insertions(+), 105 deletions(-) diff --git a/.travis.yml b/.travis.yml index 109533f..d6fc195 100644 --- a/.travis.yml +++ b/.travis.yml @@ -71,7 +71,6 @@ stages: - name: compile - name: test - name: E2E -if: type = cron - name: cleanup jdk: "openjdk8" @@ -130,242 +129,190 @@ jobs: env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11" name: cleanup # hadoop 2.4.1 profile -- if: type = cron - stage: compile +- stage: compile script: ./tools/travis_controller.sh compile env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: compile - hadoop 2.4.1 -- if: type = cron - stage: test +- stage: test script: ./tools/travis_controller.sh core env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: core - hadoop 2.4.1 -- if: type = cron - script: ./tools/travis_controller.sh libraries +- script: ./tools/travis_controller.sh libraries env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: libraries - hadoop 2.4.1 -- if: type = cron - script: ./tools/travis_controller.sh blink_planner +- script: ./tools/travis_controller.sh blink_planner env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: blink_planner - hadoop 2.4.1 -- if: type = cron - script: ./tools/travis_controller.sh connectors +- script: ./tools/travis_controller.sh connectors env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: connectors - hadoop 2.4.1 -- if: type = cron - script: ./tools/travis_controller.sh kafka/gelly +- script: ./tools/travis_controller.sh kafka/gelly env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: kafka/gelly - hadoop 2.4.1 -- if: type = cron - script: ./tools/travis_controller.sh tests +- script: ./tools/travis_controller.sh tests env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: tests - hadoop 2.4.1 -- if: type = cron - script: ./tools/travis_controller.sh misc +- script: ./tools/travis_controller.sh misc env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: misc - hadoop 2.4.1 -- if: type = cron - stage: cleanup +- stage: cleanup script: ./tools/travis_controller.sh cleanup env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests" name: cleanup - hadoop 2.4.1 # scala 2.12 profile -- if: type = cron - stage: compile +- stage: compile script: ./tools/travis_controller.sh compile env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: compile - scala 2.12 -- if: type = cron - stage: test +- stage: test script: ./tools/travis_controller.sh core env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: core - scala 2.12 -- if: type = cron - script: ./tools/travis_controller.sh libraries +- script: ./tools/travis_controller.sh libraries env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: libraries - scala 2.12 -- if: type = cron - script: ./tools/travis_controller.sh blink_planner +- script: ./tools/travis_controller.sh blink_planner env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: blink_planner - scala 2.12 -- if: type = cron - script: ./tools/travis_controller.sh connectors +- script: ./tools/travis_controller.sh connectors env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: connectors - scala 2.12 -- if: type = cron - script: ./tools/travis_controller.sh kafka/gelly +- script: ./tools/travis_controller.sh kafka/gelly env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: kafka/gelly - scala 2.12 -- if: type = cron - script: ./tools/travis_controller.sh tests +- script: ./tools/travis_controller.sh tests env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 -Phive-1.2.1" name: tests - scala 2.12 -- if: type = cron - script: ./tools/travis_controller.sh misc +- script: ./tools/travis_control
[flink] 17/21: fix LaunchableMesosWorkerTest
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit dc0ba44b40fd86c1fb019f37383b849956125874 Author: Andrey Zagrebin AuthorDate: Mon Nov 4 18:31:06 2019 +0100 fix LaunchableMesosWorkerTest --- .../flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java index 3d53160..42095b8 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.mesos.scheduler.LaunchableTask; import org.apache.flink.mesos.util.MesosResourceAllocation; import org.apache.flink.mesos.util.MesosUtils; import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.util.TestLogger; import org.apache.mesos.Protos; @@ -86,6 +87,7 @@ public class LaunchableMesosWorkerTest extends TestLogger { configuration.setString(MesosOptions.MASTER_URL, "foobar"); final MemorySize memorySize = new MemorySize(1337L); configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, memorySize.toString()); + TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration); final LaunchableTask launchableTask = new LaunchableMesosWorker( ignored -> Option.empty(),
[flink] branch master updated (f6e39aa -> 306b83b)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f6e39aa [FLINK-14830][docs-zh] Correct the links in stream_checkpointing.zh.md page (#10233) add 7f3471c [hotfix] Add shortcuts for getting jvm heap / direct memory size. add 306b83b [FLINK-14637][core][runtime] Introduce config option for framework off-heap memory. No new revisions were added by this update. Summary of changes: .../task_manager_memory_configuration.html | 5 .../flink/configuration/TaskManagerOptions.java| 10 +++ .../clusterframework/TaskExecutorResourceSpec.java | 33 +- .../TaskExecutorResourceUtils.java | 32 +++-- .../TaskExecutorResourceUtilsTest.java | 18 ++-- 5 files changed, 85 insertions(+), 13 deletions(-)
[flink] branch master updated (d8a5d41 -> ec12ec0)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d8a5d41 [FLINK-14750][documentation] Regenerated restart strategy documentation add ec12ec0 [hotfix] Add String type in config docs for taskmanager.memory.framework.off-heap.size No new revisions were added by this update. Summary of changes: docs/_includes/generated/task_manager_memory_configuration.html | 1 + 1 file changed, 1 insertion(+)
[flink] branch master updated (7128fdc -> 36d1b62)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7128fdc [FLINK-14693][python] Fix Python tox checks failure on travis add b872f19 [hotfix] Introduce constants MemorySize#ZERO and MemorySize#MAX_VALUE. add 95fe886 [hotfix] Move SimpleSlotContext to test scope. add 740adc1 [hotfix] Fix misusage of UNKNOWN and ANY ResourceProfiles add 4a5e8c0 [hotfix] Deduplicate setting operator resource / parallelism argument checks. add 9d22410 [hotfix] UNKNOWN ResourceSpec should not be numerically compared. add 1cbee5e [hotfix] Make MemorySize comparable. add 2c81c3f [FLINK-14405][runtime] Update ResourceSpec to align with FLIP-49 resource types add b6c2943 [FLINK-14405][runtime] Update ResourceProfile to align with FLIP-49 resource types add 001733b [FLINK-14495][core] Limit ResourceSpec to always specify cpu cores and task heap memory size, unless it UNKNOWN. add 099e965 [hotfix] Remove unused constructors and unnecessary javadocs for ResourceSpec. add bfd6aa5 [hotfix] Remove unused constructors and unnecessary javadoces, and annotate constructors used only in testing codes as VisibleForTestting for ResourceProfile. add 36d1b62 [hotfix] Preserve the singleton property for ResourceProfile#ANY. No new revisions were added by this update. Summary of changes: .../flink/api/common/operators/ResourceSpec.java | 265 --- .../operators/util/OperatorValidationUtils.java| 79 + .../org/apache/flink/api/dag/Transformation.java | 11 +- .../org/apache/flink/configuration/MemorySize.java | 11 +- .../api/common/operators/ResourceSpecTest.java | 74 ++--- .../apache/flink/configuration/MemorySizeTest.java | 2 +- .../apache/flink/api/java/operators/DataSink.java | 13 +- .../flink/api/java/operators/DeltaIteration.java | 13 +- .../apache/flink/api/java/operators/Operator.java | 14 +- .../flink/api/java/operator/OperatorTest.java | 4 +- .../plantranslate/JobGraphGeneratorTest.java | 26 +- .../TaskExecutorResourceUtils.java | 4 +- .../clusterframework/types/ResourceProfile.java| 368 - .../runtime/executiongraph/ExecutionJobVertex.java | 3 +- .../runtime/taskexecutor/TaskManagerServices.java | 11 +- .../TaskExecutorResourceUtilsTest.java | 8 +- ...ocationPreferenceSlotSelectionStrategyTest.java | 2 +- .../types/ResourceProfileTest.java | 152 - .../flink/runtime/dispatcher/DispatcherTest.java | 2 +- .../executiongraph/utils/SimpleSlotProvider.java | 4 +- .../flink/runtime/instance/SimpleSlotContext.java | 2 +- .../flink/runtime/jobmaster/JobMasterTest.java | 6 +- .../jobmaster/slotpool/AllocatedSlotsTest.java | 2 +- .../jobmaster/slotpool/SingleLogicalSlotTest.java | 2 +- .../jobmaster/slotpool/SlotPoolCoLocationTest.java | 4 +- .../jobmaster/slotpool/SlotPoolImplTest.java | 16 +- .../slotpool/SlotPoolRequestCompletionTest.java| 2 +- .../slotpool/SlotPoolSlotSharingTest.java | 8 +- .../ResourceManagerTaskExecutorTest.java | 4 +- .../slotmanager/SlotManagerImplTest.java | 20 +- .../slotmanager/TestingResourceActionsBuilder.java | 4 +- .../TaskExecutorPartitionLifecycleTest.java| 2 +- .../runtime/taskexecutor/TaskExecutorTest.java | 38 +-- .../TaskSubmissionTestEnvironment.java | 2 +- .../taskexecutor/slot/TaskSlotTableTest.java | 2 +- .../streaming/api/datastream/DataStreamSink.java | 10 +- .../streaming/api/datastream/DataStreamSource.java | 10 +- .../api/datastream/SingleOutputStreamOperator.java | 23 +- .../apache/flink/streaming/api/DataStreamTest.java | 28 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 20 +- .../plan/nodes/resource/NodeResourceUtil.java | 5 +- 41 files changed, 692 insertions(+), 584 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorValidationUtils.java rename flink-runtime/src/{main => test}/java/org/apache/flink/runtime/instance/SimpleSlotContext.java (98%)
[flink] branch FLINK-13986-flip49-cleanup-e2e created (now 38b436c)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git. at 38b436c run e2e This branch includes the following new commits: new 2b9e5d9 [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes. new dd72879 [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools. new 30ce8c0 [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters. new 4088bea [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled. new 5b3ebcf [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec. new a28bdc8 [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment. new b4b482a [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager. new e1daac3 [FLINK-13986][core][config] Change default value of flip49 feature flag to true new 1d80b12 [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations. new 436f467 [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type. new 4224fd9 Adjust memory configuration for local execution new fd73130 Treat legacy TM heap size as total process memory, not flink memory new 5b5be51 Add backwards compatibility new 1b7df76 Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap) new c499e10 Increase memory in YARNHighAvailabilityITCase new 71b1c38 fix ClassLoaderITCase new dc0ba44 fix LaunchableMesosWorkerTest new d123baf fix YarnConfigurationITCase new 1f24495 Fix yarn cut off new b8c6a4c [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle) new 38b436c run e2e The 21 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] 01/21: [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git commit 2b9e5d9f545a136cede63bc924c03ea013029fe3 Author: Xintong Song AuthorDate: Thu Sep 26 19:38:55 2019 +0800 [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes. --- .../org/apache/flink/dist/JavaBashTestBase.java| 60 ++ ...TaskManagerHeapSizeCalculationJavaBashTest.java | 34 +--- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java new file mode 100644 index 000..63faaa2 --- /dev/null +++ b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.junit.Assume; +import org.junit.BeforeClass; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * Abstract test class for executing bash scripts. + */ +public abstract class JavaBashTestBase extends TestLogger { + @BeforeClass + public static void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts which are not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Executes the given shell script wrapper and returns its output. +* +* @param command command to run +* +* @return raw script output +*/ + protected String executeScript(final String[] command) throws IOException { + ProcessBuilder pb = new ProcessBuilder(command); + pb.redirectErrorStream(true); + Process process = pb.start(); + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + StringBuilder sb = new StringBuilder(); + String s; + while ((s = reader.readLine()) != null) { + sb.append(s); + } + return sb.toString(); + } +} diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java index 0b5f5be..4f172d9 100755 --- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java @@ -24,16 +24,10 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; -import org.apache.flink.util.OperatingSystem; -import org.apache.flink.util.TestLogger; -import org.junit.Assume; -import org.junit.Before; import org.junit.Test; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.util.Random; import static org.hamcrest.CoreMatchers.allOf; @@ -51,7 +45,7 @@ import static org.junit.Assert.assertThat; * double precision but our Java code restrains to float because we actually do * not need high precision. */ -public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { +public class TaskManagerHeapSizeCalculationJavaBashTest extends JavaBashTestBase { /** Key that is used by config.sh. */ private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.size"; @@ -64,12 +58,6 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { */ private static final int
[flink] branch master updated (88c206d -> 2142dc7)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 88c206d [hotfix][state-backends] Simple cleanups in RocksDBStateBackend add 4060acc [FLINK-15047][runtime] Fix format of setting managed memory size in ActiveResourceManagerFactory. add 1f5e56a [hotfix][runtime] Code clean-up in ResourceProfile. add 2142dc7 [FLINK-15023][runtime] Remove on-heap managed memory No new revisions were added by this update. Summary of changes: .../task_manager_memory_configuration.html | 16 +--- docs/ops/config.md | 2 +- docs/ops/config.zh.md | 2 +- .../flink/api/common/operators/ResourceSpec.java | 73 +-- .../flink/configuration/ConfigurationUtils.java| 1 - .../flink/configuration/TaskManagerOptions.java| 48 ++ .../clusterframework/TaskExecutorResourceSpec.java | 61 + .../TaskExecutorResourceUtils.java | 100 ++--- .../clusterframework/types/ResourceProfile.java| 88 ++ .../ActiveResourceManagerFactory.java | 2 +- .../runtime/taskexecutor/TaskManagerServices.java | 35 ++-- .../TaskManagerServicesConfiguration.java | 8 +- .../flink/runtime/taskexecutor/slot/TaskSlot.java | 7 +- .../clusterframework/BootstrapToolsTest.java | 3 +- .../TaskExecutorResourceUtilsTest.java | 85 +- .../types/ResourceProfileTest.java | 71 ++- .../jobmaster/slotpool/SlotSharingManagerTest.java | 9 +- .../runtime/taskexecutor/slot/TaskSlotUtils.java | 3 +- .../flink/streaming/api/graph/StreamConfig.java| 27 ++ .../api/graph/StreamingJobGraphGenerator.java | 23 ++--- .../api/graph/StreamingJobGraphGeneratorTest.java | 44 - .../plan/nodes/resource/NodeResourceUtil.java | 3 +- .../apache/flink/yarn/YarnResourceManagerTest.java | 3 +- 23 files changed, 163 insertions(+), 551 deletions(-)
[flink] branch master updated (5576851 -> 60b3f2f)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5576851 [FLINK-13986][core][config] Remove FLIP-49 feature option. add 60b3f2f [FLINK-14936][runtime] Introduce MemoryManager#computeMemorySize to calculate managed memory size from a fraction No new revisions were added by this update. Summary of changes: .../apache/flink/runtime/memory/MemoryManager.java | 14 .../flink/runtime/memory/MemoryManagerTest.java| 25 ++ 2 files changed, 39 insertions(+)
[flink] branch master updated (89bd90d -> 5576851)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 89bd90d [FLINK-15042][python] Fix python compatibility by excluding executeAsync add 5dca721 [hotfix][runtime] Map deprecated config option 'taskmanager.heap.size' and 'taskmanager.heap.mb' to total process memory. add 1bc28ef [hotfix][dist] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes. add 3704a0e [hotfix][runtime] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools. add 30132a8 [hotfix][mesos] Fix missing junit annotation for MesosTaskManagerParametersTest#testForcePullImageTrue. add 7027094 [hotfix][runtime] Fix backwards compatibility of NettyShuffleEnvironmentOptions#NETWORK_NUM_BUFFERS. add 96b735d [hotfix][runtime] If not specified, set Total Flink Memory according to JVM free heap memory for MiniCluster. add 82f8f42b [hotfix][runtime] Code style clean-up for TaskExecutorResourceUtilsTest add 99bc4fd [hotfix][runtime] Throw IllegalConfigurationException instead of NumberFormatException for negative memory sizes in TaskExecutorResourceUtils. add 6681e11 [hotfix][runtime] Add backwards compatibility for setting task executor memory through environment variable. add 4b8ed64 [hotfix][core][config] Change default value of TaskManagerOptions#MEMORY_OFF_HEAP from false to true. add 65e9507 [hotfix][core][config] Change default value of managed memory fraction from 0.5 to 0.4. add 8090cfd [hotfix][core][config] Change default framework off-heap memory size from 64m to 128m. add 0e6c8fa [hotfix][core][config] Change default jvm metaspace size from 192m to 128m. add 937eed6 [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters. add 9d1256c [FLINK-13983][runtime][yarn/mesos][dist] Launch task executor with new memory calculation logics add c324f1a [FLINK-13986][dist] Remove codes for legacy memory calculations in task executor startup scripts. add d24c324 [FLINK-13986][core][runtime] Remove java codes for legacy memory calculations. add ba1d711 [FLINK-13986][core][config] Remove legacy config option TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY. add 96da510 [FLINK-13986][core][config] Remove legacy config option TaskManagerOptions#LEGACY_MANAGED_MEMORY_FRACTION. add ffe323e [FLINK-13986][core][config] Remove legacy config option TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE. add 65460ec [FLINK-13986][core][config] Remove unused ConfigConstants#TASK_MANAGER_MEMORY_OFF_HEAP_KEY. add 96fe45e [FLINK-13986][core][config] Remove usage of legacy NETWORK_BUFFERS_MEMORY_(MIN|MAX|FRACTION). add 5bdc299 [FLINK-13986][doc] Update docs for managed memory configuration. add 5576851 [FLINK-13986][core][config] Remove FLIP-49 feature option. No new revisions were added by this update. Summary of changes: docs/_includes/generated/common_section.html | 4 +- .../task_manager_memory_configuration.html | 6 +- docs/ops/config.md | 10 +- docs/ops/config.zh.md | 10 +- docs/ops/deployment/cluster_setup.md | 6 +- docs/ops/deployment/cluster_setup.zh.md| 6 +- docs/ops/deployment/kubernetes.md | 2 +- docs/ops/deployment/kubernetes.zh.md | 2 +- docs/ops/deployment/mesos.md | 4 +- docs/ops/deployment/mesos.zh.md| 2 +- docs/ops/deployment/yarn_setup.md | 2 +- docs/ops/deployment/yarn_setup.zh.md | 2 +- .../client/deployment/ClusterSpecification.java| 6 +- .../kafka/KafkaShortRetentionTestBase.java | 2 +- .../streaming/connectors/kafka/KafkaTestBase.java | 2 +- .../kinesis/manualtests/ManualExactlyOnceTest.java | 2 +- .../ManualExactlyOnceWithStreamReshardingTest.java | 2 +- .../flink/configuration/ConfigConstants.java | 34 --- .../flink/configuration/ConfigurationUtils.java| 78 +++-- .../flink/configuration/TaskManagerOptions.java| 68 + flink-dist/src/main/flink-bin/bin/config.sh| 184 ++-- flink-dist/src/main/flink-bin/bin/taskmanager.sh | 30 +- flink-dist/src/main/resources/flink-conf.yaml | 8 +- flink-dist/src/test/bin/calcTMHeapSizeMB.sh| 50 .../{calcTMNetBufMem.sh => runBashJavaUtilsCmd.sh} | 21 +- .../org/apache/flink/dist/BashJavaUtilsTest.java | 54 .../org/apache/flink/dist/JavaBashTestBase.java| 60 ...TaskManagerHeapSizeCalculationJavaBashTest.java | 333 - flink-end-to-end-te
[flink] branch master updated (20e945d -> db3dec5)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 20e945d [FLINK-14898] Enable background cleanup of state with TTL by default add dbdaf5b [hotfix][core] Introduce divide operation to Resource add a78a5d4 [hotfix][core] Introduce divide operation to MemorySize add 5183be8 [hotfix][core] Introduce TaskExecutorResourceSpecBuilder for building TaskExecutorResourceSpec. add d2558f4 [FLINK-14188][runtime][yarn/mesos] Set container cpu cores into TaskExecutorResourceSpec when launching TaskExecutors on Yarn/Mesos. add db33a49 [FLINK-14188][runtime] Derive and register TaskExecutor to ResourceManager with default slot resource profile. add 66be972 [FLINK-14188][runtime] Use default slot resource profile derived from TaskExecutorResourceSpec on both RM and TM sides. add 25f79fb [hotfix][runtime] Wrap arguments of ResourceManagerGateway#registerTaskExecutor into TaskExecutorRegistration. add da071ce [FLINK-14189][runtime] TaskExecutor register to ResourceManager with total resource profile. add 516aee3 [FLINK-14189][runtime] Extend TaskExecutor to support dynamic slot allocation add db3dec5 [FLINK-14189][runtime] Do not store dynamic slots by index in TaskSlotTable No new revisions were added by this update. Summary of changes: .../flink/api/common/resources/Resource.java | 9 + .../org/apache/flink/configuration/MemorySize.java | 5 + .../flink/configuration/TaskManagerOptions.java| 27 ++- .../flink/api/common/resources/ResourceTest.java | 28 +++ .../apache/flink/configuration/MemorySizeTest.java | 13 ++ .../kubernetes/KubernetesResourceManager.java | 7 +- .../kubernetes/KubernetesResourceManagerTest.java | 13 +- .../flink/kubernetes/KubernetesUtilsTest.java | 2 + .../clusterframework/MesosResourceManager.java | 5 +- .../MesosTaskManagerParameters.java| 42 +++-- .../clusterframework/MesosResourceManagerTest.java | 18 +- .../MesosTaskManagerParametersTest.java| 9 + .../clusterframework/TaskExecutorResourceSpec.java | 12 +- .../TaskExecutorResourceSpecBuilder.java | 60 ++ .../TaskExecutorResourceUtils.java | 82 - .../types/ResourceBudgetManager.java | 76 .../runtime/clusterframework/types/SlotID.java | 14 +- .../resourcemanager/ActiveResourceManager.java | 11 +- .../runtime/resourcemanager/ResourceManager.java | 59 ++ .../resourcemanager/ResourceManagerGateway.java| 11 +- .../resourcemanager/TaskExecutorRegistration.java | 103 +++ .../slotmanager/SlotManagerImpl.java | 1 + .../flink/runtime/taskexecutor/TaskExecutor.java | 21 ++- .../runtime/taskexecutor/TaskExecutorGateway.java | 3 + .../TaskExecutorToResourceManagerConnection.java | 53 ++ .../taskexecutor/TaskManagerConfiguration.java | 25 ++- .../runtime/taskexecutor/TaskManagerRunner.java| 11 +- .../runtime/taskexecutor/TaskManagerServices.java | 45 + .../TaskManagerServicesConfiguration.java | 9 +- .../flink/runtime/taskexecutor/slot/SlotOffer.java | 1 - .../flink/runtime/taskexecutor/slot/TaskSlot.java | 74 ++-- .../runtime/taskexecutor/slot/TaskSlotState.java | 3 +- .../runtime/taskexecutor/slot/TaskSlotTable.java | 205 - .../clusterframework/BootstrapToolsTest.java | 2 + .../TaskExecutorResourceUtilsTest.java | 53 ++ .../types/ResourceBudgetManagerTest.java | 70 +++ .../ResourceManagerTaskExecutorTest.java | 23 ++- .../resourcemanager/ResourceManagerTest.java | 30 +-- .../slotmanager/SlotManagerImplTest.java | 52 +++--- .../utils/TestingResourceManagerGateway.java | 12 +- .../TaskExecutorLocalStateStoresManagerTest.java | 6 +- .../TaskExecutorPartitionLifecycleTest.java| 10 +- .../runtime/taskexecutor/TaskExecutorTest.java | 204 +++- ...askExecutorToResourceManagerConnectionTest.java | 27 ++- .../TaskSubmissionTestEnvironment.java | 11 +- .../taskexecutor/TestingTaskExecutorGateway.java | 11 +- .../TestingTaskExecutorGatewayBuilder.java | 9 +- .../taskexecutor/slot/TaskSlotTableTest.java | 135 ++ .../runtime/taskexecutor/slot/TaskSlotUtils.java | 36 ++-- .../runtime/util/JvmExitOnFatalErrorTest.java | 7 +- .../test/java/org/apache/flink/yarn/UtilsTest.java | 5 +- .../apache/flink/yarn/YarnClusterDescriptor.java | 5 +- .../org/apache/flink/yarn/YarnResourceManager.java | 31 +++- .../apache/flink/yarn/YarnResourceManagerTest.java | 15 +- 54 files changed, 1349 insertions(+), 462 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework
[flink] branch master updated (1858ce2 -> 20e945d)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1858ce2 [hotfix][hive] adapt HiveFunctionDefinitionFactory.createFunctionDefinition to latest APPI add 20e945d [FLINK-14898] Enable background cleanup of state with TTL by default No new revisions were added by this update. Summary of changes: docs/dev/stream/state/state.md | 58 ++ .../flink/api/common/state/StateTtlConfig.java | 20 ++-- .../flink/api/common/state/StateTtlConfigTest.java | 54 ++-- .../flink/runtime/state/ttl/TtlStateTestBase.java | 1 + 4 files changed, 82 insertions(+), 51 deletions(-)
[flink] branch master updated (1489cb0 -> f90fd6b)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1489cb0 [FLINK-15130][core] Deprecate RequiredParameters and Option add f90fd6b [FLINK-15136][docs] Update the Chinese version of Working with State TTL No new revisions were added by this update. Summary of changes: docs/dev/stream/state/state.zh.md | 39 ++- 1 file changed, 18 insertions(+), 21 deletions(-)
[flink] branch master updated (5c89d12 -> 2c11291)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5c89d12 [hotfix][doc] update Hive functions page with more info add 2c11291 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests No new revisions were added by this update. Summary of changes: .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++ .../streaming/tests/TtlVerifyUpdateFunction.java | 21 + 2 files changed, 30 insertions(+), 27 deletions(-)
[flink] branch release-1.8 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new dfcbf68 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests dfcbf68 is described below commit dfcbf68dbd792fc27c997078be7a59a594005d8b Author: Yangze Guo AuthorDate: Tue Dec 10 10:03:49 2019 +0800 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests --- .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++ .../streaming/tests/TtlVerifyUpdateFunction.java | 21 + 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java index 24bc9dc..c2e66f1 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java @@ -19,12 +19,15 @@ package org.apache.flink.streaming.tests; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkState; + /** * A stub implementation of a {@link TtlTimeProvider} which guarantees that * processing time increases monotonically. @@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { private static final Object lock = new Object(); @GuardedBy("lock") - static long freeze() { + static T doWithFrozenTime(FunctionWithException action) throws E { synchronized (lock) { - if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { - timeIsFrozen = true; - return getCurrentTimestamp(); - } else { - return lastReturnedProcessingTime; - } + final long timestampBeforeUpdate = freeze(); + T result = action.apply(timestampBeforeUpdate); + final long timestampAfterUpdate = unfreezeTime(); + + checkState(timestampAfterUpdate == timestampBeforeUpdate, + "Timestamps before and after the update do not match."); + return result; + } + } + + private static long freeze() { + if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { + timeIsFrozen = true; + return getCurrentTimestamp(); + } else { + return lastReturnedProcessingTime; } } @@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { return lastReturnedProcessingTime; } - @GuardedBy("lock") - static long unfreezeTime() { - synchronized (lock) { - timeIsFrozen = false; - return lastReturnedProcessingTime; - } + private static long unfreezeTime() { + timeIsFrozen = false; + return lastReturnedProcessingTime; } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index 94e9dbd..ed69171 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -47,7 +47,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Update state with TTL for each verifier. @@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction verifier, Object update) throws Exception { - final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze(); - State sta
[flink] branch release-1.9 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new 8a73d68 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests 8a73d68 is described below commit 8a73d680869da0d7bb4d543bfc197d01f3b0e068 Author: Yangze Guo AuthorDate: Tue Dec 10 10:03:49 2019 +0800 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests --- .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++ .../streaming/tests/TtlVerifyUpdateFunction.java | 21 + 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java index 24bc9dc..c2e66f1 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java @@ -19,12 +19,15 @@ package org.apache.flink.streaming.tests; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkState; + /** * A stub implementation of a {@link TtlTimeProvider} which guarantees that * processing time increases monotonically. @@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { private static final Object lock = new Object(); @GuardedBy("lock") - static long freeze() { + static T doWithFrozenTime(FunctionWithException action) throws E { synchronized (lock) { - if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { - timeIsFrozen = true; - return getCurrentTimestamp(); - } else { - return lastReturnedProcessingTime; - } + final long timestampBeforeUpdate = freeze(); + T result = action.apply(timestampBeforeUpdate); + final long timestampAfterUpdate = unfreezeTime(); + + checkState(timestampAfterUpdate == timestampBeforeUpdate, + "Timestamps before and after the update do not match."); + return result; + } + } + + private static long freeze() { + if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { + timeIsFrozen = true; + return getCurrentTimestamp(); + } else { + return lastReturnedProcessingTime; } } @@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { return lastReturnedProcessingTime; } - @GuardedBy("lock") - static long unfreezeTime() { - synchronized (lock) { - timeIsFrozen = false; - return lastReturnedProcessingTime; - } + private static long unfreezeTime() { + timeIsFrozen = false; + return lastReturnedProcessingTime; } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index 94e9dbd..ed69171 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -47,7 +47,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Update state with TTL for each verifier. @@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction verifier, Object update) throws Exception { - final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze(); - State sta
[flink] branch master updated (68bba73 -> fa1dadc)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 68bba73 [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py add fa1dadc [FLINK-15063][metric]fix input group and output group of the task metric are reversed No new revisions were added by this update. Summary of changes: .../org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch release-1.9 updated (d9f8abb -> 1be72a8)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from d9f8abb [hotfix] Let YarnResourceManagerTest run asynchronous tasks in main thread add 1be72a8 [FLINK-15063][metric]fix input group and output group of the task metric are reversed No new revisions were added by this update. Summary of changes: .../org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (fbd5b63 -> 6df53ea)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fbd5b63 [FLINK-14342][table][python] Remove method FunctionDefinition#getLanguage. add 6df53ea [FLINK-14522] Revert FLINK-13985 (sun.misc.Cleaner is not available in Java 9+) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/flink/core/memory/MemorySegmentFactory.java | 5 ++--- .../src/main/java/org/apache/flink/core/memory/MemoryUtils.java | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-)
[flink] branch master updated (ca5e518 -> 60cc18b)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ca5e518 [FLINK-14215][docs] Add how to configure environment variables to documentation add 60cc18b [FLINK-12628][Runtime / Coordination] Remove no consumers check in Execution.getPartitionMaxParallelism No new revisions were added by this update. Summary of changes: .../org/apache/flink/runtime/executiongraph/Execution.java | 12 1 file changed, 4 insertions(+), 8 deletions(-)
[flink] branch master updated (d5bb073 -> 11ef253)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d5bb073 [hotfix] Fix typos in MemorySegment java docs add 11ef253 [FLINK-12289][flink-runtime]Fix typos in Memory manager No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/runtime/memory/MemoryManager.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-)
[flink] branch master updated (fe966d4 -> d5bb073)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fe966d4 [FLINK-14490][table-api] Rework insertInto method add f71ce33 [FLINK-12223][Runtime]HeapMemorySegment.getArray should return null after being freed add d5bb073 [hotfix] Fix typos in MemorySegment java docs No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/core/memory/HeapMemorySegment.java | 2 +- .../main/java/org/apache/flink/core/memory/MemorySegment.java | 6 +++--- .../org/apache/flink/core/memory/HeapMemorySegmentTest.java| 10 ++ 3 files changed, 14 insertions(+), 4 deletions(-)
[flink] branch master updated (fe966d4 -> d5bb073)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fe966d4 [FLINK-14490][table-api] Rework insertInto method add f71ce33 [FLINK-12223][Runtime]HeapMemorySegment.getArray should return null after being freed add d5bb073 [hotfix] Fix typos in MemorySegment java docs No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/core/memory/HeapMemorySegment.java | 2 +- .../main/java/org/apache/flink/core/memory/MemorySegment.java | 6 +++--- .../org/apache/flink/core/memory/HeapMemorySegmentTest.java| 10 ++ 3 files changed, 14 insertions(+), 4 deletions(-)
[flink] branch master updated (fe966d4 -> d5bb073)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fe966d4 [FLINK-14490][table-api] Rework insertInto method add f71ce33 [FLINK-12223][Runtime]HeapMemorySegment.getArray should return null after being freed add d5bb073 [hotfix] Fix typos in MemorySegment java docs No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/core/memory/HeapMemorySegment.java | 2 +- .../main/java/org/apache/flink/core/memory/MemorySegment.java | 6 +++--- .../org/apache/flink/core/memory/HeapMemorySegmentTest.java| 10 ++ 3 files changed, 14 insertions(+), 4 deletions(-)
[flink] branch master updated (e1f5ead -> 7334e0a)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e1f5ead [FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed add 6b5a00c [hotfix] Annotate MemoryManager methods used for testing with the @VisibleForTesting add 7a62d72 [hotfix] Introduce MemoryManagerBuilder for tests add d8bfc30 [hotfix] Remove unsed methods and fields in MemoryManager add 8f889a7 [hotfix] Checkstyle fixes in MemoryManager add ffa7bdd [hotfix] Refactor MemoryManager constructor add 597c969 [hotfix] Remove and deprecate memory preallocation in MemoryManager add 76349cf [FLINK-13984] Separate on-heap and off-heap managed memory pools add 036505f [FLINK-14399] Implement reservation of memory chunks in MemoryManager add 7334e0a [hotfix] Enable memory manager with zero memory or zero pages No new revisions were added by this update. Summary of changes: .../task_manager_memory_configuration.html | 5 - docs/ops/config.md | 2 +- docs/ops/config.zh.md | 2 +- .../flink/configuration/ConfigConstants.java | 17 - .../flink/configuration/TaskManagerOptions.java| 24 +- flink-dist/src/main/flink-bin/bin/config.sh| 7 - flink-dist/src/main/flink-bin/bin/taskmanager.sh | 2 +- flink-dist/src/main/resources/flink-conf.yaml | 11 - .../apache/flink/runtime/memory/MemoryManager.java | 861 ++--- .../MemoryReservationException.java} | 16 +- .../runtime/taskexecutor/TaskManagerServices.java | 52 +- .../TaskManagerServicesConfiguration.java | 11 - .../flink/runtime/util/KeyedBudgetManager.java | 294 +++ .../flink/runtime/io/disk/ChannelViewsTest.java| 8 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 9 +- .../runtime/io/disk/FileChannelStreamsTest.java| 6 +- .../io/disk/SeekableFileChannelInputViewTest.java | 8 +- .../flink/runtime/io/disk/SpillingBufferTest.java | 3 +- .../runtime/io/disk/iomanager/IOManagerITCase.java | 3 +- .../flink/runtime/memory/MemoryManagerBuilder.java | 67 ++ .../MemoryManagerConcurrentModReleaseTest.java | 13 +- .../memory/MemoryManagerLazyAllocationTest.java| 198 - .../flink/runtime/memory/MemoryManagerTest.java| 140 +++- .../runtime/memory/MemorySegmentSimpleTest.java| 7 +- .../runtime/operators/drivers/TestTaskContext.java | 7 +- .../runtime/operators/hash/HashTableITCase.java| 5 +- .../hash/NonReusingHashJoinIteratorITCase.java | 3 +- .../operators/hash/ReOpenableHashTableITCase.java | 8 +- .../hash/ReOpenableHashTableTestBase.java | 8 +- .../hash/ReusingHashJoinIteratorITCase.java| 3 +- .../BlockResettableMutableObjectIteratorTest.java | 3 +- .../NonReusingBlockResettableIteratorTest.java | 3 +- .../ReusingBlockResettableIteratorTest.java| 3 +- .../resettable/SpillingResettableIteratorTest.java | 7 +- ...pillingResettableMutableObjectIteratorTest.java | 3 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 3 +- .../sort/CombiningUnilateralSortMergerITCase.java | 3 +- .../runtime/operators/sort/ExternalSortITCase.java | 3 +- .../sort/ExternalSortLargeRecordsITCase.java | 3 +- .../sort/FixedLengthRecordSorterTest.java | 8 +- .../operators/sort/LargeRecordHandlerITCase.java | 14 +- .../operators/sort/LargeRecordHandlerTest.java | 20 +- ...NonReusingSortMergeInnerJoinIteratorITCase.java | 3 +- .../operators/sort/NormalizedKeySorterTest.java| 8 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 3 +- .../operators/sort/UnilateralSortMergerTest.java | 6 +- .../testutils/BinaryOperatorTestBase.java | 3 +- .../operators/testutils/DriverTestBase.java| 3 +- .../operators/testutils/MockEnvironment.java | 3 +- .../operators/testutils/UnaryOperatorTestBase.java | 3 +- .../operators/util/HashVsSortMiniBenchmark.java| 8 +- .../runtime/taskexecutor/TaskExecutorTest.java | 13 +- .../taskexecutor/TaskManagerRunnerStartupTest.java | 20 - .../taskexecutor/TaskManagerServicesBuilder.java | 13 +- .../flink/runtime/taskmanager/TestTaskBuilder.java | 3 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 3 +- .../flink/runtime/util/KeyedBudgetManagerTest.java | 261 +++ .../runtime/tasks/StreamMockEnvironment.java | 3 +- .../runtime/tasks/StreamTaskTerminationTest.java | 4 +- .../runtime/hashtable/BinaryHashTableTest.java | 34 +- .../table/runtime/hashtable/LongHashTableTest.java | 3 +- .../operators/aggregate/BytesHashMapTest.java | 40 +- .../runtime/operators/aggregate/HashAggTest.java | 3 +- .../join/Int2SortMergeJoinOperatorTest.java| 3
[flink] branch master updated (e1f5ead -> 7334e0a)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e1f5ead [FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed add 6b5a00c [hotfix] Annotate MemoryManager methods used for testing with the @VisibleForTesting add 7a62d72 [hotfix] Introduce MemoryManagerBuilder for tests add d8bfc30 [hotfix] Remove unsed methods and fields in MemoryManager add 8f889a7 [hotfix] Checkstyle fixes in MemoryManager add ffa7bdd [hotfix] Refactor MemoryManager constructor add 597c969 [hotfix] Remove and deprecate memory preallocation in MemoryManager add 76349cf [FLINK-13984] Separate on-heap and off-heap managed memory pools add 036505f [FLINK-14399] Implement reservation of memory chunks in MemoryManager add 7334e0a [hotfix] Enable memory manager with zero memory or zero pages No new revisions were added by this update. Summary of changes: .../task_manager_memory_configuration.html | 5 - docs/ops/config.md | 2 +- docs/ops/config.zh.md | 2 +- .../flink/configuration/ConfigConstants.java | 17 - .../flink/configuration/TaskManagerOptions.java| 24 +- flink-dist/src/main/flink-bin/bin/config.sh| 7 - flink-dist/src/main/flink-bin/bin/taskmanager.sh | 2 +- flink-dist/src/main/resources/flink-conf.yaml | 11 - .../apache/flink/runtime/memory/MemoryManager.java | 861 ++--- .../MemoryReservationException.java} | 16 +- .../runtime/taskexecutor/TaskManagerServices.java | 52 +- .../TaskManagerServicesConfiguration.java | 11 - .../flink/runtime/util/KeyedBudgetManager.java | 294 +++ .../flink/runtime/io/disk/ChannelViewsTest.java| 8 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 9 +- .../runtime/io/disk/FileChannelStreamsTest.java| 6 +- .../io/disk/SeekableFileChannelInputViewTest.java | 8 +- .../flink/runtime/io/disk/SpillingBufferTest.java | 3 +- .../runtime/io/disk/iomanager/IOManagerITCase.java | 3 +- .../flink/runtime/memory/MemoryManagerBuilder.java | 67 ++ .../MemoryManagerConcurrentModReleaseTest.java | 13 +- .../memory/MemoryManagerLazyAllocationTest.java| 198 - .../flink/runtime/memory/MemoryManagerTest.java| 140 +++- .../runtime/memory/MemorySegmentSimpleTest.java| 7 +- .../runtime/operators/drivers/TestTaskContext.java | 7 +- .../runtime/operators/hash/HashTableITCase.java| 5 +- .../hash/NonReusingHashJoinIteratorITCase.java | 3 +- .../operators/hash/ReOpenableHashTableITCase.java | 8 +- .../hash/ReOpenableHashTableTestBase.java | 8 +- .../hash/ReusingHashJoinIteratorITCase.java| 3 +- .../BlockResettableMutableObjectIteratorTest.java | 3 +- .../NonReusingBlockResettableIteratorTest.java | 3 +- .../ReusingBlockResettableIteratorTest.java| 3 +- .../resettable/SpillingResettableIteratorTest.java | 7 +- ...pillingResettableMutableObjectIteratorTest.java | 3 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 3 +- .../sort/CombiningUnilateralSortMergerITCase.java | 3 +- .../runtime/operators/sort/ExternalSortITCase.java | 3 +- .../sort/ExternalSortLargeRecordsITCase.java | 3 +- .../sort/FixedLengthRecordSorterTest.java | 8 +- .../operators/sort/LargeRecordHandlerITCase.java | 14 +- .../operators/sort/LargeRecordHandlerTest.java | 20 +- ...NonReusingSortMergeInnerJoinIteratorITCase.java | 3 +- .../operators/sort/NormalizedKeySorterTest.java| 8 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 3 +- .../operators/sort/UnilateralSortMergerTest.java | 6 +- .../testutils/BinaryOperatorTestBase.java | 3 +- .../operators/testutils/DriverTestBase.java| 3 +- .../operators/testutils/MockEnvironment.java | 3 +- .../operators/testutils/UnaryOperatorTestBase.java | 3 +- .../operators/util/HashVsSortMiniBenchmark.java| 8 +- .../runtime/taskexecutor/TaskExecutorTest.java | 13 +- .../taskexecutor/TaskManagerRunnerStartupTest.java | 20 - .../taskexecutor/TaskManagerServicesBuilder.java | 13 +- .../flink/runtime/taskmanager/TestTaskBuilder.java | 3 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 3 +- .../flink/runtime/util/KeyedBudgetManagerTest.java | 261 +++ .../runtime/tasks/StreamMockEnvironment.java | 3 +- .../runtime/tasks/StreamTaskTerminationTest.java | 4 +- .../runtime/hashtable/BinaryHashTableTest.java | 34 +- .../table/runtime/hashtable/LongHashTableTest.java | 3 +- .../operators/aggregate/BytesHashMapTest.java | 40 +- .../runtime/operators/aggregate/HashAggTest.java | 3 +- .../join/Int2SortMergeJoinOperatorTest.java| 3
[flink] branch master updated (7334e0a -> 56126bd)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7334e0a [hotfix] Enable memory manager with zero memory or zero pages add 56126bd [FLINK-14447] Network metrics doc table render confusion No new revisions were added by this update. Summary of changes: docs/monitoring/metrics.md| 4 ++-- docs/monitoring/metrics.zh.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-)
[flink] branch master updated (7334e0a -> 56126bd)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7334e0a [hotfix] Enable memory manager with zero memory or zero pages add 56126bd [FLINK-14447] Network metrics doc table render confusion No new revisions were added by this update. Summary of changes: docs/monitoring/metrics.md| 4 ++-- docs/monitoring/metrics.zh.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-)
[flink] branch master updated (c31e44e -> 6d0a827)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c31e44e [FLINK-14235][kafka,tests] Change source in at-least-once test from finite to infinite add 8e0b35c [hotfix] Use MemorySegmentFactory in tests instead of HybridMemorySegment constructors add d154ae5 [hotfix] Minor refactoring of OperationsOnFreedSegmentTest add afe2161 [FLINK-13985] Use unsafe memory for managed memory add 6d0a827 [hotfix] KeyedBudgetManagerTest extends TestLogger No new revisions were added by this update. Summary of changes: .../flink/core/memory/HybridMemorySegment.java | 70 - .../flink/core/memory/MemorySegmentFactory.java| 47 ++-- .../org/apache/flink/core/memory/MemoryUtils.java | 81 .../flink/core/memory/CrossSegmentTypeTest.java| 51 - .../flink/core/memory/EndiannessAccessChecks.java | 10 ++- .../HybridOffHeapDirectMemorySegmentTest.java} | 29 --- .../memory/HybridOffHeapMemorySegmentTest.java | 21 + .../HybridOffHeapUnsafeMemorySegmentTest.java} | 29 --- .../core/memory/HybridOnHeapMemorySegmentTest.java | 6 +- .../flink/core/memory/MemorySegmentChecksTest.java | 14 +--- .../flink/core/memory/MemorySegmentTestBase.java | 5 ++ .../core/memory/MemorySegmentUndersizedTest.java | 21 - .../core/memory/OperationsOnFreedSegmentTest.java | 89 -- .../apache/flink/runtime/memory/MemoryManager.java | 4 +- .../flink/runtime/util/KeyedBudgetManagerTest.java | 3 +- 15 files changed, 254 insertions(+), 226 deletions(-) copy flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java => test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java} (59%) copy flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java => test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java} (58%)
[flink] branch master updated (c31e44e -> 6d0a827)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c31e44e [FLINK-14235][kafka,tests] Change source in at-least-once test from finite to infinite add 8e0b35c [hotfix] Use MemorySegmentFactory in tests instead of HybridMemorySegment constructors add d154ae5 [hotfix] Minor refactoring of OperationsOnFreedSegmentTest add afe2161 [FLINK-13985] Use unsafe memory for managed memory add 6d0a827 [hotfix] KeyedBudgetManagerTest extends TestLogger No new revisions were added by this update. Summary of changes: .../flink/core/memory/HybridMemorySegment.java | 70 - .../flink/core/memory/MemorySegmentFactory.java| 47 ++-- .../org/apache/flink/core/memory/MemoryUtils.java | 81 .../flink/core/memory/CrossSegmentTypeTest.java| 51 - .../flink/core/memory/EndiannessAccessChecks.java | 10 ++- .../HybridOffHeapDirectMemorySegmentTest.java} | 29 --- .../memory/HybridOffHeapMemorySegmentTest.java | 21 + .../HybridOffHeapUnsafeMemorySegmentTest.java} | 29 --- .../core/memory/HybridOnHeapMemorySegmentTest.java | 6 +- .../flink/core/memory/MemorySegmentChecksTest.java | 14 +--- .../flink/core/memory/MemorySegmentTestBase.java | 5 ++ .../core/memory/MemorySegmentUndersizedTest.java | 21 - .../core/memory/OperationsOnFreedSegmentTest.java | 89 -- .../apache/flink/runtime/memory/MemoryManager.java | 4 +- .../flink/runtime/util/KeyedBudgetManagerTest.java | 3 +- 15 files changed, 254 insertions(+), 226 deletions(-) copy flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java => test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java} (59%) copy flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java => test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java} (58%)
[flink] branch master updated (0a9db1e -> ad531f0)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0a9db1e [FLINK-14466][runtime] Let YarnJobClusterEntrypoint use user code class loader add cb62c5f [hotfix] Refactor TaskExecutorTest.testTaskSubmission to not use mocking add d3ecd3d [hotfix] Introduce TaskSlotUtils for tests add 01d6972 [hotfix] Refactor out slots creation from the TaskSlotTable constructor add 9fed0dd [FLINK-14400] Shrink the scope of MemoryManager from TaskExecutor to slot add 4c4652e [hotfix] Remove unused number of slots in MemoryManager add 48986aa [hotfix] Remove unnecessary comments about memory size calculation before network init add ad531f0 [hotfix] Remove redundant timerService in TaskExecutorTest No new revisions were added by this update. Summary of changes: .../apache/flink/runtime/memory/MemoryManager.java | 16 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 15 +- .../runtime/taskexecutor/TaskManagerServices.java | 88 ++- .../flink/runtime/taskexecutor/slot/TaskSlot.java | 39 - .../runtime/taskexecutor/slot/TaskSlotTable.java | 43 -- .../flink/runtime/memory/MemoryManagerBuilder.java | 8 +- .../TaskExecutorPartitionLifecycleTest.java| 8 +- .../runtime/taskexecutor/TaskExecutorTest.java | 164 +++-- .../taskexecutor/TaskManagerServicesBuilder.java | 14 +- .../TaskSubmissionTestEnvironment.java | 13 +- .../taskexecutor/slot/TaskSlotTableTest.java | 19 +-- .../runtime/taskexecutor/slot/TaskSlotUtils.java | 77 ++ .../operators/aggregate/BytesHashMapTest.java | 6 - 13 files changed, 268 insertions(+), 242 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java
[flink] branch master updated (ad531f0 -> 7201206)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ad531f0 [hotfix] Remove redundant timerService in TaskExecutorTest add 7201206 [FLINK-14720] Bring down ExecutionVertex#deployToSlot access modifier and mark it with @VisibleForTesting annotation No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java| 1 + 1 file changed, 1 insertion(+)
[flink] branch master updated (6691894 -> 20f6976)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6691894 [hotfix] Correct wrong log level in TaskLocalStateStoreImpl#storeLocalState add 20f6976 [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB No new revisions were added by this update. Summary of changes: docs/_includes/generated/rocks_db_configurable_configuration.html | 2 +- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (3d657b4 -> 8930f62)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3d657b4 [FLINK-14939][e2e] Set distDir property add 9f6b204 [FLINK-14901] Throw Error in MemoryUtils if there is problem with using system classes over reflection add 8930f62 [hotfix] Annotate interfaces in JavaGcCleanerWrapper with @FunctionalInterface No new revisions were added by this update. Summary of changes: .../org/apache/flink/core/memory/MemoryUtils.java | 33 ++ .../apache/flink/util/JavaGcCleanerWrapper.java| 13 + 2 files changed, 23 insertions(+), 23 deletions(-)
[flink] branch master updated (daa2f95 -> 07b66b6)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from daa2f95 [FLINK-14974][runtime] Calculate managed memory fractions with BigDecimal add 07b66b6 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() No new revisions were added by this update. Summary of changes: .../connectors/cassandra/CassandraSinkBase.java| 2 +- .../cassandra/CassandraSinkBaseTest.java | 51 ++ 2 files changed, 34 insertions(+), 19 deletions(-)
[flink] branch release-1.8 updated: [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 8747e3d [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() 8747e3d is described below commit 8747e3d4ec29394fa65e875b9da68b2af863f92a Author: Mads Chr. Olesen AuthorDate: Wed Nov 27 14:56:16 2019 +0100 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() --- .../connectors/cassandra/CassandraSinkBase.java| 2 +- .../cassandra/CassandraSinkBaseTest.java | 51 ++ 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 5d758be..1b6e3b3 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -131,7 +131,7 @@ public abstract class CassandraSinkBase extends RichSinkFunction impl final ListenableFuture result; try { result = send(value); - } catch (Exception e) { + } catch (Throwable e) { semaphore.release(); throw e; } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java index b4406ab..709e5b7 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java @@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; @@ -41,8 +40,12 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.when; @@ -79,7 +82,7 @@ public class CassandraSinkBaseTest { casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null)); final int originalPermits = casSinkFunc.getAvailablePermits(); - Assert.assertThat(originalPermits, greaterThan(0)); + assertThat(originalPermits, greaterThan(0)); Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); casSinkFunc.invoke("hello"); @@ -274,21 +277,29 @@ public class CassandraSinkBaseTest { } @Test(timeout = DEFAULT_TEST_TIMEOUT) - public void testReleaseOnSendException() throws Exception { + public void testReleaseOnThrowingSend() throws Exception { final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder() .setMaxConcurrentRequests(1) .build(); - try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) { - Assert.assertEquals(1, testCassandraSink.getAvailablePermits()); - Assert.assertEquals(0, testCassandraSink.getAcquiredPermits()); + Function> failingSendFunction = ignoredMessage -> { + throwCheckedAsUnchecked(new Throwable("expected")); + //noinspection ReturnOfNull + return null; + }; + try (TestCassandraSink testCassandraSink = new MockCassandraSink(config, failingSendFunction)) { + testCassandraSink.open(new Configuration()); +
[flink] branch release-1.9 updated: [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new 6039e11 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() 6039e11 is described below commit 6039e11c1cad20fe3468715ff594a49cbdc8d95e Author: Mads Chr. Olesen AuthorDate: Wed Nov 27 14:56:16 2019 +0100 [FLINK-14976][cassandra] Release semaphore on all Throwable's in send() --- .../connectors/cassandra/CassandraSinkBase.java| 2 +- .../cassandra/CassandraSinkBaseTest.java | 51 ++ 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 0e7eb6f..76ed9c7 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -132,7 +132,7 @@ public abstract class CassandraSinkBase extends RichSinkFunction impl final ListenableFuture result; try { result = send(value); - } catch (Exception e) { + } catch (Throwable e) { semaphore.release(); throw e; } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java index 3ce9742..5c0a431 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java @@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; @@ -42,9 +41,13 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.when; @@ -81,7 +84,7 @@ public class CassandraSinkBaseTest { casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null)); final int originalPermits = casSinkFunc.getAvailablePermits(); - Assert.assertThat(originalPermits, greaterThan(0)); + assertThat(originalPermits, greaterThan(0)); Assert.assertEquals(0, casSinkFunc.getAcquiredPermits()); casSinkFunc.invoke("hello"); @@ -277,21 +280,29 @@ public class CassandraSinkBaseTest { } @Test(timeout = DEFAULT_TEST_TIMEOUT) - public void testReleaseOnSendException() throws Exception { + public void testReleaseOnThrowingSend() throws Exception { final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder() .setMaxConcurrentRequests(1) .build(); - try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) { - Assert.assertEquals(1, testCassandraSink.getAvailablePermits()); - Assert.assertEquals(0, testCassandraSink.getAcquiredPermits()); + Function> failingSendFunction = ignoredMessage -> { + throwCheckedAsUnchecked(new Throwable("expected")); + //noinspection ReturnOfNull + return null; + }; + try (TestCassandraSink testCassandraSink = new MockCassandraSink(c
[flink] branch master updated (6c6ada5 -> e41c0fe)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6c6ada5 [FLINK-14556][python] Correct the package structure of cloudpickle add e41c0fe [FLINK-14522] Introduce JavaGcCleanerWrapper to find Java GC Cleaner depending on JVM version No new revisions were added by this update. Summary of changes: .../flink/core/memory/MemorySegmentFactory.java| 5 +- .../org/apache/flink/core/memory/MemoryUtils.java | 9 +- .../apache/flink/util/JavaGcCleanerWrapper.java| 255 + .../flink/util/JavaGcCleanerWrapperTest.java | 29 ++- 4 files changed, 274 insertions(+), 24 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java copy flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeadersTest.java => flink-core/src/test/java/org/apache/flink/util/JavaGcCleanerWrapperTest.java (57%)
[flink] branch master updated (96f4c79 -> ef7ee76)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 96f4c79 [FLINK-15268][build] Correctly set Multi-Release in manifest. add ef7ee76 [FLINK-15065][docs] Correct default value of RocksDB options in documentation No new revisions were added by this update. Summary of changes: docs/_includes/generated/rocks_db_configurable_configuration.html | 6 +++--- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-)
[flink] branch release-1.8 updated: [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new c0f6f95 [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB c0f6f95 is described below commit c0f6f95353b5be284b0fd1cd53c5950ceeebbf56 Author: Yun Tang AuthorDate: Mon Nov 25 02:00:40 2019 +0800 [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB Correct the default write buffer size of RocksDB to '64MB' --- docs/_includes/generated/rocks_db_configurable_configuration.html | 2 +- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html b/docs/_includes/generated/rocks_db_configurable_configuration.html index fa0e0ff..b29054c 100644 --- a/docs/_includes/generated/rocks_db_configurable_configuration.html +++ b/docs/_includes/generated/rocks_db_configurable_configuration.html @@ -60,7 +60,7 @@ state.backend.rocksdb.writebuffer.size (none) -The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as '4MB'. +The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'. diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 261e5f0..a8213bc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -94,7 +94,7 @@ public class RocksDBConfigurableOptions implements Serializable { key("state.backend.rocksdb.writebuffer.size") .noDefaultValue() .withDescription("The amount of data built up in memory (backed by an unsorted log on disk) " + - "before converting to a sorted on-disk files. RocksDB has default writebuffer size as '4MB'."); + "before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'."); public static final ConfigOption MAX_WRITE_BUFFER_NUMBER = key("state.backend.rocksdb.writebuffer.count")
[flink] branch release-1.9 updated: [FLINK-15065][docs] Correct default value of RocksDB options in documentation
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new ea5f241 [FLINK-15065][docs] Correct default value of RocksDB options in documentation ea5f241 is described below commit ea5f2418331d8d54cb47313edcdf74923f0f19c8 Author: Yun Tang AuthorDate: Fri Dec 13 16:38:45 2019 +0800 [FLINK-15065][docs] Correct default value of RocksDB options in documentation This refer to https://github.com/facebook/rocksdb/pull/6123 which correctis RocksDB javadoc --- docs/_includes/generated/rocks_db_configurable_configuration.html | 6 +++--- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html b/docs/_includes/generated/rocks_db_configurable_configuration.html index b29054c..11fcfcf 100644 --- a/docs/_includes/generated/rocks_db_configurable_configuration.html +++ b/docs/_includes/generated/rocks_db_configurable_configuration.html @@ -20,12 +20,12 @@ state.backend.rocksdb.compaction.level.max-size-level-base (none) -The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '10MB'. +The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '256MB'. state.backend.rocksdb.compaction.level.target-file-size-base (none) -The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '2MB'. +The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '64MB'. state.backend.rocksdb.compaction.level.use-dynamic-size @@ -40,7 +40,7 @@ state.backend.rocksdb.files.open (none) -The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '5000'. +The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '-1'. state.backend.rocksdb.thread.num diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index a8213bc..f321feb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -53,7 +53,7 @@ public class RocksDBConfigurableOptions implements Serializable { key("state.backend.rocksdb.files.open") .noDefaultValue() .withDescription("The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. " + - "RocksDB has default configuration as '5000'."); + "RocksDB has default configuration as '-1'."); //-- // Provided configurable ColumnFamilyOptions within Flink @@ -82,13 +82,13 @@ public class RocksDBConfigurableOptions implements Serializable { key("state.backend.rocksdb.compaction.level.target-file-size-base") .noDefaultValue() .withDescription("The target file size for compaction, which determines a level-1 file size. " + - "RocksDB has default configuration as '2MB'."); + "RocksDB has default configuration as '64MB'."); public static final ConfigOption MAX_SIZE_LEVEL_BASE = key("state.backend.rocksdb.compaction.level.max-size-level-base") .noDefaultValue() .withDescription("The upper-bound of the total size of level base files in bytes. " + - "RocksDB has default configuration as '10MB'."); + "RocksDB has default configuration as '256MB'."); public static final ConfigOption WRITE_BUFFER_SIZE = key("state.backend.rocksdb.writebuffer.size")
[flink] branch release-1.8 updated: [FLINK-15065][docs] Correct default value of RocksDB options in documentation
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new b38ada0 [FLINK-15065][docs] Correct default value of RocksDB options in documentation b38ada0 is described below commit b38ada077c2ce487dde64a04cb0857d91327c04d Author: Yun Tang AuthorDate: Fri Dec 13 16:38:45 2019 +0800 [FLINK-15065][docs] Correct default value of RocksDB options in documentation This refer to https://github.com/facebook/rocksdb/pull/6123 which correctis RocksDB javadoc --- docs/_includes/generated/rocks_db_configurable_configuration.html | 6 +++--- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html b/docs/_includes/generated/rocks_db_configurable_configuration.html index b29054c..11fcfcf 100644 --- a/docs/_includes/generated/rocks_db_configurable_configuration.html +++ b/docs/_includes/generated/rocks_db_configurable_configuration.html @@ -20,12 +20,12 @@ state.backend.rocksdb.compaction.level.max-size-level-base (none) -The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '10MB'. +The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '256MB'. state.backend.rocksdb.compaction.level.target-file-size-base (none) -The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '2MB'. +The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '64MB'. state.backend.rocksdb.compaction.level.use-dynamic-size @@ -40,7 +40,7 @@ state.backend.rocksdb.files.open (none) -The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '5000'. +The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '-1'. state.backend.rocksdb.thread.num diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index a8213bc..f321feb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -53,7 +53,7 @@ public class RocksDBConfigurableOptions implements Serializable { key("state.backend.rocksdb.files.open") .noDefaultValue() .withDescription("The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. " + - "RocksDB has default configuration as '5000'."); + "RocksDB has default configuration as '-1'."); //-- // Provided configurable ColumnFamilyOptions within Flink @@ -82,13 +82,13 @@ public class RocksDBConfigurableOptions implements Serializable { key("state.backend.rocksdb.compaction.level.target-file-size-base") .noDefaultValue() .withDescription("The target file size for compaction, which determines a level-1 file size. " + - "RocksDB has default configuration as '2MB'."); + "RocksDB has default configuration as '64MB'."); public static final ConfigOption MAX_SIZE_LEVEL_BASE = key("state.backend.rocksdb.compaction.level.max-size-level-base") .noDefaultValue() .withDescription("The upper-bound of the total size of level base files in bytes. " + - "RocksDB has default configuration as '10MB'."); + "RocksDB has default configuration as '256MB'."); public static final ConfigOption WRITE_BUFFER_SIZE = key("state.backend.rocksdb.writebuffer.size")
[flink] branch release-1.10 updated: [FLINK-15065][docs] Correct default value of RocksDB options in documentation
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 836b8ae [FLINK-15065][docs] Correct default value of RocksDB options in documentation 836b8ae is described below commit 836b8ae4c178be6c91fbb64580361ab50bfecdc0 Author: Yun Tang AuthorDate: Fri Dec 13 16:38:45 2019 +0800 [FLINK-15065][docs] Correct default value of RocksDB options in documentation This refer to https://github.com/facebook/rocksdb/pull/6123 which correctis RocksDB javadoc --- docs/_includes/generated/rocks_db_configurable_configuration.html | 6 +++--- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html b/docs/_includes/generated/rocks_db_configurable_configuration.html index 47e3bef..f46fc27 100644 --- a/docs/_includes/generated/rocks_db_configurable_configuration.html +++ b/docs/_includes/generated/rocks_db_configurable_configuration.html @@ -24,13 +24,13 @@ state.backend.rocksdb.compaction.level.max-size-level-base (none) MemorySize -The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '10MB'. +The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '256MB'. state.backend.rocksdb.compaction.level.target-file-size-base (none) MemorySize -The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '2MB'. +The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '64MB'. state.backend.rocksdb.compaction.level.use-dynamic-size @@ -48,7 +48,7 @@ state.backend.rocksdb.files.open (none) Integer -The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '5000'. +The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '-1'. state.backend.rocksdb.thread.num diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 1f27c3f..e4dda8b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -58,7 +58,7 @@ public class RocksDBConfigurableOptions implements Serializable { .intType() .noDefaultValue() .withDescription("The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. " + - "RocksDB has default configuration as '5000'."); + "RocksDB has default configuration as '-1'."); //-- // Provided configurable ColumnFamilyOptions within Flink @@ -90,14 +90,14 @@ public class RocksDBConfigurableOptions implements Serializable { .memoryType() .noDefaultValue() .withDescription("The target file size for compaction, which determines a level-1 file size. " + - "RocksDB has default configuration as '2MB'."); + "RocksDB has default configuration as '64MB'."); public static final ConfigOption MAX_SIZE_LEVEL_BASE = key("state.backend.rocksdb.compaction.level.max-size-level-base") .memoryType() .noDefaultValue() .withDescription("The upper-bound of the total size of level base files in bytes. " + - "RocksDB has default configuration as '10MB'."); + "RocksDB has default configuration as '256MB'."); public static final ConfigOption WRITE_BUFFER_SIZE = key("state.backend.rocksdb.writebuffer.size")
[flink] branch release-1.9 updated: [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new c221872 [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB c221872 is described below commit c22187205841b1bc7b9b1c769674b92c78a1244f Author: Yun Tang AuthorDate: Mon Nov 25 02:00:40 2019 +0800 [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB Correct the default write buffer size of RocksDB to '64MB' --- docs/_includes/generated/rocks_db_configurable_configuration.html | 2 +- .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html b/docs/_includes/generated/rocks_db_configurable_configuration.html index fa0e0ff..b29054c 100644 --- a/docs/_includes/generated/rocks_db_configurable_configuration.html +++ b/docs/_includes/generated/rocks_db_configurable_configuration.html @@ -60,7 +60,7 @@ state.backend.rocksdb.writebuffer.size (none) -The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as '4MB'. +The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'. diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 261e5f0..a8213bc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -94,7 +94,7 @@ public class RocksDBConfigurableOptions implements Serializable { key("state.backend.rocksdb.writebuffer.size") .noDefaultValue() .withDescription("The amount of data built up in memory (backed by an unsorted log on disk) " + - "before converting to a sorted on-disk files. RocksDB has default writebuffer size as '4MB'."); + "before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'."); public static final ConfigOption MAX_WRITE_BUFFER_NUMBER = key("state.backend.rocksdb.writebuffer.count")
[flink] branch master updated (2b13a41 -> 3cf8fe5)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2b13a41 [FLINK-16362][table] Remove deprecated `emitDataStream` method in StreamTableSink add 3cf8fe5 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job No new revisions were added by this update. Summary of changes: .../streaming/state/RocksDBStateBackend.java | 32 +++-- .../state/RocksDBStateBackendConfigTest.java | 33 ++ 2 files changed, 57 insertions(+), 8 deletions(-)
[flink] branch release-1.9 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new e64a2e0 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job e64a2e0 is described below commit e64a2e0d1ba3391fac23c04543235bb8bb047c33 Author: Aitozi <1059789...@qq.com> AuthorDate: Wed Sep 4 22:59:09 2019 +0800 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job This closes #8479. --- .../streaming/state/RocksDBStateBackend.java | 32 +++- .../state/RocksDBStateBackendConfigTest.java | 34 ++ 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index c5604e5..cfac3b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -147,7 +147,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private TernaryBoolean enableTtlCompactionFilter; /** This determines the type of priority queue state. */ - private final PriorityQueueStateType priorityQueueStateType; + @Nullable + private PriorityQueueStateType priorityQueueStateType; /** The default rocksdb metrics options. */ private final RocksDBNativeMetricOptions defaultMetricOptions; @@ -265,8 +266,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS; - // for now, we use still the heap-based implementation as default - this.priorityQueueStateType = PriorityQueueStateType.HEAP; this.defaultMetricOptions = new RocksDBNativeMetricOptions(); this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED; } @@ -314,10 +313,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); - final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); - - this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? - PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + if (null == original.priorityQueueStateType) { + this.priorityQueueStateType = config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY); + } else { + this.priorityQueueStateType = original.priorityQueueStateType; + } // configure local directories if (original.localRocksDbDirectories != null) { @@ -507,7 +507,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu keyGroupRange, executionConfig, localRecoveryConfig, - priorityQueueStateType, + getPriorityQueueStateType(), ttlTimeProvider, metricGroup, stateHandles, @@ -716,6 +716,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu enableTtlCompactionFilter = TernaryBoolean.TRUE; } + /** +* Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. +* @return The type of the priority queue state. +*/ + public PriorityQueueStateType getPriorityQueueStateType() { + return priorityQueueStateType == null ? + PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : priorityQueueStateType; + } + + /** +* Sets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. +*/ + public void setPriorityQueueStateType(PriorityQueueStateType priorityQueue
[flink] branch release-1.10 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 9310b26 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job 9310b26 is described below commit 9310b2672d89c392d25edc33755e291dac4b7398 Author: Aitozi <1059789...@qq.com> AuthorDate: Wed Sep 4 22:59:09 2019 +0800 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job This closes #8479. --- .../streaming/state/RocksDBStateBackend.java | 32 +++-- .../state/RocksDBStateBackendConfigTest.java | 33 ++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index f071104..873f57d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -153,7 +153,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private final RocksDBMemoryConfiguration memoryConfiguration; /** This determines the type of priority queue state. */ - private final PriorityQueueStateType priorityQueueStateType; + @Nullable + private PriorityQueueStateType priorityQueueStateType; /** The default rocksdb metrics options. */ private final RocksDBNativeMetricOptions defaultMetricOptions; @@ -274,8 +275,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS; - // use RocksDB-based implementation as default from FLINK-15637 - this.priorityQueueStateType = PriorityQueueStateType.ROCKSDB; this.defaultMetricOptions = new RocksDBNativeMetricOptions(); this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED; this.memoryConfiguration = new RocksDBMemoryConfiguration(); @@ -333,10 +332,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.memoryConfiguration = RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration, config); this.memoryConfiguration.validate(); - final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); - - this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? - PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + if (null == original.priorityQueueStateType) { + this.priorityQueueStateType = config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY); + } else { + this.priorityQueueStateType = original.priorityQueueStateType; + } // configure local directories if (original.localRocksDbDirectories != null) { @@ -533,7 +533,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu keyGroupRange, executionConfig, localRecoveryConfig, - priorityQueueStateType, + getPriorityQueueStateType(), ttlTimeProvider, metricGroup, stateHandles, @@ -770,6 +770,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu enableTtlCompactionFilter = TernaryBoolean.FALSE; } + /** +* Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. +* @return The type of the priority queue state. +*/ + public PriorityQueueStateType getPriorityQueueStateType() { + return priorityQueueStateType == null ? + PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : priorityQueueStateType; + } + + /** +* Sets the type of the priority queue state. It will fallback to the default value, if it is not expli
[flink] branch release-1.8 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 60d9b96 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job 60d9b96 is described below commit 60d9b96456f142f8d18d5882016840a00159403e Author: Aitozi <1059789...@qq.com> AuthorDate: Wed Sep 4 22:59:09 2019 +0800 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job This closes #8479. --- .../streaming/state/RocksDBStateBackend.java | 32 +++- .../state/RocksDBStateBackendConfigTest.java | 34 ++ 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index c5604e5..cfac3b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -147,7 +147,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private TernaryBoolean enableTtlCompactionFilter; /** This determines the type of priority queue state. */ - private final PriorityQueueStateType priorityQueueStateType; + @Nullable + private PriorityQueueStateType priorityQueueStateType; /** The default rocksdb metrics options. */ private final RocksDBNativeMetricOptions defaultMetricOptions; @@ -265,8 +266,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS; - // for now, we use still the heap-based implementation as default - this.priorityQueueStateType = PriorityQueueStateType.HEAP; this.defaultMetricOptions = new RocksDBNativeMetricOptions(); this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED; } @@ -314,10 +313,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); - final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); - - this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? - PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + if (null == original.priorityQueueStateType) { + this.priorityQueueStateType = config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY); + } else { + this.priorityQueueStateType = original.priorityQueueStateType; + } // configure local directories if (original.localRocksDbDirectories != null) { @@ -507,7 +507,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu keyGroupRange, executionConfig, localRecoveryConfig, - priorityQueueStateType, + getPriorityQueueStateType(), ttlTimeProvider, metricGroup, stateHandles, @@ -716,6 +716,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu enableTtlCompactionFilter = TernaryBoolean.TRUE; } + /** +* Gets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. +* @return The type of the priority queue state. +*/ + public PriorityQueueStateType getPriorityQueueStateType() { + return priorityQueueStateType == null ? + PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : priorityQueueStateType; + } + + /** +* Sets the type of the priority queue state. It will fallback to the default value, if it is not explicitly set. +*/ + public void setPriorityQueueStateType(PriorityQueueStateType priorityQueue
[flink] 02/04: [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit ac2aaf9277333a6d8ac5aa1c0c81189f56e6ffd4 Author: Xintong Song AuthorDate: Mon Feb 17 15:43:16 2020 +0800 [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'. This closes #0. --- .../kubernetes/KubernetesResourceManager.java | 3 ++- .../kubernetes/KubernetesResourceManagerTest.java | 28 ++ .../MesosTaskManagerParameters.java| 4 ++-- .../clusterframework/TaskExecutorProcessUtils.java | 5 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java index 4c94a28..fdf3afc 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java @@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -338,6 +339,6 @@ public class KubernetesResourceManager extends ActiveResourceManager MESOS_RM_TASKS_CPUS = key("mesos.resourcemanager.tasks.cpus") + .doubleType() .defaultValue(0.0) .withDescription("CPUs to assign to the Mesos workers."); @@ -424,8 +425,7 @@ public class MesosTaskManagerParameters { } private static double getCpuCores(final Configuration configuration) { - double fallback = configuration.getDouble(MESOS_RM_TASKS_CPUS); - return TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue(); + return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, MESOS_RM_TASKS_CPUS); } private static MemorySize getTotalProcessMemory(final Configuration configuration) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java index 917c575..9b7ae12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java @@ -644,6 +644,11 @@ public class TaskExecutorProcessUtils { return getCpuCoresWithFallback(config, -1.0); } + public static double getCpuCoresWithFallbackConfigOption(final Configuration config, ConfigOption fallbackOption) { + double fallbackValue = config.getDouble(fallbackOption); + return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, fallbackValue).getValue().doubleValue(); + } + public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) { final double cpuCores; if (config.contains(TaskManagerOptions.CPU_CORES)) {
[flink] branch release-1.10 updated (543e24c -> 82f6dc2)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 543e24c [FLINK-16013][core] Make complex type config options could be parsed correctly new ab3e390 [hotfix] Minor clean-up in TaskExecutorProcessUtils. new ac2aaf9 [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'. new 8858d75 [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order. new 82f6dc2 [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order. The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../kubernetes/KubernetesResourceManager.java | 3 +- .../kubernetes/KubernetesResourceManagerTest.java | 28 +++ .../MesosTaskManagerParameters.java| 4 +- .../MesosTaskManagerParametersTest.java| 3 ++ .../clusterframework/TaskExecutorProcessUtils.java | 11 +++-- .../apache/flink/yarn/YarnResourceManagerTest.java | 55 ++ 6 files changed, 96 insertions(+), 8 deletions(-)
[flink] 01/04: [hotfix] Minor clean-up in TaskExecutorProcessUtils.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit ab3e3906c5f362a539dd7a6fe2f400dfab32a93e Author: Xintong Song AuthorDate: Mon Feb 24 19:01:46 2020 +0800 [hotfix] Minor clean-up in TaskExecutorProcessUtils. --- .../flink/runtime/clusterframework/TaskExecutorProcessUtils.java | 8 ++-- 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java index 31b65d5..917c575 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java @@ -640,15 +640,11 @@ public class TaskExecutorProcessUtils { } } - public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) { - return getCpuCores(config, fallback); - } - private static CPUResource getCpuCores(final Configuration config) { - return getCpuCores(config, -1.0); + return getCpuCoresWithFallback(config, -1.0); } - private static CPUResource getCpuCores(final Configuration config, double fallback) { + public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) { final double cpuCores; if (config.contains(TaskManagerOptions.CPU_CORES)) { cpuCores = config.getDouble(TaskManagerOptions.CPU_CORES);
[flink] 03/04: [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 8858d75697ba4a3e888b8a86981b4bb6e1bd558f Author: Xintong Song AuthorDate: Mon Feb 17 19:51:58 2020 +0800 [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order. --- .../apache/flink/yarn/YarnResourceManagerTest.java | 55 ++ 1 file changed, 55 insertions(+) diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 2798076..54f26b6 100755 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.yarn; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -56,6 +57,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; @@ -526,6 +528,59 @@ public class YarnResourceManagerTest extends TestLogger { }}; } + @Test + public void testGetCpuCoresCommonOption() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0); + configuration.setInteger(YarnConfigOptions.VCORES, 2); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); + + new Context() {{ + runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(1.0))); + }}; + } + + @Test + public void testGetCpuCoresYarnOption() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setInteger(YarnConfigOptions.VCORES, 2); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); + + new Context() {{ + runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(2.0))); + }}; + } + + @Test + public void testGetCpuCoresNumSlots() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); + + new Context() {{ + runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(3.0))); + }}; + } + + @Test + public void testGetCpuRoundUp() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setDouble(TaskManagerOptions.CPU_CORES, 0.5); + + new Context() {{ + runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(1.0))); + }}; + } + + @Test(expected = IllegalConfigurationException.class) + public void testGetCpuExceedMaxInt() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setDouble(TaskManagerOptions.CPU_CORES, Double.MAX_VALUE); + + new Context() {{ + resourceManager.getCpuCores(configuration); + }}; + } + private void registerSlotRequest( TestingYarnResourceManager resourceManager, MockResourceManagerRuntimeServices rmServices,
[flink] 04/04: [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order.
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 82f6dc2dbcf16f0cd7b28d562c2e2d75b3060eb3 Author: Xintong Song AuthorDate: Mon Feb 17 19:57:43 2020 +0800 [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order. --- .../mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java index c91..5792f77 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java @@ -232,6 +232,8 @@ public class MesosTaskManagerParametersTest extends TestLogger { public void testConfigCpuCores() { Configuration config = getConfiguration(); config.setDouble(TaskManagerOptions.CPU_CORES, 1.5); + config.setDouble(MesosTaskManagerParameters.MESOS_RM_TASKS_CPUS, 2.5); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(config); assertThat(mesosTaskManagerParameters.cpus(), is(1.5)); } @@ -240,6 +242,7 @@ public class MesosTaskManagerParametersTest extends TestLogger { public void testLegacyConfigCpuCores() { Configuration config = getConfiguration(); config.setDouble(MesosTaskManagerParameters.MESOS_RM_TASKS_CPUS, 1.5); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(config); assertThat(mesosTaskManagerParameters.cpus(), is(1.5)); }
[flink] branch master updated (93da5ec -> 74fc20a)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 93da5ec [FLINK-16263][python][tests] Set io.netty.tryReflectionSetAccessible to true for JDK9+ add 35f967c [hotfix] Fix minor IDE warnings in MemoryUtils add 74fc20a [FLINK-15094] Use Unsafe to instantiate and construct DirectByteBuffer No new revisions were added by this update. Summary of changes: .../flink/core/memory/HybridMemorySegment.java | 51 ++- .../org/apache/flink/core/memory/MemoryUtils.java | 76 ++ 2 files changed, 54 insertions(+), 73 deletions(-)
[flink] 01/02: [hotfix] Fix minor IDE warnings in MemoryUtils
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 9ccbc1fb0584597f0775519ad87472f3667a95f0 Author: Andrey Zagrebin AuthorDate: Thu Feb 20 09:56:01 2020 +0100 [hotfix] Fix minor IDE warnings in MemoryUtils --- .../src/main/java/org/apache/flink/core/memory/MemoryUtils.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java index fcdf427..c807145 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java @@ -33,7 +33,7 @@ import java.nio.ByteOrder; public class MemoryUtils { /** The "unsafe", which can be used to perform native memory accesses. */ - @SuppressWarnings("restriction") + @SuppressWarnings({"restriction", "UseOfSunClasses"}) public static final sun.misc.Unsafe UNSAFE = getUnsafe(); /** The native byte order of the platform on which the system currently runs. */ @@ -50,7 +50,7 @@ public class MemoryUtils { } catch (SecurityException e) { throw new Error("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e); } catch (NoSuchFieldException e) { - throw new Error("The static handle field in sun.misc.Unsafe was not found."); + throw new Error("The static handle field in sun.misc.Unsafe was not found.", e); } catch (IllegalArgumentException e) { throw new Error("Bug: Illegal argument reflection access for static field.", e); } catch (IllegalAccessException e) { @@ -64,7 +64,6 @@ public class MemoryUtils { private MemoryUtils() {} private static Constructor getDirectBufferPrivateConstructor() { - //noinspection OverlyBroadCatchBlock try { Constructor constructor = ByteBuffer.allocateDirect(1).getClass().getDeclaredConstructor(long.class, int.class); @@ -107,7 +106,6 @@ public class MemoryUtils { * @param address address of the unsafe memory to release * @return action to run to release the unsafe memory manually */ - @SuppressWarnings("UseOfSunClasses") static Runnable createMemoryGcCleaner(Object owner, long address) { return JavaGcCleanerWrapper.create(owner, () -> releaseUnsafe(address)); }
[flink] 02/02: [FLINK-15094] Use Unsafe to instantiate and construct DirectByteBuffer
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 02255ed8996a49ce53c605cbc5850921e2d52f6d Author: Andrey Zagrebin AuthorDate: Thu Feb 20 10:12:16 2020 +0100 [FLINK-15094] Use Unsafe to instantiate and construct DirectByteBuffer If we use reflection to create a DirectByteBuffer to wrap unsafe native memory allocations, it causes illegal access warnings in Java9+. This PR changes this to use Unsafe to instantiate a DirectByteBuffer. The address and capacity fields are set by direct unsafe memory operations. Other fields are set by calling ByteBuffer#clear at the end. Unsafe operations skips the illegal access verification and do not result in warnings. This solution still relies on Unsafe which is about to be removed in future Java releases. If it is removed and we still do not want to contribute to direct memory by allocating native managed memory, we will have to find alternartive solutions, like e.g. writing a custom native allocator and use JNI API to instantiate the wrapping DirectByteBuffer (NewDirectByteBuffer in C). This closes #11160 --- .../flink/core/memory/HybridMemorySegment.java | 51 ++-- .../org/apache/flink/core/memory/MemoryUtils.java | 70 +++--- 2 files changed, 52 insertions(+), 69 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java index fbb9837..1693e9a 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java @@ -26,12 +26,13 @@ import javax.annotation.Nullable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.lang.reflect.Field; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.ReadOnlyBufferException; +import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress; + /** * This class represents a piece of memory managed by Flink. * @@ -74,7 +75,7 @@ public final class HybridMemorySegment extends MemorySegment { * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. */ HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable Runnable cleaner) { - super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner); + super(getByteBufferAddress(buffer), buffer.capacity(), owner); this.offHeapBuffer = buffer; this.cleaner = cleaner; } @@ -310,7 +311,7 @@ public final class HybridMemorySegment extends MemorySegment { if (target.isDirect()) { // copy to the target memory directly - final long targetPointer = getAddress(target) + targetOffset; + final long targetPointer = getByteBufferAddress(target) + targetOffset; final long sourcePointer = address + offset; if (sourcePointer <= addressLimit - numBytes) { @@ -354,7 +355,7 @@ public final class HybridMemorySegment extends MemorySegment { if (source.isDirect()) { // copy to the target memory directly - final long sourcePointer = getAddress(source) + sourceOffset; + final long sourcePointer = getByteBufferAddress(source) + sourceOffset; final long targetPointer = address + offset; if (targetPointer <= addressLimit - numBytes) { @@ -383,46 +384,4 @@ public final class HybridMemorySegment extends MemorySegment { } } } - - // - // Utilities for native memory accesses and checks - // - - /** -* The reflection fields with which we access the off-heap pointer from direct ByteBuffers. -*/ - private static final Field ADDRESS_FIELD; - - static { - try { - ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); - ADDRESS_FIELD.setAccessible(true); - } - catch (Throwable t) { - throw new RuntimeException( - "Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t); - } -
[flink] branch release-1.10 updated (82f6dc2 -> 02255ed8)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 82f6dc2 [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order. new 9ccbc1f [hotfix] Fix minor IDE warnings in MemoryUtils new 02255ed8 [FLINK-15094] Use Unsafe to instantiate and construct DirectByteBuffer The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/core/memory/HybridMemorySegment.java | 51 ++- .../org/apache/flink/core/memory/MemoryUtils.java | 76 ++ 2 files changed, 54 insertions(+), 73 deletions(-)
[flink] branch master updated (bd5901f -> 05bddfb)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from bd5901f [FLINK-14701][runtime] Fix MultiTaskSlot to not remove slots which are not its children add 05bddfb [FLINK-15741][docs][TTL] Fix TTL docs after enabling RocksDB compaction filter by default No new revisions were added by this update. Summary of changes: docs/dev/stream/state/state.md | 15 ++- 1 file changed, 6 insertions(+), 9 deletions(-)
[flink] branch release-1.10 updated (27d4cc4 -> 52a7ac9)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 27d4cc4 [FLINK-14701][runtime] Fix MultiTaskSlot to not remove slots which are not its children add 52a7ac9 [FLINK-15741][docs][TTL] Fix TTL docs after enabling RocksDB compaction filter by default No new revisions were added by this update. Summary of changes: docs/dev/stream/state/state.md | 15 ++- 1 file changed, 6 insertions(+), 9 deletions(-)
[flink] branch release-1.9 updated (e4eb303 -> df24a9f)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from e4eb303 [FLINK-15863][docs] Fix docs stating that savepoints are relocatable add df24a9f [FLINK-15143][docs] Add new memory configuration guide before FLIP-49 No new revisions were added by this update. Summary of changes: docs/fig/mem_model.svg | 21 +++ docs/ops/cli.md | 2 +- docs/ops/cli.zh.md | 2 +- docs/ops/config.md | 4 +- docs/ops/config.zh.md | 4 +- docs/ops/mem_setup.md | 129 docs/ops/production_ready.md| 2 +- docs/ops/production_ready.zh.md | 2 +- docs/ops/python_shell.md| 2 +- docs/ops/python_shell.zh.md | 2 +- docs/ops/scala_shell.md | 2 +- docs/ops/scala_shell.zh.md | 2 +- docs/ops/security-ssl.md| 2 +- docs/ops/security-ssl.zh.md | 2 +- docs/ops/upgrading.md | 2 +- docs/ops/upgrading.zh.md| 2 +- 16 files changed, 166 insertions(+), 16 deletions(-) create mode 100644 docs/fig/mem_model.svg create mode 100644 docs/ops/mem_setup.md
[flink] branch master updated (b902ed5 -> 0cb8834)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b902ed5 [FLINK-15918][runtime] Fix that 'uptime' metric is not reset after restart add 01a9ff9 [FLINK-15143][docs] Add new memory configuration guide for FLIP-49 add d4aa8fa [FLINK-15143][docs] Add tuning and troubleshooting guides for memory configuration add 0cb8834 [FLINK-15143][docs] Add migration guide from pre-FLIP-49 memory config No new revisions were added by this update. Summary of changes: docs/fig/detailed-mem-model.svg | 21 +++ docs/fig/simple_mem_model.svg | 21 +++ docs/ops/cli.md | 2 +- docs/ops/cli.zh.md | 2 +- docs/ops/config.md | 2 +- docs/ops/config.zh.md | 2 +- docs/ops/deployment/cluster_setup.md| 2 +- docs/ops/deployment/cluster_setup.zh.md | 2 +- docs/ops/memory/index.md| 24 docs/ops/memory/index.zh.md | 24 docs/ops/memory/mem_detail.md | 149 docs/ops/memory/mem_detail.zh.md| 149 docs/ops/memory/mem_migration.md| 236 docs/ops/memory/mem_migration.zh.md | 236 docs/ops/memory/mem_setup.md| 133 ++ docs/ops/memory/mem_setup.zh.md | 133 ++ docs/ops/memory/mem_trouble.md | 74 ++ docs/ops/memory/mem_trouble.zh.md | 74 ++ docs/ops/memory/mem_tuning.md | 87 docs/ops/memory/mem_tuning.zh.md| 87 docs/ops/plugins.md | 2 +- docs/ops/plugins.zh.md | 2 +- docs/ops/production_ready.md| 2 +- docs/ops/production_ready.zh.md | 2 +- docs/ops/python_shell.md| 2 +- docs/ops/python_shell.zh.md | 2 +- docs/ops/scala_shell.md | 2 +- docs/ops/scala_shell.zh.md | 2 +- docs/ops/security-ssl.md| 2 +- docs/ops/security-ssl.zh.md | 2 +- docs/ops/state/state_backends.md| 7 + docs/ops/state/state_backends.zh.md | 5 + docs/ops/upgrading.md | 2 +- docs/ops/upgrading.zh.md| 2 +- docs/release-notes/flink-1.10.md| 2 + docs/release-notes/flink-1.10.zh.md | 2 + 36 files changed, 1482 insertions(+), 18 deletions(-) create mode 100644 docs/fig/detailed-mem-model.svg create mode 100644 docs/fig/simple_mem_model.svg create mode 100644 docs/ops/memory/index.md create mode 100644 docs/ops/memory/index.zh.md create mode 100644 docs/ops/memory/mem_detail.md create mode 100644 docs/ops/memory/mem_detail.zh.md create mode 100644 docs/ops/memory/mem_migration.md create mode 100644 docs/ops/memory/mem_migration.zh.md create mode 100644 docs/ops/memory/mem_setup.md create mode 100644 docs/ops/memory/mem_setup.zh.md create mode 100644 docs/ops/memory/mem_trouble.md create mode 100644 docs/ops/memory/mem_trouble.zh.md create mode 100644 docs/ops/memory/mem_tuning.md create mode 100644 docs/ops/memory/mem_tuning.zh.md
[flink] 01/03: [FLINK-15143][docs] Add new memory configuration guide for FLIP-49
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit ff7e25cad70a74da1795c69fd651d788d5972f1c Author: Andrey Zagrebin AuthorDate: Fri Jan 31 12:51:30 2020 +0100 [FLINK-15143][docs] Add new memory configuration guide for FLIP-49 --- docs/fig/detailed-mem-model.svg | 21 + docs/fig/simple_mem_model.svg | 21 + docs/ops/cli.md | 2 +- docs/ops/cli.zh.md | 2 +- docs/ops/config.md | 2 +- docs/ops/config.zh.md | 2 +- docs/ops/memory/index.md| 24 ++ docs/ops/memory/index.zh.md | 24 ++ docs/ops/memory/mem_detail.md | 149 docs/ops/memory/mem_detail.zh.md| 149 docs/ops/memory/mem_setup.md| 132 docs/ops/memory/mem_setup.zh.md | 132 docs/ops/plugins.md | 2 +- docs/ops/plugins.zh.md | 2 +- docs/ops/production_ready.md| 2 +- docs/ops/production_ready.zh.md | 2 +- docs/ops/python_shell.md| 2 +- docs/ops/python_shell.zh.md | 2 +- docs/ops/scala_shell.md | 2 +- docs/ops/scala_shell.zh.md | 2 +- docs/ops/security-ssl.md| 2 +- docs/ops/security-ssl.zh.md | 2 +- docs/ops/upgrading.md | 2 +- docs/ops/upgrading.zh.md| 2 +- docs/release-notes/flink-1.10.md| 2 + docs/release-notes/flink-1.10.zh.md | 2 + 26 files changed, 672 insertions(+), 16 deletions(-) diff --git a/docs/fig/detailed-mem-model.svg b/docs/fig/detailed-mem-model.svg new file mode 100644 index 000..215d99a --- /dev/null +++ b/docs/fig/detailed-mem-model.svg @@ -0,0 +1,21 @@ + + + +http://www.w3.org/1999/xlink; xmlns="http://www.w3.org/2000/svg;> + + +http://www.w3.org/1999/xlink; xmlns="http://www.w3.org/2000/svg;>http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> diff --git a/docs/ops/memory/index.zh.md b/docs/ops/memory/index.zh.md new file mode 100644 index 000..e2190e1 --- /dev/null +++ b/docs/ops/memory/index.zh.md @@ -0,0 +1,24 @@ +--- +nav-title: 'Memory Configuration' +nav-id: ops_mem +nav-parent_id: ops +nav-pos: 5 +--- + diff --git a/docs/ops/memory/mem_detail.md b/docs/ops/memory/mem_detail.md new file mode 100644 index 000..a5b476f --- /dev/null +++ b/docs/ops/memory/mem_detail.md @@ -0,0 +1,149 @@ +--- +title: "Detailed Memory Model" +nav-parent_id: ops_mem +nav-pos: 2 +--- + + +This section gives a detailed description of all components in Flink’s memory model of task executor. +Check [memory configuration guide](mem_setup.html) for the basic memory setup. + +* toc +{:toc} + +## Overview + + + + + + + +The following table lists all memory components, depicted above, and references Flink configuration options +which affect the size of the respective components: + +| **Component** | **Configuration options** | **Description** [...] +| : | : | :- [...] +| [Framework Heap Memory](#framework-memory) | [`taskmanager.memory.framework.heap.size`](../config.html#taskmanager-memory-framework-heap-size) | JVM heap memory dedicated to Flink framework (advanced option) [...] +| [Task Heap Memory](mem_setup.html#task-operator-heap-memory) | [`taskmanager
[flink] 03/03: [FLINK-15143][docs] Add migration guide from pre-FLIP-49 memory config
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 1212f662cd201af44f4b6507f789488d2fceb9af Author: Andrey Zagrebin AuthorDate: Fri Jan 31 14:01:22 2020 +0100 [FLINK-15143][docs] Add migration guide from pre-FLIP-49 memory config --- docs/ops/memory/mem_migration.md| 236 docs/ops/memory/mem_migration.zh.md | 236 docs/ops/memory/mem_setup.md| 3 +- docs/ops/memory/mem_setup.zh.md | 3 +- 4 files changed, 476 insertions(+), 2 deletions(-) diff --git a/docs/ops/memory/mem_migration.md b/docs/ops/memory/mem_migration.md new file mode 100644 index 000..46a3f2e --- /dev/null +++ b/docs/ops/memory/mem_migration.md @@ -0,0 +1,236 @@ +--- +title: "Migration Guide" +nav-parent_id: ops_mem +nav-pos: 5 +--- + + +The [memory setup of task managers](mem_setup.html) has changed a lot with the 1.10 release. Many configuration options +were removed or their semantics changed. This guide will help you to migrate the memory configuration from Flink +[<= *1.9*](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html) to >= *1.10*. + +* toc +{:toc} + + + Warning: It is important to review this guide because the legacy and new memory configuration can + result in different sizes of memory components. If you try to reuse your Flink configuration from older versions + before 1.10, it can result in changes to the behavior, performance or even configuration failures of your application. + + +Note Before version *1.10*, Flink did not require that memory related options are set at all +as they all had default values. The [new memory configuration](mem_setup.html#configure-total-memory) requires +that at least one subset of the following options is configured explicitly, otherwise the configuration will fail: +* [`taskmanager.memory.flink.size`](../config.html#taskmanager-memory-flink-size) +* [`taskmanager.memory.process.size`](../config.html#taskmanager-memory-process-size) +* [`taskmanager.memory.task.heap.size`](../config.html#taskmanager-memory-task-heap-size) and [`taskmanager.memory.managed.size`](../config.html#taskmanager-memory-managed-size) + +The [default `flink-conf.yaml`](#default-configuration-in-flink-confyaml) shipped with Flink sets [`taskmanager.memory.process.size`](../config.html#taskmanager-memory-process-size) +to make the default memory configuration consistent. + +This [spreadsheet](https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE) can also help +to evaluate and compare the results of the legacy and new memory computations. + +## Changes in Configuration Options + +This chapter shortly lists all changes to Flink's memory configuration options introduced with the *1.10* release. +It also references other chapters for more details about migrating to the new configuration options. + +The following options are completely removed. If they are still used, they will be ignored. + + + + +Removed option +Note + + + + +taskmanager.memory.fraction + +Check the description of the new option taskmanager.memory.managed.fraction. +The new option has different semantics and the value of the deprecated option usually has to be adjusted. +See also how to migrate managed memory. + + + + taskmanager.memory.off-heap + On-heap managed memory is no longer supported. See also how to migrate managed memory. + + + taskmanager.memory.preallocate + Pre-allocation is no longer supported and managed memory is always allocated lazily. See also how to migrate managed memory. + + + + +The following options are deprecated but if they are still used they will be interpreted as new options for backwards compatibility: + + + + +Deprecated option +Interpreted as + + + + +taskmanager.heap.size + + + taskmanager.memory.flink.size for standalone deployment + taskmanager.memory.process.size for containerized deployments + +See also how to migrate total memory. + + + + taskmanager.memory.size + taskmanager.memory.managed.size, see also how to migrate managed memory. + + + taskmanager.network.memory.min + taskmanager.memory.network.min + + + taskmanager.network.memory.max + taskmanager.memory.network.max + + + taskmanager.network