[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-05-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3721


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112248789
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

no, actually, the user may give the `FLINK_TM_HEAP` environment variable or 
configure the "flink heap size" via `taskmanager.heap.mb` but this is not the 
real "heap" size - rather the overall memory size used by flink (including 
off-heap). So this function removes the off-heap part and returns the real heap 
sizes to use with `-Xmx` and `-Xms`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112247760
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
-   LOG.error("Unexpected problen while getting the 
file statistics for file '" + this.filePath + "': "
+   LOG.error("Unexpected problem while getting the 
file statistics for file '" + this.filePath + "': "
--- End diff --

You can leave it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112246946
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareNetworkBufShellScriptWithJava() throws Exception {
+   int managedMemSize = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+   float managedMemFrac = 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+   // manual tests from 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 
/*MB*/, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, 0.1f), 0.0f);
+
+   // some automated tests with random (but valid) values
+
+   Random ran = new Random();
+   for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+   // tolerate that values differ by 1% (due to different 
floating point precisions)
+   compareNetworkBufJavaVsScript(getRandomConfig(ran), 
0.01f);
--- End diff --

oh, here, I actually do already print the configuration in the error message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but 

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112246505
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
-   LOG.error("Unexpected problen while getting the 
file statistics for file '" + this.filePath + "': "
+   LOG.error("Unexpected problem while getting the 
file statistics for file '" + this.filePath + "': "
--- End diff --

sorry, should I create a separate PR? (a separate JIRA is definitely 
overkill for this)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112246105
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30,
+   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
+
+   calculateNetworkBufNew(config);
+   }
+
   

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112242996
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

To me, "recalculate" implied that it would change some configuration value, 
but that's not happening. It's only verifying that the memory for network 
buffers is less than the heap memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112242842
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30,
+   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
+
+   calculateNetworkBufNew(config);
+   }
+
   

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112239687
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30,
+   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
+
+   calculateNetworkBufNew(config);
+   }
+
   

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112236022
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

what do you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112003041
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareNetworkBufShellScriptWithJava() throws Exception {
+   int managedMemSize = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+   float managedMemFrac = 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+   // manual tests from 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 
/*MB*/, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, 0.1f), 0.0f);
+
+   // some automated tests with random (but valid) values
+
+   Random ran = new Random();
+   for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+   // tolerate that values differ by 1% (due to different 
floating point precisions)
+   compareNetworkBufJavaVsScript(getRandomConfig(ran), 
0.01f);
+   }
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareHeapSizeShellScriptWithJava() throws Exception {
+   int 

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112001266
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
--- End diff --

missing "that are"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112000380
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
-   LOG.error("Unexpected problen while getting the 
file statistics for file '" + this.filePath + "': "
+   LOG.error("Unexpected problem while getting the 
file statistics for file '" + this.filePath + "': "
--- End diff --

Let's see whether i can remember to not squash this commit :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111994207
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -376,6 +392,169 @@ private static NetworkEnvironment 
createNetworkEnvironment(
}
 
/**
+* Calculates the amount of memory used for network buffers based on 
the total memory to use and
+* the according configuration parameters.
+*
+* The following configuration parameters are involved:
+* 
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and
+*  {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)
+* .
+*
+* @param totalJavaMemorySize
+*  overall available memory to use (heap and off-heap, in 
bytes)
+* @param config
+*  configuration object
+*
+* @return memory to use for network buffers (in bytes)
+*/
+   public static long calculateNetworkBuf(long totalJavaMemorySize, 
Configuration config) {
--- End diff --

how about a slightly longer method name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112000165
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30,
+   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
+
+   calculateNetworkBufNew(config);
+   }
+
  

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r112002748
  
--- Diff: 
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation 
used by the bash script
+ * taskmanager.sh returns the same values as the heap size 
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses awk to perform floating-point 
arithmetic which uses
+ * double precision but our Java code restrains to float 
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger 
{
+
+   /** Key that is used by config.sh. */
+   private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+   /**
+* Number of tests with random values.
+*
+* NOTE: calling the external test script is slow and thus low numbers 
are preferred for general
+* testing.
+*/
+   private static final int NUM_RANDOM_TESTS = 20;
+
+   @Before
+   public void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts not available 
on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} has the same
+* result as the shell script.
+*/
+   @Test
+   public void compareNetworkBufShellScriptWithJava() throws Exception {
+   int managedMemSize = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+   float managedMemFrac = 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+   // manual tests from 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 
/*MB*/, managedMemFrac), 0.0f);
+
+   compareNetworkBufJavaVsScript(
+   getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 
managedMemSize, 0.1f), 0.0f);
+
+   // some automated tests with random (but valid) values
+
+   Random ran = new Random();
+   for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+   // tolerate that values differ by 1% (due to different 
floating point precisions)
+   compareNetworkBufJavaVsScript(getRandomConfig(ran), 
0.01f);
--- End diff --

As with the other randomized tests we should print the configured used for 
the test in case of failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if 

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111997260
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30,
+   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
+
+   calculateNetworkBufNew(config);
+   }
+
  

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111992933
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
+* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+*/
+   @SuppressWarnings("deprecation")
+   @Test
+   public void calculateNetworkBufOld() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+   // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
+   
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
+
+   // test integer overflow in the memory size
+   int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
+   assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+   }
+
+   /**
+* Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
+* configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+*/
+   @Test
+   public void calculateNetworkBufNew() throws Exception {
+   Configuration config = new Configuration();
+
+   // (1) defaults
+   final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+   final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+   final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20,
+   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
+   assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30,
+   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
+
+   calculateNetworkBufNew(config);
+   }
+
  

