[flink] branch master updated (9391a70 -> 24ab7bb)

2019-08-19 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9391a70  [FLINK-13752] Only references necessary variables when 
bookkeeping result partitions on TM
 add 24ab7bb  [FLINK-13599][e2e tests] Harden test_streaming_kinesis with 
kinesalite docker image download/run retries

No new revisions were added by this update.

Summary of changes:
 flink-end-to-end-tests/test-scripts/common.sh  |  1 +
 .../test-scripts/test_streaming_kinesis.sh | 18 +++---
 2 files changed, 16 insertions(+), 3 deletions(-)



[flink] branch release-1.9 updated: [FLINK-13599][e2e tests] Harden test_streaming_kinesis with kinesalite docker image download/run retries

2019-08-19 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new a893b70b [FLINK-13599][e2e tests] Harden test_streaming_kinesis with 
kinesalite docker image download/run retries
a893b70b is described below

commit a893b70b203ffbce1165fd4424b106443b4ab6c8
Author: Andrey Zagrebin 
AuthorDate: Mon Aug 12 12:02:56 2019 +0300

[FLINK-13599][e2e tests] Harden test_streaming_kinesis with kinesalite 
docker image download/run retries
---
 flink-end-to-end-tests/test-scripts/common.sh  |  1 +
 .../test-scripts/test_streaming_kinesis.sh | 18 +++---
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 0191a73..f3c70ac 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -747,3 +747,4 @@ function retry_times() {
 echo "Command: ${command} failed ${retriesNumber} times."
 return 1
 }
+
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
index 8dfba83..08416a0 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
@@ -28,9 +28,21 @@ export AWS_SECRET_KEY=flinkKinesisTestFakeAccessKey
 
 KINESALITE_PORT=4567
 
-#docker run -d --rm --name flink-test-kinesis -p 
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite
-# override entrypoint to enable SSL
-docker run -d --rm --entrypoint "/tini" --name flink-test-kinesis -p 
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite -- 
/usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite --ssl
+function start_kinesalite {
+#docker run -d --rm --name flink-test-kinesis -p 
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite
+# override entrypoint to enable SSL
+docker run -d --rm --entrypoint "/tini" \
+--name flink-test-kinesis \
+-p ${KINESALITE_PORT}:${KINESALITE_PORT} \
+instructure/kinesalite -- \
+/usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite 
--ssl
+}
+
+START_KINESALITE_MAX_RETRIES=50
+if ! retry_times ${START_KINESALITE_MAX_RETRIES} 0 start_kinesalite; then
+echo "Failed to run kinesalite docker image"
+exit 1
+fi
 
 # reveal potential issues with the container in the CI environment
 docker logs flink-test-kinesis



[flink-web] branch asf-site updated (b2439e9 -> 36dd318)

2019-08-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from b2439e9  Minor fix in 1.9 announcement
 new d61151b  Add a new committer with the ASF id azagrebin
 new 36dd318  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 community.md   | 6 ++
 content/community.html | 6 ++
 2 files changed, 12 insertions(+)



[flink-web] 02/02: Rebuild website

2019-08-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 36dd3180b98f55dd6d8a758ca1c1fe56752f029f
Author: Andrey Zagrebin 
AuthorDate: Thu Aug 22 15:36:21 2019 +0200

Rebuild website
---
 content/community.html | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/content/community.html b/content/community.html
index 1d18937..1ed68ab 100644
--- a/content/community.html
+++ b/content/community.html
@@ -638,6 +638,12 @@
 Committer
 kurt
   
+  
+https://avatars0.githubusercontent.com/u/10573485?s=50; 
class="committer-avatar" />
+Andrey Zagrebin
+Committer
+azagrebin
+  
 
 
 



[flink-web] 01/02: Add a new committer with the ASF id azagrebin

2019-08-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit d61151bbecea7aa9e4c44511bbe84e4d6d76caf2
Author: Andrey Zagrebin 
AuthorDate: Thu Aug 22 15:17:10 2019 +0200

Add a new committer with the ASF id azagrebin
---
 community.md | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/community.md b/community.md
index 6c70099..2f4e689 100644
--- a/community.md
+++ b/community.md
@@ -450,6 +450,12 @@ Flink Forward is a conference happening yearly in 
different locations around the
 Committer
 kurt
   
+  
+https://avatars0.githubusercontent.com/u/10573485?s=50; 
class="committer-avatar">
+Andrey Zagrebin
+Committer
+azagrebin
+  
 
 
 



