This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a835f31 [FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.memory-threshold' a835f31 is described below commit a835f31a3a78f34b4a80f9e634b34c6a6681a482 Author: Yun Tang <myas...@live.com> AuthorDate: Thu May 21 20:37:56 2020 +0800 [FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.memory-threshold' This closes #12282. --- .../generated/checkpointing_configuration.html | 6 ++-- .../generated/expert_state_backends_section.html | 6 ++-- .../flink/configuration/CheckpointingOptions.java | 7 ++-- .../state/api/output/SavepointOutputFormat.java | 2 +- .../pyflink/datastream/tests/test_state_backend.py | 2 +- .../runtime/state/filesystem/FsStateBackend.java | 39 ++++++++++++++-------- .../runtime/state/StateBackendLoadingTest.java | 17 +++++----- .../flink/test/checkpointing/SavepointITCase.java | 4 +-- .../utils/SavepointMigrationTestBase.java | 3 +- 9 files changed, 50 insertions(+), 36 deletions(-) diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html index c8517a5..748bdb2 100644 --- a/docs/_includes/generated/checkpointing_configuration.html +++ b/docs/_includes/generated/checkpointing_configuration.html @@ -22,9 +22,9 @@ </tr> <tr> <td><h5>state.backend.fs.memory-threshold</h5></td> - <td style="word-wrap: break-word;">1024</td> - <td>Integer</td> - <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td> + <td style="word-wrap: break-word;">20 kb</td> + <td>MemorySize</td> + <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td> </tr> <tr> <td><h5>state.backend.fs.write-buffer-size</h5></td> diff --git a/docs/_includes/generated/expert_state_backends_section.html b/docs/_includes/generated/expert_state_backends_section.html index 9d50be1..0fed867 100644 --- a/docs/_includes/generated/expert_state_backends_section.html +++ b/docs/_includes/generated/expert_state_backends_section.html @@ -16,9 +16,9 @@ </tr> <tr> <td><h5>state.backend.fs.memory-threshold</h5></td> - <td style="word-wrap: break-word;">1024</td> - <td>Integer</td> - <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td> + <td style="word-wrap: break-word;">20 kb</td> + <td>MemorySize</td> + <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td> </tr> <tr> <td><h5>state.backend.fs.write-buffer-size</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index df19ab9..16eaf75 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -140,11 +140,12 @@ public class CheckpointingOptions { /** The minimum size of state data files. All state chunks smaller than that * are stored inline in the root checkpoint metadata file. */ @Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS) - public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD = ConfigOptions + public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD = ConfigOptions .key("state.backend.fs.memory-threshold") - .defaultValue(1024) + .memoryType() + .defaultValue(MemorySize.parse("20kb")) .withDescription("The minimum size of state data files. All state chunks smaller than that are stored" + - " inline in the root checkpoint metadata file."); + " inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB."); /** * The default size of the write buffer for the checkpoint streams that write to file systems. diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java index 8235067..bebd435 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java @@ -93,7 +93,7 @@ public class SavepointOutputFormat extends RichOutputFormat<CheckpointMetadata> location, location, reference, - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(), + (int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes(), CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue()); } } diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py b/flink-python/pyflink/datastream/tests/test_state_backend.py index 4f3249f..3ce18dc 100644 --- a/flink-python/pyflink/datastream/tests/test_state_backend.py +++ b/flink-python/pyflink/datastream/tests/test_state_backend.py @@ -97,7 +97,7 @@ class FsStateBackendTests(PyFlinkTestCase): state_backend = FsStateBackend("file://var/checkpoints/") - self.assertEqual(state_backend.get_min_file_size_threshold(), 1024) + self.assertEqual(state_backend.get_min_file_size_threshold(), 20480) state_backend = FsStateBackend("file://var/checkpoints/", file_state_size_threshold=2048) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index e45fb20..1c61ab6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.MathUtils; import org.apache.flink.util.TernaryBoolean; import org.slf4j.LoggerFactory; @@ -55,6 +56,7 @@ import java.io.IOException; import java.net.URI; import java.util.Collection; +import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -363,22 +365,24 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined( configuration.get(CheckpointingOptions.ASYNC_SNAPSHOTS)); - final int sizeThreshold = original.fileStateThreshold >= 0 ? - original.fileStateThreshold : - configuration.get(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD); + if (getValidFileStateThreshold(original.fileStateThreshold) >= 0) { + this.fileStateThreshold = original.fileStateThreshold; + } else { + final int configuredStateThreshold = + getValidFileStateThreshold(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes()); - if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) { - this.fileStateThreshold = sizeThreshold; - } - else { - this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(); + if (configuredStateThreshold >= 0) { + this.fileStateThreshold = configuredStateThreshold; + } else { + this.fileStateThreshold = MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes()); - // because this is the only place we (unlikely) ever log, we lazily - // create the logger here - LoggerFactory.getLogger(AbstractFileStateBackend.class).warn( + // because this is the only place we (unlikely) ever log, we lazily + // create the logger here + LoggerFactory.getLogger(AbstractFileStateBackend.class).warn( "Ignoring invalid file size threshold value ({}): {} - using default value {} instead.", - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold, - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()); + FS_SMALL_FILE_THRESHOLD.key(), configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(), + FS_SMALL_FILE_THRESHOLD.defaultValue()); + } } final int bufferSize = original.writeBufferSize >= 0 ? @@ -388,6 +392,13 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur this.writeBufferSize = Math.max(bufferSize, this.fileStateThreshold); } + private int getValidFileStateThreshold(long fileStateThreshold) { + if (fileStateThreshold >= 0 && fileStateThreshold <= MAX_FILE_STATE_THRESHOLD) { + return (int) fileStateThreshold; + } + return -1; + } + // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -432,7 +443,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur public int getMinFileSizeThreshold() { return fileStateThreshold >= 0 ? fileStateThreshold : - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(); + MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes()); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java index 877a51d..5340290 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -227,7 +228,7 @@ public class StateBackendLoadingTest { final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); final Path expectedCheckpointsPath = new Path(checkpointDir); final Path expectedSavepointsPath = new Path(savepointDir); - final int threshold = 1000000; + final MemorySize threshold = MemorySize.parse("900kb"); final int minWriteBufferSize = 1024; final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue(); @@ -237,7 +238,7 @@ public class StateBackendLoadingTest { config1.setString(backendKey, "filesystem"); config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); - config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold); + config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold); config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize); config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async); @@ -245,7 +246,7 @@ public class StateBackendLoadingTest { config2.setString(backendKey, FsStateBackendFactory.class.getName()); config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); - config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold); + config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold); config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize); config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async); @@ -262,10 +263,10 @@ public class StateBackendLoadingTest { assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath()); assertEquals(expectedSavepointsPath, fs1.getSavepointPath()); assertEquals(expectedSavepointsPath, fs2.getSavepointPath()); - assertEquals(threshold, fs1.getMinFileSizeThreshold()); - assertEquals(threshold, fs2.getMinFileSizeThreshold()); - assertEquals(Math.max(threshold, minWriteBufferSize), fs1.getWriteBufferSize()); - assertEquals(Math.max(threshold, minWriteBufferSize), fs2.getWriteBufferSize()); + assertEquals(threshold.getBytes(), fs1.getMinFileSizeThreshold()); + assertEquals(threshold.getBytes(), fs2.getMinFileSizeThreshold()); + assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs1.getWriteBufferSize()); + assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs2.getWriteBufferSize()); assertEquals(async, fs1.isUsingAsynchronousSnapshots()); assertEquals(async, fs2.isUsingAsynchronousSnapshots()); } @@ -293,7 +294,7 @@ public class StateBackendLoadingTest { config.setString(backendKey, "jobmanager"); // this should not be picked up config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); - config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up + config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20")); // this should not be picked up config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000); // this should not be picked up final StateBackend loadedBackend = diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index be9b706..abdc188 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -576,7 +576,7 @@ public class SavepointITCase extends TestLogger { if (data == null) { // We need this to be large, because we want to test with files Random rand = new Random(getRuntimeContext().getIndexOfThisSubtask()); - data = new byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1]; + data = new byte[(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes() + 1]; rand.nextBytes(data); } } @@ -833,7 +833,7 @@ public class SavepointITCase extends TestLogger { final Configuration config = new Configuration(); config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem"); config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); - config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); + config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); return config; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 638edb5..1e7f619 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -109,7 +110,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils { config.setString(CheckpointingOptions.STATE_BACKEND, "memory"); config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); - config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); + config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);