[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999502#comment-15999502
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3721


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>Assignee: Nico Kruber
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999430#comment-15999430
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3721
  
Merging this.
I filed a follow-up JIRA to address the "configuration with units" to make 
sure all memory-related parameters behave the same way, without loss of byte 
precision where needed: https://issues.apache.org/jira/browse/FLINK-6469


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>Assignee: Nico Kruber
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998233#comment-15998233
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3721
  
Okay, taking a step back. Looking through the code some more, the internal 
arithmetric should certainly stay in bytes. However, bytes are tedious to 
configure.

I suggest to add support to the configuration to interpret memory units, so 
that we can configure values via
  - 512m
  - 10 kb
  - ...

I have started some utility here: 
https://github.com/StephanEwen/incubator-flink/tree/mem_size

That means that we would keep the PR in this form and add memory 
configuration parsing as a followup.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>Assignee: Nico Kruber
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15997337#comment-15997337
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3721
  
Code is good in general and well tested (including the shell scripts, which 
is great!)

I would do some on-the fly polishing while merging. Main thing I want to 
adjust if having the configuration parameters specified in Megabytes, not 
Bytes. All other memory related parameters are in Megabytes, so that one should 
be as well, for consistency. Also, I think we don't need a finer granularity 
these days.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996721#comment-15996721
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3721
  
I am checking this PR out now...


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974988#comment-15974988
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112248789
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

no, actually, the user may give the `FLINK_TM_HEAP` environment variable or 
configure the "flink heap size" via `taskmanager.heap.mb` but this is not the 
real "heap" size - rather the overall memory size used by flink (including 
off-heap). So this function removes the off-heap part and returns the real heap 
sizes to use with `-Xmx` and `-Xms`


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I 

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974981#comment-15974981
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112247760
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
-   LOG.error("Unexpected problen while getting the 
file statistics for file '" + this.filePath + "': "
+   LOG.error("Unexpected problem while getting the 
file statistics for file '" + this.filePath + "': "
--- End diff --

You can leave it as it is.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974977#comment-15974977
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112246946
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareNetworkBufShellScriptWithJava() throws Exception {
+   int managedMemSize = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+   float managedMemFrac = 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+   // manual tests from 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 
/*MB*/, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, 0.1f), 0.0f);
+
+   // some automated tests with random (but valid) values
+
+   Random ran = new Random();
+   for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+   // tolerate that values differ by 1% (due to different 
floating point precisions)
+   compareNetworkBufJavaVsScript(getRandomConfig(ran), 
0.01f);
--- End diff --

oh, here, I actually do already print the 

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974974#comment-15974974
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112246505
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
-   LOG.error("Unexpected problen while getting the 
file statistics for file '" + this.filePath + "': "
+   LOG.error("Unexpected problem while getting the 
file statistics for file '" + this.filePath + "': "
--- End diff --

sorry, should I create a separate PR? (a separate JIRA is definitely 
overkill for this)


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974970#comment-15974970
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112246105
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974950#comment-15974950
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112242842
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974951#comment-15974951
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112242996
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

To me, "recalculate" implied that it would change some configuration value, 
but that's not happening. It's only verifying that the memory for network 
buffers is less than the heap memory.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974938#comment-15974938
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112239687
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974927#comment-15974927
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3721
  
For the two tests that failed on Travis CI: they were simply killed and a 
"`Killed`" appeared in their logs which is usually an indicator that memory ran 
out and the kernel killed a process


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974920#comment-15974920
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112236022
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

what do you mean?


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973040#comment-15973040
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112001266
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
--- End diff --

missing "that are"?


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973044#comment-15973044
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112002748
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareNetworkBufShellScriptWithJava() throws Exception {
+   int managedMemSize = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+   float managedMemFrac = 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+   // manual tests from 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 
/*MB*/, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, 0.1f), 0.0f);
+
+   // some automated tests with random (but valid) values
+
+   Random ran = new Random();
+   for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+   // tolerate that values differ by 1% (due to different 
floating point precisions)
+   compareNetworkBufJavaVsScript(getRandomConfig(ran), 
0.01f);
--- End diff --

As with the other randomized tests we should print 

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973043#comment-15973043
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112000165
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973041#comment-15973041
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111975142
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

This is more of a verification isn't it?


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973042#comment-15973042
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112003041
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareNetworkBufShellScriptWithJava() throws Exception {
+   int managedMemSize = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+   float managedMemFrac = 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+   // manual tests from 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 
/*MB*/, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, 0.1f), 0.0f);
+
+   // some automated tests with random (but valid) values
+
+   Random ran = new Random();
+   for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+   // tolerate that values differ by 1% (due to different 
floating point precisions)
+   compareNetworkBufJavaVsScript(getRandomConfig(ran), 
0.01f);
+   }
+   }
+
+   /**
+* Tests that {@link 

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973037#comment-15973037
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111992933
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973035#comment-15973035
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111994207
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -376,6 +392,169 @@ private static NetworkEnvironment 
createNetworkEnvironment(
}
 
/**
+* Calculates the amount of memory used for network buffers based on 
the total memory to use and
+* the according configuration parameters.
+*
+* The following configuration parameters are involved:
+* 
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and
+*  {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)
+* .
+*
+* @param totalJavaMemorySize
+*  overall available memory to use (heap and off-heap, in 
bytes)
+* @param config
+*  configuration object
+*
+* @return memory to use for network buffers (in bytes)
+*/
+   public static long calculateNetworkBuf(long totalJavaMemorySize, 
Configuration config) {
--- End diff --

how about a slightly longer method name?


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973038#comment-15973038
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112000380
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
-   LOG.error("Unexpected problen while getting the 
file statistics for file '" + this.filePath + "': "
+   LOG.error("Unexpected problem while getting the 
file statistics for file '" + this.filePath + "': "
--- End diff --

Let's see whether i can remember to not squash this commit :)


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973039#comment-15973039
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111983754
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -410,24 +410,47 @@ private static NetworkEnvironment 
createNetworkEnvironment(
 *
 * @return memory to use for network buffers (in bytes)
 */
+   @SuppressWarnings("deprecation")
public static long calculateNetworkBuf(long totalJavaMemorySize, 
Configuration config) {
+   assert totalJavaMemorySize > 0;
--- End diff --

use Preconditions instead?


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973036#comment-15973036
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111997260
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972803#comment-15972803
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111969982
  
--- Diff: docs/setup/config.md ---
@@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in 
order to archive terminated
 
 ## Background
 
+
 ### Configuring the Network Buffers
 
-If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, please use the following formula to adjust the number of 
network buffers:
+If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, you
--- End diff --

afaik those things only happen if you leave an empty space at the end of 
the line or so - here, the lines are properly joined inside HTML


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972771#comment-15972771
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111966911
  
--- Diff: docs/setup/config.md ---
@@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in 
order to archive terminated
 
 ## Background
 
+
 ### Configuring the Network Buffers
 
-If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, please use the following formula to adjust the number of 
network buffers:
+If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, you
--- End diff --

We usually don't do manual line breaks in the documentation; otherwise if 
you resize the window funky things start to happen.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972538#comment-15972538
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3721
  
I've merged the 2 PR's that this one build upon; could you rebase this one?


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967540#comment-15967540
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3721

[FLINK-4545] replace the network buffers parameter

(based on #3708 and #3713)

Instead, allow the configuration with the following three new (more 
flexible) parameters:
* `taskmanager.network.memory.fraction`: fraction of JVM memory to use for 
network buffers (default: 0.1)
* `taskmanager.network.memory.min`: minimum memory size for network buffers 
(default: 64 MB)
* `taskmanager.network.memory.max`: maximum memory size for network buffers 
(default: 1 GB)

Note that I needed to adapt two unit tests which would have been killed on 
Travis CI because these defaults result in ~150MB memory being used for network 
buffers which apparently was too much there.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3721


commit e61f7bc4debce332c421cb645ff1025b4d03d8d0
Author: Nico Kruber 
Date:   2017-04-11T09:26:29Z

[FLINK-6292] fix transfer.sh upload by using https

Seems the upload via http is not supported anymore.

commit 362ceec0823b179719449d0ed244c591dfcf51f4
Author: Nico Kruber 
Date:   2017-04-12T09:09:03Z

[FLINK-6299] make all IT cases extend from TestLogger

This way, currently executed tests and their failures are properly logged.

commit 973099ef55701fe63951639d37b4f01765b06a01
Author: Nico Kruber 
Date:   2017-04-06T12:41:52Z

[FLINK-4545] replace the network buffers parameter

Instead, allow the configuration with the following three new (more 
flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for 
network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network 
buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network 
buffers (default: 1 GB)

 # Please enter the commit message for your changes. Lines starting

commit 09a981189b59ac13bd39000cc77913c0b03289fd
Author: Nico Kruber 
Date:   2017-04-11T12:20:40Z

[hotfix] fix typo in error message

commit 0960a809c8da51b9787f3f726945716933051fc3
Author: Nico Kruber 
Date:   2017-04-11T13:29:41Z

[hotfix] fix typo in taskmanager.sh usage string

commit 298bb69451a1405df774451de11eb5684534c956
Author: Nico Kruber 
Date:   2017-04-06T15:58:14Z

[FLINK-4545] adapt taskmanager.sh to take network buffers memory into 
account

commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a
Author: Nico Kruber 
Date:   2017-04-10T09:43:50Z

[FLINK-4545] add configuration checks for the new network buffer memory 
config

commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea
Author: Nico Kruber 
Date:   2017-04-10T16:22:10Z

[FLINK-4545] add unit tests using the new network configuration parameters 
and methods

commit a24a548e6ff7e36581f7f7457099656362ca3974
Author: Nico Kruber 
Date:   2017-04-11T16:52:56Z

[FLINK-4545] add unit tests for heap size calculation in shell scripts

These verify that the results are the same as in the calculation done by 
Java.

commit d55153d559bf110a931b5de849df812038ba4a7a
Author: Nico Kruber 
Date:   2017-04-12T16:11:37Z

[FLINK-4545] update the docs with the changed network buffer parameter

Also update the descriptions of taskmanager.memory.fraction not being 
relative
to the full size of taskmanager.heap.mb but that network buffer memory is
subtracted before!

commit c48beb0d67e8ef847ef845835e342d4a49127e7d
Author: Nico Kruber 
Date:   2017-04-12T16:25:27Z

[FLINK-4545] fix some tests being killed on Travis CI

Due to the increased defaults for network buffer memory use, some builds on
Travis CI fail with unit tests being killed. This affects
* RocksDbBackendEventTimeWindowCheckpointingITCase and
* HBaseConnectorITCase

We fix this by limiting the maximum amount of network buffer memory to 80MB
(current defaults would yield 150MB, previously 64MB were used).




> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: 

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-20 Thread zhijiang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932773#comment-15932773
 ] 

zhijiang commented on FLINK-4545:
-

[~StephanEwen], Yeah, there are two cases, first is for all task managers with 
the same certain size like mini cluster mode, and the above three parameters 
"fraction, min, max" Nico mentioned can both work for network memory.

Second is for computing the size of container like in yarn mode, and we 
actually need a configuration parameter for network memory,  maybe the 
parameter `taskmanager.network.memory.max` Nico mentioned above can still work 
for that. My understanding is right?

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932481#comment-15932481
 ] 

Stephan Ewen commented on FLINK-4545:
-

[~zjwang] The logic outlined above is tailored towards the case where you have 
a container or JVM of a certain size and want to configure how much memory goes 
to what component.

In the case where you actually want to compute the size of the container (as in 
the fine-grained resource configuration code), we probably need a configuration 
parameter for the network memory to add to each container. What do you think?

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-18 Thread zhijiang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931247#comment-15931247
 ] 

zhijiang commented on FLINK-4545:
-

[~StephanEwen], thank you for so rich information! 
I agree with the above points and know well the considerations and future plans!
The current constant global buffer amount is replaced by the flexible minimum 
and maximum range, which is especially helpful for current batch jobs and 
future better recovery for streaming. Wish to see the changes quickly and 
participate in related parts!

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-17 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930088#comment-15930088
 ] 

Stephan Ewen commented on FLINK-4545:
-

There are a few reasons why I think the GlobalBufferPool would be helpful:

  - It is hard to know up front exactly how many network buffers will be 
needed. A TaskManager may get different tasks after a recovery (with more 
channels). In batch jobs, tasks are deployed lazily and a TaskManager may get 
more tasks over time.
  - Having more network memory is for batch jobs generally quite beneficial. 
But how many more? Usually as many as the system can spare.
  - Also on the streaming side, we plan to make use of spare buffers to help 
with better recovery in the future. The change now would make sure that this is 
possible without changing the memory configuration again.
  - Finally, on demand-buffer-allocation can help, but has also a downside: A 
system that reserves/allocates memory up from is more predictable later.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-17 Thread zhijiang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929676#comment-15929676
 ] 

zhijiang commented on FLINK-4545:
-

[~NicoK], [~StephanEwen], thank you for your replies!

As I know, we already introduced another two parameters for users to control 
the core buffer amount in {{LocalBufferPool}}, 
{{taskmanager.net.memory.buffers-per-channel}}(default: 2) and 
{{taskmanager.net.memory.extra-buffers-per-gate}}(default: 8) separately. And 
it is easy for users to understand them.

In my opinion , the {{JobManager}} or {{ResourceManager}} can decide the total 
buffer amount in {{NetworkBufferPool}} based on two conditions:
   - The number of tasks would be deployed into this {{TaskManager}}.
   - The core number of buffers in {{LocalBufferPool}}, maybe extra consider 
{{ResultPartitionType}}.

So the framework can calculate the total buffer amount and corresponding memory 
usage before starting the {{TaskManager}}. The buffer amount will be set onto 
{{TaskManager}} and memory usage will be aggregated into the total resource 
request. It seems consistent.

Have you considered not to expose the global network memory parameters to 
users, and just allow the parameters in {{LocalBufferPool}}.
Otherwise the users should consider both global parameters and local 
parameters, and may also understand the total number of tasks in 
{{TaskManager}}. It is easy to cause conflict.  The less parameters users know, 
the better.

Maybe you have other concerns that I have not covered yet, wish your advices 
and I am very willing to do something for it if needed.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-14 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924535#comment-15924535
 ] 

Nico Kruber commented on FLINK-4545:


We did some thinking and would probably add the following three new 
configuration parameters (with the given defaults) to finally replace the 
{{taskmanager.network.numberOfBuffers}} parameter:

- {{taskmanager.network.memory.fraction}} (default: 0.1): fraction of JVM 
memory to use for network buffers (by reducing {{taskmanager.memory.fraction}} 
from 0.7 to 0.6)
- {{taskmanager.network.memory.min}} (default: 64MB): minimum memory size for 
network buffers
- {{taskmanager.network.memory.max}} (default: 1GB): maximum memory size for 
network buffers

A fixed size may be achieved by setting the latter two to the same value, 
{{taskmanager.network.numberOfBuffers}} will be marked deprecated and used only 
if the other three are not given, e.g. due to old config files being used.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923811#comment-15923811
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3467
  
@zhijiangW Yes, let's discuss this when the feature is complete.
Our thinking so far is:
  - One can specify an absolute amount of network memory (similar as one 
can specify an absolute amount of managed memory for batch)
  - If no absolute amount is specified, a relative fraction of the JVM heap 
will be pre-allocated as network buffers.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923554#comment-15923554
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3467
  
@NicoK ,thank you for explanation,  and I already trace the code in your 
local branch. Wish your further change commit in global pool.

@StephanEwen , thanks for further elaboration. From my understanding, each 
task can decide the core number of buffers in `LocalBufferPool` based on input, 
output channels and configuration, the maximum number of buffers based on 
`ResultPartitionType`. And all the `LocalBufferPool`s make effect on the total 
number of buffers in `NetworkBufferPool`, may need consider maximum memory 
usages.

And my concern is to consider the memory usages in `NetworkBufferPool` 
before starts the `TaskManager`, and this part of memory should be added into 
the total resource of `TaskManager`. 
I am willing to do that as a part of my current work in [Fine-grained 
Resource Configuration](https://issues.apache.org/jira/browse/FLINK-5131) after 
this feature completes.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905085#comment-15905085
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3480


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904930#comment-15904930
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3480
  
Did another review - looks good to me!
Merging...


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903159#comment-15903159
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3480
  
added the requested changes and successfully rebased on the newest master 
due to conflicts


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902948#comment-15902948
 ] 

Stephan Ewen commented on FLINK-4545:
-

Part one merged in 8b49ee5aa2e17b1787764c3265e1ebda47d89840

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902943#comment-15902943
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3467


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902916#comment-15902916
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3467#discussion_r105143643
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -265,11 +281,15 @@ public String toString() {
// 

 
private void returnMemorySegment(MemorySegment segment) {
+   assert Thread.holdsLock(availableMemorySegments);
--- End diff --

Using synchronized again would impact performance, while assertions only do 
when they are enabled which is the case in our unit tests (see 
https://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#enableAssertions).


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902771#comment-15902771
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3480
  
Code looks overall very nice!

What would be good is to make the "magic constants" configurable. I don't 
expect that anyone usually tweaks those (and eventually they will disappear and 
flow control will auto-tune the network), but having them configurable is 
always good to have a workaround in case something unexpected happens.

I would follow the same path as the `partitionRequestInitialBackoff` and 
`partitionRequestMaxBackoff` values in the `NetworkEnvironment`.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902429#comment-15902429
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user wenlong88 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3467#discussion_r105081191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -265,11 +281,15 @@ public String toString() {
// 

 
private void returnMemorySegment(MemorySegment segment) {
+   assert Thread.holdsLock(availableMemorySegments);
--- End diff --

Hi, I have a question about assert, because I found that assertion is 
disabled in java by default. why not use explicit 
`synchronized(availableMemorySegments)` which may be more common usage.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901930#comment-15901930
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3467
  
I think this is a good change, merging this...

@zhijiangW Managing the buffers changes in some followup PR, first 
adjusting the local pools, then the global pool. Managing buffers in a global 
pool can help when caching data, such as for batch jobs. But we can take 
suggestions followup improvements as a separate thread, after this improvement 
is in.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897795#comment-15897795
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3480

[FLINK-4545] use size-restricted LocalBufferPool instances for network 
communication

Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the 
network buffer parameter.

With this PR, the number of buffers a `LocalBufferPool` has to offer, will 
be limited to `2 *  + 8` for both input and output 
connections. This way, we reduce buffer bloat in our network stack without 
limiting ourselves to specific jobs and their connections too much since the 
total number of network buffers can now be arbitrarily large again without 
consequences on the delays checkpoint barriers, for example, have while 
travelling through all TMs.

Eventually, this will lead to the network buffer parameter being removed 
(which was the initial goal) but in a simple scenario like the following, with 
a parallelism of 2 and thus running on 6 TMs, we were able to reduce the 
75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for 
both).

```java
final StreamExecutionEnvironment env = 
getStreamExecutionEnvironment(params);
env.disableOperatorChaining();

env.enableCheckpointing(1_000L);

DataStreamSource> source1 = env.addSource(new 
LongSource());

source1.slotSharingGroup("source")
.keyBy(1)
.map(new IdentityMapFunction>())
.slotSharingGroup("map")
.keyBy(1)
.addSink(new DiscardingSink>())
.slotSharingGroup("sink");
```

By adding random delays (every 1000 keys 0-1ms) to the 
`IdentityMapFunction`, the median even improves from 5026ms to 293ms.

Both scenarios do not influence the throughput of the program but for real 
programs, reductions in delay may differ since there actual state may need to 
be stored and other components take part as well ;)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3480


commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
Author: Nico Kruber 
Date:   2017-02-10T13:36:37Z

[FLINK-4545] remove (unused) persistent partition type

commit 11557c004450bcbbe680f1575f0e41d164424eae
Author: Nico Kruber 
Date:   2017-02-10T15:11:08Z

[docs] improve some documentation around network buffers

commit cd999061d04ae803c79473241ac1f9b39c1f2731
Author: Nico Kruber 
Date:   2017-02-10T15:12:19Z

[hotfix][network] add some assertions documenting on which locks we rely

commit 8f529bb3f42916c816c5091228569952917ad9b5
Author: Nico Kruber 
Date:   2017-03-01T13:33:44Z

[FLINK-4545] remove fixed-size BufferPool instances

These were unused except for unit tests and will be replaced with bounded
BufferPool instances.

commit 91cea2917e9453f9de5c02472d99d4fc0d090dda
Author: Nico Kruber 
Date:   2017-03-06T11:36:02Z

[FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without 
partition type

This removes
JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern 
distPattern)
and requires the developer to call
JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern 
distPattern, ResultPartitionType partitionType)
instead and think about the partition type to add.

commit 83d1404b106b558679e4c9ef16123fbc6b5eac72
Author: Nico Kruber 
Date:   2017-03-06T11:37:56Z

[FLINK-4545] remove unused IntermediateDataSet constructors

These were implying a default result partition type which we want the 
developer
to actively decide upon.

commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86
Author: Nico Kruber 
Date:   2017-02-10T13:53:09Z

[FLINK-4545] add a bounded result partition type

This can be used to limit the number of network buffers used for this 
partition.

(borrows the appropriate parts of a commit previously sketched for
FLINK-5088 to implement bounded network queue lengths)

commit b57f0652a768645a5712d376d0e4b438f35cfa6c
Author: Nico Kruber 
Date:   2017-02-10T17:22:55Z

[FLINK-4545] allow LocalBufferPool to use a limited number of buffers

commit 

[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897050#comment-15897050
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3467
  
Hi @zhijiangW,
actually, the solution I am working on is to replace the network buffers 
parameter by something like "max memory in percent" and "min MB to use". For 
this to not create buffer bloat in our network stack, I have started to 
implement limited `LocalBufferPool` instances which tune their size based on 
the actual number of outgoing and ingoing channels. It is actually not much 
more complicated than this and I already started on this in my local branch at 
https://github.com/NicoK/flink/tree/flink-4545 - expect a new PR within the 
week with more details.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896993#comment-15896993
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3467
  
Hi @NicoK , I am interested in this issue and I like the way of asserting 
hold lock in this PR.

It is really necessary to manage network buffers by framework, because it 
is difficult to set the exact number of buffers by users. And our current 
simple solution is to expand the `ResourceProfile` by adding the total number 
of input and output edges for `Execution`. Then the `ResourceManager` would 
calculate the buffer amounts based on that and overwrite the parameter value to 
`TaskManager` configuration.

From @StephanEwen mentioned before, I know a little for this issue. Would 
you share some detail designs for plans for it if have, then I can learn and 
track the progress in time.  Thank you !


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894230#comment-15894230
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3467

[FLINK-4545] preparations for removing the network buffers parameter

This PR includes some preparations for following PRs that ultimately lead 
to removing the network buffer parameter that was hard to tune.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4545-prep

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3467.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3467


commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
Author: Nico Kruber 
Date:   2017-02-10T13:36:37Z

[FLINK-4545] remove (unused) persistent partition type

commit 11557c004450bcbbe680f1575f0e41d164424eae
Author: Nico Kruber 
Date:   2017-02-10T15:11:08Z

[docs] improve some documentation around network buffers

commit cd999061d04ae803c79473241ac1f9b39c1f2731
Author: Nico Kruber 
Date:   2017-02-10T15:12:19Z

[hotfix][network] add some assertions documenting on which locks we rely

commit 8f529bb3f42916c816c5091228569952917ad9b5
Author: Nico Kruber 
Date:   2017-03-01T13:33:44Z

[FLINK-4545] remove fixed-size BufferPool instances

These were unused except for unit tests and will be replaced with bounded
BufferPool instances.




> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2016-11-03 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632781#comment-15632781
 ] 

Greg Hogan commented on FLINK-4545:
---

In 1.2 we now expose metrics for the number of allocated and in use network 
buffers. This is now in the web UI but wasn't properly populating the values.

Is there an exact "best" number of network buffers to allocate? Flink may run 
with a minimal allocation but this may cause additional backpressure while 
waiting for buffers to be processed and become free. We recommend something 
similar by advising users to maximize the TaskManager memory allocation, which 
may be better for streaming than batch since after spilling to disk the 
merge-sort makes use of the filesystem's read-ahead cache.

+1 to dynamic scaling, and also if/when memory buffers are dynamically 
allocated between operators.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2016-11-03 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632652#comment-15632652
 ] 

Stephan Ewen commented on FLINK-4545:
-

For a YARN-job-at-a-time setup, one could pre-compute that.
For more dynamic setups its a little more tricky, if you do not now the job or 
the number of TaskManagers up front.

Also - the formula needs to take a "{{buffers *= numShuffles}} into account.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632608#comment-15632608
 ] 

Jamie Grier commented on FLINK-4545:


Big +1!

In general I would love to see this improved.  In my experience this is the 
"one thing" that people run into with Flink, whereas everything else "just 
works" this one parameter they have to set/tune and it's very confusing to 
newcomers.

The equation to get this right is complex and the "correct" setting changes 
based on how they deploy the job, what parallelism they use, how many TMs, etc, 
etc.

It also often happens that things are working and then a user changes their job 
a bit (adding a keyBy for instance) and then it stops working at they have a 
hard time understanding why.

Is there a way we can set this parameter automatically in a majority of use 
cases?  If folks are running single jobs directly on YARN for instance it seems 
we should have all the information necessary to set this parameter 
auto-magically or at least fail-fast and tell the the user what the parameter 
should be set to.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)