[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999502#comment-15999502 ] ASF GitHub Bot commented on FLINK-4545: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3721 > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu >Assignee: Nico Kruber >Priority: Critical > Fix For: 1.3.0 > > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999430#comment-15999430 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 Merging this. I filed a follow-up JIRA to address the "configuration with units" to make sure all memory-related parameters behave the same way, without loss of byte precision where needed: https://issues.apache.org/jira/browse/FLINK-6469 > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu >Assignee: Nico Kruber >Priority: Critical > Fix For: 1.3.0 > > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998233#comment-15998233 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 Okay, taking a step back. Looking through the code some more, the internal arithmetric should certainly stay in bytes. However, bytes are tedious to configure. I suggest to add support to the configuration to interpret memory units, so that we can configure values via - 512m - 10 kb - ... I have started some utility here: https://github.com/StephanEwen/incubator-flink/tree/mem_size That means that we would keep the PR in this form and add memory configuration parsing as a followup. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu >Assignee: Nico Kruber >Priority: Critical > Fix For: 1.3.0 > > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15997337#comment-15997337 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 Code is good in general and well tested (including the shell scripts, which is great!) I would do some on-the fly polishing while merging. Main thing I want to adjust if having the configuration parameters specified in Megabytes, not Bytes. All other memory related parameters are in Megabytes, so that one should be as well, for consistency. Also, I think we don't need a finer granularity these days. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.3.0 > > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996721#comment-15996721 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 I am checking this PR out now... > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.3.0 > > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974988#comment-15974988 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112248789 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- no, actually, the user may give the `FLINK_TM_HEAP` environment variable or configure the "flink heap size" via `taskmanager.heap.mb` but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with `-Xmx` and `-Xms` > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974981#comment-15974981 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112247760 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " --- End diff -- You can leave it as it is. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974977#comment-15974977 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246946 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); --- End diff -- oh, here, I actually do already print the
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974974#comment-15974974 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246505 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " --- End diff -- sorry, should I create a separate PR? (a separate JIRA is definitely overkill for this) > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974970#comment-15974970 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246105 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); +
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974950#comment-15974950 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112242842 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); +
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974951#comment-15974951 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112242996 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- To me, "recalculate" implied that it would change some configuration value, but that's not happening. It's only verifying that the memory for network buffers is less than the heap memory. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974938#comment-15974938 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112239687 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); +
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974927#comment-15974927 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3721 For the two tests that failed on Travis CI: they were simply killed and a "`Killed`" appeared in their logs which is usually an indicator that memory ran out and the kernel killed a process > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974920#comment-15974920 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112236022 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- what do you mean? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973040#comment-15973040 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112001266 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", --- End diff -- missing "that are"? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973044#comment-15973044 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112002748 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); --- End diff -- As with the other randomized tests we should print
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973043#comment-15973043 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112000165 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); +
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973041#comment-15973041 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111975142 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { +local network_buffers_bytes +if [ "${FLINK_TM_HEAP}" -le "0" ]; then +echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." +exit 1 +fi + +if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then +# fix memory size for network buffers +network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} +else +if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." +echo "Min must be less than or equal to max." +echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +# Bash only performs integer arithmetic so floating point computation is performed using awk +if [[ -z "${HAVE_AWK}" ]] ; then +command -v awk >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +echo "[ERROR] Program 'awk' not found." +echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi +HAVE_AWK=true +fi + +# We calculate the memory using a fraction of the total memory +if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then +echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." +echo "It must be between 0.0 and 1.0." +echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." +exit 1 +fi + +network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` +fi + +# recalculate the JVM heap memory by taking the network buffers into account --- End diff -- This is more of a verification isn't it? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973042#comment-15973042 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112003041 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * taskmanager.sh returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses awk to perform floating-point arithmetic which uses + * double precision but our Java code restrains to float because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by config.sh. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** +* Number of tests with random values. +* +* NOTE: calling the external test script is slow and thus low numbers are preferred for general +* testing. +*/ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** +* Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same +* result as the shell script. +*/ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); + } + } + + /** +* Tests that {@link
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973037#comment-15973037 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111992933 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); +
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973035#comment-15973035 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111994207 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -376,6 +392,169 @@ private static NetworkEnvironment createNetworkEnvironment( } /** +* Calculates the amount of memory used for network buffers based on the total memory to use and +* the according configuration parameters. +* +* The following configuration parameters are involved: +* +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and +* {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist) +* . +* +* @param totalJavaMemorySize +* overall available memory to use (heap and off-heap, in bytes) +* @param config +* configuration object +* +* @return memory to use for network buffers (in bytes) +*/ + public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) { --- End diff -- how about a slightly longer method name? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973038#comment-15973038 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112000380 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " --- End diff -- Let's see whether i can remember to not squash this commit :) > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973039#comment-15973039 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111983754 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -410,24 +410,47 @@ private static NetworkEnvironment createNetworkEnvironment( * * @return memory to use for network buffers (in bytes) */ + @SuppressWarnings("deprecation") public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) { + assert totalJavaMemorySize > 0; --- End diff -- use Preconditions instead? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973036#comment-15973036 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111997260 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old +* configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. +*/ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** +* Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new +* configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. +*/ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20, + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); +
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972803#comment-15972803 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111969982 --- Diff: docs/setup/config.md --- @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated ## Background + ### Configuring the Network Buffers -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you --- End diff -- afaik those things only happen if you leave an empty space at the end of the line or so - here, the lines are properly joined inside HTML > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972771#comment-15972771 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111966911 --- Diff: docs/setup/config.md --- @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated ## Background + ### Configuring the Network Buffers -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you --- End diff -- We usually don't do manual line breaks in the documentation; otherwise if you resize the window funky things start to happen. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972538#comment-15972538 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3721 I've merged the 2 PR's that this one build upon; could you rebase this one? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967540#comment-15967540 ] ASF GitHub Bot commented on FLINK-4545: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3721 [FLINK-4545] replace the network buffers parameter (based on #3708 and #3713) Instead, allow the configuration with the following three new (more flexible) parameters: * `taskmanager.network.memory.fraction`: fraction of JVM memory to use for network buffers (default: 0.1) * `taskmanager.network.memory.min`: minimum memory size for network buffers (default: 64 MB) * `taskmanager.network.memory.max`: maximum memory size for network buffers (default: 1 GB) Note that I needed to adapt two unit tests which would have been killed on Travis CI because these defaults result in ~150MB memory being used for network buffers which apparently was too much there. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3721 commit e61f7bc4debce332c421cb645ff1025b4d03d8d0 Author: Nico KruberDate: 2017-04-11T09:26:29Z [FLINK-6292] fix transfer.sh upload by using https Seems the upload via http is not supported anymore. commit 362ceec0823b179719449d0ed244c591dfcf51f4 Author: Nico Kruber Date: 2017-04-12T09:09:03Z [FLINK-6299] make all IT cases extend from TestLogger This way, currently executed tests and their failures are properly logged. commit 973099ef55701fe63951639d37b4f01765b06a01 Author: Nico Kruber Date: 2017-04-06T12:41:52Z [FLINK-4545] replace the network buffers parameter Instead, allow the configuration with the following three new (more flexible) parameters: * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1) * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB) * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB) # Please enter the commit message for your changes. Lines starting commit 09a981189b59ac13bd39000cc77913c0b03289fd Author: Nico Kruber Date: 2017-04-11T12:20:40Z [hotfix] fix typo in error message commit 0960a809c8da51b9787f3f726945716933051fc3 Author: Nico Kruber Date: 2017-04-11T13:29:41Z [hotfix] fix typo in taskmanager.sh usage string commit 298bb69451a1405df774451de11eb5684534c956 Author: Nico Kruber Date: 2017-04-06T15:58:14Z [FLINK-4545] adapt taskmanager.sh to take network buffers memory into account commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a Author: Nico Kruber Date: 2017-04-10T09:43:50Z [FLINK-4545] add configuration checks for the new network buffer memory config commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea Author: Nico Kruber Date: 2017-04-10T16:22:10Z [FLINK-4545] add unit tests using the new network configuration parameters and methods commit a24a548e6ff7e36581f7f7457099656362ca3974 Author: Nico Kruber Date: 2017-04-11T16:52:56Z [FLINK-4545] add unit tests for heap size calculation in shell scripts These verify that the results are the same as in the calculation done by Java. commit d55153d559bf110a931b5de849df812038ba4a7a Author: Nico Kruber Date: 2017-04-12T16:11:37Z [FLINK-4545] update the docs with the changed network buffer parameter Also update the descriptions of taskmanager.memory.fraction not being relative to the full size of taskmanager.heap.mb but that network buffer memory is subtracted before! commit c48beb0d67e8ef847ef845835e342d4a49127e7d Author: Nico Kruber Date: 2017-04-12T16:25:27Z [FLINK-4545] fix some tests being killed on Travis CI Due to the increased defaults for network buffer memory use, some builds on Travis CI fail with unit tests being killed. This affects * RocksDbBackendEventTimeWindowCheckpointingITCase and * HBaseConnectorITCase We fix this by limiting the maximum amount of network buffer memory to 80MB (current defaults would yield 150MB, previously 64MB were used). > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL:
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932773#comment-15932773 ] zhijiang commented on FLINK-4545: - [~StephanEwen], Yeah, there are two cases, first is for all task managers with the same certain size like mini cluster mode, and the above three parameters "fraction, min, max" Nico mentioned can both work for network memory. Second is for computing the size of container like in yarn mode, and we actually need a configuration parameter for network memory, maybe the parameter `taskmanager.network.memory.max` Nico mentioned above can still work for that. My understanding is right? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932481#comment-15932481 ] Stephan Ewen commented on FLINK-4545: - [~zjwang] The logic outlined above is tailored towards the case where you have a container or JVM of a certain size and want to configure how much memory goes to what component. In the case where you actually want to compute the size of the container (as in the fine-grained resource configuration code), we probably need a configuration parameter for the network memory to add to each container. What do you think? > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931247#comment-15931247 ] zhijiang commented on FLINK-4545: - [~StephanEwen], thank you for so rich information! I agree with the above points and know well the considerations and future plans! The current constant global buffer amount is replaced by the flexible minimum and maximum range, which is especially helpful for current batch jobs and future better recovery for streaming. Wish to see the changes quickly and participate in related parts! > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930088#comment-15930088 ] Stephan Ewen commented on FLINK-4545: - There are a few reasons why I think the GlobalBufferPool would be helpful: - It is hard to know up front exactly how many network buffers will be needed. A TaskManager may get different tasks after a recovery (with more channels). In batch jobs, tasks are deployed lazily and a TaskManager may get more tasks over time. - Having more network memory is for batch jobs generally quite beneficial. But how many more? Usually as many as the system can spare. - Also on the streaming side, we plan to make use of spare buffers to help with better recovery in the future. The change now would make sure that this is possible without changing the memory configuration again. - Finally, on demand-buffer-allocation can help, but has also a downside: A system that reserves/allocates memory up from is more predictable later. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929676#comment-15929676 ] zhijiang commented on FLINK-4545: - [~NicoK], [~StephanEwen], thank you for your replies! As I know, we already introduced another two parameters for users to control the core buffer amount in {{LocalBufferPool}}, {{taskmanager.net.memory.buffers-per-channel}}(default: 2) and {{taskmanager.net.memory.extra-buffers-per-gate}}(default: 8) separately. And it is easy for users to understand them. In my opinion , the {{JobManager}} or {{ResourceManager}} can decide the total buffer amount in {{NetworkBufferPool}} based on two conditions: - The number of tasks would be deployed into this {{TaskManager}}. - The core number of buffers in {{LocalBufferPool}}, maybe extra consider {{ResultPartitionType}}. So the framework can calculate the total buffer amount and corresponding memory usage before starting the {{TaskManager}}. The buffer amount will be set onto {{TaskManager}} and memory usage will be aggregated into the total resource request. It seems consistent. Have you considered not to expose the global network memory parameters to users, and just allow the parameters in {{LocalBufferPool}}. Otherwise the users should consider both global parameters and local parameters, and may also understand the total number of tasks in {{TaskManager}}. It is easy to cause conflict. The less parameters users know, the better. Maybe you have other concerns that I have not covered yet, wish your advices and I am very willing to do something for it if needed. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924535#comment-15924535 ] Nico Kruber commented on FLINK-4545: We did some thinking and would probably add the following three new configuration parameters (with the given defaults) to finally replace the {{taskmanager.network.numberOfBuffers}} parameter: - {{taskmanager.network.memory.fraction}} (default: 0.1): fraction of JVM memory to use for network buffers (by reducing {{taskmanager.memory.fraction}} from 0.7 to 0.6) - {{taskmanager.network.memory.min}} (default: 64MB): minimum memory size for network buffers - {{taskmanager.network.memory.max}} (default: 1GB): maximum memory size for network buffers A fixed size may be achieved by setting the latter two to the same value, {{taskmanager.network.numberOfBuffers}} will be marked deprecated and used only if the other three are not given, e.g. due to old config files being used. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923811#comment-15923811 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3467 @zhijiangW Yes, let's discuss this when the feature is complete. Our thinking so far is: - One can specify an absolute amount of network memory (similar as one can specify an absolute amount of managed memory for batch) - If no absolute amount is specified, a relative fraction of the JVM heap will be pre-allocated as network buffers. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923554#comment-15923554 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3467 @NicoK ,thank you for explanation, and I already trace the code in your local branch. Wish your further change commit in global pool. @StephanEwen , thanks for further elaboration. From my understanding, each task can decide the core number of buffers in `LocalBufferPool` based on input, output channels and configuration, the maximum number of buffers based on `ResultPartitionType`. And all the `LocalBufferPool`s make effect on the total number of buffers in `NetworkBufferPool`, may need consider maximum memory usages. And my concern is to consider the memory usages in `NetworkBufferPool` before starts the `TaskManager`, and this part of memory should be added into the total resource of `TaskManager`. I am willing to do that as a part of my current work in [Fine-grained Resource Configuration](https://issues.apache.org/jira/browse/FLINK-5131) after this feature completes. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905085#comment-15905085 ] ASF GitHub Bot commented on FLINK-4545: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3480 > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904930#comment-15904930 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3480 Did another review - looks good to me! Merging... > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903159#comment-15903159 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3480 added the requested changes and successfully rebased on the newest master due to conflicts > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902948#comment-15902948 ] Stephan Ewen commented on FLINK-4545: - Part one merged in 8b49ee5aa2e17b1787764c3265e1ebda47d89840 > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902943#comment-15902943 ] ASF GitHub Bot commented on FLINK-4545: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3467 > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902916#comment-15902916 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3467#discussion_r105143643 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -265,11 +281,15 @@ public String toString() { // private void returnMemorySegment(MemorySegment segment) { + assert Thread.holdsLock(availableMemorySegments); --- End diff -- Using synchronized again would impact performance, while assertions only do when they are enabled which is the case in our unit tests (see https://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#enableAssertions). > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902771#comment-15902771 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3480 Code looks overall very nice! What would be good is to make the "magic constants" configurable. I don't expect that anyone usually tweaks those (and eventually they will disappear and flow control will auto-tune the network), but having them configurable is always good to have a workaround in case something unexpected happens. I would follow the same path as the `partitionRequestInitialBackoff` and `partitionRequestMaxBackoff` values in the `NetworkEnvironment`. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902429#comment-15902429 ] ASF GitHub Bot commented on FLINK-4545: --- Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3467#discussion_r105081191 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -265,11 +281,15 @@ public String toString() { // private void returnMemorySegment(MemorySegment segment) { + assert Thread.holdsLock(availableMemorySegments); --- End diff -- Hi, I have a question about assert, because I found that assertion is disabled in java by default. why not use explicit `synchronized(availableMemorySegments)` which may be more common usage. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901930#comment-15901930 ] ASF GitHub Bot commented on FLINK-4545: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3467 I think this is a good change, merging this... @zhijiangW Managing the buffers changes in some followup PR, first adjusting the local pools, then the global pool. Managing buffers in a global pool can help when caching data, such as for batch jobs. But we can take suggestions followup improvements as a separate thread, after this improvement is in. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897795#comment-15897795 ] ASF GitHub Bot commented on FLINK-4545: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3480 [FLINK-4545] use size-restricted LocalBufferPool instances for network communication Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the network buffer parameter. With this PR, the number of buffers a `LocalBufferPool` has to offer, will be limited to `2 * + 8` for both input and output connections. This way, we reduce buffer bloat in our network stack without limiting ourselves to specific jobs and their connections too much since the total number of network buffers can now be arbitrarily large again without consequences on the delays checkpoint barriers, for example, have while travelling through all TMs. Eventually, this will lead to the network buffer parameter being removed (which was the initial goal) but in a simple scenario like the following, with a parallelism of 2 and thus running on 6 TMs, we were able to reduce the 75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for both). ```java final StreamExecutionEnvironment env = getStreamExecutionEnvironment(params); env.disableOperatorChaining(); env.enableCheckpointing(1_000L); DataStreamSource> source1 = env.addSource(new LongSource()); source1.slotSharingGroup("source") .keyBy(1) .map(new IdentityMapFunction >()) .slotSharingGroup("map") .keyBy(1) .addSink(new DiscardingSink >()) .slotSharingGroup("sink"); ``` By adding random delays (every 1000 keys 0-1ms) to the `IdentityMapFunction`, the median even improves from 5026ms to 293ms. Both scenarios do not influence the throughput of the program but for real programs, reductions in delay may differ since there actual state may need to be stored and other components take part as well ;) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3480 commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3 Author: Nico Kruber Date: 2017-02-10T13:36:37Z [FLINK-4545] remove (unused) persistent partition type commit 11557c004450bcbbe680f1575f0e41d164424eae Author: Nico Kruber Date: 2017-02-10T15:11:08Z [docs] improve some documentation around network buffers commit cd999061d04ae803c79473241ac1f9b39c1f2731 Author: Nico Kruber Date: 2017-02-10T15:12:19Z [hotfix][network] add some assertions documenting on which locks we rely commit 8f529bb3f42916c816c5091228569952917ad9b5 Author: Nico Kruber Date: 2017-03-01T13:33:44Z [FLINK-4545] remove fixed-size BufferPool instances These were unused except for unit tests and will be replaced with bounded BufferPool instances. commit 91cea2917e9453f9de5c02472d99d4fc0d090dda Author: Nico Kruber Date: 2017-03-06T11:36:02Z [FLINK-4545] remove JobVertex#connectNewDataSetAsInput variant without partition type This removes JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) and requires the developer to call JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) instead and think about the partition type to add. commit 83d1404b106b558679e4c9ef16123fbc6b5eac72 Author: Nico Kruber Date: 2017-03-06T11:37:56Z [FLINK-4545] remove unused IntermediateDataSet constructors These were implying a default result partition type which we want the developer to actively decide upon. commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86 Author: Nico Kruber Date: 2017-02-10T13:53:09Z [FLINK-4545] add a bounded result partition type This can be used to limit the number of network buffers used for this partition. (borrows the appropriate parts of a commit previously sketched for FLINK-5088 to implement bounded network queue lengths) commit b57f0652a768645a5712d376d0e4b438f35cfa6c Author: Nico Kruber Date: 2017-02-10T17:22:55Z [FLINK-4545] allow LocalBufferPool to use a limited number of buffers commit
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897050#comment-15897050 ] ASF GitHub Bot commented on FLINK-4545: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3467 Hi @zhijiangW, actually, the solution I am working on is to replace the network buffers parameter by something like "max memory in percent" and "min MB to use". For this to not create buffer bloat in our network stack, I have started to implement limited `LocalBufferPool` instances which tune their size based on the actual number of outgoing and ingoing channels. It is actually not much more complicated than this and I already started on this in my local branch at https://github.com/NicoK/flink/tree/flink-4545 - expect a new PR within the week with more details. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896993#comment-15896993 ] ASF GitHub Bot commented on FLINK-4545: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3467 Hi @NicoK , I am interested in this issue and I like the way of asserting hold lock in this PR. It is really necessary to manage network buffers by framework, because it is difficult to set the exact number of buffers by users. And our current simple solution is to expand the `ResourceProfile` by adding the total number of input and output edges for `Execution`. Then the `ResourceManager` would calculate the buffer amounts based on that and overwrite the parameter value to `TaskManager` configuration. From @StephanEwen mentioned before, I know a little for this issue. Would you share some detail designs for plans for it if have, then I can learn and track the progress in time. Thank you ! > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894230#comment-15894230 ] ASF GitHub Bot commented on FLINK-4545: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3467 [FLINK-4545] preparations for removing the network buffers parameter This PR includes some preparations for following PRs that ultimately lead to removing the network buffer parameter that was hard to tune. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545-prep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3467.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3467 commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3 Author: Nico KruberDate: 2017-02-10T13:36:37Z [FLINK-4545] remove (unused) persistent partition type commit 11557c004450bcbbe680f1575f0e41d164424eae Author: Nico Kruber Date: 2017-02-10T15:11:08Z [docs] improve some documentation around network buffers commit cd999061d04ae803c79473241ac1f9b39c1f2731 Author: Nico Kruber Date: 2017-02-10T15:12:19Z [hotfix][network] add some assertions documenting on which locks we rely commit 8f529bb3f42916c816c5091228569952917ad9b5 Author: Nico Kruber Date: 2017-03-01T13:33:44Z [FLINK-4545] remove fixed-size BufferPool instances These were unused except for unit tests and will be replaced with bounded BufferPool instances. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632781#comment-15632781 ] Greg Hogan commented on FLINK-4545: --- In 1.2 we now expose metrics for the number of allocated and in use network buffers. This is now in the web UI but wasn't properly populating the values. Is there an exact "best" number of network buffers to allocate? Flink may run with a minimal allocation but this may cause additional backpressure while waiting for buffers to be processed and become free. We recommend something similar by advising users to maximize the TaskManager memory allocation, which may be better for streaming than batch since after spilling to disk the merge-sort makes use of the filesystem's read-ahead cache. +1 to dynamic scaling, and also if/when memory buffers are dynamically allocated between operators. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632652#comment-15632652 ] Stephan Ewen commented on FLINK-4545: - For a YARN-job-at-a-time setup, one could pre-compute that. For more dynamic setups its a little more tricky, if you do not now the job or the number of TaskManagers up front. Also - the formula needs to take a "{{buffers *= numShuffles}} into account. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632608#comment-15632608 ] Jamie Grier commented on FLINK-4545: Big +1! In general I would love to see this improved. In my experience this is the "one thing" that people run into with Flink, whereas everything else "just works" this one parameter they have to set/tune and it's very confusing to newcomers. The equation to get this right is complex and the "correct" setting changes based on how they deploy the job, what parallelism they use, how many TMs, etc, etc. It also often happens that things are working and then a user changes their job a bit (adding a keyBy for instance) and then it stops working at they have a hard time understanding why. Is there a way we can set this parameter automatically in a majority of use cases? If folks are running single jobs directly on YARN for instance it seems we should have all the information necessary to set this parameter auto-magically or at least fail-fast and tell the the user what the parameter should be set to. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)