[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3721 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112248789 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- no, actually, the user may give the `FLINK_TM_HEAP` environment variable or configure the "flink heap size" via `taskmanager.heap.mb` but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with `-Xmx` and `-Xms` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112247760 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " --- End diff -- You can leave it as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246946 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); --- End diff -- oh, here, I actually do already print the configuration in the error message --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246505 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " --- End diff -- sorry, should I create a separate PR? (a separate JIRA is definitely overkill for this) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246105 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30, + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } +
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112242996 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- To me, "recalculate" implied that it would change some configuration value, but that's not happening. It's only verifying that the memory for network buffers is less than the heap memory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112242842 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30, + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } +
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112239687 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30, + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } +
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112236022 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- what do you mean? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112003041 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); + } + } + + /** +* Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareHeapSizeShellScriptWithJava() throws Exception { + int
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112001266 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", --- End diff -- missing "that are"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112000380 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " --- End diff -- Let's see whether i can remember to not squash this commit :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111994207 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -376,6 +392,169 @@ private static NetworkEnvironment createNetworkEnvironment( } /** +* Calculates the amount of memory used for network buffers based on the total memory to use and +* the according configuration parameters. +* +* The following configuration parameters are involved: +* +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and +* {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist) +* . +* +* @param totalJavaMemorySize +* overall available memory to use (heap and off-heap, in bytes) +* @param config +* configuration object +* +* @return memory to use for network buffers (in bytes) +*/ + public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) { --- End diff -- how about a slightly longer method name? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112000165 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30, + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } +
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112002748 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); --- End diff -- As with the other randomized tests we should print the configured used for the test in case of failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111997260 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30, + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } +
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111992933 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30, + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } +
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111983754 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -410,24 +410,47 @@ private static NetworkEnvironment createNetworkEnvironment( * * @return memory to use for network buffers (in bytes) */ + @SuppressWarnings("deprecation") public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) { + assert totalJavaMemorySize > 0; --- End diff -- use Preconditions instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111975142 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- This is more of a verification isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111969982 --- Diff: docs/setup/config.md --- @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated ## Background + ### Configuring the Network Buffers -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you --- End diff -- afaik those things only happen if you leave an empty space at the end of the line or so - here, the lines are properly joined inside HTML --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111966911 --- Diff: docs/setup/config.md --- @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated ## Background + ### Configuring the Network Buffers -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you --- End diff -- We usually don't do manual line breaks in the documentation; otherwise if you resize the window funky things start to happen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3721: [FLINK-4545] replace the network buffers parameter
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3721 [FLINK-4545] replace the network buffers parameter (based on #3708 and #3713) Instead, allow the configuration with the following three new (more flexible) parameters: * `taskmanager.network.memory.fraction`: fraction of JVM memory to use for network buffers (default: 0.1) * `taskmanager.network.memory.min`: minimum memory size for network buffers (default: 64 MB) * `taskmanager.network.memory.max`: maximum memory size for network buffers (default: 1 GB) Note that I needed to adapt two unit tests which would have been killed on Travis CI because these defaults result in ~150MB memory being used for network buffers which apparently was too much there. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3721 commit e61f7bc4debce332c421cb645ff1025b4d03d8d0 Author: Nico KruberDate: 2017-04-11T09:26:29Z [FLINK-6292] fix transfer.sh upload by using https Seems the upload via http is not supported anymore. commit 362ceec0823b179719449d0ed244c591dfcf51f4 Author: Nico Kruber Date: 2017-04-12T09:09:03Z [FLINK-6299] make all IT cases extend from TestLogger This way, currently executed tests and their failures are properly logged. commit 973099ef55701fe63951639d37b4f01765b06a01 Author: Nico Kruber Date: 2017-04-06T12:41:52Z [FLINK-4545] replace the network buffers parameter Instead, allow the configuration with the following three new (more flexible) parameters: * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1) * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB) * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB) # Please enter the commit message for your changes. Lines starting commit 09a981189b59ac13bd39000cc77913c0b03289fd Author: Nico Kruber Date: 2017-04-11T12:20:40Z [hotfix] fix typo in error message commit 0960a809c8da51b9787f3f726945716933051fc3 Author: Nico Kruber Date: 2017-04-11T13:29:41Z [hotfix] fix typo in taskmanager.sh usage string commit 298bb69451a1405df774451de11eb5684534c956 Author: Nico Kruber Date: 2017-04-06T15:58:14Z [FLINK-4545] adapt taskmanager.sh to take network buffers memory into account commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a Author: Nico Kruber Date: 2017-04-10T09:43:50Z [FLINK-4545] add configuration checks for the new network buffer memory config commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea Author: Nico Kruber Date: 2017-04-10T16:22:10Z [FLINK-4545] add unit tests using the new network configuration parameters and methods commit a24a548e6ff7e36581f7f7457099656362ca3974 Author: Nico Kruber Date: 2017-04-11T16:52:56Z [FLINK-4545] add unit tests for heap size calculation in shell scripts These verify that the results are the same as in the calculation done by Java. commit d55153d559bf110a931b5de849df812038ba4a7a Author: Nico Kruber Date: 2017-04-12T16:11:37Z [FLINK-4545] update the docs with the changed network buffer parameter Also update the descriptions of taskmanager.memory.fraction not being relative to the full size of taskmanager.heap.mb but that network buffer memory is subtracted before! commit c48beb0d67e8ef847ef845835e342d4a49127e7d Author: Nico Kruber Date: 2017-04-12T16:25:27Z [FLINK-4545] fix some tests being killed on Travis CI Due to the increased defaults for network buffer memory use, some builds on Travis CI fail with unit tests being killed. This affects * RocksDbBackendEventTimeWindowCheckpointingITCase and * HBaseConnectorITCase We fix this by limiting the maximum amount of network buffer memory to 80MB (current defaults would yield 150MB, previously 64MB were used). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---