[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111983754
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -410,24 +410,47 @@ private static NetworkEnvironment 
createNetworkEnvironment(
 *
 * @return memory to use for network buffers (in bytes)
 */
+   @SuppressWarnings("deprecation")
public static long calculateNetworkBuf(long totalJavaMemorySize, 
Configuration config) {
+   assert totalJavaMemorySize > 0;
--- End diff --

use Preconditions instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111975142
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -398,3 +428,106 @@ readSlaves() {
 useOffHeapMemory() {
 [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
 }
+
+HAVE_AWK=
+# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
+calculateNetworkBuf() {
+local network_buffers_bytes
+if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
+exit 1
+fi
+
+if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
+# fix memory size for network buffers
+network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
+else
+if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
+echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
+echo "Min must be less than or equal to max."
+echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+
+# Bash only performs integer arithmetic so floating point 
computation is performed using awk
+if [[ -z "${HAVE_AWK}" ]] ; then
+command -v awk >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Program 'awk' not found."
+echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
+exit 1
+fi
+HAVE_AWK=true
+fi
+
+# We calculate the memory using a fraction of the total memory
+if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
+echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
+echo "It must be between 0.0 and 1.0."
+echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
+exit 1
+fi
+
+network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+fi
+
+# recalculate the JVM heap memory by taking the network buffers into 
account
--- End diff --

This is more of a verification isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111969982
  
--- Diff: docs/setup/config.md ---
@@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in 
order to archive terminated
 
 ## Background
 
+
 ### Configuring the Network Buffers
 
-If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, please use the following formula to adjust the number of 
network buffers:
+If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, you
--- End diff --

afaik those things only happen if you leave an empty space at the end of 
the line or so - here, the lines are properly joined inside HTML


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3721#discussion_r111966911
  
--- Diff: docs/setup/config.md ---
@@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in 
order to archive terminated
 
 ## Background
 
+
 ### Configuring the Network Buffers
 
-If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, please use the following formula to adjust the number of 
network buffers:
+If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, you
--- End diff --

We usually don't do manual line breaks in the documentation; otherwise if 
you resize the window funky things start to happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter

2017-04-13 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3721

[FLINK-4545] replace the network buffers parameter

(based on #3708 and #3713)

Instead, allow the configuration with the following three new (more 
flexible) parameters:
* `taskmanager.network.memory.fraction`: fraction of JVM memory to use for 
network buffers (default: 0.1)
* `taskmanager.network.memory.min`: minimum memory size for network buffers 
(default: 64 MB)
* `taskmanager.network.memory.max`: maximum memory size for network buffers 
(default: 1 GB)

Note that I needed to adapt two unit tests which would have been killed on 
Travis CI because these defaults result in ~150MB memory being used for network 
buffers which apparently was too much there.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3721


commit e61f7bc4debce332c421cb645ff1025b4d03d8d0
Author: Nico Kruber 
Date:   2017-04-11T09:26:29Z

[FLINK-6292] fix transfer.sh upload by using https

Seems the upload via http is not supported anymore.

commit 362ceec0823b179719449d0ed244c591dfcf51f4
Author: Nico Kruber 
Date:   2017-04-12T09:09:03Z

[FLINK-6299] make all IT cases extend from TestLogger

This way, currently executed tests and their failures are properly logged.

commit 973099ef55701fe63951639d37b4f01765b06a01
Author: Nico Kruber 
Date:   2017-04-06T12:41:52Z

[FLINK-4545] replace the network buffers parameter

Instead, allow the configuration with the following three new (more 
flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for 
network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network 
buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network 
buffers (default: 1 GB)

 # Please enter the commit message for your changes. Lines starting

commit 09a981189b59ac13bd39000cc77913c0b03289fd
Author: Nico Kruber 
Date:   2017-04-11T12:20:40Z

[hotfix] fix typo in error message

commit 0960a809c8da51b9787f3f726945716933051fc3
Author: Nico Kruber 
Date:   2017-04-11T13:29:41Z

[hotfix] fix typo in taskmanager.sh usage string

commit 298bb69451a1405df774451de11eb5684534c956
Author: Nico Kruber 
Date:   2017-04-06T15:58:14Z

[FLINK-4545] adapt taskmanager.sh to take network buffers memory into 
account

commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a
Author: Nico Kruber 
Date:   2017-04-10T09:43:50Z

[FLINK-4545] add configuration checks for the new network buffer memory 
config

commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea
Author: Nico Kruber 
Date:   2017-04-10T16:22:10Z

[FLINK-4545] add unit tests using the new network configuration parameters 
and methods

commit a24a548e6ff7e36581f7f7457099656362ca3974
Author: Nico Kruber 
Date:   2017-04-11T16:52:56Z

[FLINK-4545] add unit tests for heap size calculation in shell scripts

These verify that the results are the same as in the calculation done by 
Java.

commit d55153d559bf110a931b5de849df812038ba4a7a
Author: Nico Kruber 
Date:   2017-04-12T16:11:37Z

[FLINK-4545] update the docs with the changed network buffer parameter

Also update the descriptions of taskmanager.memory.fraction not being 
relative
to the full size of taskmanager.heap.mb but that network buffer memory is
subtracted before!

commit c48beb0d67e8ef847ef845835e342d4a49127e7d
Author: Nico Kruber 
Date:   2017-04-12T16:25:27Z

[FLINK-4545] fix some tests being killed on Travis CI

Due to the increased defaults for network buffer memory use, some builds on
Travis CI fail with unit tests being killed. This affects
* RocksDbBackendEventTimeWindowCheckpointingITCase and
* HBaseConnectorITCase

We fix this by limiting the maximum amount of network buffer memory to 80MB
(current defaults would yield 150MB, previously 64MB were used).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---