[flink] 01/02: [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint

2019-08-30 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a837659b71dd02ef6775bd2a4de331aab3ddc2e
Author: Andrey Zagrebin 
AuthorDate: Wed Aug 21 11:19:27 2019 +0200

[FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint

To better reflect the lifecycle of RpcEndpoint, we suggest to introduce its 
running state.
We can use the non-running state e.g. to make decision about how to react 
on API
calls if it is already known that the RpcEndpoint is terminating.
---
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 85 --
 .../flink/runtime/rpc/akka/AkkaRpcActor.java   |  4 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 72 ++
 3 files changed, 153 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 5c14a54..7c7a4c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -56,6 +56,36 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * The RPC endpoint provides {@link #runAsync(Runnable)}, {@link 
#callAsync(Callable, Time)}
  * and the {@link #getMainThreadExecutor()} to execute code in the RPC 
endpoint's main thread.
+ *
+ * Lifecycle
+ *
+ * The RPC endpoint has the following stages:
+ * 
+ *
+ *The RPC endpoint is created in a non-running state and does not 
serve any RPC requests.
+ *
+ *
+ *Calling the {@link #start()} method triggers the start of the RPC 
endpoint and schedules overridable
+ *{@link #onStart()} method call to the main thread.
+ *
+ *
+ *When the start operation ends the RPC endpoint is moved to the 
running state
+ *and starts to serve and complete RPC requests.
+ *
+ *
+ *Calling the {@link #closeAsync()} method triggers the termination of 
the RPC endpoint and schedules overridable
+ *{@link #onStop()} method call to the main thread.
+ *
+ *
+ *When {@link #onStop()} method is called, it triggers an asynchronous 
stop operation.
+ *The RPC endpoint is not in the running state anymore but it 
continues to serve RPC requests.
+ *
+ *
+ *When the asynchronous stop operation ends, the RPC endpoint 
terminates completely
+ *and does not serve RPC requests anymore.
+ *
+ * 
+ * The running state can be queried in a RPC method handler or in the main 
thread by calling {@link #isRunning()} method.
  */
 public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 
@@ -80,6 +110,13 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
private final MainThreadExecutor mainThreadExecutor;
 
/**
+* Indicates whether the RPC endpoint is started and not stopped or 
being stopped.
+*
+* IMPORTANT: the running state is not thread safe and can be used 
only in the main thread of the rpc endpoint.
+*/
+   private boolean isRunning;
+
+   /**
 * Initializes the RPC endpoint.
 *
 * @param rpcService The RPC server that dispatches calls to this RPC 
endpoint.
@@ -112,12 +149,22 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
return endpointId;
}
 
+   /**
+* Returns whether the RPC endpoint is started and not stopped or being 
stopped.
+*
+* @return whether the RPC endpoint is started and not stopped or being 
stopped.
+*/
+   protected boolean isRunning() {
+   validateRunsInMainThread();
+   return isRunning;
+   }
+
// 

//  Start & shutdown & lifecycle callbacks
// 

 
/**
-* Starts the rpc endpoint. This tells the underlying rpc server that 
the rpc endpoint is ready
+* Triggers start of the rpc endpoint. This tells the underlying rpc 
server that the rpc endpoint is ready
 * to process remote procedure calls.
 *
 * @throws Exception indicating that something went wrong while 
starting the RPC endpoint
@@ -127,20 +174,33 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
}
 
/**
-* User overridable callback.
+* Internal method which is called by the RpcService implementation to 
start the RpcEndpoint.
+*
+* @throws Exception indicating that the rpc endpoint could not be 
started. If an exception occurs,
+* then the rpc 

[flink] 02/02: [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect

2019-08-30 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b682e9a316669f30fa4bfcaa32a8fa0d3ac1dc02
Author: Andrey Zagrebin 
AuthorDate: Mon Aug 19 16:20:39 2019 +0200

[FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and 
do not reconnect

This prevents JM from acquiring slots which belong to the stopped TM.
---
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java| 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c8bcaf9..b1238ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -340,11 +340,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
Throwable jobManagerDisconnectThrowable = null;
 
-   if (resourceManagerConnection != null) {
-   resourceManagerConnection.close();
-   }
-
FlinkException cause = new FlinkException("The TaskExecutor is 
shutting down.");
+
+   closeResourceManagerConnection(cause);
+
for (JobManagerConnection jobManagerConnection : 
jobManagerConnections.values()) {
try {

disassociateFromJobManager(jobManagerConnection, cause);
@@ -958,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
@Override
public void disconnectResourceManager(Exception cause) {
-   reconnectToResourceManager(cause);
+   if (isRunning()) {
+   reconnectToResourceManager(cause);
+   }
}
 
// 
==
@@ -986,6 +987,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
+   startRegistrationTimeout();
tryConnectToResourceManager();
}
 
@@ -1098,8 +1100,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
-
-   startRegistrationTimeout();
}
 
private void startRegistrationTimeout() {



[flink] branch release-1.9 updated (c175cc4 -> b682e9a)

2019-08-30 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c175cc4  [hotfix][FLINK-13901][docs] Fix documentation links check 
errors
 new 1a83765  [FLINK-13819][coordination] Introduce isRunning state for 
RpcEndpoint
 new b682e9a  [FLINK-13769][Coordination] Close RM connection in 
TaskExecutor.onStop and do not reconnect

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 85 --
 .../flink/runtime/rpc/akka/AkkaRpcActor.java   |  4 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 14 ++--
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 72 ++
 4 files changed, 160 insertions(+), 15 deletions(-)



[flink] branch master updated (e263856 -> aabde88)

2019-08-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e263856  [FLINK-13774][table-planner-blink] Use 
LocalReferenceExpression and RexNodeExpression instead of blink expressions
 add aabde88  [FLINK-13819][coordination] Introduce isRunning state for 
RpcEndpoint

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 85 --
 .../flink/runtime/rpc/akka/AkkaRpcActor.java   |  4 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 72 ++
 3 files changed, 153 insertions(+), 8 deletions(-)



[flink] branch master updated (aabde88 -> fa12f2d)

2019-08-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from aabde88  [FLINK-13819][coordination] Introduce isRunning state for 
RpcEndpoint
 add fa12f2d  [FLINK-13769][Coordination] Close RM connection in 
TaskExecutor.onStop and do not reconnect

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java| 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)



[flink-web] branch asf-site updated: [FLINK-13804][code style][zh] Set the initial capacity for a collection only if there is a good proven reason

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new e4f85d6  [FLINK-13804][code style][zh] Set the initial capacity for a 
collection only if there is a good proven reason
e4f85d6 is described below

commit e4f85d62998c08c4acb66604c4235944279a
Author: Andrey Zagrebin 
AuthorDate: Wed Aug 28 11:41:04 2019 +0200

[FLINK-13804][code style][zh] Set the initial capacity for a collection 
only if there is a good proven reason
---
 contributing/code-style-and-quality-java.zh.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/contributing/code-style-and-quality-java.zh.md 
b/contributing/code-style-and-quality-java.zh.md
index 335e889..f0de4e7 100644
--- a/contributing/code-style-and-quality-java.zh.md
+++ b/contributing/code-style-and-quality-java.zh.md
@@ -63,6 +63,7 @@ title:  "Apache Flink Code Style and Quality Guide — Java"
 * `contains()` before `get()` → `get()` and check null
 * `contains()` before `put()` → `putIfAbsent()` or `computeIfAbsent()`
 * Iterating over keys, getting values → iterate over `entrySet()`
+* **Set the initial capacity for a collection only if there is a good proven 
reason** for that, otherwise do not clutter the code. In case of **Maps** it 
can be even deluding because the Map's load factor effectively reduces the 
capacity.
 
 
 ### Lambdas



[flink-web] branch asf-site updated: [FLINK-13812][code style] Usage of Java Optional

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 5088cce  [FLINK-13812][code style] Usage of Java Optional
5088cce is described below

commit 5088cce017c888514567460a7a4cc2b8915127da
Author: Andrey Zagrebin 
AuthorDate: Wed Aug 21 13:17:50 2019 +0200

[FLINK-13812][code style] Usage of Java Optional
---
 contributing/code-style-and-quality-common.md |  2 +-
 contributing/code-style-and-quality-java.md   | 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/contributing/code-style-and-quality-common.md 
b/contributing/code-style-and-quality-common.md
index 05f703b..68de3a2 100644
--- a/contributing/code-style-and-quality-common.md
+++ b/contributing/code-style-and-quality-common.md
@@ -140,7 +140,7 @@ That way, you get warnings from IntelliJ about all sections 
where you have to re
 _Note: This means that `@Nonnull` annotations are usually not necessary, but 
can be used in certain cases to override a previous annotation, or to point 
non-nullability out in a context where one would expect a nullable value._
 
 `Optional` is a good solution as a return type for method that may or may not 
have a result, so nullable return types are good candidates to be replaced with 
`Optional`. 
-For fields and parameters, `Optional` is disputed in Java and most parts of 
the Flink code case don’t use optional for fields.
+See also [usage of Java 
Optional](code-style-and-quality-java.md#java-optional).
 
 
 ### Avoid Code Duplication
diff --git a/contributing/code-style-and-quality-java.md 
b/contributing/code-style-and-quality-java.md
index e22b2c8..ef7a74c 100644
--- a/contributing/code-style-and-quality-java.md
+++ b/contributing/code-style-and-quality-java.md
@@ -66,6 +66,17 @@ title:  "Apache Flink Code Style and Quality Guide — Java"
 * **Set the initial capacity for a collection only if there is a good proven 
reason** for that, otherwise do not clutter the code. In case of **Maps** it 
can be even deluding because the Map's load factor effectively reduces the 
capacity.
 
 
+### Java Optional
+
+* Use **@Nullable annotation where you do not use Optional** for the nullable 
values.
+* If you can prove that `Optional` usage would lead to a **performance 
degradation in critical code then fallback to @Nullable**.
+* Always use **Optional to return nullable values** in the API/public methods 
except the case of a proven performance concern.
+* **Do not use Optional as a function argument**, instead either overload the 
method or use the Builder pattern for the set of function arguments.
+ * Note: an Optional argument can be allowed in a private helper method if 
you believe that it simplifies the code
+ 
([example](https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java#L95)).
+* **Do not use Optional for class fields**.
+
+
 ### Lambdas
 
 * Prefer non-capturing lambdas (lambdas that do not contain references to the 
outer scope). Capturing lambdas need to create a new object instance for every 
call. Non-capturing lambdas can use the same instance for each invocation. 



[flink-web] branch asf-site updated: [FLINK-13804][code style] Set the initial capacity for a collection only if there is a good proven reason

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 8ce7c00  [FLINK-13804][code style] Set the initial capacity for a 
collection only if there is a good proven reason
8ce7c00 is described below

commit 8ce7c00db1a7ceebdd14b2c7ee8e6365ed8a8081
Author: Andrey Zagrebin 
AuthorDate: Tue Aug 20 17:44:41 2019 +0200

[FLINK-13804][code style] Set the initial capacity for a collection only if 
there is a good proven reason
---
 contributing/code-style-and-quality-java.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/contributing/code-style-and-quality-java.md 
b/contributing/code-style-and-quality-java.md
index d1e7dea..e22b2c8 100644
--- a/contributing/code-style-and-quality-java.md
+++ b/contributing/code-style-and-quality-java.md
@@ -63,6 +63,7 @@ title:  "Apache Flink Code Style and Quality Guide — Java"
 * `contains()` before `get()` → `get()` and check null
 * `contains()` before `put()` → `putIfAbsent()` or `computeIfAbsent()`
 * Iterating over keys, getting values → iterate over `entrySet()`
+* **Set the initial capacity for a collection only if there is a good proven 
reason** for that, otherwise do not clutter the code. In case of **Maps** it 
can be even deluding because the Map's load factor effectively reduces the 
capacity.
 
 
 ### Lambdas



[flink-web] branch asf-site updated: [FLINK-13812][code style][zh] Usage of Java Optional

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 2f8ce2f  [FLINK-13812][code style][zh] Usage of Java Optional
2f8ce2f is described below

commit 2f8ce2ff9d3d471a8440e466ce12e52a770ecce3
Author: Andrey Zagrebin 
AuthorDate: Wed Aug 28 11:46:17 2019 +0200

[FLINK-13812][code style][zh] Usage of Java Optional
---
 contributing/code-style-and-quality-common.zh.md |  2 +-
 contributing/code-style-and-quality-java.zh.md   | 11 +++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/contributing/code-style-and-quality-common.zh.md 
b/contributing/code-style-and-quality-common.zh.md
index a8643a9..82caabf 100644
--- a/contributing/code-style-and-quality-common.zh.md
+++ b/contributing/code-style-and-quality-common.zh.md
@@ -140,7 +140,7 @@ That way, you get warnings from IntelliJ about all sections 
where you have to re
 _Note: This means that `@Nonnull` annotations are usually not necessary, but 
can be used in certain cases to override a previous annotation, or to point 
non-nullability out in a context where one would expect a nullable value._
 
 `Optional` is a good solution as a return type for method that may or may not 
have a result, so nullable return types are good candidates to be replaced with 
`Optional`. 
-For fields and parameters, `Optional` is disputed in Java and most parts of 
the Flink code case don’t use optional for fields.
+See also [usage of Java 
Optional](code-style-and-quality-java.md#java-optional).
 
 
 ### Avoid Code Duplication
diff --git a/contributing/code-style-and-quality-java.zh.md 
b/contributing/code-style-and-quality-java.zh.md
index f0de4e7..d5af274 100644
--- a/contributing/code-style-and-quality-java.zh.md
+++ b/contributing/code-style-and-quality-java.zh.md
@@ -66,6 +66,17 @@ title:  "Apache Flink Code Style and Quality Guide — Java"
 * **Set the initial capacity for a collection only if there is a good proven 
reason** for that, otherwise do not clutter the code. In case of **Maps** it 
can be even deluding because the Map's load factor effectively reduces the 
capacity.
 
 
+### Java Optional
+
+ * Use **@Nullable annotation where you do not use Optional** for the nullable 
values.
+* If you can prove that `Optional` usage would lead to a **performance 
degradation in critical code then fallback to @Nullable**.
+* Always use **Optional to return nullable values** in the API/public methods 
except the case of a proven performance concern.
+* **Do not use Optional as a function argument**, instead either overload the 
method or use the Builder pattern for the set of function arguments.
+ * Note: an Optional argument can be allowed in a private helper method if 
you believe that it simplifies the code
+ 
([example](https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java#L95)).
+* **Do not use Optional for class fields**.
+
+
 ### Lambdas
 
 * Prefer non-capturing lambdas (lambdas that do not contain references to the 
outer scope). Capturing lambdas need to create a new object instance for every 
call. Non-capturing lambdas can use the same instance for each invocation. 



[flink-web] branch asf-site updated (2f8ce2f -> f7c040b)

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from 2f8ce2f  [FLINK-13812][code style][zh] Usage of Java Optional
 new 00462dd  [FLINK-13820][code style] Breaking long function argument 
lists and chained method calls
 new f7c040b  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../code-style-and-quality-common.html |  2 +-
 .../code-style-and-quality-formatting.html | 39 ++
 .../contributing/code-style-and-quality-java.html  | 17 ++
 .../code-style-and-quality-common.html |  2 +-
 .../code-style-and-quality-formatting.html | 38 +
 .../contributing/code-style-and-quality-java.html  | 17 ++
 contributing/code-style-and-quality-formatting.md  | 39 ++
 .../code-style-and-quality-formatting.zh.md| 38 +
 8 files changed, 190 insertions(+), 2 deletions(-)



[flink-web] 02/02: Rebuild website

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit f7c040be053c871b9fc66588aca78510db5608b5
Author: Andrey Zagrebin 
AuthorDate: Wed Aug 28 17:47:59 2019 +0200

Rebuild website
---
 .../code-style-and-quality-common.html |  2 +-
 .../code-style-and-quality-formatting.html | 39 ++
 .../contributing/code-style-and-quality-java.html  | 17 ++
 .../code-style-and-quality-common.html |  2 +-
 .../code-style-and-quality-formatting.html | 38 +
 .../contributing/code-style-and-quality-java.html  | 17 ++
 6 files changed, 113 insertions(+), 2 deletions(-)

diff --git a/content/contributing/code-style-and-quality-common.html 
b/content/contributing/code-style-and-quality-common.html
index aba7479..e92b0d8 100644
--- a/content/contributing/code-style-and-quality-common.html
+++ b/content/contributing/code-style-and-quality-common.html
@@ -377,7 +377,7 @@ That way, you get warnings from IntelliJ about all sections 
where you have to re
 Note: This means that @Nonnull annotations are usually not 
necessary, but can be used in certain cases to override a previous annotation, 
or to point non-nullability out in a context where one would expect a nullable 
value.
 
 Optional is a good solution as a return type for method that 
may or may not have a result, so nullable return types are good candidates to 
be replaced with Optional. 
-For fields and parameters, Optional is disputed in Java and most 
parts of the Flink code case don’t use optional for fields.
+See also usage of Java 
Optional.
 
 Avoid Code Duplication
 
diff --git a/content/contributing/code-style-and-quality-formatting.html 
b/content/contributing/code-style-and-quality-formatting.html
index 7d934b9..10f8305 100644
--- a/content/contributing/code-style-and-quality-formatting.html
+++ b/content/contributing/code-style-and-quality-formatting.html
@@ -223,6 +223,7 @@
   Imports
   Naming
   Whitespaces
+  Breaking the lines 
of too long statements
   Braces
   Javadocs
   Modifiers
@@ -281,6 +282,44 @@ We are aware that spaces are a bit nicer; it just happened 
to be that we started
   Spaces around operators/keywords. Operators 
(+, =, , …) and keywords 
(if, for, catch, …) must have a space 
before and after them, provided they are not at the start or end of the 
line.
 
 
+Breaking the lines of too 
long statements
+
+In general long lines should be avoided for the better readability. Try to 
use short statements which operate on the same level of abstraction. Break the 
long statements by creating more local variables, defining helper functions 
etc.
+
+Two major sources of long lines are:
+ * Long list of arguments in function declaration or call: void 
func(type1 arg1, type2 arg2, ...)
+ * Long sequence of chained calls: 
list.stream().map(...).reduce(...).collect(...)...
+
+Rules about breaking the long lines:
+ * Break the argument list or chain of calls if the line exceeds limit or 
earlier if you believe that the breaking would improve the code readability
+ * If you break the line then each argument/call should have a separate line, 
including the first one
+ * Each new line should have one extra indentation (or two for a function 
declaration) relative to the line of the parent function name or the called 
entity
+
+Additionally for function arguments:
+ * The opening parenthesis always stays on the line of the parent function name
+ * The possible thrown exception list is never broken and stays on the same 
last line, even if the line length exceeds its limit
+ * The line of the function argument should end with a comma staying on the 
same line except the last argument
+
+Example of breaking the list of function arguments:
+```
+public void func(
+int arg1,
+int arg2,
+…) throws E1, E2, E3 {
+
+}
+```
+
+The dot of a chained call is always on the line of that chained call 
proceeding the call at the beginning.
+
+Example of breaking the list of chaind calls:
+
+values
+.stream()
+.map(...)
+.collect(...);
+
+
 Braces
 
 
diff --git a/content/contributing/code-style-and-quality-java.html 
b/content/contributing/code-style-and-quality-java.html
index f2d8bd3..91f5f59 100644
--- a/content/contributing/code-style-and-quality-java.html
+++ b/content/contributing/code-style-and-quality-java.html
@@ -225,6 +225,7 @@
   Java Serialization
   Java 
Reflection
   Collections
+  Java 
Optional
   Lambdas
   Java 
Streams
 
@@ -305,6 +306,22 @@
   Iterating over keys, getting values → iterate over 
entrySet()
 
   
+  Set the initial capacity for a collection only if there is a 
good proven reason for that, otherwise do not clutter the code. In 
case of Maps it can be even deluding because the Map’s load 
factor effectively reduces

[flink-web] 01/02: [FLINK-13820][code style] Breaking long function argument lists and chained method calls

2019-08-28 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 00462ddbd4d289f9f9c35e61899d8eb55a21e849
Author: Andrey Zagrebin 
AuthorDate: Thu Aug 22 16:43:25 2019 +0200

[FLINK-13820][code style] Breaking long function argument lists and chained 
method calls
---
 contributing/code-style-and-quality-formatting.md  | 39 ++
 .../code-style-and-quality-formatting.zh.md| 38 +
 2 files changed, 77 insertions(+)

diff --git a/contributing/code-style-and-quality-formatting.md 
b/contributing/code-style-and-quality-formatting.md
index cb83717..f75b140 100644
--- a/contributing/code-style-and-quality-formatting.md
+++ b/contributing/code-style-and-quality-formatting.md
@@ -48,6 +48,45 @@ We are aware that spaces are a bit nicer; it just happened 
to be that we started
 * **Spaces around operators/keywords.** Operators (`+`, `=`, `>`, …) and 
keywords (`if`, `for`, `catch`, …) must have a space before and after them, 
provided they are not at the start or end of the line.
 
 
+### Breaking the lines of too long statements
+
+In general long lines should be avoided for the better readability. Try to use 
short statements which operate on the same level of abstraction. Break the long 
statements by creating more local variables, defining helper functions etc.
+
+Two major sources of long lines are:
+ * Long list of arguments in function declaration or call: void 
func(type1 arg1, type2 arg2, ...)
+ * Long sequence of chained calls: 
list.stream().map(...).reduce(...).collect(...)...
+
+Rules about breaking the long lines:
+ * Break the argument list or chain of calls if the line exceeds limit or 
earlier if you believe that the breaking would improve the code readability
+ * If you break the line then each argument/call should have a separate line, 
including the first one
+ * Each new line should have one extra indentation (or two for a function 
declaration) relative to the line of the parent function name or the called 
entity
+
+Additionally for function arguments:
+ * The opening parenthesis always stays on the line of the parent function name
+ * The possible thrown exception list is never broken and stays on the same 
last line, even if the line length exceeds its limit
+ * The line of the function argument should end with a comma staying on the 
same line except the last argument
+
+Example of breaking the list of function arguments:
+```
+public void func(
+int arg1,
+int arg2,
+...) throws E1, E2, E3 {
+
+}
+```
+
+The dot of a chained call is always on the line of that chained call 
proceeding the call at the beginning.
+
+Example of breaking the list of chaind calls:
+```
+values
+.stream()
+.map(...)
+.collect(...);
+```
+
+
 ### Braces
 
 * **Left curly braces ({) must not be placed on a new line.**
diff --git a/contributing/code-style-and-quality-formatting.zh.md 
b/contributing/code-style-and-quality-formatting.zh.md
index 2abac04..2c0bc0f 100644
--- a/contributing/code-style-and-quality-formatting.zh.md
+++ b/contributing/code-style-and-quality-formatting.zh.md
@@ -48,6 +48,44 @@ We are aware that spaces are a bit nicer; it just happened 
to be that we started
 * **Spaces around operators/keywords.** Operators (`+`, `=`, `>`, …) and 
keywords (`if`, `for`, `catch`, …) must have a space before and after them, 
provided they are not at the start or end of the line.
 
 
+### Breaking the lines of too long statements
+
+In general long lines should be avoided for the better readability. Try to use 
short statements which operate on the same level of abstraction. Break the long 
statements by creating more local variables, defining helper functions etc.
+
+Two major sources of long lines are:
+ * Long list of arguments in function declaration or call: void 
func(type1 arg1, type2 arg2, ...)
+ * Long sequence of chained calls: 
list.stream().map(...).reduce(...).collect(...)...
+
+Rules about breaking the long lines:
+ * Break the argument list or chain of calls if the line exceeds limit or 
earlier if you believe that the breaking would improve the code readability
+ * If you break the line then each argument/call should have a separate line, 
including the first one
+ * Each new line should have one extra indentation (or two for a function 
declaration) relative to the line of the parent function name or the called 
entity
+
+Additionally for function arguments:
+ * The opening parenthesis always stays on the line of the parent function name
+ * The possible thrown exception list is never broken and stays on the same 
last line, even if the line length exceeds its limit
+ * The line of the function argument should end with a comma staying on the 
same line except the last argument
+
+Example of breaking the list of function arguments:
+```
+public void func(
+int arg1,
+int arg2,
+...) 

[flink-web] branch asf-site updated (c520e37 -> 1c7a290)

2019-09-02 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from c520e37  Rebuild website
 add a9dabc0  [FLINK-13908][code style] Fix formatting issue in Breaking 
the lines of too long statements
 add 0a9f019  [hotfix][code style] Fix typo in: Breaking the lines of too 
long statements
 add 1c7a290  Rebuild website

No new revisions were added by this update.

Summary of changes:
 .../code-style-and-quality-formatting.html | 57 +-
 .../code-style-and-quality-formatting.html | 56 -
 contributing/code-style-and-quality-formatting.md  | 23 +
 .../code-style-and-quality-formatting.zh.md| 26 ++
 4 files changed, 96 insertions(+), 66 deletions(-)



[flink] branch master updated (5531b23 -> 3c991dc)

2019-09-05 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5531b23  [FLINK-13892][hs] Harden HistoryServerTest
 add 3c991dc  [FLINK-13927][docs] Add note about adding Hadoop dependencies 
to run/debug Flink locally in mini cluster

No new revisions were added by this update.

Summary of changes:
 docs/ops/deployment/hadoop.md| 22 ++
 docs/ops/deployment/hadoop.zh.md | 22 ++
 2 files changed, 44 insertions(+)



[flink] branch master updated (0d390c1 -> 5ec02e2)

2019-09-06 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0d390c1  [FLINK-13967][licensing] Fully generate binary licensing
 add 5ec02e2  [FLINK-13963][docs] Consolidate Hadoop file systems usage and 
Hadoop integration docs

No new revisions were added by this update.

Summary of changes:
 docs/dev/batch/connectors.md  | 34 -
 docs/dev/batch/connectors.zh.md   | 34 -
 docs/dev/batch/hadoop_compatibility.md|  9 +
 docs/dev/batch/hadoop_compatibility.zh.md |  9 +
 docs/ops/config.md|  2 +
 docs/ops/config.zh.md |  2 +
 docs/ops/deployment/hadoop.md | 22 ++-
 docs/ops/deployment/hadoop.zh.md  | 22 ++-
 docs/ops/filesystems/index.md | 61 +-
 docs/ops/filesystems/index.zh.md  | 63 +--
 10 files changed, 119 insertions(+), 139 deletions(-)



[flink] 19/21: Fix yarn cut off

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f24495c0df56473c850605d9e34baf2f7464be1
Author: Andrey Zagrebin 
AuthorDate: Wed Nov 6 09:59:32 2019 +0100

Fix yarn cut off
---
 .../client/deployment/ClusterSpecification.java|   6 +-
 .../clusterframework/TaskExecutorResourceSpec.java |   4 +
 .../TaskExecutorResourceUtils.java |   4 +-
 .../ActiveResourceManagerFactory.java  |  23 +---
 .../ActiveResourceManagerFactoryTest.java  |  97 --
 .../flink/yarn/CliFrontendRunWithYarnTest.java |   3 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  28 +---
 .../flink/yarn/YarnClusterClientFactory.java   |   6 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  18 +--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |   2 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++--
 11 files changed, 73 insertions(+), 260 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 72975d8..0d8d105 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 
 /**
  * Description of the cluster to start by the {@link ClusterDescriptor}.
@@ -68,7 +69,10 @@ public final class ClusterSpecification {
int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
int jobManagerMemoryMb = 
ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
-   int taskManagerMemoryMb = 
ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+   int taskManagerMemoryMb = TaskExecutorResourceUtils
+   .resourceSpecFromConfig(configuration)
+   .getTotalProcessMemorySize()
+   .getMebiBytes();
 
return new ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index d6cbe5b..d73e7b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements 
java.io.Serializable {
return 
getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
}
 
+   public MemorySize getHeapSize() {
+   return 
frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize);
+   }
+
@Override
public String toString() {
return "TaskExecutorResourceSpec {"
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 4b649e4..9c69b62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils {
// 

 
public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
-   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
-   .add(taskExecutorResourceSpec.getTaskHeapSize())
-   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getHeapSize();
final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
.add(taskExecutorResourceSpec.getShuffleMemSize());
final MemorySize jvmMetaspaceSize = 
taskExecutorResourceSpec.getJvmMetaspaceSize();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resou

[flink] 10/21: [FLINK-13986][test] Fix failure cases that fail due to change of expected exception type.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 436f46753baa44f2892910e50709bd5cd49d09d6
Author: Xintong Song 
AuthorDate: Mon Oct 21 18:46:32 2019 +0800

[FLINK-13986][test] Fix failure cases that fail due to change of expected 
exception type.
---
 .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 8bc60a0..bd094c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.BlobCacheService;
@@ -132,7 +131,7 @@ public class TaskManagerRunnerStartupTest extends 
TestLogger {
highAvailabilityServices);
 
fail("Should fail synchronously with an exception");
-   } catch (IllegalConfigurationException e) {
+   } catch (Throwable t) {
// splendid!
}
}



[flink] 14/21: Change task off heap default value from 0b to 1m to accomodate for framework small allocations (temporary: later add framework off heap)

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b7df767ce95db9d40f71c2936aa298ae51cc16f
Author: Andrey Zagrebin 
AuthorDate: Mon Nov 4 14:27:03 2019 +0100

Change task off heap default value from 0b to 1m to accomodate for 
framework small allocations (temporary: later add framework off heap)
---
 docs/_includes/generated/task_manager_memory_configuration.html | 2 +-
 .../main/java/org/apache/flink/configuration/TaskManagerOptions.java| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/task_manager_memory_configuration.html 
b/docs/_includes/generated/task_manager_memory_configuration.html
index 09b9dc6..e95160e 100644
--- a/docs/_includes/generated/task_manager_memory_configuration.html
+++ b/docs/_includes/generated/task_manager_memory_configuration.html
@@ -79,7 +79,7 @@
 
 
 taskmanager.memory.task.off-heap.size
-"0b"
+"1m"
 Task Heap Memory size for TaskExecutors. This is the size of 
off heap memory (JVM direct memory or native memory) reserved for user 
code.
 
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index d717ad5..234d16a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -293,7 +293,7 @@ public class TaskManagerOptions {
 */
public static final ConfigOption TASK_OFF_HEAP_MEMORY =
key("taskmanager.memory.task.off-heap.size")
-   .defaultValue("0b")
+   .defaultValue("1m")
.withDescription("Task Heap Memory size for 
TaskExecutors. This is the size of off heap memory (JVM direct"
+ " memory or native memory) reserved for user 
code.");
 



[flink] 13/21: Add backwards compatibility

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b5be514ad75d0343792b95ea5be8f83b0fa25d8
Author: Andrey Zagrebin 
AuthorDate: Thu Oct 31 17:06:58 2019 +0100

Add backwards compatibility
---
 flink-dist/src/main/resources/flink-conf.yaml  | 4 ++--
 .../flink/runtime/clusterframework/TaskExecutorResourceUtils.java  | 7 +--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index a3bc57d..15d7aa9 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -42,9 +42,9 @@ jobmanager.rpc.port: 6123
 jobmanager.heap.size: 1024m
 
 
-# The heap size for the TaskManager JVM
+# Total Flink process size for the TaskExecutor
 
-taskmanager.heap.size: 1024m
+taskmanager.memory.total-process.size: 1024m
 
 
 # The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index f2a275f..4b649e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -417,7 +417,9 @@ public class TaskExecutorResourceUtils {
return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
} else {
@SuppressWarnings("deprecation")
-   final long legacyHeapMemoryMB = 
config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
+   final long legacyHeapMemoryMB = 
config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) ?
+   
config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) :
+   
MemorySize.parse(config.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getBytes();
return new MemorySize(legacyHeapMemoryMB << 20); // 
megabytes to bytes
}
}
@@ -427,7 +429,8 @@ public class TaskExecutorResourceUtils {
}
 
private static boolean isManagedMemorySizeExplicitlyConfigured(final 
Configuration config) {
-   return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE) 
||
+   
config.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE);
}
 
private static boolean 
isManagedMemoryOffHeapFractionExplicitlyConfigured(final Configuration config) {



[flink] 20/21: [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8c6a4cdfab29447dc1cfc74d37c1a4c34217717
Author: Andrey Zagrebin 
AuthorDate: Wed Nov 6 17:24:28 2019 +0100

[FLINK-14631] Account for netty direct allocations in direct memory limit 
(Netty Shuffle)

At the moment after FLINK-13982, when we calculate JVM direct memory limit, 
we only account for memory segment network buffers but not for direct 
allocations from netty arenas in 
org.apache.flink.runtime.io.network.netty.NettyBufferPool. We should include 
netty arenas into shuffle memory calculations.
---
 .../test-scripts/test_batch_allround.sh|  4 +--
 .../TaskExecutorResourceUtils.java |  6 +++-
 .../runtime/io/network/netty/NettyBufferPool.java  |  2 ++
 .../runtime/io/network/netty/NettyConfig.java  | 10 --
 .../NettyShuffleEnvironmentConfiguration.java  | 39 ++
 .../TaskExecutorResourceUtilsTest.java |  6 +++-
 .../NettyShuffleEnvironmentConfigurationTest.java  | 16 +
 .../example/failing/JobSubmissionFailsITCase.java  |  6 +++-
 .../test/streaming/runtime/BackPressureITCase.java |  4 ++-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  4 +--
 10 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh 
b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 5143ef4..dc6f753 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -26,8 +26,8 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-set_config_key "taskmanager.network.memory.min" "10485760"
-set_config_key "taskmanager.network.memory.max" "10485760"
+set_config_key "taskmanager.network.memory.min" "27262976"
+set_config_key "taskmanager.network.memory.max" "27262976"
 
 set_conf_ssl "server"
 start_cluster
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 9c69b62..f98670b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -375,7 +377,9 @@ public class TaskExecutorResourceUtils {
@SuppressWarnings("deprecation")
final long numOfBuffers = 
config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
final long pageSize =  
ConfigurationParserUtils.getPageSize(config);
-   return new MemorySize(numOfBuffers * pageSize);
+   final int numberOfSlots = 
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+   final long numberOfArenas = 
NettyConfig.getNumberOfArenas(config, numberOfSlots);
+   return new MemorySize(numOfBuffers * pageSize + numberOfArenas 
* NettyBufferPool.ARENA_SIZE);
}
 
private static RangeFraction getShuffleMemoryRangeFraction(final 
Configuration config) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index 6d2a6c8..fc49712 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -67,6 +67,8 @@ public class NettyBufferPool extends PooledByteBufAllocator {
 */
private static final int MAX_ORDER = 11;
 
+   public static final long ARENA_SIZE = PAGE_SIZE << MAX_ORDER;
+
/**
 * Creates Netty's buffer pool with the specified number of direct 
arenas.
 *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime

[flink] 04/21: [FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM resource configs and JVM parameters, if feature option is enabled.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4088bea93ad89940909e1c0d3bb92054315f795d
Author: Xintong Song 
AuthorDate: Fri Sep 27 11:19:55 2019 +0800

[FLINK-13983][dist] TM startup scripts calls java codes to set flip49 TM 
resource configs and JVM parameters, if feature option is enabled.
---
 flink-dist/src/main/flink-bin/bin/config.sh  | 49 
 flink-dist/src/main/flink-bin/bin/taskmanager.sh | 30 ---
 2 files changed, 57 insertions(+), 22 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 090c1fb..52ce960 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -117,6 +117,8 @@ KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
 KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
 KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback
 
+KEY_TASKM_ENABLE_FLIP49="taskmanager.enable-flip-49" # temporal feature option 
for flip-49
+
 KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
 
 KEY_ENV_PID_DIR="env.pid.dir"
@@ -429,6 +431,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o 
"${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
 FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
 fi
 
+# Define FLINK_TM_ENABLE_FLIP49 if it is not already set
+# temporal feature option for flip-49
+if [ -z "${FLINK_TM_ENABLE_FLIP49}" ]; then
+FLINK_TM_ENABLE_FLIP49=$(readFromConfig ${KEY_TASKM_ENABLE_FLIP49} "false" 
"${YAML_CONF}")
+fi
 
 # Verify that NUMA tooling is available
 command -v numactl >/dev/null 2>&1
@@ -790,3 +797,45 @@ runBashJavaUtilsCmd() {
 
 echo ${output}
 }
+
+getTmResourceDynamicConfigsAndJvmParams() {
+if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == 
"true" ]]; then
+echo "$(getTmResourceDynamicConfigsAndJvmParamsFlip49)"
+else
+echo "$(getTmResourceDynamicConfigsAndJvmParamsLegacy)"
+fi
+}
+
+getTmResourceDynamicConfigsAndJvmParamsFlip49() {
+local class_path=`constructFlinkClassPath`
+class_path=`manglePathList ${class_path}`
+
+local dynamic_configs=$(runBashJavaUtilsCmd 
GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR})
+local jvm_params=$(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS 
${class_path} ${FLINK_CONF_DIR})
+
+echo ${jvm_params} $'\n' ${dynamic_configs}
+}
+
+getTmResourceDynamicConfigsAndJvmParamsLegacy() {
+if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
+   echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace 
with key \`${KEY_TASKM_MEM_SIZE}\`"
+else
+   flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
+   FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
+fi
+
+if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
+
+TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
+# Long.MAX_VALUE in TB: This is an upper bound, much less direct 
memory will be used
+TM_MAX_OFFHEAP_SIZE="8388607T"
+
+local jvm_params="-Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M 
-XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
+echo ${jvm_params} # no dynamic configs
+fi
+}
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index f12f9d6..e78a1f1 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -44,32 +44,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == 
"start-foreground" ]]; then
 export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
 fi
 
-if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
-   echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace 
with key \`${KEY_TASKM_MEM_SIZE}\`"
-else
-   flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
-   FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
-fi
-
-if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" 
-lt "0" ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '${KEY_TASKM_MEM_SIZE}' i

[flink] 09/21: [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d80b12834b2f11a64d61ef489ea0c9e38357524
Author: Xintong Song 
AuthorDate: Mon Oct 21 16:28:57 2019 +0800

[FLINK-13986][test] Fix test cases missing explicit task executor resource 
configurations.

FLIP-49 requires either one of the following three size(s) to be explicitly 
configured.
- Task Heap Memory and Managed Memory
- Total Flink Memory
- Total Process Memory

This commit fix test cases that fail due to all the above three are 
missing, by setting a reasonable Total Flink Memory size.
---
 .../src/main/java/org/apache/flink/client/LocalExecutor.java | 2 ++
 .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java   | 3 ++-
 .../runtime/state/TaskExecutorLocalStateStoresManagerTest.java   | 4 +++-
 .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java | 9 ++---
 .../apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java | 7 +--
 .../test/recovery/JobManagerHAProcessFailureRecoveryITCase.java  | 3 ++-
 6 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index b1d3330..7ff020c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -64,6 +65,7 @@ public class LocalExecutor extends PlanExecutor {
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
+   
TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration);
 
int numTaskManagers = configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 94d9133..85f8f2f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -117,7 +117,8 @@ public abstract class KafkaTestBase extends TestLogger {
 
protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
-   
flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
+   flinkConfig.setString(TaskManagerOptions.TASK_HEAP_MEMORY, 
"16m");
+   flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"16m");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
return flinkConfig;
}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index d9eac20..6191d49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -46,6 +47,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends 
TestLogger {
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
priv

[flink] 06/21: [FLINK-13983][runtime] Use flip49 config options to decide memory size of ShuffleEnvironment.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a28bdc854df6e2c52dcb03e36de0ba3915f7866c
Author: Xintong Song 
AuthorDate: Sun Sep 29 11:56:23 2019 +0800

[FLINK-13983][runtime] Use flip49 config options to decide memory size of 
ShuffleEnvironment.
---
 .../io/network/NettyShuffleServiceFactory.java |  1 +
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 13 ++
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 +
 .../TaskManagerServicesConfiguration.java  | 40 +
 .../NettyShuffleEnvironmentConfiguration.java  | 51 +++---
 .../NettyShuffleEnvironmentConfigurationTest.java  |  3 +-
 6 files changed, 93 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index f266c77..477aca0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -59,6 +59,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory systemResourceMetricsProbingInterval;
 
+   @Nullable // should only be null when flip49 is disabled
+   private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
public TaskManagerServicesConfiguration(
Configuration configuration,
ResourceID resourceID,
@@ -101,6 +107,8 @@ public class TaskManagerServicesConfiguration {
MemoryType memoryType,
float memoryFraction,
int pageSize,
+   @Nullable // should only be null when flip49 is disabled
+   TaskExecutorResourceSpec taskExecutorResourceSpec,
long timerServiceShutdownTimeout,
RetryingRegistrationConfiguration 
retryingRegistrationConfiguration,
Optional systemResourceMetricsProbingInterval) {
@@ -122,6 +130,8 @@ public class TaskManagerServicesConfiguration {
this.memoryFraction = memoryFraction;
this.pageSize = pageSize;
 
+   this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+
checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
"service shutdown timeout must be greater or equal to 
0.");
this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
@@ -207,6 +217,11 @@ public class TaskManagerServicesConfiguration {
return pageSize;
}
 
+   @Nullable // should only be null when flip49 is disabled
+   public MemorySize getShuffleMemorySize() {
+   return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getShuffleMemSize();
+   }
+
long getTimerServiceShutdownTimeout() {
return timerServiceShutdownTimeout;
}
@@ -259,6 +274,30 @@ public class TaskManagerServicesConfiguration {
 
final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration = 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+   if 
(configuration.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+   final TaskExecutorResourceSpec taskExecutorResourceSpec 
= TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+   return new TaskManagerServicesConfiguration(
+   configuration,
+   resourceID,
+   remoteAddress,
+   localCommunicationOnly,
+   tmpDirs,
+   localStateRootDir,
+   freeHeapMemoryWithDefrag,
+   maxJvmHeapMemory,
+   localRecoveryMode,
+   queryableStateConfig,
+   ConfigurationParserUtils.getSlot(configuration),
+   
ConfigurationParserUtils.getManagedMemorySize(configuration),
+   
ConfigurationParserUtils.getMemoryType(configuration),
+   
ConfigurationParserUtils.getManagedMemoryFraction(configuration),
+   
ConfigurationParserUtils.getPageSize(configuration),
+   taskExecutorResourceSpec,
+   timerServiceShutdownTimeout,
+   retryingRe

[flink] 15/21: Increase memory in YARNHighAvailabilityITCase

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c499e102929b5793c9470e4527e6206d866f874a
Author: Andrey Zagrebin 
AuthorDate: Mon Nov 4 15:24:02 2019 +0100

Increase memory in YARNHighAvailabilityITCase
---
 .../src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a30046e..f082c84 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -288,7 +288,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
}
 
private RestClusterClient 
deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws 
ClusterDeploymentException {
-   final int containerMemory = 256;
+   final int containerMemory = 1024;
final ClusterClient yarnClusterClient = 
yarnClusterDescriptor.deploySessionCluster(
new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(containerMemory)



[flink] 18/21: fix YarnConfigurationITCase

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d123baf24e68ba7058bbb739954a59879bf5265d
Author: Andrey Zagrebin 
AuthorDate: Tue Nov 5 09:14:50 2019 +0100

fix YarnConfigurationITCase
---
 .../src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java| 2 +-
 .../src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 8350293..dbad597 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -85,7 +85,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
final Configuration configuration = new 
Configuration(flinkConfiguration);
 
final int masterMemory = 64;
-   final int taskManagerMemory = 128;
+   final int taskManagerMemory = 512;
final int slotsPerTaskManager = 3;
 
// disable heap cutoff min
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f958b70..74207ff 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -835,7 +835,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
clusterSpecification.getSlotsPerTaskManager());
 
configuration.setString(
-   TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
+   TaskManagerOptions.TOTAL_PROCESS_MEMORY,
clusterSpecification.getTaskManagerMemoryMB() + 
"m");
 
// Upload the flink configuration



[flink] 12/21: Treat legacy TM heap size as total process memory, not flink memory

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce
Author: Andrey Zagrebin 
AuthorDate: Wed Nov 6 16:24:08 2019 +0100

Treat legacy TM heap size as total process memory, not flink memory
---
 .../flink/configuration/TaskManagerOptions.java|  2 +-
 .../TaskExecutorResourceUtils.java | 27 +++---
 .../TaskExecutorResourceUtilsTest.java | 16 ++---
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7d1492c..d717ad5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -253,6 +253,7 @@ public class TaskManagerOptions {
public static final ConfigOption TOTAL_PROCESS_MEMORY =
key("taskmanager.memory.total-process.size")
.noDefaultValue()
+   .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
.withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
+ " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
+ " containerized setups, this should be set to 
the container memory.");
@@ -264,7 +265,6 @@ public class TaskManagerOptions {
public static final ConfigOption TOTAL_FLINK_MEMORY =
key("taskmanager.memory.total-flink.size")
.noDefaultValue()
-   .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
.withDescription("Total Flink Memory size for the 
TaskExecutors. This includes all the memory that a"
+ " TaskExecutor consumes, except for JVM Metaspace and 
JVM Overhead. It consists of Framework Heap Memory,"
+ " Task Heap Memory, Task Off-Heap Memory, Managed 
Memory, and Shuffle Memory.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 83d2d7d..f2a275f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils {
 
private static MemorySize getTotalFlinkMemorySize(final Configuration 
config) {

checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config));
-   if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) {
-   return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+   return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+   }
+
+   private static MemorySize getTotalProcessMemorySize(final Configuration 
config) {
+   
checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
+   if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+   return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
} else {
@SuppressWarnings("deprecation")
final long legacyHeapMemoryMB = 
config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
@@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils {
}
}
 
-   private static MemorySize getTotalProcessMemorySize(final Configuration 
config) {
-   
checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
-   return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-   }
-
private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final 
Configuration config) {
return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY);
}
@@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils {
}
 
private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final 
Configuration config) {
-   // backward compatible with the deprecated config option 
TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly
-   // configured by the user
-   @SuppressWarnings("deprecation")
-   

[flink] 02/21: [hotfix] Remove heading/trailing/duplicated whitespaces from shell command generated by BootstrapTools.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd728794937d69b710accea513e4662c9e669949
Author: Xintong Song 
AuthorDate: Fri Sep 27 18:09:58 2019 +0800

[hotfix] Remove heading/trailing/duplicated whitespaces from shell command 
generated by BootstrapTools.

This is to eliminate the dependencies on white space counts from 
'BootstrapToolsTest#testGetTaskManagerShellCommand'.
---
 .../runtime/clusterframework/BootstrapTools.java   |  4 +-
 .../clusterframework/BootstrapToolsTest.java   | 20 -
 .../flink/yarn/YarnClusterDescriptorTest.java  | 49 +++---
 3 files changed, 37 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 75de581..0b156ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -472,7 +472,9 @@ public class BootstrapTools {
for (Map.Entry variable : startCommandValues
.entrySet()) {
template = template
-   .replace("%" + variable.getKey() + "%", 
variable.getValue());
+   .replace("%" + variable.getKey() + "%", 
variable.getValue())
+   .replace("  ", " ")
+   .trim();
}
return template;
}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 54aadf5..04ad29c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -165,8 +165,8 @@ public class BootstrapToolsTest extends TestLogger {
 
assertEquals(
java + " " + jvmmem +
-   " " + // jvmOpts
-   " " + // logging
+   "" + // jvmOpts
+   "" + // logging
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
.getTaskManagerShellCommand(cfg, 
containeredParams, "./conf", "./logs",
@@ -175,8 +175,8 @@ public class BootstrapToolsTest extends TestLogger {
final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
assertEquals(
java + " " + jvmmem +
-   " " + " " + krb5 + // jvmOpts
-   " " + // logging
+   " " + krb5 + // jvmOpts
+   "" + // logging
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
.getTaskManagerShellCommand(cfg, 
containeredParams, "./conf", "./logs",
@@ -185,7 +185,7 @@ public class BootstrapToolsTest extends TestLogger {
// logback only, with/out krb5
assertEquals(
java + " " + jvmmem +
-   " " + // jvmOpts
+   "" + // jvmOpts
" " + logfile + " " + logback +
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
@@ -194,7 +194,7 @@ public class BootstrapToolsTest extends TestLogger {
 
assertEquals(
java + " " + jvmmem +
-   " " + " " + krb5 + // jvmOpts
+   " " + krb5 + // jvmOpts
" " + logfile + " " + logback +
" " + mainClass + " " + args + " " + redirects,
BootstrapTools
@@ -204,7 +204,7 @@ public class BootstrapToolsTest extends TestLogger {
// log4j, with/out krb5
assertEquals(
  

[flink] 11/21: Adjust memory configuration for local execution

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4224fd9dd21a6fd6d96aa389676660a522c92e21
Author: Andrey Zagrebin 
AuthorDate: Thu Oct 31 11:53:43 2019 +0100

Adjust memory configuration for local execution
---
 .../MesosTaskManagerParametersTest.java| 32 --
 .../TaskExecutorResourceUtils.java | 24 
 .../minicluster/MiniClusterConfiguration.java  |  3 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java |  2 ++
 .../apache/flink/yarn/YarnResourceManagerTest.java |  2 ++
 5 files changed, 48 insertions(+), 15 deletions(-)

diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index ead7a12..73a36d0 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 
 import scala.Option;
 
+import static 
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -78,7 +79,7 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
Configuration config = new Configuration();

config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_VOLUMES, 
"/host/path:/container/path:ro");
 
-   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   MesosTaskManagerParameters params = 
createMesosTaskManagerParameters(config);
assertEquals(1, params.containerVolumes().size());
assertEquals("/container/path", 
params.containerVolumes().get(0).getContainerPath());
assertEquals("/host/path", 
params.containerVolumes().get(0).getHostPath());
@@ -90,7 +91,7 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
Configuration config = new Configuration();

config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 "testKey=testValue");
 
-   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   MesosTaskManagerParameters params = 
createMesosTaskManagerParameters(config);
assertEquals(params.dockerParameters().size(), 1);
assertEquals(params.dockerParameters().get(0).getKey(), 
"testKey");
assertEquals(params.dockerParameters().get(0).getValue(), 
"testValue");
@@ -102,7 +103,7 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {

config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,

"testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\"");
 
-   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   MesosTaskManagerParameters params = 
createMesosTaskManagerParameters(config);
assertEquals(params.dockerParameters().size(), 4);
assertEquals(params.dockerParameters().get(0).getKey(), 
"testKey1");
assertEquals(params.dockerParameters().get(0).getValue(), 
"testValue1");
@@ -118,7 +119,7 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
public void testContainerDockerParametersMalformed() throws Exception {
Configuration config = new Configuration();

config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 "badParam");
-   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   MesosTaskManagerParameters params = 
createMesosTaskManagerParameters(config);
}
 
@Test
@@ -127,7 +128,7 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
config.setString(MesosTaskManagerParameters.MESOS_TM_URIS,
"file:///dev/null,http://localhost/test,  
test_url ");
 
-   MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+   MesosTaskManagerParameters params = 
createMesosTaskManagerParameters(config);
assertEquals(params.uris().size(), 3);
assertEquals(params.uris().get(0), "file

[flink] 03/21: [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 30ce8c0059a03b494ad7cae93487f9b8c8117749
Author: Xintong Song 
AuthorDate: Thu Sep 26 19:36:20 2019 +0800

[FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to 
call java codes for generating TM resource dynamic configurations and JVM 
parameters.
---
 .../flink/configuration/ConfigurationUtils.java| 58 
 flink-dist/src/main/flink-bin/bin/config.sh| 14 
 flink-dist/src/test/bin/runBashJavaUtilsCmd.sh | 38 +++
 .../org/apache/flink/dist/BashJavaUtilsTest.java   | 54 +++
 .../runtime/taskexecutor/TaskManagerRunner.java|  4 +-
 .../apache/flink/runtime/util/BashJavaUtils.java   | 77 ++
 .../TaskExecutorResourceUtilsTest.java | 44 +++--
 7 files changed, 250 insertions(+), 39 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index b94865c..b6d817c 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 
 import javax.annotation.Nonnull;
@@ -31,6 +32,7 @@ import java.util.Set;
 
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Utility class for {@link Configuration} related helper functions.
@@ -174,6 +176,62 @@ public class ConfigurationUtils {
return separatedPaths.length() > 0 ? separatedPaths.split(",|" 
+ File.pathSeparator) : EMPTY;
}
 
+   @VisibleForTesting
+   public static Map parseTmResourceDynamicConfigs(String 
dynamicConfigsStr) {
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+
+   checkArgument(configStrs.length % 2 == 0);
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   checkArgument(configStr.equals("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   checkArgument(configKV.length == 2);
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
checkArgument(configs.containsKey(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key()));
+   
checkArgument(configs.containsKey(TaskManagerOptions.TASK_HEAP_MEMORY.key()));
+   
checkArgument(configs.containsKey(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
+   
checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key()));
+   
checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key()));
+   
checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_SIZE.key()));
+   
checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key()));
+
+   return configs;
+   }
+
+   @VisibleForTesting
+   public static Map parseTmResourceJvmParams(String 
jvmParamsStr) {
+   final String xmx = "-Xmx";
+   final String xms = "-Xms";
+   final String maxDirect = "-XX:MaxDirectMemorySize=";
+   final String maxMetadata = "-XX:MaxMetaspaceSize=";
+
+   Map configs = new HashMap<>();
+   for (String paramStr : jvmParamsStr.split(" ")) {
+   if (paramStr.startsWith(xmx)) {
+   configs.put(xmx, 
paramStr.substring(xmx.length()));
+   } else if (paramStr.startsWith(xms)) {
+   configs.put(xms, 
paramStr.substring(xms.length()));
+   } else if (paramStr.startsWith(maxDirect)) {
+   configs.put(maxDirect, 
paramStr.substring(maxDirect.length()));
+   } else if (paramStr.startsWith(maxMetadata)) {
+   configs.put(maxMetadata, 
paramStr.substring(maxMetadata.length()));
+   }
+   }
+
+   checkArgument(configs.containsKey(xmx))

[flink] 16/21: fix ClassLoaderITCase

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71b1c3827b65560f004e067ec66ecf2bd1ae5e1f
Author: Andrey Zagrebin 
AuthorDate: Mon Nov 4 18:27:57 2019 +0100

fix ClassLoaderITCase
---
 flink-dist/src/main/flink-bin/bin/config.sh| 26 --
 flink-dist/src/main/flink-bin/bin/taskmanager.sh   |  3 ++-
 .../flink/test/classloading/ClassLoaderITCase.java |  2 +-
 3 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 52ce960..8843877 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -798,6 +798,20 @@ runBashJavaUtilsCmd() {
 echo ${output}
 }
 
+verifyTmResourceConfig() {
+  if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
+   echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace 
with key \`${KEY_TASKM_MEM_SIZE}\`"
+else
+   flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
+   FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
+fi
+
+if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+}
+
 getTmResourceDynamicConfigsAndJvmParams() {
 if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == 
"true" ]]; then
 echo "$(getTmResourceDynamicConfigsAndJvmParamsFlip49)"
@@ -817,18 +831,6 @@ getTmResourceDynamicConfigsAndJvmParamsFlip49() {
 }
 
 getTmResourceDynamicConfigsAndJvmParamsLegacy() {
-if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
-   echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace 
with key \`${KEY_TASKM_MEM_SIZE}\`"
-else
-   flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
-   FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
-fi
-
-if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" 
-lt "0" ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
-exit 1
-fi
-
 if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
 
 TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e78a1f1..a3d62e2 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -48,6 +48,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == 
"start-foreground" ]]; then
 export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} 
${FLINK_ENV_JAVA_OPTS_TM}"
 
 # Startup parameters
+verifyTmResourceConfig
 dynamic_configs_and_jvm_params=$(getTmResourceDynamicConfigsAndJvmParams)
 IFS=$'\n' lines=(${dynamic_configs_and_jvm_params})
 
@@ -55,7 +56,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == 
"start-foreground" ]]; then
 export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
 
 dynamic_configs=${lines[1]}
-ARGS=(${ARGS[@]} ${dynamic_configs})
+ARGS+=(${dynamic_configs})
 ARGS+=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index a09142e..f90a5f3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -116,7 +116,7 @@ public class ClassLoaderITCase extends TestLogger {

FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
// required as we otherwise run out of memory
-   config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
"80m");
+   config.setString(TaskManagerOptions.TOTAL_FLINK_MEMORY, "512m");
 
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()



[flink] 08/21: [FLINK-13986][core][config] Change default value of flip49 feature flag to true

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1daac36f0ac6e5e2039823d0a48ef576839f437
Author: Xintong Song 
AuthorDate: Wed Oct 16 21:00:09 2019 +0800

[FLINK-13986][core][config] Change default value of flip49 feature flag to 
true
---
 .../main/java/org/apache/flink/configuration/TaskManagerOptions.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b190ee8..7d1492c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -484,7 +484,7 @@ public class TaskManagerOptions {
@Documentation.ExcludeFromDocumentation("FLIP-49 is still in 
development.")
public static final ConfigOption ENABLE_FLIP_49_CONFIG =
key("taskmanager.enable-flip-49")
-   .defaultValue(false)
+   .defaultValue(true)
.withDescription("Toggle to switch between FLIP-49 and 
current task manager memory configurations.");
 
/** Not intended to be instantiated. */



[flink] 07/21: [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4b482af4b93ee9fc98de08b7b427514df41d61b
Author: Xintong Song 
AuthorDate: Mon Oct 14 16:09:23 2019 +0800

[FLINK-13983][runtime] Use flip49 config options to decide memory size of 
MemoryManager.
---
 .../flink/runtime/taskexecutor/TaskManagerServices.java   | 15 +++
 .../taskexecutor/TaskManagerServicesConfiguration.java| 10 ++
 2 files changed, 25 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e570799..91f611a 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -51,7 +51,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -330,6 +332,19 @@ public class TaskManagerServices {
 */
private static MemoryManager createMemoryManager(
TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) {
+   if 
(taskManagerServicesConfiguration.getOnHeapManagedMemorySize() != null &&
+   
taskManagerServicesConfiguration.getOffHeapManagedMemorySize() != null) {
+   // flip49 enabled
+
+   final Map memorySizeByType = new 
HashMap<>();
+   memorySizeByType.put(MemoryType.HEAP, 
taskManagerServicesConfiguration.getOnHeapManagedMemorySize().getBytes());
+   memorySizeByType.put(MemoryType.OFF_HEAP, 
taskManagerServicesConfiguration.getOffHeapManagedMemorySize().getBytes());
+
+   return new MemoryManager(memorySizeByType,
+   
taskManagerServicesConfiguration.getNumberOfSlots(),
+   taskManagerServicesConfiguration.getPageSize());
+   }
+
// computing the amount of memory to use depends on how much 
memory is available
// it strictly needs to happen AFTER the network stack has been 
initialized
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c2133c9..bd84107 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -222,6 +222,16 @@ public class TaskManagerServicesConfiguration {
return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getShuffleMemSize();
}
 
+   @Nullable // should only be null when flip49 is disabled
+   public MemorySize getOnHeapManagedMemorySize() {
+   return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getOnHeapManagedMemorySize();
+   }
+
+   @Nullable // should only be null when flip49 is disabled
+   public MemorySize getOffHeapManagedMemorySize() {
+   return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getOffHeapManagedMemorySize();
+   }
+
long getTimerServiceShutdownTimeout() {
return timerServiceShutdownTimeout;
}



[flink] 21/21: run e2e

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38b436cbd24d54b65bf7479b05f13920255271f9
Author: Andrey Zagrebin 
AuthorDate: Wed Nov 6 18:15:49 2019 +0100

run e2e
---
 .travis.yml | 157 
 1 file changed, 52 insertions(+), 105 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 109533f..d6fc195 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -71,7 +71,6 @@ stages:
   - name: compile
   - name: test
   - name: E2E
-if: type = cron
   - name: cleanup
 
 jdk: "openjdk8"
@@ -130,242 +129,190 @@ jobs:
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11"
   name: cleanup
 # hadoop 2.4.1 profile
-- if: type = cron
-  stage: compile
+- stage: compile
   script: ./tools/travis_controller.sh compile
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: compile - hadoop 2.4.1
-- if: type = cron
-  stage: test
+- stage: test
   script: ./tools/travis_controller.sh core
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: core - hadoop 2.4.1
-- if: type = cron
-  script: ./tools/travis_controller.sh libraries
+- script: ./tools/travis_controller.sh libraries
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: libraries - hadoop 2.4.1
-- if: type = cron
-  script: ./tools/travis_controller.sh blink_planner
+- script: ./tools/travis_controller.sh blink_planner
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: blink_planner - hadoop 2.4.1
-- if: type = cron
-  script: ./tools/travis_controller.sh connectors
+- script: ./tools/travis_controller.sh connectors
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: connectors - hadoop 2.4.1
-- if: type = cron
-  script: ./tools/travis_controller.sh kafka/gelly
+- script: ./tools/travis_controller.sh kafka/gelly
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: kafka/gelly - hadoop 2.4.1
-- if: type = cron
-  script: ./tools/travis_controller.sh tests
+- script: ./tools/travis_controller.sh tests
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: tests - hadoop 2.4.1
-- if: type = cron
-  script: ./tools/travis_controller.sh misc
+- script: ./tools/travis_controller.sh misc
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: misc - hadoop 2.4.1
-- if: type = cron
-  stage: cleanup
+- stage: cleanup
   script: ./tools/travis_controller.sh cleanup
   env: PROFILE="-Dhadoop.version=2.4.1 -Pskip-hive-tests"
   name: cleanup - hadoop 2.4.1
 # scala 2.12 profile
-- if: type = cron
-  stage: compile
+- stage: compile
   script: ./tools/travis_controller.sh compile
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: compile - scala 2.12
-- if: type = cron
-  stage: test
+- stage: test
   script: ./tools/travis_controller.sh core
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: core - scala 2.12
-- if: type = cron
-  script: ./tools/travis_controller.sh libraries
+- script: ./tools/travis_controller.sh libraries
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: libraries - scala 2.12
-- if: type = cron
-  script: ./tools/travis_controller.sh blink_planner
+- script: ./tools/travis_controller.sh blink_planner
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: blink_planner - scala 2.12
-- if: type = cron
-  script: ./tools/travis_controller.sh connectors
+- script: ./tools/travis_controller.sh connectors
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: connectors - scala 2.12
-- if: type = cron
-  script: ./tools/travis_controller.sh kafka/gelly
+- script: ./tools/travis_controller.sh kafka/gelly
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: kafka/gelly - scala 2.12
-- if: type = cron
-  script: ./tools/travis_controller.sh tests
+- script: ./tools/travis_controller.sh tests
   env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.12 
-Phive-1.2.1"
   name: tests - scala 2.12
-- if: type = cron
-  script: ./tools/travis_controller.sh misc
+- script: ./tools/travis_control

[flink] 17/21: fix LaunchableMesosWorkerTest

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc0ba44b40fd86c1fb019f37383b849956125874
Author: Andrey Zagrebin 
AuthorDate: Mon Nov 4 18:31:06 2019 +0100

fix LaunchableMesosWorkerTest
---
 .../flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 3d53160..42095b8 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosResourceAllocation;
 import org.apache.flink.mesos.util.MesosUtils;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.mesos.Protos;
@@ -86,6 +87,7 @@ public class LaunchableMesosWorkerTest extends TestLogger {
configuration.setString(MesosOptions.MASTER_URL, "foobar");
final MemorySize memorySize = new MemorySize(1337L);

configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
memorySize.toString());
+   
TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration);
 
final LaunchableTask launchableTask = new LaunchableMesosWorker(
ignored -> Option.empty(),



[flink] branch master updated (f6e39aa -> 306b83b)

2019-11-18 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f6e39aa  [FLINK-14830][docs-zh] Correct the links in 
stream_checkpointing.zh.md page (#10233)
 add 7f3471c  [hotfix] Add shortcuts for getting jvm heap / direct memory 
size.
 add 306b83b  [FLINK-14637][core][runtime] Introduce config option for 
framework off-heap memory.

No new revisions were added by this update.

Summary of changes:
 .../task_manager_memory_configuration.html |  5 
 .../flink/configuration/TaskManagerOptions.java| 10 +++
 .../clusterframework/TaskExecutorResourceSpec.java | 33 +-
 .../TaskExecutorResourceUtils.java | 32 +++--
 .../TaskExecutorResourceUtilsTest.java | 18 ++--
 5 files changed, 85 insertions(+), 13 deletions(-)



[flink] branch master updated (d8a5d41 -> ec12ec0)

2019-11-18 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d8a5d41  [FLINK-14750][documentation] Regenerated restart strategy 
documentation
 add ec12ec0  [hotfix] Add String type in config docs for 
taskmanager.memory.framework.off-heap.size

No new revisions were added by this update.

Summary of changes:
 docs/_includes/generated/task_manager_memory_configuration.html | 1 +
 1 file changed, 1 insertion(+)



[flink] branch master updated (7128fdc -> 36d1b62)

2019-11-12 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7128fdc  [FLINK-14693][python] Fix Python tox checks failure on travis
 add b872f19  [hotfix] Introduce constants MemorySize#ZERO and 
MemorySize#MAX_VALUE.
 add 95fe886  [hotfix] Move SimpleSlotContext to test scope.
 add 740adc1  [hotfix] Fix misusage of UNKNOWN and ANY ResourceProfiles
 add 4a5e8c0  [hotfix] Deduplicate setting operator resource / parallelism 
argument checks.
 add 9d22410  [hotfix] UNKNOWN ResourceSpec should not be numerically 
compared.
 add 1cbee5e  [hotfix] Make MemorySize comparable.
 add 2c81c3f  [FLINK-14405][runtime] Update ResourceSpec to align with 
FLIP-49 resource types
 add b6c2943  [FLINK-14405][runtime] Update ResourceProfile to align with 
FLIP-49 resource types
 add 001733b  [FLINK-14495][core] Limit ResourceSpec to always specify cpu 
cores and task heap memory size, unless it UNKNOWN.
 add 099e965  [hotfix] Remove unused constructors and unnecessary javadocs 
for ResourceSpec.
 add bfd6aa5  [hotfix] Remove unused constructors and unnecessary 
javadoces, and annotate constructors used only in testing codes as 
VisibleForTestting for ResourceProfile.
 add 36d1b62  [hotfix] Preserve the singleton property for 
ResourceProfile#ANY.

No new revisions were added by this update.

Summary of changes:
 .../flink/api/common/operators/ResourceSpec.java   | 265 ---
 .../operators/util/OperatorValidationUtils.java|  79 +
 .../org/apache/flink/api/dag/Transformation.java   |  11 +-
 .../org/apache/flink/configuration/MemorySize.java |  11 +-
 .../api/common/operators/ResourceSpecTest.java |  74 ++---
 .../apache/flink/configuration/MemorySizeTest.java |   2 +-
 .../apache/flink/api/java/operators/DataSink.java  |  13 +-
 .../flink/api/java/operators/DeltaIteration.java   |  13 +-
 .../apache/flink/api/java/operators/Operator.java  |  14 +-
 .../flink/api/java/operator/OperatorTest.java  |   4 +-
 .../plantranslate/JobGraphGeneratorTest.java   |  26 +-
 .../TaskExecutorResourceUtils.java |   4 +-
 .../clusterframework/types/ResourceProfile.java| 368 -
 .../runtime/executiongraph/ExecutionJobVertex.java |   3 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  11 +-
 .../TaskExecutorResourceUtilsTest.java |   8 +-
 ...ocationPreferenceSlotSelectionStrategyTest.java |   2 +-
 .../types/ResourceProfileTest.java | 152 -
 .../flink/runtime/dispatcher/DispatcherTest.java   |   2 +-
 .../executiongraph/utils/SimpleSlotProvider.java   |   4 +-
 .../flink/runtime/instance/SimpleSlotContext.java  |   2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java |   6 +-
 .../jobmaster/slotpool/AllocatedSlotsTest.java |   2 +-
 .../jobmaster/slotpool/SingleLogicalSlotTest.java  |   2 +-
 .../jobmaster/slotpool/SlotPoolCoLocationTest.java |   4 +-
 .../jobmaster/slotpool/SlotPoolImplTest.java   |  16 +-
 .../slotpool/SlotPoolRequestCompletionTest.java|   2 +-
 .../slotpool/SlotPoolSlotSharingTest.java  |   8 +-
 .../ResourceManagerTaskExecutorTest.java   |   4 +-
 .../slotmanager/SlotManagerImplTest.java   |  20 +-
 .../slotmanager/TestingResourceActionsBuilder.java |   4 +-
 .../TaskExecutorPartitionLifecycleTest.java|   2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java |  38 +--
 .../TaskSubmissionTestEnvironment.java |   2 +-
 .../taskexecutor/slot/TaskSlotTableTest.java   |   2 +-
 .../streaming/api/datastream/DataStreamSink.java   |  10 +-
 .../streaming/api/datastream/DataStreamSource.java |  10 +-
 .../api/datastream/SingleOutputStreamOperator.java |  23 +-
 .../apache/flink/streaming/api/DataStreamTest.java |  28 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  20 +-
 .../plan/nodes/resource/NodeResourceUtil.java  |   5 +-
 41 files changed, 692 insertions(+), 584 deletions(-)
 create mode 100644 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorValidationUtils.java
 rename flink-runtime/src/{main => 
test}/java/org/apache/flink/runtime/instance/SimpleSlotContext.java (98%)



[flink] branch FLINK-13986-flip49-cleanup-e2e created (now 38b436c)

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git.


  at 38b436c  run e2e

This branch includes the following new commits:

 new 2b9e5d9  [hotfix] Refactor 
'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for 
executing testing bash scripts from java classes.
 new dd72879  [hotfix] Remove heading/trailing/duplicated whitespaces from 
shell command generated by BootstrapTools.
 new 30ce8c0  [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing 
bash scripts to call java codes for generating TM resource dynamic 
configurations and JVM parameters.
 new 4088bea  [FLINK-13983][dist] TM startup scripts calls java codes to 
set flip49 TM resource configs and JVM parameters, if feature option is enabled.
 new 5b3ebcf  [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on 
Yarn/Mesos with JVM parameters and dynamic configs generated from 
TaskExecutorResourceSpec.
 new a28bdc8  [FLINK-13983][runtime] Use flip49 config options to decide 
memory size of ShuffleEnvironment.
 new b4b482a  [FLINK-13983][runtime] Use flip49 config options to decide 
memory size of MemoryManager.
 new e1daac3  [FLINK-13986][core][config] Change default value of flip49 
feature flag to true
 new 1d80b12  [FLINK-13986][test] Fix test cases missing explicit task 
executor resource configurations.
 new 436f467  [FLINK-13986][test] Fix failure cases that fail due to change 
of expected exception type.
 new 4224fd9  Adjust memory configuration for local execution
 new fd73130  Treat legacy TM heap size as total process memory, not flink 
memory
 new 5b5be51  Add backwards compatibility
 new 1b7df76  Change task off heap default value from 0b to 1m to 
accomodate for framework small allocations (temporary: later add framework off 
heap)
 new c499e10  Increase memory in YARNHighAvailabilityITCase
 new 71b1c38  fix ClassLoaderITCase
 new dc0ba44  fix LaunchableMesosWorkerTest
 new d123baf  fix YarnConfigurationITCase
 new 1f24495  Fix yarn cut off
 new b8c6a4c  [FLINK-14631] Account for netty direct allocations in direct 
memory limit (Netty Shuffle)
 new 38b436c  run e2e

The 21 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink] 01/21: [hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for executing testing bash scripts from java classes.

2019-11-07 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b9e5d9f545a136cede63bc924c03ea013029fe3
Author: Xintong Song 
AuthorDate: Thu Sep 26 19:38:55 2019 +0800

[hotfix] Refactor 'TaskManagerHeapSizeCalculationJavaBashTest', abstract 
'JavaBashTestBase' for executing testing bash scripts from java classes.
---
 .../org/apache/flink/dist/JavaBashTestBase.java| 60 ++
 ...TaskManagerHeapSizeCalculationJavaBashTest.java | 34 +---
 2 files changed, 61 insertions(+), 33 deletions(-)

diff --git 
a/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java 
b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java
new file mode 100644
index 000..63faaa2
--- /dev/null
+++ b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Abstract test class for executing bash scripts.
+ */
+public abstract class JavaBashTestBase extends TestLogger {
+   @BeforeClass
+   public static void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts which are not 
available on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Executes the given shell script wrapper and returns its output.
+*
+* @param command  command to run
+*
+* @return raw script output
+*/
+   protected String executeScript(final String[] command) throws 
IOException {
+   ProcessBuilder pb = new ProcessBuilder(command);
+   pb.redirectErrorStream(true);
+   Process process = pb.start();
+   BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+   StringBuilder sb = new StringBuilder();
+   String s;
+   while ((s = reader.readLine()) != null) {
+   sb.append(s);
+   }
+   return sb.toString();
+   }
+}
diff --git 
a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 
b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 0b5f5be..4f172d9 100755
--- 
a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ 
b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -24,16 +24,10 @@ import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.Random;
 
 import static org.hamcrest.CoreMatchers.allOf;
@@ -51,7 +45,7 @@ import static org.junit.Assert.assertThat;
  * double precision but our Java code restrains to float 
because we actually do
  * not need high precision.
  */
-public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
+public class TaskManagerHeapSizeCalculationJavaBashTest extends 
JavaBashTestBase {
 
/** Key that is used by config.sh. */
private static final String KEY_TASKM_MEM_SIZE = 
"taskmanager.heap.size";
@@ -64,12 +58,6 @@ public class TaskManagerHeapSizeCalculationJavaBashTest 
extends TestLogger {
 */
private static final int 

[flink] branch master updated (88c206d -> 2142dc7)

2019-12-04 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 88c206d  [hotfix][state-backends] Simple cleanups in 
RocksDBStateBackend
 add 4060acc  [FLINK-15047][runtime] Fix format of setting managed memory 
size in ActiveResourceManagerFactory.
 add 1f5e56a  [hotfix][runtime] Code clean-up in ResourceProfile.
 add 2142dc7  [FLINK-15023][runtime] Remove on-heap managed memory

No new revisions were added by this update.

Summary of changes:
 .../task_manager_memory_configuration.html |  16 +---
 docs/ops/config.md |   2 +-
 docs/ops/config.zh.md  |   2 +-
 .../flink/api/common/operators/ResourceSpec.java   |  73 +--
 .../flink/configuration/ConfigurationUtils.java|   1 -
 .../flink/configuration/TaskManagerOptions.java|  48 ++
 .../clusterframework/TaskExecutorResourceSpec.java |  61 +
 .../TaskExecutorResourceUtils.java | 100 ++---
 .../clusterframework/types/ResourceProfile.java|  88 ++
 .../ActiveResourceManagerFactory.java  |   2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  35 ++--
 .../TaskManagerServicesConfiguration.java  |   8 +-
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |   7 +-
 .../clusterframework/BootstrapToolsTest.java   |   3 +-
 .../TaskExecutorResourceUtilsTest.java |  85 +-
 .../types/ResourceProfileTest.java |  71 ++-
 .../jobmaster/slotpool/SlotSharingManagerTest.java |   9 +-
 .../runtime/taskexecutor/slot/TaskSlotUtils.java   |   3 +-
 .../flink/streaming/api/graph/StreamConfig.java|  27 ++
 .../api/graph/StreamingJobGraphGenerator.java  |  23 ++---
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  44 -
 .../plan/nodes/resource/NodeResourceUtil.java  |   3 +-
 .../apache/flink/yarn/YarnResourceManagerTest.java |   3 +-
 23 files changed, 163 insertions(+), 551 deletions(-)



[flink] branch master updated (5576851 -> 60b3f2f)

2019-12-03 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5576851  [FLINK-13986][core][config] Remove FLIP-49 feature option.
 add 60b3f2f  [FLINK-14936][runtime] Introduce 
MemoryManager#computeMemorySize to calculate managed memory size from a fraction

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/runtime/memory/MemoryManager.java | 14 
 .../flink/runtime/memory/MemoryManagerTest.java| 25 ++
 2 files changed, 39 insertions(+)



[flink] branch master updated (89bd90d -> 5576851)

2019-12-03 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 89bd90d  [FLINK-15042][python] Fix python compatibility by excluding 
executeAsync
 add 5dca721  [hotfix][runtime] Map deprecated config option 
'taskmanager.heap.size' and 'taskmanager.heap.mb' to total process memory.
 add 1bc28ef  [hotfix][dist] Refactor 
'TaskManagerHeapSizeCalculationJavaBashTest', abstract 'JavaBashTestBase' for 
executing testing bash scripts from java classes.
 add 3704a0e  [hotfix][runtime] Remove heading/trailing/duplicated 
whitespaces from shell command generated by BootstrapTools.
 add 30132a8  [hotfix][mesos] Fix missing junit annotation for 
MesosTaskManagerParametersTest#testForcePullImageTrue.
 add 7027094  [hotfix][runtime] Fix backwards compatibility of 
NettyShuffleEnvironmentOptions#NETWORK_NUM_BUFFERS.
 add 96b735d  [hotfix][runtime] If not specified, set Total Flink Memory 
according to JVM free heap memory for MiniCluster.
 add 82f8f42b [hotfix][runtime] Code style clean-up for 
TaskExecutorResourceUtilsTest
 add 99bc4fd  [hotfix][runtime] Throw IllegalConfigurationException instead 
of NumberFormatException for negative memory sizes in TaskExecutorResourceUtils.
 add 6681e11  [hotfix][runtime] Add backwards compatibility for setting 
task executor memory through environment variable.
 add 4b8ed64  [hotfix][core][config] Change default value of 
TaskManagerOptions#MEMORY_OFF_HEAP from false to true.
 add 65e9507  [hotfix][core][config] Change default value of managed memory 
fraction from 0.5 to 0.4.
 add 8090cfd  [hotfix][core][config] Change default framework off-heap 
memory size from 64m to 128m.
 add 0e6c8fa  [hotfix][core][config] Change default jvm metaspace size from 
192m to 128m.
 add 937eed6  [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing 
bash scripts to call java codes for generating TM resource dynamic 
configurations and JVM parameters.
 add 9d1256c  [FLINK-13983][runtime][yarn/mesos][dist] Launch task executor 
with new memory calculation logics
 add c324f1a  [FLINK-13986][dist] Remove codes for legacy memory 
calculations in task executor startup scripts.
 add d24c324  [FLINK-13986][core][runtime] Remove java codes for legacy 
memory calculations.
 add ba1d711  [FLINK-13986][core][config] Remove legacy config option 
TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY.
 add 96da510  [FLINK-13986][core][config] Remove legacy config option 
TaskManagerOptions#LEGACY_MANAGED_MEMORY_FRACTION.
 add ffe323e  [FLINK-13986][core][config] Remove legacy config option 
TaskManagerOptions#LEGACY_MANAGED_MEMORY_SIZE.
 add 65460ec  [FLINK-13986][core][config] Remove unused 
ConfigConstants#TASK_MANAGER_MEMORY_OFF_HEAP_KEY.
 add 96fe45e  [FLINK-13986][core][config] Remove usage of legacy 
NETWORK_BUFFERS_MEMORY_(MIN|MAX|FRACTION).
 add 5bdc299  [FLINK-13986][doc] Update docs for managed memory 
configuration.
 add 5576851  [FLINK-13986][core][config] Remove FLIP-49 feature option.

No new revisions were added by this update.

Summary of changes:
 docs/_includes/generated/common_section.html   |   4 +-
 .../task_manager_memory_configuration.html |   6 +-
 docs/ops/config.md |  10 +-
 docs/ops/config.zh.md  |  10 +-
 docs/ops/deployment/cluster_setup.md   |   6 +-
 docs/ops/deployment/cluster_setup.zh.md|   6 +-
 docs/ops/deployment/kubernetes.md  |   2 +-
 docs/ops/deployment/kubernetes.zh.md   |   2 +-
 docs/ops/deployment/mesos.md   |   4 +-
 docs/ops/deployment/mesos.zh.md|   2 +-
 docs/ops/deployment/yarn_setup.md  |   2 +-
 docs/ops/deployment/yarn_setup.zh.md   |   2 +-
 .../client/deployment/ClusterSpecification.java|   6 +-
 .../kafka/KafkaShortRetentionTestBase.java |   2 +-
 .../streaming/connectors/kafka/KafkaTestBase.java  |   2 +-
 .../kinesis/manualtests/ManualExactlyOnceTest.java |   2 +-
 .../ManualExactlyOnceWithStreamReshardingTest.java |   2 +-
 .../flink/configuration/ConfigConstants.java   |  34 ---
 .../flink/configuration/ConfigurationUtils.java|  78 +++--
 .../flink/configuration/TaskManagerOptions.java|  68 +
 flink-dist/src/main/flink-bin/bin/config.sh| 184 ++--
 flink-dist/src/main/flink-bin/bin/taskmanager.sh   |  30 +-
 flink-dist/src/main/resources/flink-conf.yaml  |   8 +-
 flink-dist/src/test/bin/calcTMHeapSizeMB.sh|  50 
 .../{calcTMNetBufMem.sh => runBashJavaUtilsCmd.sh} |  21 +-
 .../org/apache/flink/dist/BashJavaUtilsTest.java   |  54 
 .../org/apache/flink/dist/JavaBashTestBase.java|  60 
 ...TaskManagerHeapSizeCalculationJavaBashTest.java | 333 -
 flink-end-to-end-te

[flink] branch master updated (20e945d -> db3dec5)

2019-12-08 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 20e945d  [FLINK-14898] Enable background cleanup of state with TTL by 
default
 add dbdaf5b  [hotfix][core] Introduce divide operation to Resource
 add a78a5d4  [hotfix][core] Introduce divide operation to MemorySize
 add 5183be8  [hotfix][core] Introduce TaskExecutorResourceSpecBuilder for 
building TaskExecutorResourceSpec.
 add d2558f4  [FLINK-14188][runtime][yarn/mesos] Set container cpu cores 
into TaskExecutorResourceSpec when launching TaskExecutors on Yarn/Mesos.
 add db33a49  [FLINK-14188][runtime] Derive and register TaskExecutor to 
ResourceManager with default slot resource profile.
 add 66be972  [FLINK-14188][runtime] Use default slot resource profile 
derived from TaskExecutorResourceSpec on both RM and TM sides.
 add 25f79fb  [hotfix][runtime] Wrap arguments of 
ResourceManagerGateway#registerTaskExecutor into TaskExecutorRegistration.
 add da071ce  [FLINK-14189][runtime] TaskExecutor register to 
ResourceManager with total resource profile.
 add 516aee3  [FLINK-14189][runtime] Extend TaskExecutor to support dynamic 
slot allocation
 add db3dec5  [FLINK-14189][runtime] Do not store dynamic slots by index in 
TaskSlotTable

No new revisions were added by this update.

Summary of changes:
 .../flink/api/common/resources/Resource.java   |   9 +
 .../org/apache/flink/configuration/MemorySize.java |   5 +
 .../flink/configuration/TaskManagerOptions.java|  27 ++-
 .../flink/api/common/resources/ResourceTest.java   |  28 +++
 .../apache/flink/configuration/MemorySizeTest.java |  13 ++
 .../kubernetes/KubernetesResourceManager.java  |   7 +-
 .../kubernetes/KubernetesResourceManagerTest.java  |  13 +-
 .../flink/kubernetes/KubernetesUtilsTest.java  |   2 +
 .../clusterframework/MesosResourceManager.java |   5 +-
 .../MesosTaskManagerParameters.java|  42 +++--
 .../clusterframework/MesosResourceManagerTest.java |  18 +-
 .../MesosTaskManagerParametersTest.java|   9 +
 .../clusterframework/TaskExecutorResourceSpec.java |  12 +-
 .../TaskExecutorResourceSpecBuilder.java   |  60 ++
 .../TaskExecutorResourceUtils.java |  82 -
 .../types/ResourceBudgetManager.java   |  76 
 .../runtime/clusterframework/types/SlotID.java |  14 +-
 .../resourcemanager/ActiveResourceManager.java |  11 +-
 .../runtime/resourcemanager/ResourceManager.java   |  59 ++
 .../resourcemanager/ResourceManagerGateway.java|  11 +-
 .../resourcemanager/TaskExecutorRegistration.java  | 103 +++
 .../slotmanager/SlotManagerImpl.java   |   1 +
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  21 ++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  |   3 +
 .../TaskExecutorToResourceManagerConnection.java   |  53 ++
 .../taskexecutor/TaskManagerConfiguration.java |  25 ++-
 .../runtime/taskexecutor/TaskManagerRunner.java|  11 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  45 +
 .../TaskManagerServicesConfiguration.java  |   9 +-
 .../flink/runtime/taskexecutor/slot/SlotOffer.java |   1 -
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |  74 ++--
 .../runtime/taskexecutor/slot/TaskSlotState.java   |   3 +-
 .../runtime/taskexecutor/slot/TaskSlotTable.java   | 205 -
 .../clusterframework/BootstrapToolsTest.java   |   2 +
 .../TaskExecutorResourceUtilsTest.java |  53 ++
 .../types/ResourceBudgetManagerTest.java   |  70 +++
 .../ResourceManagerTaskExecutorTest.java   |  23 ++-
 .../resourcemanager/ResourceManagerTest.java   |  30 +--
 .../slotmanager/SlotManagerImplTest.java   |  52 +++---
 .../utils/TestingResourceManagerGateway.java   |  12 +-
 .../TaskExecutorLocalStateStoresManagerTest.java   |   6 +-
 .../TaskExecutorPartitionLifecycleTest.java|  10 +-
 .../runtime/taskexecutor/TaskExecutorTest.java | 204 +++-
 ...askExecutorToResourceManagerConnectionTest.java |  27 ++-
 .../TaskSubmissionTestEnvironment.java |  11 +-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  11 +-
 .../TestingTaskExecutorGatewayBuilder.java |   9 +-
 .../taskexecutor/slot/TaskSlotTableTest.java   | 135 ++
 .../runtime/taskexecutor/slot/TaskSlotUtils.java   |  36 ++--
 .../runtime/util/JvmExitOnFatalErrorTest.java  |   7 +-
 .../test/java/org/apache/flink/yarn/UtilsTest.java |   5 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   5 +-
 .../org/apache/flink/yarn/YarnResourceManager.java |  31 +++-
 .../apache/flink/yarn/YarnResourceManagerTest.java |  15 +-
 54 files changed, 1349 insertions(+), 462 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework

[flink] branch master updated (1858ce2 -> 20e945d)

2019-12-08 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1858ce2  [hotfix][hive] adapt 
HiveFunctionDefinitionFactory.createFunctionDefinition to latest APPI
 add 20e945d  [FLINK-14898] Enable background cleanup of state with TTL by 
default

No new revisions were added by this update.

Summary of changes:
 docs/dev/stream/state/state.md | 58 ++
 .../flink/api/common/state/StateTtlConfig.java | 20 ++--
 .../flink/api/common/state/StateTtlConfigTest.java | 54 ++--
 .../flink/runtime/state/ttl/TtlStateTestBase.java  |  1 +
 4 files changed, 82 insertions(+), 51 deletions(-)



[flink] branch master updated (1489cb0 -> f90fd6b)

2019-12-09 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1489cb0  [FLINK-15130][core] Deprecate RequiredParameters and Option
 add f90fd6b  [FLINK-15136][docs] Update the Chinese version of Working 
with State TTL

No new revisions were added by this update.

Summary of changes:
 docs/dev/stream/state/state.zh.md | 39 ++-
 1 file changed, 18 insertions(+), 21 deletions(-)



[flink] branch master updated (5c89d12 -> 2c11291)

2019-12-11 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5c89d12  [hotfix][doc] update Hive functions page with more info
 add 2c11291  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests

No new revisions were added by this update.

Summary of changes:
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +
 2 files changed, 30 insertions(+), 27 deletions(-)



[flink] branch release-1.8 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests

2019-12-11 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new dfcbf68  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests
dfcbf68 is described below

commit dfcbf68dbd792fc27c997078be7a59a594005d8b
Author: Yangze Guo 
AuthorDate: Tue Dec 10 10:03:49 2019 +0800

[FLINK-14951][tests] Harden the thread safety of State TTL backend tests
---
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
index 24bc9dc..c2e66f1 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -19,12 +19,15 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A stub implementation of a {@link TtlTimeProvider} which guarantees that
  * processing time increases monotonically.
@@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
private static final Object lock = new Object();
 
@GuardedBy("lock")
-   static long freeze() {
+   static  T 
doWithFrozenTime(FunctionWithException action) throws E {
synchronized (lock) {
-   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
-   timeIsFrozen = true;
-   return getCurrentTimestamp();
-   } else {
-   return lastReturnedProcessingTime;
-   }
+   final long timestampBeforeUpdate = freeze();
+   T result = action.apply(timestampBeforeUpdate);
+   final long timestampAfterUpdate = unfreezeTime();
+
+   checkState(timestampAfterUpdate == 
timestampBeforeUpdate,
+   "Timestamps before and after the update do not 
match.");
+   return result;
+   }
+   }
+
+   private static long freeze() {
+   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
+   timeIsFrozen = true;
+   return getCurrentTimestamp();
+   } else {
+   return lastReturnedProcessingTime;
}
}
 
@@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
return lastReturnedProcessingTime;
}
 
-   @GuardedBy("lock")
-   static long unfreezeTime() {
-   synchronized (lock) {
-   timeIsFrozen = false;
-   return lastReturnedProcessingTime;
-   }
+   private static long unfreezeTime() {
+   timeIsFrozen = false;
+   return lastReturnedProcessingTime;
}
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 94e9dbd..ed69171 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Update state with TTL for each verifier.
@@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends 
RichFlatMapFunction verifier,
Object update) throws Exception {
 
-   final long timestampBeforeUpdate = 
MonotonicTTLTimeProvider.freeze();
-   State sta

[flink] branch release-1.9 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests

2019-12-11 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 8a73d68  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests
8a73d68 is described below

commit 8a73d680869da0d7bb4d543bfc197d01f3b0e068
Author: Yangze Guo 
AuthorDate: Tue Dec 10 10:03:49 2019 +0800

[FLINK-14951][tests] Harden the thread safety of State TTL backend tests
---
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
index 24bc9dc..c2e66f1 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -19,12 +19,15 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A stub implementation of a {@link TtlTimeProvider} which guarantees that
  * processing time increases monotonically.
@@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
private static final Object lock = new Object();
 
@GuardedBy("lock")
-   static long freeze() {
+   static  T 
doWithFrozenTime(FunctionWithException action) throws E {
synchronized (lock) {
-   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
-   timeIsFrozen = true;
-   return getCurrentTimestamp();
-   } else {
-   return lastReturnedProcessingTime;
-   }
+   final long timestampBeforeUpdate = freeze();
+   T result = action.apply(timestampBeforeUpdate);
+   final long timestampAfterUpdate = unfreezeTime();
+
+   checkState(timestampAfterUpdate == 
timestampBeforeUpdate,
+   "Timestamps before and after the update do not 
match.");
+   return result;
+   }
+   }
+
+   private static long freeze() {
+   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
+   timeIsFrozen = true;
+   return getCurrentTimestamp();
+   } else {
+   return lastReturnedProcessingTime;
}
}
 
@@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
return lastReturnedProcessingTime;
}
 
-   @GuardedBy("lock")
-   static long unfreezeTime() {
-   synchronized (lock) {
-   timeIsFrozen = false;
-   return lastReturnedProcessingTime;
-   }
+   private static long unfreezeTime() {
+   timeIsFrozen = false;
+   return lastReturnedProcessingTime;
}
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 94e9dbd..ed69171 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Update state with TTL for each verifier.
@@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends 
RichFlatMapFunction verifier,
Object update) throws Exception {
 
-   final long timestampBeforeUpdate = 
MonotonicTTLTimeProvider.freeze();
-   State sta

[flink] branch master updated (68bba73 -> fa1dadc)

2019-12-06 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 68bba73  [FLINK-14198][python] Add "type" and "rtype" options to flink 
python API docstrings of table.py and table_environment.py
 add fa1dadc  [FLINK-15063][metric]fix input group and output group of the 
task metric are reversed

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[flink] branch release-1.9 updated (d9f8abb -> 1be72a8)

2019-12-06 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d9f8abb  [hotfix] Let YarnResourceManagerTest run asynchronous tasks 
in main thread
 add 1be72a8  [FLINK-15063][metric]fix input group and output group of the 
task metric are reversed

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[flink] branch master updated (fbd5b63 -> 6df53ea)

2019-10-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fbd5b63  [FLINK-14342][table][python] Remove method 
FunctionDefinition#getLanguage.
 add 6df53ea  [FLINK-14522] Revert FLINK-13985 (sun.misc.Cleaner is not 
available in Java 9+)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/flink/core/memory/MemorySegmentFactory.java | 5 ++---
 .../src/main/java/org/apache/flink/core/memory/MemoryUtils.java  | 3 ++-
 2 files changed, 4 insertions(+), 4 deletions(-)



[flink] branch master updated (ca5e518 -> 60cc18b)

2019-10-15 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ca5e518  [FLINK-14215][docs] Add how to configure environment 
variables to documentation
 add 60cc18b  [FLINK-12628][Runtime / Coordination] Remove no consumers 
check in Execution.getPartitionMaxParallelism

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/runtime/executiongraph/Execution.java   | 12 
 1 file changed, 4 insertions(+), 8 deletions(-)



[flink] branch master updated (d5bb073 -> 11ef253)

2019-10-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d5bb073  [hotfix] Fix typos in MemorySegment java docs
 add 11ef253  [FLINK-12289][flink-runtime]Fix typos in Memory manager

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/flink/runtime/memory/MemoryManager.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)



[flink] branch master updated (fe966d4 -> d5bb073)

2019-10-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fe966d4  [FLINK-14490][table-api] Rework insertInto method
 add f71ce33  [FLINK-12223][Runtime]HeapMemorySegment.getArray should 
return null after being freed
 add d5bb073  [hotfix] Fix typos in MemorySegment java docs

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/core/memory/HeapMemorySegment.java   |  2 +-
 .../main/java/org/apache/flink/core/memory/MemorySegment.java  |  6 +++---
 .../org/apache/flink/core/memory/HeapMemorySegmentTest.java| 10 ++
 3 files changed, 14 insertions(+), 4 deletions(-)



[flink] branch master updated (fe966d4 -> d5bb073)

2019-10-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fe966d4  [FLINK-14490][table-api] Rework insertInto method
 add f71ce33  [FLINK-12223][Runtime]HeapMemorySegment.getArray should 
return null after being freed
 add d5bb073  [hotfix] Fix typos in MemorySegment java docs

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/core/memory/HeapMemorySegment.java   |  2 +-
 .../main/java/org/apache/flink/core/memory/MemorySegment.java  |  6 +++---
 .../org/apache/flink/core/memory/HeapMemorySegmentTest.java| 10 ++
 3 files changed, 14 insertions(+), 4 deletions(-)



[flink] branch master updated (fe966d4 -> d5bb073)

2019-10-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fe966d4  [FLINK-14490][table-api] Rework insertInto method
 add f71ce33  [FLINK-12223][Runtime]HeapMemorySegment.getArray should 
return null after being freed
 add d5bb073  [hotfix] Fix typos in MemorySegment java docs

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/core/memory/HeapMemorySegment.java   |  2 +-
 .../main/java/org/apache/flink/core/memory/MemorySegment.java  |  6 +++---
 .../org/apache/flink/core/memory/HeapMemorySegmentTest.java| 10 ++
 3 files changed, 14 insertions(+), 4 deletions(-)



[flink] branch master updated (e1f5ead -> 7334e0a)

2019-10-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e1f5ead  [FLINK-14434][coordination] Dispatcher#createJobManagerRunner 
returns on creation succeed
 add 6b5a00c  [hotfix] Annotate MemoryManager methods used for testing with 
the @VisibleForTesting
 add 7a62d72  [hotfix] Introduce MemoryManagerBuilder for tests
 add d8bfc30  [hotfix] Remove unsed methods and fields in MemoryManager
 add 8f889a7  [hotfix] Checkstyle fixes in MemoryManager
 add ffa7bdd  [hotfix] Refactor MemoryManager constructor
 add 597c969  [hotfix] Remove and deprecate memory preallocation in 
MemoryManager
 add 76349cf  [FLINK-13984] Separate on-heap and off-heap managed memory 
pools
 add 036505f  [FLINK-14399] Implement reservation of memory chunks in 
MemoryManager
 add 7334e0a  [hotfix] Enable memory manager with zero memory or zero pages

No new revisions were added by this update.

Summary of changes:
 .../task_manager_memory_configuration.html |   5 -
 docs/ops/config.md |   2 +-
 docs/ops/config.zh.md  |   2 +-
 .../flink/configuration/ConfigConstants.java   |  17 -
 .../flink/configuration/TaskManagerOptions.java|  24 +-
 flink-dist/src/main/flink-bin/bin/config.sh|   7 -
 flink-dist/src/main/flink-bin/bin/taskmanager.sh   |   2 +-
 flink-dist/src/main/resources/flink-conf.yaml  |  11 -
 .../apache/flink/runtime/memory/MemoryManager.java | 861 ++---
 .../MemoryReservationException.java}   |  16 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  52 +-
 .../TaskManagerServicesConfiguration.java  |  11 -
 .../flink/runtime/util/KeyedBudgetManager.java | 294 +++
 .../flink/runtime/io/disk/ChannelViewsTest.java|   8 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   9 +-
 .../runtime/io/disk/FileChannelStreamsTest.java|   6 +-
 .../io/disk/SeekableFileChannelInputViewTest.java  |   8 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   3 +-
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   3 +-
 .../flink/runtime/memory/MemoryManagerBuilder.java |  67 ++
 .../MemoryManagerConcurrentModReleaseTest.java |  13 +-
 .../memory/MemoryManagerLazyAllocationTest.java| 198 -
 .../flink/runtime/memory/MemoryManagerTest.java| 140 +++-
 .../runtime/memory/MemorySegmentSimpleTest.java|   7 +-
 .../runtime/operators/drivers/TestTaskContext.java |   7 +-
 .../runtime/operators/hash/HashTableITCase.java|   5 +-
 .../hash/NonReusingHashJoinIteratorITCase.java |   3 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   8 +-
 .../hash/ReOpenableHashTableTestBase.java  |   8 +-
 .../hash/ReusingHashJoinIteratorITCase.java|   3 +-
 .../BlockResettableMutableObjectIteratorTest.java  |   3 +-
 .../NonReusingBlockResettableIteratorTest.java |   3 +-
 .../ReusingBlockResettableIteratorTest.java|   3 +-
 .../resettable/SpillingResettableIteratorTest.java |   7 +-
 ...pillingResettableMutableObjectIteratorTest.java |   3 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   3 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   3 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   3 +-
 .../sort/ExternalSortLargeRecordsITCase.java   |   3 +-
 .../sort/FixedLengthRecordSorterTest.java  |   8 +-
 .../operators/sort/LargeRecordHandlerITCase.java   |  14 +-
 .../operators/sort/LargeRecordHandlerTest.java |  20 +-
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   3 +-
 .../operators/sort/NormalizedKeySorterTest.java|   8 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   3 +-
 .../operators/sort/UnilateralSortMergerTest.java   |   6 +-
 .../testutils/BinaryOperatorTestBase.java  |   3 +-
 .../operators/testutils/DriverTestBase.java|   3 +-
 .../operators/testutils/MockEnvironment.java   |   3 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   3 +-
 .../operators/util/HashVsSortMiniBenchmark.java|   8 +-
 .../runtime/taskexecutor/TaskExecutorTest.java |  13 +-
 .../taskexecutor/TaskManagerRunnerStartupTest.java |  20 -
 .../taskexecutor/TaskManagerServicesBuilder.java   |  13 +-
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   3 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java  |   3 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java | 261 +++
 .../runtime/tasks/StreamMockEnvironment.java   |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   4 +-
 .../runtime/hashtable/BinaryHashTableTest.java |  34 +-
 .../table/runtime/hashtable/LongHashTableTest.java |   3 +-
 .../operators/aggregate/BytesHashMapTest.java  |  40 +-
 .../runtime/operators/aggregate/HashAggTest.java   |   3 +-
 .../join/Int2SortMergeJoinOperatorTest.java|   3

[flink] branch master updated (e1f5ead -> 7334e0a)

2019-10-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e1f5ead  [FLINK-14434][coordination] Dispatcher#createJobManagerRunner 
returns on creation succeed
 add 6b5a00c  [hotfix] Annotate MemoryManager methods used for testing with 
the @VisibleForTesting
 add 7a62d72  [hotfix] Introduce MemoryManagerBuilder for tests
 add d8bfc30  [hotfix] Remove unsed methods and fields in MemoryManager
 add 8f889a7  [hotfix] Checkstyle fixes in MemoryManager
 add ffa7bdd  [hotfix] Refactor MemoryManager constructor
 add 597c969  [hotfix] Remove and deprecate memory preallocation in 
MemoryManager
 add 76349cf  [FLINK-13984] Separate on-heap and off-heap managed memory 
pools
 add 036505f  [FLINK-14399] Implement reservation of memory chunks in 
MemoryManager
 add 7334e0a  [hotfix] Enable memory manager with zero memory or zero pages

No new revisions were added by this update.

Summary of changes:
 .../task_manager_memory_configuration.html |   5 -
 docs/ops/config.md |   2 +-
 docs/ops/config.zh.md  |   2 +-
 .../flink/configuration/ConfigConstants.java   |  17 -
 .../flink/configuration/TaskManagerOptions.java|  24 +-
 flink-dist/src/main/flink-bin/bin/config.sh|   7 -
 flink-dist/src/main/flink-bin/bin/taskmanager.sh   |   2 +-
 flink-dist/src/main/resources/flink-conf.yaml  |  11 -
 .../apache/flink/runtime/memory/MemoryManager.java | 861 ++---
 .../MemoryReservationException.java}   |  16 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  52 +-
 .../TaskManagerServicesConfiguration.java  |  11 -
 .../flink/runtime/util/KeyedBudgetManager.java | 294 +++
 .../flink/runtime/io/disk/ChannelViewsTest.java|   8 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   9 +-
 .../runtime/io/disk/FileChannelStreamsTest.java|   6 +-
 .../io/disk/SeekableFileChannelInputViewTest.java  |   8 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   3 +-
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   3 +-
 .../flink/runtime/memory/MemoryManagerBuilder.java |  67 ++
 .../MemoryManagerConcurrentModReleaseTest.java |  13 +-
 .../memory/MemoryManagerLazyAllocationTest.java| 198 -
 .../flink/runtime/memory/MemoryManagerTest.java| 140 +++-
 .../runtime/memory/MemorySegmentSimpleTest.java|   7 +-
 .../runtime/operators/drivers/TestTaskContext.java |   7 +-
 .../runtime/operators/hash/HashTableITCase.java|   5 +-
 .../hash/NonReusingHashJoinIteratorITCase.java |   3 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   8 +-
 .../hash/ReOpenableHashTableTestBase.java  |   8 +-
 .../hash/ReusingHashJoinIteratorITCase.java|   3 +-
 .../BlockResettableMutableObjectIteratorTest.java  |   3 +-
 .../NonReusingBlockResettableIteratorTest.java |   3 +-
 .../ReusingBlockResettableIteratorTest.java|   3 +-
 .../resettable/SpillingResettableIteratorTest.java |   7 +-
 ...pillingResettableMutableObjectIteratorTest.java |   3 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   3 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   3 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   3 +-
 .../sort/ExternalSortLargeRecordsITCase.java   |   3 +-
 .../sort/FixedLengthRecordSorterTest.java  |   8 +-
 .../operators/sort/LargeRecordHandlerITCase.java   |  14 +-
 .../operators/sort/LargeRecordHandlerTest.java |  20 +-
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   3 +-
 .../operators/sort/NormalizedKeySorterTest.java|   8 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   3 +-
 .../operators/sort/UnilateralSortMergerTest.java   |   6 +-
 .../testutils/BinaryOperatorTestBase.java  |   3 +-
 .../operators/testutils/DriverTestBase.java|   3 +-
 .../operators/testutils/MockEnvironment.java   |   3 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   3 +-
 .../operators/util/HashVsSortMiniBenchmark.java|   8 +-
 .../runtime/taskexecutor/TaskExecutorTest.java |  13 +-
 .../taskexecutor/TaskManagerRunnerStartupTest.java |  20 -
 .../taskexecutor/TaskManagerServicesBuilder.java   |  13 +-
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   3 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java  |   3 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java | 261 +++
 .../runtime/tasks/StreamMockEnvironment.java   |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   4 +-
 .../runtime/hashtable/BinaryHashTableTest.java |  34 +-
 .../table/runtime/hashtable/LongHashTableTest.java |   3 +-
 .../operators/aggregate/BytesHashMapTest.java  |  40 +-
 .../runtime/operators/aggregate/HashAggTest.java   |   3 +-
 .../join/Int2SortMergeJoinOperatorTest.java|   3

[flink] branch master updated (7334e0a -> 56126bd)

2019-10-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7334e0a  [hotfix] Enable memory manager with zero memory or zero pages
 add 56126bd  [FLINK-14447] Network metrics doc table render confusion

No new revisions were added by this update.

Summary of changes:
 docs/monitoring/metrics.md| 4 ++--
 docs/monitoring/metrics.zh.md | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)



[flink] branch master updated (7334e0a -> 56126bd)

2019-10-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7334e0a  [hotfix] Enable memory manager with zero memory or zero pages
 add 56126bd  [FLINK-14447] Network metrics doc table render confusion

No new revisions were added by this update.

Summary of changes:
 docs/monitoring/metrics.md| 4 ++--
 docs/monitoring/metrics.zh.md | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)



[flink] branch master updated (c31e44e -> 6d0a827)

2019-10-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c31e44e  [FLINK-14235][kafka,tests] Change source in at-least-once 
test from finite to infinite
 add 8e0b35c  [hotfix] Use MemorySegmentFactory in tests instead of 
HybridMemorySegment constructors
 add d154ae5  [hotfix] Minor refactoring of OperationsOnFreedSegmentTest
 add afe2161  [FLINK-13985] Use unsafe memory for managed memory
 add 6d0a827  [hotfix] KeyedBudgetManagerTest extends TestLogger

No new revisions were added by this update.

Summary of changes:
 .../flink/core/memory/HybridMemorySegment.java | 70 -
 .../flink/core/memory/MemorySegmentFactory.java| 47 ++--
 .../org/apache/flink/core/memory/MemoryUtils.java  | 81 
 .../flink/core/memory/CrossSegmentTypeTest.java| 51 -
 .../flink/core/memory/EndiannessAccessChecks.java  | 10 ++-
 .../HybridOffHeapDirectMemorySegmentTest.java} | 29 ---
 .../memory/HybridOffHeapMemorySegmentTest.java | 21 +
 .../HybridOffHeapUnsafeMemorySegmentTest.java} | 29 ---
 .../core/memory/HybridOnHeapMemorySegmentTest.java |  6 +-
 .../flink/core/memory/MemorySegmentChecksTest.java | 14 +---
 .../flink/core/memory/MemorySegmentTestBase.java   |  5 ++
 .../core/memory/MemorySegmentUndersizedTest.java   | 21 -
 .../core/memory/OperationsOnFreedSegmentTest.java  | 89 --
 .../apache/flink/runtime/memory/MemoryManager.java |  4 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java |  3 +-
 15 files changed, 254 insertions(+), 226 deletions(-)
 copy 
flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
 => 
test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java}
 (59%)
 copy 
flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
 => 
test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java}
 (58%)



[flink] branch master updated (c31e44e -> 6d0a827)

2019-10-22 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c31e44e  [FLINK-14235][kafka,tests] Change source in at-least-once 
test from finite to infinite
 add 8e0b35c  [hotfix] Use MemorySegmentFactory in tests instead of 
HybridMemorySegment constructors
 add d154ae5  [hotfix] Minor refactoring of OperationsOnFreedSegmentTest
 add afe2161  [FLINK-13985] Use unsafe memory for managed memory
 add 6d0a827  [hotfix] KeyedBudgetManagerTest extends TestLogger

No new revisions were added by this update.

Summary of changes:
 .../flink/core/memory/HybridMemorySegment.java | 70 -
 .../flink/core/memory/MemorySegmentFactory.java| 47 ++--
 .../org/apache/flink/core/memory/MemoryUtils.java  | 81 
 .../flink/core/memory/CrossSegmentTypeTest.java| 51 -
 .../flink/core/memory/EndiannessAccessChecks.java  | 10 ++-
 .../HybridOffHeapDirectMemorySegmentTest.java} | 29 ---
 .../memory/HybridOffHeapMemorySegmentTest.java | 21 +
 .../HybridOffHeapUnsafeMemorySegmentTest.java} | 29 ---
 .../core/memory/HybridOnHeapMemorySegmentTest.java |  6 +-
 .../flink/core/memory/MemorySegmentChecksTest.java | 14 +---
 .../flink/core/memory/MemorySegmentTestBase.java   |  5 ++
 .../core/memory/MemorySegmentUndersizedTest.java   | 21 -
 .../core/memory/OperationsOnFreedSegmentTest.java  | 89 --
 .../apache/flink/runtime/memory/MemoryManager.java |  4 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java |  3 +-
 15 files changed, 254 insertions(+), 226 deletions(-)
 copy 
flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
 => 
test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java}
 (59%)
 copy 
flink-core/src/{main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
 => 
test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java}
 (58%)



[flink] branch master updated (0a9db1e -> ad531f0)

2019-11-19 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0a9db1e  [FLINK-14466][runtime] Let YarnJobClusterEntrypoint use user 
code class loader
 add cb62c5f  [hotfix] Refactor TaskExecutorTest.testTaskSubmission to not 
use mocking
 add d3ecd3d  [hotfix] Introduce TaskSlotUtils for tests
 add 01d6972  [hotfix] Refactor out slots creation from the TaskSlotTable 
constructor
 add 9fed0dd  [FLINK-14400] Shrink the scope of MemoryManager from 
TaskExecutor to slot
 add 4c4652e  [hotfix] Remove unused number of slots in MemoryManager
 add 48986aa  [hotfix] Remove unnecessary comments about memory size 
calculation before network init
 add ad531f0  [hotfix] Remove redundant timerService in TaskExecutorTest

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/runtime/memory/MemoryManager.java |  16 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  15 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  88 ++-
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |  39 -
 .../runtime/taskexecutor/slot/TaskSlotTable.java   |  43 --
 .../flink/runtime/memory/MemoryManagerBuilder.java |   8 +-
 .../TaskExecutorPartitionLifecycleTest.java|   8 +-
 .../runtime/taskexecutor/TaskExecutorTest.java | 164 +++--
 .../taskexecutor/TaskManagerServicesBuilder.java   |  14 +-
 .../TaskSubmissionTestEnvironment.java |  13 +-
 .../taskexecutor/slot/TaskSlotTableTest.java   |  19 +--
 .../runtime/taskexecutor/slot/TaskSlotUtils.java   |  77 ++
 .../operators/aggregate/BytesHashMapTest.java  |   6 -
 13 files changed, 268 insertions(+), 242 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotUtils.java



[flink] branch master updated (ad531f0 -> 7201206)

2019-11-19 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ad531f0  [hotfix] Remove redundant timerService in TaskExecutorTest
 add 7201206  [FLINK-14720] Bring down ExecutionVertex#deployToSlot access 
modifier and mark it with @VisibleForTesting annotation

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java| 1 +
 1 file changed, 1 insertion(+)



[flink] branch master updated (6691894 -> 20f6976)

2019-11-26 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6691894  [hotfix] Correct wrong log level in 
TaskLocalStateStoreImpl#storeLocalState
 add 20f6976  [FLINK-14846][doc] Correct the default writerbuffer size 
documentation of RocksDB

No new revisions were added by this update.

Summary of changes:
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 2 +-
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)



[flink] branch master updated (3d657b4 -> 8930f62)

2019-11-27 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3d657b4  [FLINK-14939][e2e] Set distDir property
 add 9f6b204  [FLINK-14901] Throw Error in MemoryUtils if there is problem 
with using system classes over reflection
 add 8930f62  [hotfix] Annotate interfaces in JavaGcCleanerWrapper with 
@FunctionalInterface

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/core/memory/MemoryUtils.java  | 33 ++
 .../apache/flink/util/JavaGcCleanerWrapper.java| 13 +
 2 files changed, 23 insertions(+), 23 deletions(-)



[flink] branch master updated (daa2f95 -> 07b66b6)

2019-11-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from daa2f95  [FLINK-14974][runtime] Calculate managed memory fractions 
with BigDecimal
 add 07b66b6  [FLINK-14976][cassandra] Release semaphore on all Throwable's 
in send()

No new revisions were added by this update.

Summary of changes:
 .../connectors/cassandra/CassandraSinkBase.java|  2 +-
 .../cassandra/CassandraSinkBaseTest.java   | 51 ++
 2 files changed, 34 insertions(+), 19 deletions(-)



[flink] branch release-1.8 updated: [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()

2019-11-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 8747e3d  [FLINK-14976][cassandra] Release semaphore on all Throwable's 
in send()
8747e3d is described below

commit 8747e3d4ec29394fa65e875b9da68b2af863f92a
Author: Mads Chr. Olesen 
AuthorDate: Wed Nov 27 14:56:16 2019 +0100

[FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
---
 .../connectors/cassandra/CassandraSinkBase.java|  2 +-
 .../cassandra/CassandraSinkBaseTest.java   | 51 ++
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 5d758be..1b6e3b3 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -131,7 +131,7 @@ public abstract class CassandraSinkBase extends 
RichSinkFunction impl
final ListenableFuture result;
try {
result = send(value);
-   } catch (Exception e) {
+   } catch (Throwable e) {
semaphore.release();
throw e;
}
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index b4406ab..709e5b7 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
@@ -41,8 +40,12 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
@@ -79,7 +82,7 @@ public class CassandraSinkBaseTest {

casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
 
final int originalPermits = 
casSinkFunc.getAvailablePermits();
-   Assert.assertThat(originalPermits, greaterThan(0));
+   assertThat(originalPermits, greaterThan(0));
Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
 
casSinkFunc.invoke("hello");
@@ -274,21 +277,29 @@ public class CassandraSinkBaseTest {
}
 
@Test(timeout = DEFAULT_TEST_TIMEOUT)
-   public void testReleaseOnSendException() throws Exception {
+   public void testReleaseOnThrowingSend() throws Exception {
final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1)
.build();
 
-   try (TestCassandraSink testCassandraSink = 
createOpenedSendExceptionTestCassandraSink(config)) {
-   Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
-   Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+   Function> 
failingSendFunction = ignoredMessage -> {
+   throwCheckedAsUnchecked(new Throwable("expected"));
+   //noinspection ReturnOfNull
+   return null;
+   };
 
+   try (TestCassandraSink testCassandraSink = new 
MockCassandraSink(config, failingSendFunction)) {
+   testCassandraSink.open(new Configuration());
+   

[flink] branch release-1.9 updated: [FLINK-14976][cassandra] Release semaphore on all Throwable's in send()

2019-11-29 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 6039e11  [FLINK-14976][cassandra] Release semaphore on all Throwable's 
in send()
6039e11 is described below

commit 6039e11c1cad20fe3468715ff594a49cbdc8d95e
Author: Mads Chr. Olesen 
AuthorDate: Wed Nov 27 14:56:16 2019 +0100

[FLINK-14976][cassandra] Release semaphore on all Throwable's in send()
---
 .../connectors/cassandra/CassandraSinkBase.java|  2 +-
 .../cassandra/CassandraSinkBaseTest.java   | 51 ++
 2 files changed, 34 insertions(+), 19 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..76ed9c7 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -132,7 +132,7 @@ public abstract class CassandraSinkBase extends 
RichSinkFunction impl
final ListenableFuture result;
try {
result = send(value);
-   } catch (Exception e) {
+   } catch (Throwable e) {
semaphore.release();
throw e;
}
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 3ce9742..5c0a431 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.Preconditions;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
@@ -42,9 +41,13 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
@@ -81,7 +84,7 @@ public class CassandraSinkBaseTest {

casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
 
final int originalPermits = 
casSinkFunc.getAvailablePermits();
-   Assert.assertThat(originalPermits, greaterThan(0));
+   assertThat(originalPermits, greaterThan(0));
Assert.assertEquals(0, 
casSinkFunc.getAcquiredPermits());
 
casSinkFunc.invoke("hello");
@@ -277,21 +280,29 @@ public class CassandraSinkBaseTest {
}
 
@Test(timeout = DEFAULT_TEST_TIMEOUT)
-   public void testReleaseOnSendException() throws Exception {
+   public void testReleaseOnThrowingSend() throws Exception {
final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1)
.build();
 
-   try (TestCassandraSink testCassandraSink = 
createOpenedSendExceptionTestCassandraSink(config)) {
-   Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
-   Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+   Function> 
failingSendFunction = ignoredMessage -> {
+   throwCheckedAsUnchecked(new Throwable("expected"));
+   //noinspection ReturnOfNull
+   return null;
+   };
 
+   try (TestCassandraSink testCassandraSink = new 
MockCassandraSink(c

[flink] branch master updated (6c6ada5 -> e41c0fe)

2019-10-31 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6c6ada5  [FLINK-14556][python] Correct the package structure of 
cloudpickle
 add e41c0fe  [FLINK-14522] Introduce JavaGcCleanerWrapper to find Java GC 
Cleaner depending on JVM version

No new revisions were added by this update.

Summary of changes:
 .../flink/core/memory/MemorySegmentFactory.java|   5 +-
 .../org/apache/flink/core/memory/MemoryUtils.java  |   9 +-
 .../apache/flink/util/JavaGcCleanerWrapper.java| 255 +
 .../flink/util/JavaGcCleanerWrapperTest.java   |  29 ++-
 4 files changed, 274 insertions(+), 24 deletions(-)
 create mode 100644 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
 copy 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeadersTest.java
 => 
flink-core/src/test/java/org/apache/flink/util/JavaGcCleanerWrapperTest.java 
(57%)



[flink] branch master updated (96f4c79 -> ef7ee76)

2019-12-20 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 96f4c79  [FLINK-15268][build] Correctly set Multi-Release in manifest.
 add ef7ee76  [FLINK-15065][docs] Correct default value of RocksDB options 
in documentation

No new revisions were added by this update.

Summary of changes:
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 6 +++---
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)



[flink] branch release-1.8 updated: [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB

2019-12-20 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new c0f6f95  [FLINK-14846][doc] Correct the default writerbuffer size 
documentation of RocksDB
c0f6f95 is described below

commit c0f6f95353b5be284b0fd1cd53c5950ceeebbf56
Author: Yun Tang 
AuthorDate: Mon Nov 25 02:00:40 2019 +0800

[FLINK-14846][doc] Correct the default writerbuffer size documentation of 
RocksDB

Correct the default write buffer size of RocksDB to '64MB'
---
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 2 +-
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html 
b/docs/_includes/generated/rocks_db_configurable_configuration.html
index fa0e0ff..b29054c 100644
--- a/docs/_includes/generated/rocks_db_configurable_configuration.html
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -60,7 +60,7 @@
 
 state.backend.rocksdb.writebuffer.size
 (none)
-The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. RocksDB has default 
writebuffer size as '4MB'.
+The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. RocksDB has default 
writebuffer size as '64MB'.
 
 
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 261e5f0..a8213bc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -94,7 +94,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
key("state.backend.rocksdb.writebuffer.size")
.noDefaultValue()
.withDescription("The amount of data built up in memory 
(backed by an unsorted log on disk) " +
-   "before converting to a sorted on-disk files. 
RocksDB has default writebuffer size as '4MB'.");
+   "before converting to a sorted on-disk files. 
RocksDB has default writebuffer size as '64MB'.");
 
public static final ConfigOption MAX_WRITE_BUFFER_NUMBER =
key("state.backend.rocksdb.writebuffer.count")



[flink] branch release-1.9 updated: [FLINK-15065][docs] Correct default value of RocksDB options in documentation

2019-12-20 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new ea5f241  [FLINK-15065][docs] Correct default value of RocksDB options 
in documentation
ea5f241 is described below

commit ea5f2418331d8d54cb47313edcdf74923f0f19c8
Author: Yun Tang 
AuthorDate: Fri Dec 13 16:38:45 2019 +0800

[FLINK-15065][docs] Correct default value of RocksDB options in 
documentation

This refer to https://github.com/facebook/rocksdb/pull/6123 which correctis 
RocksDB javadoc
---
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 6 +++---
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html 
b/docs/_includes/generated/rocks_db_configurable_configuration.html
index b29054c..11fcfcf 100644
--- a/docs/_includes/generated/rocks_db_configurable_configuration.html
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -20,12 +20,12 @@
 
 
state.backend.rocksdb.compaction.level.max-size-level-base
 (none)
-The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '10MB'.
+The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '256MB'.
 
 
 
state.backend.rocksdb.compaction.level.target-file-size-base
 (none)
-The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '2MB'.
+The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '64MB'.
 
 
 
state.backend.rocksdb.compaction.level.use-dynamic-size
@@ -40,7 +40,7 @@
 
 state.backend.rocksdb.files.open
 (none)
-The maximum number of open files (per TaskManager) that can be 
used by the DB, '-1' means no limit. RocksDB has default configuration as 
'5000'.
+The maximum number of open files (per TaskManager) that can be 
used by the DB, '-1' means no limit. RocksDB has default configuration as 
'-1'.
 
 
 state.backend.rocksdb.thread.num
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index a8213bc..f321feb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -53,7 +53,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
key("state.backend.rocksdb.files.open")
.noDefaultValue()
.withDescription("The maximum number of open files (per 
TaskManager) that can be used by the DB, '-1' means no limit. " +
-   "RocksDB has default configuration as '5000'.");
+   "RocksDB has default configuration as '-1'.");
 

//--
// Provided configurable ColumnFamilyOptions within Flink
@@ -82,13 +82,13 @@ public class RocksDBConfigurableOptions implements 
Serializable {

key("state.backend.rocksdb.compaction.level.target-file-size-base")
.noDefaultValue()
.withDescription("The target file size for compaction, 
which determines a level-1 file size. " +
-   "RocksDB has default configuration as '2MB'.");
+   "RocksDB has default configuration as '64MB'.");
 
public static final ConfigOption MAX_SIZE_LEVEL_BASE =

key("state.backend.rocksdb.compaction.level.max-size-level-base")
.noDefaultValue()
.withDescription("The upper-bound of the total size of 
level base files in bytes. " +
-   "RocksDB has default configuration as '10MB'.");
+   "RocksDB has default configuration as 
'256MB'.");
 
public static final ConfigOption WRITE_BUFFER_SIZE =
key("state.backend.rocksdb.writebuffer.size")



[flink] branch release-1.8 updated: [FLINK-15065][docs] Correct default value of RocksDB options in documentation

2019-12-20 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new b38ada0  [FLINK-15065][docs] Correct default value of RocksDB options 
in documentation
b38ada0 is described below

commit b38ada077c2ce487dde64a04cb0857d91327c04d
Author: Yun Tang 
AuthorDate: Fri Dec 13 16:38:45 2019 +0800

[FLINK-15065][docs] Correct default value of RocksDB options in 
documentation

This refer to https://github.com/facebook/rocksdb/pull/6123 which correctis 
RocksDB javadoc
---
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 6 +++---
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html 
b/docs/_includes/generated/rocks_db_configurable_configuration.html
index b29054c..11fcfcf 100644
--- a/docs/_includes/generated/rocks_db_configurable_configuration.html
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -20,12 +20,12 @@
 
 
state.backend.rocksdb.compaction.level.max-size-level-base
 (none)
-The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '10MB'.
+The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '256MB'.
 
 
 
state.backend.rocksdb.compaction.level.target-file-size-base
 (none)
-The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '2MB'.
+The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '64MB'.
 
 
 
state.backend.rocksdb.compaction.level.use-dynamic-size
@@ -40,7 +40,7 @@
 
 state.backend.rocksdb.files.open
 (none)
-The maximum number of open files (per TaskManager) that can be 
used by the DB, '-1' means no limit. RocksDB has default configuration as 
'5000'.
+The maximum number of open files (per TaskManager) that can be 
used by the DB, '-1' means no limit. RocksDB has default configuration as 
'-1'.
 
 
 state.backend.rocksdb.thread.num
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index a8213bc..f321feb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -53,7 +53,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
key("state.backend.rocksdb.files.open")
.noDefaultValue()
.withDescription("The maximum number of open files (per 
TaskManager) that can be used by the DB, '-1' means no limit. " +
-   "RocksDB has default configuration as '5000'.");
+   "RocksDB has default configuration as '-1'.");
 

//--
// Provided configurable ColumnFamilyOptions within Flink
@@ -82,13 +82,13 @@ public class RocksDBConfigurableOptions implements 
Serializable {

key("state.backend.rocksdb.compaction.level.target-file-size-base")
.noDefaultValue()
.withDescription("The target file size for compaction, 
which determines a level-1 file size. " +
-   "RocksDB has default configuration as '2MB'.");
+   "RocksDB has default configuration as '64MB'.");
 
public static final ConfigOption MAX_SIZE_LEVEL_BASE =

key("state.backend.rocksdb.compaction.level.max-size-level-base")
.noDefaultValue()
.withDescription("The upper-bound of the total size of 
level base files in bytes. " +
-   "RocksDB has default configuration as '10MB'.");
+   "RocksDB has default configuration as 
'256MB'.");
 
public static final ConfigOption WRITE_BUFFER_SIZE =
key("state.backend.rocksdb.writebuffer.size")



[flink] branch release-1.10 updated: [FLINK-15065][docs] Correct default value of RocksDB options in documentation

2019-12-20 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 836b8ae  [FLINK-15065][docs] Correct default value of RocksDB options 
in documentation
836b8ae is described below

commit 836b8ae4c178be6c91fbb64580361ab50bfecdc0
Author: Yun Tang 
AuthorDate: Fri Dec 13 16:38:45 2019 +0800

[FLINK-15065][docs] Correct default value of RocksDB options in 
documentation

This refer to https://github.com/facebook/rocksdb/pull/6123 which correctis 
RocksDB javadoc
---
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 6 +++---
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html 
b/docs/_includes/generated/rocks_db_configurable_configuration.html
index 47e3bef..f46fc27 100644
--- a/docs/_includes/generated/rocks_db_configurable_configuration.html
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -24,13 +24,13 @@
 
state.backend.rocksdb.compaction.level.max-size-level-base
 (none)
 MemorySize
-The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '10MB'.
+The upper-bound of the total size of level base files in 
bytes. RocksDB has default configuration as '256MB'.
 
 
 
state.backend.rocksdb.compaction.level.target-file-size-base
 (none)
 MemorySize
-The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '2MB'.
+The target file size for compaction, which determines a 
level-1 file size. RocksDB has default configuration as '64MB'.
 
 
 
state.backend.rocksdb.compaction.level.use-dynamic-size
@@ -48,7 +48,7 @@
 state.backend.rocksdb.files.open
 (none)
 Integer
-The maximum number of open files (per TaskManager) that can be 
used by the DB, '-1' means no limit. RocksDB has default configuration as 
'5000'.
+The maximum number of open files (per TaskManager) that can be 
used by the DB, '-1' means no limit. RocksDB has default configuration as 
'-1'.
 
 
 state.backend.rocksdb.thread.num
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 1f27c3f..e4dda8b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -58,7 +58,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
.intType()
.noDefaultValue()
.withDescription("The maximum number of open files (per 
TaskManager) that can be used by the DB, '-1' means no limit. " +
-   "RocksDB has default configuration as '5000'.");
+   "RocksDB has default configuration as '-1'.");
 

//--
// Provided configurable ColumnFamilyOptions within Flink
@@ -90,14 +90,14 @@ public class RocksDBConfigurableOptions implements 
Serializable {
.memoryType()
.noDefaultValue()
.withDescription("The target file size for compaction, 
which determines a level-1 file size. " +
-   "RocksDB has default configuration as '2MB'.");
+   "RocksDB has default configuration as '64MB'.");
 
public static final ConfigOption MAX_SIZE_LEVEL_BASE =

key("state.backend.rocksdb.compaction.level.max-size-level-base")
.memoryType()
.noDefaultValue()
.withDescription("The upper-bound of the total size of 
level base files in bytes. " +
-   "RocksDB has default configuration as '10MB'.");
+   "RocksDB has default configuration as 
'256MB'.");
 
public static final ConfigOption WRITE_BUFFER_SIZE =
key("state.backend.rocksdb.writebuffer.size")



[flink] branch release-1.9 updated: [FLINK-14846][doc] Correct the default writerbuffer size documentation of RocksDB

2019-12-20 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new c221872  [FLINK-14846][doc] Correct the default writerbuffer size 
documentation of RocksDB
c221872 is described below

commit c22187205841b1bc7b9b1c769674b92c78a1244f
Author: Yun Tang 
AuthorDate: Mon Nov 25 02:00:40 2019 +0800

[FLINK-14846][doc] Correct the default writerbuffer size documentation of 
RocksDB

Correct the default write buffer size of RocksDB to '64MB'
---
 docs/_includes/generated/rocks_db_configurable_configuration.html   | 2 +-
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html 
b/docs/_includes/generated/rocks_db_configurable_configuration.html
index fa0e0ff..b29054c 100644
--- a/docs/_includes/generated/rocks_db_configurable_configuration.html
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -60,7 +60,7 @@
 
 state.backend.rocksdb.writebuffer.size
 (none)
-The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. RocksDB has default 
writebuffer size as '4MB'.
+The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. RocksDB has default 
writebuffer size as '64MB'.
 
 
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 261e5f0..a8213bc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -94,7 +94,7 @@ public class RocksDBConfigurableOptions implements 
Serializable {
key("state.backend.rocksdb.writebuffer.size")
.noDefaultValue()
.withDescription("The amount of data built up in memory 
(backed by an unsorted log on disk) " +
-   "before converting to a sorted on-disk files. 
RocksDB has default writebuffer size as '4MB'.");
+   "before converting to a sorted on-disk files. 
RocksDB has default writebuffer size as '64MB'.");
 
public static final ConfigOption MAX_WRITE_BUFFER_NUMBER =
key("state.backend.rocksdb.writebuffer.count")



[flink] branch master updated (2b13a41 -> 3cf8fe5)

2020-03-04 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2b13a41  [FLINK-16362][table] Remove deprecated `emitDataStream` 
method in StreamTableSink
 add 3cf8fe5  [FLINK-11193][State] Fix Rockdb timer service factory 
configuration option to be settable per job

No new revisions were added by this update.

Summary of changes:
 .../streaming/state/RocksDBStateBackend.java   | 32 +++--
 .../state/RocksDBStateBackendConfigTest.java   | 33 ++
 2 files changed, 57 insertions(+), 8 deletions(-)



[flink] branch release-1.9 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job

2020-03-04 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new e64a2e0  [FLINK-11193][State] Fix Rockdb timer service factory 
configuration option to be settable per job
e64a2e0 is described below

commit e64a2e0d1ba3391fac23c04543235bb8bb047c33
Author: Aitozi <1059789...@qq.com>
AuthorDate: Wed Sep 4 22:59:09 2019 +0800

[FLINK-11193][State] Fix Rockdb timer service factory configuration option 
to be settable per job

This closes #8479.
---
 .../streaming/state/RocksDBStateBackend.java   | 32 +++-
 .../state/RocksDBStateBackendConfigTest.java   | 34 ++
 2 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c5604e5..cfac3b2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -147,7 +147,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
private TernaryBoolean enableTtlCompactionFilter;
 
/** This determines the type of priority queue state. */
-   private final PriorityQueueStateType priorityQueueStateType;
+   @Nullable
+   private PriorityQueueStateType priorityQueueStateType;
 
/** The default rocksdb metrics options. */
private final RocksDBNativeMetricOptions defaultMetricOptions;
@@ -265,8 +266,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
this.numberOfTransferingThreads = 
UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
-   // for now, we use still the heap-based implementation as 
default
-   this.priorityQueueStateType = PriorityQueueStateType.HEAP;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
}
@@ -314,10 +313,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter

.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-   final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
-
-   this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
-   
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+   if (null == original.priorityQueueStateType) {
+   this.priorityQueueStateType = 
config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY);
+   } else {
+   this.priorityQueueStateType = 
original.priorityQueueStateType;
+   }
 
// configure local directories
if (original.localRocksDbDirectories != null) {
@@ -507,7 +507,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
keyGroupRange,
executionConfig,
localRecoveryConfig,
-   priorityQueueStateType,
+   getPriorityQueueStateType(),
ttlTimeProvider,
metricGroup,
stateHandles,
@@ -716,6 +716,22 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   /**
+* Gets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+* @return The type of the priority queue state.
+*/
+   public PriorityQueueStateType getPriorityQueueStateType() {
+   return priorityQueueStateType == null ?
+   
PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : 
priorityQueueStateType;
+   }
+
+   /**
+* Sets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+*/
+   public void setPriorityQueueStateType(PriorityQueueStateType 
priorityQueue

[flink] branch release-1.10 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job

2020-03-04 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 9310b26  [FLINK-11193][State] Fix Rockdb timer service factory 
configuration option to be settable per job
9310b26 is described below

commit 9310b2672d89c392d25edc33755e291dac4b7398
Author: Aitozi <1059789...@qq.com>
AuthorDate: Wed Sep 4 22:59:09 2019 +0800

[FLINK-11193][State] Fix Rockdb timer service factory configuration option 
to be settable per job

This closes #8479.
---
 .../streaming/state/RocksDBStateBackend.java   | 32 +++--
 .../state/RocksDBStateBackendConfigTest.java   | 33 ++
 2 files changed, 57 insertions(+), 8 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f071104..873f57d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -153,7 +153,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
private final RocksDBMemoryConfiguration memoryConfiguration;
 
/** This determines the type of priority queue state. */
-   private final PriorityQueueStateType priorityQueueStateType;
+   @Nullable
+   private PriorityQueueStateType priorityQueueStateType;
 
/** The default rocksdb metrics options. */
private final RocksDBNativeMetricOptions defaultMetricOptions;
@@ -274,8 +275,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
this.numberOfTransferThreads = 
UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
-   // use RocksDB-based implementation as default from FLINK-15637
-   this.priorityQueueStateType = PriorityQueueStateType.ROCKSDB;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
this.memoryConfiguration = new RocksDBMemoryConfiguration();
@@ -333,10 +332,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
this.memoryConfiguration = 
RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration,
 config);
this.memoryConfiguration.validate();
 
-   final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
-
-   this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
-   
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+   if (null == original.priorityQueueStateType) {
+   this.priorityQueueStateType = 
config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY);
+   } else {
+   this.priorityQueueStateType = 
original.priorityQueueStateType;
+   }
 
// configure local directories
if (original.localRocksDbDirectories != null) {
@@ -533,7 +533,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
keyGroupRange,
executionConfig,
localRecoveryConfig,
-   priorityQueueStateType,
+   getPriorityQueueStateType(),
ttlTimeProvider,
metricGroup,
stateHandles,
@@ -770,6 +770,22 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
enableTtlCompactionFilter = TernaryBoolean.FALSE;
}
 
+   /**
+* Gets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+* @return The type of the priority queue state.
+*/
+   public PriorityQueueStateType getPriorityQueueStateType() {
+   return priorityQueueStateType == null ?
+   
PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : 
priorityQueueStateType;
+   }
+
+   /**
+* Sets the type of the priority queue state. It will fallback to the 
default value, if it is not expli

[flink] branch release-1.8 updated: [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job

2020-03-04 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 60d9b96  [FLINK-11193][State] Fix Rockdb timer service factory 
configuration option to be settable per job
60d9b96 is described below

commit 60d9b96456f142f8d18d5882016840a00159403e
Author: Aitozi <1059789...@qq.com>
AuthorDate: Wed Sep 4 22:59:09 2019 +0800

[FLINK-11193][State] Fix Rockdb timer service factory configuration option 
to be settable per job

This closes #8479.
---
 .../streaming/state/RocksDBStateBackend.java   | 32 +++-
 .../state/RocksDBStateBackendConfigTest.java   | 34 ++
 2 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c5604e5..cfac3b2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -147,7 +147,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
private TernaryBoolean enableTtlCompactionFilter;
 
/** This determines the type of priority queue state. */
-   private final PriorityQueueStateType priorityQueueStateType;
+   @Nullable
+   private PriorityQueueStateType priorityQueueStateType;
 
/** The default rocksdb metrics options. */
private final RocksDBNativeMetricOptions defaultMetricOptions;
@@ -265,8 +266,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
this.numberOfTransferingThreads = 
UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
-   // for now, we use still the heap-based implementation as 
default
-   this.priorityQueueStateType = PriorityQueueStateType.HEAP;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
}
@@ -314,10 +313,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter

.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-   final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
-
-   this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
-   
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+   if (null == original.priorityQueueStateType) {
+   this.priorityQueueStateType = 
config.getEnum(PriorityQueueStateType.class, TIMER_SERVICE_FACTORY);
+   } else {
+   this.priorityQueueStateType = 
original.priorityQueueStateType;
+   }
 
// configure local directories
if (original.localRocksDbDirectories != null) {
@@ -507,7 +507,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
keyGroupRange,
executionConfig,
localRecoveryConfig,
-   priorityQueueStateType,
+   getPriorityQueueStateType(),
ttlTimeProvider,
metricGroup,
stateHandles,
@@ -716,6 +716,22 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
enableTtlCompactionFilter = TernaryBoolean.TRUE;
}
 
+   /**
+* Gets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+* @return The type of the priority queue state.
+*/
+   public PriorityQueueStateType getPriorityQueueStateType() {
+   return priorityQueueStateType == null ?
+   
PriorityQueueStateType.valueOf(TIMER_SERVICE_FACTORY.defaultValue()) : 
priorityQueueStateType;
+   }
+
+   /**
+* Sets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
+*/
+   public void setPriorityQueueStateType(PriorityQueueStateType 
priorityQueue

[flink] 02/04: [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac2aaf9277333a6d8ac5aa1c0c81189f56e6ffd4
Author: Xintong Song 
AuthorDate: Mon Feb 17 15:43:16 2020 +0800

[FLINK-16111][k8s] Fix Kubernetes deployment not respecting 
'taskmanager.cpu.cores'.

This closes #0.
---
 .../kubernetes/KubernetesResourceManager.java  |  3 ++-
 .../kubernetes/KubernetesResourceManagerTest.java  | 28 ++
 .../MesosTaskManagerParameters.java|  4 ++--
 .../clusterframework/TaskExecutorProcessUtils.java |  5 
 4 files changed, 37 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index 4c94a28..fdf3afc 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -338,6 +339,6 @@ public class KubernetesResourceManager extends 
ActiveResourceManager MESOS_RM_TASKS_CPUS =
key("mesos.resourcemanager.tasks.cpus")
+   .doubleType()
.defaultValue(0.0)
.withDescription("CPUs to assign to the Mesos workers.");
 
@@ -424,8 +425,7 @@ public class MesosTaskManagerParameters {
}
 
private static double getCpuCores(final Configuration configuration) {
-   double fallback = configuration.getDouble(MESOS_RM_TASKS_CPUS);
-   return 
TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, 
fallback).getValue().doubleValue();
+   return 
TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, 
MESOS_RM_TASKS_CPUS);
}
 
private static MemorySize getTotalProcessMemory(final Configuration 
configuration) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
index 917c575..9b7ae12 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
@@ -644,6 +644,11 @@ public class TaskExecutorProcessUtils {
return getCpuCoresWithFallback(config, -1.0);
}
 
+   public static double getCpuCoresWithFallbackConfigOption(final 
Configuration config, ConfigOption fallbackOption) {
+   double fallbackValue = config.getDouble(fallbackOption);
+   return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, 
fallbackValue).getValue().doubleValue();
+   }
+
public static CPUResource getCpuCoresWithFallback(final Configuration 
config, double fallback) {
final double cpuCores;
if (config.contains(TaskManagerOptions.CPU_CORES)) {



[flink] branch release-1.10 updated (543e24c -> 82f6dc2)

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 543e24c  [FLINK-16013][core] Make complex type config options could be 
parsed correctly
 new ab3e390  [hotfix] Minor clean-up in TaskExecutorProcessUtils.
 new ac2aaf9  [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 
'taskmanager.cpu.cores'.
 new 8858d75  [hotfix][yarn][test] Add test cases for validating Yarn 
deployment respecting the cpu configuration fallback order.
 new 82f6dc2  [hotfix][mesos][test] Update test cases for validating Mesos 
deployment respecting the cpu configuration fallback order.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kubernetes/KubernetesResourceManager.java  |  3 +-
 .../kubernetes/KubernetesResourceManagerTest.java  | 28 +++
 .../MesosTaskManagerParameters.java|  4 +-
 .../MesosTaskManagerParametersTest.java|  3 ++
 .../clusterframework/TaskExecutorProcessUtils.java | 11 +++--
 .../apache/flink/yarn/YarnResourceManagerTest.java | 55 ++
 6 files changed, 96 insertions(+), 8 deletions(-)



[flink] 01/04: [hotfix] Minor clean-up in TaskExecutorProcessUtils.

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab3e3906c5f362a539dd7a6fe2f400dfab32a93e
Author: Xintong Song 
AuthorDate: Mon Feb 24 19:01:46 2020 +0800

[hotfix] Minor clean-up in TaskExecutorProcessUtils.
---
 .../flink/runtime/clusterframework/TaskExecutorProcessUtils.java  | 8 ++--
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
index 31b65d5..917c575 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
@@ -640,15 +640,11 @@ public class TaskExecutorProcessUtils {
}
}
 
-   public static CPUResource getCpuCoresWithFallback(final Configuration 
config, double fallback) {
-   return getCpuCores(config, fallback);
-   }
-
private static CPUResource getCpuCores(final Configuration config) {
-   return getCpuCores(config, -1.0);
+   return getCpuCoresWithFallback(config, -1.0);
}
 
-   private static CPUResource getCpuCores(final Configuration config, 
double fallback) {
+   public static CPUResource getCpuCoresWithFallback(final Configuration 
config, double fallback) {
final double cpuCores;
if (config.contains(TaskManagerOptions.CPU_CORES)) {
cpuCores = 
config.getDouble(TaskManagerOptions.CPU_CORES);



[flink] 03/04: [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order.

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8858d75697ba4a3e888b8a86981b4bb6e1bd558f
Author: Xintong Song 
AuthorDate: Mon Feb 17 19:51:58 2020 +0800

[hotfix][yarn][test] Add test cases for validating Yarn deployment 
respecting the cpu configuration fallback order.
---
 .../apache/flink/yarn/YarnResourceManagerTest.java | 55 ++
 1 file changed, 55 insertions(+)

diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 2798076..54f26b6 100755
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -56,6 +57,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
@@ -526,6 +528,59 @@ public class YarnResourceManagerTest extends TestLogger {
}};
}
 
+   @Test
+   public void testGetCpuCoresCommonOption() throws Exception {
+   final Configuration configuration = new Configuration();
+   configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
+   configuration.setInteger(YarnConfigOptions.VCORES, 2);
+   configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+   new Context() {{
+   runTest(() -> 
assertThat(resourceManager.getCpuCores(configuration), is(1.0)));
+   }};
+   }
+
+   @Test
+   public void testGetCpuCoresYarnOption() throws Exception {
+   final Configuration configuration = new Configuration();
+   configuration.setInteger(YarnConfigOptions.VCORES, 2);
+   configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+   new Context() {{
+   runTest(() -> 
assertThat(resourceManager.getCpuCores(configuration), is(2.0)));
+   }};
+   }
+
+   @Test
+   public void testGetCpuCoresNumSlots() throws Exception {
+   final Configuration configuration = new Configuration();
+   configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+   new Context() {{
+   runTest(() -> 
assertThat(resourceManager.getCpuCores(configuration), is(3.0)));
+   }};
+   }
+
+   @Test
+   public void testGetCpuRoundUp() throws Exception {
+   final Configuration configuration = new Configuration();
+   configuration.setDouble(TaskManagerOptions.CPU_CORES, 0.5);
+
+   new Context() {{
+   runTest(() -> 
assertThat(resourceManager.getCpuCores(configuration), is(1.0)));
+   }};
+   }
+
+   @Test(expected = IllegalConfigurationException.class)
+   public void testGetCpuExceedMaxInt() throws Exception {
+   final Configuration configuration = new Configuration();
+   configuration.setDouble(TaskManagerOptions.CPU_CORES, 
Double.MAX_VALUE);
+
+   new Context() {{
+   resourceManager.getCpuCores(configuration);
+   }};
+   }
+
private void registerSlotRequest(
TestingYarnResourceManager resourceManager,
MockResourceManagerRuntimeServices rmServices,



[flink] 04/04: [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order.

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82f6dc2dbcf16f0cd7b28d562c2e2d75b3060eb3
Author: Xintong Song 
AuthorDate: Mon Feb 17 19:57:43 2020 +0800

[hotfix][mesos][test] Update test cases for validating Mesos deployment 
respecting the cpu configuration fallback order.
---
 .../mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index c91..5792f77 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -232,6 +232,8 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
public void testConfigCpuCores() {
Configuration config = getConfiguration();
config.setDouble(TaskManagerOptions.CPU_CORES, 1.5);
+   
config.setDouble(MesosTaskManagerParameters.MESOS_RM_TASKS_CPUS, 2.5);
+   config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
MesosTaskManagerParameters mesosTaskManagerParameters = 
MesosTaskManagerParameters.create(config);
assertThat(mesosTaskManagerParameters.cpus(), is(1.5));
}
@@ -240,6 +242,7 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
public void testLegacyConfigCpuCores() {
Configuration config = getConfiguration();

config.setDouble(MesosTaskManagerParameters.MESOS_RM_TASKS_CPUS, 1.5);
+   config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
MesosTaskManagerParameters mesosTaskManagerParameters = 
MesosTaskManagerParameters.create(config);
assertThat(mesosTaskManagerParameters.cpus(), is(1.5));
}



[flink] branch master updated (93da5ec -> 74fc20a)

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 93da5ec  [FLINK-16263][python][tests] Set 
io.netty.tryReflectionSetAccessible to true for JDK9+
 add 35f967c  [hotfix] Fix minor IDE warnings in MemoryUtils
 add 74fc20a  [FLINK-15094] Use Unsafe to instantiate and construct 
DirectByteBuffer

No new revisions were added by this update.

Summary of changes:
 .../flink/core/memory/HybridMemorySegment.java | 51 ++-
 .../org/apache/flink/core/memory/MemoryUtils.java  | 76 ++
 2 files changed, 54 insertions(+), 73 deletions(-)



[flink] 01/02: [hotfix] Fix minor IDE warnings in MemoryUtils

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ccbc1fb0584597f0775519ad87472f3667a95f0
Author: Andrey Zagrebin 
AuthorDate: Thu Feb 20 09:56:01 2020 +0100

[hotfix] Fix minor IDE warnings in MemoryUtils
---
 .../src/main/java/org/apache/flink/core/memory/MemoryUtils.java | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
index fcdf427..c807145 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
@@ -33,7 +33,7 @@ import java.nio.ByteOrder;
 public class MemoryUtils {
 
/** The "unsafe", which can be used to perform native memory accesses. 
*/
-   @SuppressWarnings("restriction")
+   @SuppressWarnings({"restriction", "UseOfSunClasses"})
public static final sun.misc.Unsafe UNSAFE = getUnsafe();
 
/** The native byte order of the platform on which the system currently 
runs. */
@@ -50,7 +50,7 @@ public class MemoryUtils {
} catch (SecurityException e) {
throw new Error("Could not access the sun.misc.Unsafe 
handle, permission denied by security manager.", e);
} catch (NoSuchFieldException e) {
-   throw new Error("The static handle field in 
sun.misc.Unsafe was not found.");
+   throw new Error("The static handle field in 
sun.misc.Unsafe was not found.", e);
} catch (IllegalArgumentException e) {
throw new Error("Bug: Illegal argument reflection 
access for static field.", e);
} catch (IllegalAccessException e) {
@@ -64,7 +64,6 @@ public class MemoryUtils {
private MemoryUtils() {}
 
private static Constructor 
getDirectBufferPrivateConstructor() {
-   //noinspection OverlyBroadCatchBlock
try {
Constructor constructor =

ByteBuffer.allocateDirect(1).getClass().getDeclaredConstructor(long.class, 
int.class);
@@ -107,7 +106,6 @@ public class MemoryUtils {
 * @param address address of the unsafe memory to release
 * @return action to run to release the unsafe memory manually
 */
-   @SuppressWarnings("UseOfSunClasses")
static Runnable createMemoryGcCleaner(Object owner, long address) {
return JavaGcCleanerWrapper.create(owner, () -> 
releaseUnsafe(address));
}



[flink] 02/02: [FLINK-15094] Use Unsafe to instantiate and construct DirectByteBuffer

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02255ed8996a49ce53c605cbc5850921e2d52f6d
Author: Andrey Zagrebin 
AuthorDate: Thu Feb 20 10:12:16 2020 +0100

[FLINK-15094] Use Unsafe to instantiate and construct DirectByteBuffer

If we use reflection to create a DirectByteBuffer to wrap unsafe native 
memory allocations,
it causes illegal access warnings in Java9+.
This PR changes this to use Unsafe to instantiate a DirectByteBuffer.
The address and capacity fields are set by direct unsafe memory operations.
Other fields are set by calling ByteBuffer#clear at the end.
Unsafe operations skips the illegal access verification and do not result 
in warnings.

This solution still relies on Unsafe which is about to be removed in future 
Java releases.
If it is removed and we still do not want to contribute to direct memory by 
allocating native managed memory,
we will have to find alternartive solutions, like e.g. writing a custom 
native allocator
and use JNI API to instantiate the wrapping DirectByteBuffer 
(NewDirectByteBuffer in C).

This closes #11160
---
 .../flink/core/memory/HybridMemorySegment.java | 51 ++--
 .../org/apache/flink/core/memory/MemoryUtils.java  | 70 +++---
 2 files changed, 52 insertions(+), 69 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
index fbb9837..1693e9a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -26,12 +26,13 @@ import javax.annotation.Nullable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.ReadOnlyBufferException;
 
+import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
+
 /**
  * This class represents a piece of memory managed by Flink.
  *
@@ -74,7 +75,7 @@ public final class HybridMemorySegment extends MemorySegment {
  * @throws IllegalArgumentException Thrown, if the given ByteBuffer is 
not direct.
  */
HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, 
@Nullable Runnable cleaner) {
-   super(checkBufferAndGetAddress(buffer), buffer.capacity(), 
owner);
+   super(getByteBufferAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
this.cleaner = cleaner;
}
@@ -310,7 +311,7 @@ public final class HybridMemorySegment extends 
MemorySegment {
 
if (target.isDirect()) {
// copy to the target memory directly
-   final long targetPointer = getAddress(target) + 
targetOffset;
+   final long targetPointer = getByteBufferAddress(target) 
+ targetOffset;
final long sourcePointer = address + offset;
 
if (sourcePointer <= addressLimit - numBytes) {
@@ -354,7 +355,7 @@ public final class HybridMemorySegment extends 
MemorySegment {
 
if (source.isDirect()) {
// copy to the target memory directly
-   final long sourcePointer = getAddress(source) + 
sourceOffset;
+   final long sourcePointer = getByteBufferAddress(source) 
+ sourceOffset;
final long targetPointer = address + offset;
 
if (targetPointer <= addressLimit - numBytes) {
@@ -383,46 +384,4 @@ public final class HybridMemorySegment extends 
MemorySegment {
}
}
}
-
-   // 

-   //  Utilities for native memory accesses and checks
-   // 

-
-   /**
-* The reflection fields with which we access the off-heap pointer from 
direct ByteBuffers.
-*/
-   private static final Field ADDRESS_FIELD;
-
-   static {
-   try {
-   ADDRESS_FIELD = 
java.nio.Buffer.class.getDeclaredField("address");
-   ADDRESS_FIELD.setAccessible(true);
-   }
-   catch (Throwable t) {
-   throw new RuntimeException(
-   "Cannot initialize HybridMemorySegment: 
off-heap memory is incompatible with this JVM.", t);
-   }
-   

[flink] branch release-1.10 updated (82f6dc2 -> 02255ed8)

2020-02-25 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 82f6dc2  [hotfix][mesos][test] Update test cases for validating Mesos 
deployment respecting the cpu configuration fallback order.
 new 9ccbc1f  [hotfix] Fix minor IDE warnings in MemoryUtils
 new 02255ed8 [FLINK-15094] Use Unsafe to instantiate and construct 
DirectByteBuffer

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/memory/HybridMemorySegment.java | 51 ++-
 .../org/apache/flink/core/memory/MemoryUtils.java  | 76 ++
 2 files changed, 54 insertions(+), 73 deletions(-)



[flink] branch master updated (bd5901f -> 05bddfb)

2020-01-24 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from bd5901f  [FLINK-14701][runtime] Fix MultiTaskSlot to not remove slots 
which are not its children
 add 05bddfb  [FLINK-15741][docs][TTL] Fix TTL docs after enabling RocksDB 
compaction filter by default

No new revisions were added by this update.

Summary of changes:
 docs/dev/stream/state/state.md | 15 ++-
 1 file changed, 6 insertions(+), 9 deletions(-)



[flink] branch release-1.10 updated (27d4cc4 -> 52a7ac9)

2020-01-24 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 27d4cc4  [FLINK-14701][runtime] Fix MultiTaskSlot to not remove slots 
which are not its children
 add 52a7ac9  [FLINK-15741][docs][TTL] Fix TTL docs after enabling RocksDB 
compaction filter by default

No new revisions were added by this update.

Summary of changes:
 docs/dev/stream/state/state.md | 15 ++-
 1 file changed, 6 insertions(+), 9 deletions(-)



[flink] branch release-1.9 updated (e4eb303 -> df24a9f)

2020-02-10 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e4eb303  [FLINK-15863][docs] Fix docs stating that savepoints are 
relocatable
 add df24a9f  [FLINK-15143][docs] Add new memory configuration guide before 
FLIP-49

No new revisions were added by this update.

Summary of changes:
 docs/fig/mem_model.svg  |  21 +++
 docs/ops/cli.md |   2 +-
 docs/ops/cli.zh.md  |   2 +-
 docs/ops/config.md  |   4 +-
 docs/ops/config.zh.md   |   4 +-
 docs/ops/mem_setup.md   | 129 
 docs/ops/production_ready.md|   2 +-
 docs/ops/production_ready.zh.md |   2 +-
 docs/ops/python_shell.md|   2 +-
 docs/ops/python_shell.zh.md |   2 +-
 docs/ops/scala_shell.md |   2 +-
 docs/ops/scala_shell.zh.md  |   2 +-
 docs/ops/security-ssl.md|   2 +-
 docs/ops/security-ssl.zh.md |   2 +-
 docs/ops/upgrading.md   |   2 +-
 docs/ops/upgrading.zh.md|   2 +-
 16 files changed, 166 insertions(+), 16 deletions(-)
 create mode 100644 docs/fig/mem_model.svg
 create mode 100644 docs/ops/mem_setup.md



[flink] branch master updated (b902ed5 -> 0cb8834)

2020-02-10 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b902ed5  [FLINK-15918][runtime] Fix that 'uptime' metric is not reset 
after restart
 add 01a9ff9  [FLINK-15143][docs] Add new memory configuration guide for 
FLIP-49
 add d4aa8fa  [FLINK-15143][docs] Add tuning and troubleshooting guides for 
memory configuration
 add 0cb8834  [FLINK-15143][docs] Add migration guide from pre-FLIP-49 
memory config

No new revisions were added by this update.

Summary of changes:
 docs/fig/detailed-mem-model.svg |  21 +++
 docs/fig/simple_mem_model.svg   |  21 +++
 docs/ops/cli.md |   2 +-
 docs/ops/cli.zh.md  |   2 +-
 docs/ops/config.md  |   2 +-
 docs/ops/config.zh.md   |   2 +-
 docs/ops/deployment/cluster_setup.md|   2 +-
 docs/ops/deployment/cluster_setup.zh.md |   2 +-
 docs/ops/memory/index.md|  24 
 docs/ops/memory/index.zh.md |  24 
 docs/ops/memory/mem_detail.md   | 149 
 docs/ops/memory/mem_detail.zh.md| 149 
 docs/ops/memory/mem_migration.md| 236 
 docs/ops/memory/mem_migration.zh.md | 236 
 docs/ops/memory/mem_setup.md| 133 ++
 docs/ops/memory/mem_setup.zh.md | 133 ++
 docs/ops/memory/mem_trouble.md  |  74 ++
 docs/ops/memory/mem_trouble.zh.md   |  74 ++
 docs/ops/memory/mem_tuning.md   |  87 
 docs/ops/memory/mem_tuning.zh.md|  87 
 docs/ops/plugins.md |   2 +-
 docs/ops/plugins.zh.md  |   2 +-
 docs/ops/production_ready.md|   2 +-
 docs/ops/production_ready.zh.md |   2 +-
 docs/ops/python_shell.md|   2 +-
 docs/ops/python_shell.zh.md |   2 +-
 docs/ops/scala_shell.md |   2 +-
 docs/ops/scala_shell.zh.md  |   2 +-
 docs/ops/security-ssl.md|   2 +-
 docs/ops/security-ssl.zh.md |   2 +-
 docs/ops/state/state_backends.md|   7 +
 docs/ops/state/state_backends.zh.md |   5 +
 docs/ops/upgrading.md   |   2 +-
 docs/ops/upgrading.zh.md|   2 +-
 docs/release-notes/flink-1.10.md|   2 +
 docs/release-notes/flink-1.10.zh.md |   2 +
 36 files changed, 1482 insertions(+), 18 deletions(-)
 create mode 100644 docs/fig/detailed-mem-model.svg
 create mode 100644 docs/fig/simple_mem_model.svg
 create mode 100644 docs/ops/memory/index.md
 create mode 100644 docs/ops/memory/index.zh.md
 create mode 100644 docs/ops/memory/mem_detail.md
 create mode 100644 docs/ops/memory/mem_detail.zh.md
 create mode 100644 docs/ops/memory/mem_migration.md
 create mode 100644 docs/ops/memory/mem_migration.zh.md
 create mode 100644 docs/ops/memory/mem_setup.md
 create mode 100644 docs/ops/memory/mem_setup.zh.md
 create mode 100644 docs/ops/memory/mem_trouble.md
 create mode 100644 docs/ops/memory/mem_trouble.zh.md
 create mode 100644 docs/ops/memory/mem_tuning.md
 create mode 100644 docs/ops/memory/mem_tuning.zh.md



[flink] 01/03: [FLINK-15143][docs] Add new memory configuration guide for FLIP-49

2020-02-10 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff7e25cad70a74da1795c69fd651d788d5972f1c
Author: Andrey Zagrebin 
AuthorDate: Fri Jan 31 12:51:30 2020 +0100

[FLINK-15143][docs] Add new memory configuration guide for FLIP-49
---
 docs/fig/detailed-mem-model.svg |  21 +
 docs/fig/simple_mem_model.svg   |  21 +
 docs/ops/cli.md |   2 +-
 docs/ops/cli.zh.md  |   2 +-
 docs/ops/config.md  |   2 +-
 docs/ops/config.zh.md   |   2 +-
 docs/ops/memory/index.md|  24 ++
 docs/ops/memory/index.zh.md |  24 ++
 docs/ops/memory/mem_detail.md   | 149 
 docs/ops/memory/mem_detail.zh.md| 149 
 docs/ops/memory/mem_setup.md| 132 
 docs/ops/memory/mem_setup.zh.md | 132 
 docs/ops/plugins.md |   2 +-
 docs/ops/plugins.zh.md  |   2 +-
 docs/ops/production_ready.md|   2 +-
 docs/ops/production_ready.zh.md |   2 +-
 docs/ops/python_shell.md|   2 +-
 docs/ops/python_shell.zh.md |   2 +-
 docs/ops/scala_shell.md |   2 +-
 docs/ops/scala_shell.zh.md  |   2 +-
 docs/ops/security-ssl.md|   2 +-
 docs/ops/security-ssl.zh.md |   2 +-
 docs/ops/upgrading.md   |   2 +-
 docs/ops/upgrading.zh.md|   2 +-
 docs/release-notes/flink-1.10.md|   2 +
 docs/release-notes/flink-1.10.zh.md |   2 +
 26 files changed, 672 insertions(+), 16 deletions(-)

diff --git a/docs/fig/detailed-mem-model.svg b/docs/fig/detailed-mem-model.svg
new file mode 100644
index 000..215d99a
--- /dev/null
+++ b/docs/fig/detailed-mem-model.svg
@@ -0,0 +1,21 @@
+
+
+
+http://www.w3.org/1999/xlink; 
xmlns="http://www.w3.org/2000/svg;>
+
+
+http://www.w3.org/1999/xlink; 
xmlns="http://www.w3.org/2000/svg;>http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/ops/memory/index.zh.md b/docs/ops/memory/index.zh.md
new file mode 100644
index 000..e2190e1
--- /dev/null
+++ b/docs/ops/memory/index.zh.md
@@ -0,0 +1,24 @@
+---
+nav-title: 'Memory Configuration'
+nav-id: ops_mem
+nav-parent_id: ops
+nav-pos: 5
+---
+
diff --git a/docs/ops/memory/mem_detail.md b/docs/ops/memory/mem_detail.md
new file mode 100644
index 000..a5b476f
--- /dev/null
+++ b/docs/ops/memory/mem_detail.md
@@ -0,0 +1,149 @@
+---
+title: "Detailed Memory Model"
+nav-parent_id: ops_mem
+nav-pos: 2
+---
+
+
+This section gives a detailed description of all components in Flink’s memory 
model of task executor.
+Check [memory configuration guide](mem_setup.html) for the basic memory setup.
+
+* toc
+{:toc}
+
+## Overview
+
+
+
+  
+
+
+
+The following table lists all memory components, depicted above, and 
references Flink configuration options
+which affect the size of the respective components:
+
+| **Component**
 | **Configuration options**


 | 
**Description** 
   [...]
+| 
:
 | 
:
 | 
:-
 [...]
+| [Framework Heap Memory](#framework-memory)   
 | 
[`taskmanager.memory.framework.heap.size`](../config.html#taskmanager-memory-framework-heap-size)


   | JVM heap memory dedicated to Flink framework (advanced 
option)   [...]
+| [Task Heap Memory](mem_setup.html#task-operator-heap-memory) 
 | 
[`taskmanager

[flink] 03/03: [FLINK-15143][docs] Add migration guide from pre-FLIP-49 memory config

2020-02-10 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1212f662cd201af44f4b6507f789488d2fceb9af
Author: Andrey Zagrebin 
AuthorDate: Fri Jan 31 14:01:22 2020 +0100

[FLINK-15143][docs] Add migration guide from pre-FLIP-49 memory config
---
 docs/ops/memory/mem_migration.md| 236 
 docs/ops/memory/mem_migration.zh.md | 236 
 docs/ops/memory/mem_setup.md|   3 +-
 docs/ops/memory/mem_setup.zh.md |   3 +-
 4 files changed, 476 insertions(+), 2 deletions(-)

diff --git a/docs/ops/memory/mem_migration.md b/docs/ops/memory/mem_migration.md
new file mode 100644
index 000..46a3f2e
--- /dev/null
+++ b/docs/ops/memory/mem_migration.md
@@ -0,0 +1,236 @@
+---
+title: "Migration Guide"
+nav-parent_id: ops_mem
+nav-pos: 5
+---
+
+
+The [memory setup of task managers](mem_setup.html) has changed a lot with the 
1.10 release. Many configuration options
+were removed or their semantics changed. This guide will help you to migrate 
the memory configuration from Flink
+[<= 
*1.9*](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html)
 to >= *1.10*.
+
+* toc
+{:toc}
+
+
+  Warning: It is important to review this guide because the 
legacy and new memory configuration can
+  result in different sizes of memory components. If you try to reuse your 
Flink configuration from older versions
+  before 1.10, it can result in changes to the behavior, performance or even 
configuration failures of your application.
+
+
+Note Before version *1.10*, Flink did 
not require that memory related options are set at all
+as they all had default values. The [new memory 
configuration](mem_setup.html#configure-total-memory) requires
+that at least one subset of the following options is configured explicitly, 
otherwise the configuration will fail:
+* 
[`taskmanager.memory.flink.size`](../config.html#taskmanager-memory-flink-size)
+* 
[`taskmanager.memory.process.size`](../config.html#taskmanager-memory-process-size)
+* 
[`taskmanager.memory.task.heap.size`](../config.html#taskmanager-memory-task-heap-size)
 and 
[`taskmanager.memory.managed.size`](../config.html#taskmanager-memory-managed-size)
+
+The [default `flink-conf.yaml`](#default-configuration-in-flink-confyaml) 
shipped with Flink sets 
[`taskmanager.memory.process.size`](../config.html#taskmanager-memory-process-size)
+to make the default memory configuration consistent.
+
+This 
[spreadsheet](https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE)
 can also help
+to evaluate and compare the results of the legacy and new memory computations.
+
+## Changes in Configuration Options
+
+This chapter shortly lists all changes to Flink's memory configuration options 
introduced with the *1.10* release.
+It also references other chapters for more details about migrating to the new 
configuration options.
+
+The following options are completely removed. If they are still used, they 
will be ignored.
+
+
+
+
+Removed option
+Note
+
+
+
+
+taskmanager.memory.fraction
+
+Check the description of the new option taskmanager.memory.managed.fraction.
+The new option has different semantics and the value of the 
deprecated option usually has to be adjusted.
+See also how to migrate managed 
memory.
+
+
+
+ taskmanager.memory.off-heap
+ On-heap managed memory is no longer supported. See 
also how to migrate managed memory.
+
+
+ taskmanager.memory.preallocate
+ Pre-allocation is no longer supported and managed 
memory is always allocated lazily. See also how 
to migrate managed memory.
+
+
+
+
+The following options are deprecated but if they are still used they will be 
interpreted as new options for backwards compatibility:
+
+
+
+
+Deprecated option
+Interpreted as
+
+
+
+
+taskmanager.heap.size
+
+
+  taskmanager.memory.flink.size
 for standalone deployment
+  taskmanager.memory.process.size
 for containerized deployments
+
+See also how to 
migrate total memory.
+
+
+
+ taskmanager.memory.size
+ taskmanager.memory.managed.size,
 see also how to migrate managed memory.
+
+
+ taskmanager.network.memory.min
+ taskmanager.memory.network.min
+
+
+ taskmanager.network.memory.max
+ taskmanager.memory.network.max
+
+
+ taskmanager.network

  1   2   3   4   >