This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a835f31  [FLINK-17865][checkpoint] Increase default size of 
'state.backend.fs.memory-threshold'
a835f31 is described below

commit a835f31a3a78f34b4a80f9e634b34c6a6681a482
Author: Yun Tang <myas...@live.com>
AuthorDate: Thu May 21 20:37:56 2020 +0800

    [FLINK-17865][checkpoint] Increase default size of 
'state.backend.fs.memory-threshold'
    
    This closes #12282.
---
 .../generated/checkpointing_configuration.html     |  6 ++--
 .../generated/expert_state_backends_section.html   |  6 ++--
 .../flink/configuration/CheckpointingOptions.java  |  7 ++--
 .../state/api/output/SavepointOutputFormat.java    |  2 +-
 .../pyflink/datastream/tests/test_state_backend.py |  2 +-
 .../runtime/state/filesystem/FsStateBackend.java   | 39 ++++++++++++++--------
 .../runtime/state/StateBackendLoadingTest.java     | 17 +++++-----
 .../flink/test/checkpointing/SavepointITCase.java  |  4 +--
 .../utils/SavepointMigrationTestBase.java          |  3 +-
 9 files changed, 50 insertions(+), 36 deletions(-)

diff --git a/docs/_includes/generated/checkpointing_configuration.html 
b/docs/_includes/generated/checkpointing_configuration.html
index c8517a5..748bdb2 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -22,9 +22,9 @@
         </tr>
         <tr>
             <td><h5>state.backend.fs.memory-threshold</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>Integer</td>
-            <td>The minimum size of state data files. All state chunks smaller 
than that are stored inline in the root checkpoint metadata file.</td>
+            <td style="word-wrap: break-word;">20 kb</td>
+            <td>MemorySize</td>
+            <td>The minimum size of state data files. All state chunks smaller 
than that are stored inline in the root checkpoint metadata file. The max 
memory threshold for this configuration is 1MB.</td>
         </tr>
         <tr>
             <td><h5>state.backend.fs.write-buffer-size</h5></td>
diff --git a/docs/_includes/generated/expert_state_backends_section.html 
b/docs/_includes/generated/expert_state_backends_section.html
index 9d50be1..0fed867 100644
--- a/docs/_includes/generated/expert_state_backends_section.html
+++ b/docs/_includes/generated/expert_state_backends_section.html
@@ -16,9 +16,9 @@
         </tr>
         <tr>
             <td><h5>state.backend.fs.memory-threshold</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>Integer</td>
-            <td>The minimum size of state data files. All state chunks smaller 
than that are stored inline in the root checkpoint metadata file.</td>
+            <td style="word-wrap: break-word;">20 kb</td>
+            <td>MemorySize</td>
+            <td>The minimum size of state data files. All state chunks smaller 
than that are stored inline in the root checkpoint metadata file. The max 
memory threshold for this configuration is 1MB.</td>
         </tr>
         <tr>
             <td><h5>state.backend.fs.write-buffer-size</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index df19ab9..16eaf75 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -140,11 +140,12 @@ public class CheckpointingOptions {
        /** The minimum size of state data files. All state chunks smaller than 
that
         * are stored inline in the root checkpoint metadata file. */
        @Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
-       public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD = 
ConfigOptions
+       public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD = 
ConfigOptions
                        .key("state.backend.fs.memory-threshold")
-                       .defaultValue(1024)
+                       .memoryType()
+                       .defaultValue(MemorySize.parse("20kb"))
                        .withDescription("The minimum size of state data files. 
All state chunks smaller than that are stored" +
-                               " inline in the root checkpoint metadata 
file.");
+                               " inline in the root checkpoint metadata file. 
The max memory threshold for this configuration is 1MB.");
 
        /**
         * The default size of the write buffer for the checkpoint streams that 
write to file systems.
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
index 8235067..bebd435 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -93,7 +93,7 @@ public class SavepointOutputFormat extends 
RichOutputFormat<CheckpointMetadata>
                        location,
                        location,
                        reference,
-                       
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
+                       (int) 
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
                        
CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
        }
 }
diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py 
b/flink-python/pyflink/datastream/tests/test_state_backend.py
index 4f3249f..3ce18dc 100644
--- a/flink-python/pyflink/datastream/tests/test_state_backend.py
+++ b/flink-python/pyflink/datastream/tests/test_state_backend.py
@@ -97,7 +97,7 @@ class FsStateBackendTests(PyFlinkTestCase):
 
         state_backend = FsStateBackend("file://var/checkpoints/")
 
-        self.assertEqual(state_backend.get_min_file_size_threshold(), 1024)
+        self.assertEqual(state_backend.get_min_file_size_threshold(), 20480)
 
         state_backend = FsStateBackend("file://var/checkpoints/", 
file_state_size_threshold=2048)
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e45fb20..1c61ab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.TernaryBoolean;
 
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 
+import static 
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -363,22 +365,24 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                this.asynchronousSnapshots = 
original.asynchronousSnapshots.resolveUndefined(
                                
configuration.get(CheckpointingOptions.ASYNC_SNAPSHOTS));
 
-               final int sizeThreshold = original.fileStateThreshold >= 0 ?
-                               original.fileStateThreshold :
-                               
configuration.get(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
+               if (getValidFileStateThreshold(original.fileStateThreshold) >= 
0) {
+                       this.fileStateThreshold = original.fileStateThreshold;
+               } else {
+                       final int configuredStateThreshold =
+                               
getValidFileStateThreshold(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes());
 
-               if (sizeThreshold >= 0 && sizeThreshold <= 
MAX_FILE_STATE_THRESHOLD) {
-                       this.fileStateThreshold = sizeThreshold;
-               }
-               else {
-                       this.fileStateThreshold = 
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+                       if (configuredStateThreshold >= 0) {
+                               this.fileStateThreshold = 
configuredStateThreshold;
+                       } else {
+                               this.fileStateThreshold = 
MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
 
-                       // because this is the only place we (unlikely) ever 
log, we lazily
-                       // create the logger here
-                       
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
+                               // because this is the only place we (unlikely) 
ever log, we lazily
+                               // create the logger here
+                               
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
                                        "Ignoring invalid file size threshold 
value ({}): {} - using default value {} instead.",
-                                       
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
-                                       
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
+                                       FS_SMALL_FILE_THRESHOLD.key(), 
configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(),
+                                       FS_SMALL_FILE_THRESHOLD.defaultValue());
+                       }
                }
 
                final int bufferSize = original.writeBufferSize >= 0 ?
@@ -388,6 +392,13 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                this.writeBufferSize = Math.max(bufferSize, 
this.fileStateThreshold);
        }
 
+       private int getValidFileStateThreshold(long fileStateThreshold) {
+               if (fileStateThreshold >= 0 && fileStateThreshold <= 
MAX_FILE_STATE_THRESHOLD) {
+                       return (int) fileStateThreshold;
+               }
+               return -1;
+       }
+
        // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
@@ -432,7 +443,7 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
        public int getMinFileSizeThreshold() {
                return fileStateThreshold >= 0 ?
                                fileStateThreshold :
-                               
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+                               
MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index 877a51d..5340290 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -227,7 +228,7 @@ public class StateBackendLoadingTest {
                final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
                final Path expectedCheckpointsPath = new Path(checkpointDir);
                final Path expectedSavepointsPath = new Path(savepointDir);
-               final int threshold = 1000000;
+               final MemorySize threshold = MemorySize.parse("900kb");
                final int minWriteBufferSize = 1024;
                final boolean async = 
!CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
 
@@ -237,7 +238,7 @@ public class StateBackendLoadingTest {
                config1.setString(backendKey, "filesystem");
                config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
                config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
-               
config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+               config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
threshold);
                config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 
minWriteBufferSize);
                config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
 
@@ -245,7 +246,7 @@ public class StateBackendLoadingTest {
                config2.setString(backendKey, 
FsStateBackendFactory.class.getName());
                config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
                config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
-               
config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+               config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
threshold);
                config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 
minWriteBufferSize);
                config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
 
@@ -262,10 +263,10 @@ public class StateBackendLoadingTest {
                assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
                assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
                assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
-               assertEquals(threshold, fs1.getMinFileSizeThreshold());
-               assertEquals(threshold, fs2.getMinFileSizeThreshold());
-               assertEquals(Math.max(threshold, minWriteBufferSize), 
fs1.getWriteBufferSize());
-               assertEquals(Math.max(threshold, minWriteBufferSize), 
fs2.getWriteBufferSize());
+               assertEquals(threshold.getBytes(), 
fs1.getMinFileSizeThreshold());
+               assertEquals(threshold.getBytes(), 
fs2.getMinFileSizeThreshold());
+               assertEquals(Math.max(threshold.getBytes(), 
minWriteBufferSize), fs1.getWriteBufferSize());
+               assertEquals(Math.max(threshold.getBytes(), 
minWriteBufferSize), fs2.getWriteBufferSize());
                assertEquals(async, fs1.isUsingAsynchronousSnapshots());
                assertEquals(async, fs2.isUsingAsynchronousSnapshots());
        }
@@ -293,7 +294,7 @@ public class StateBackendLoadingTest {
                config.setString(backendKey, "jobmanager"); // this should not 
be picked up 
                config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir); // this should not be picked up
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
-               config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
20); // this should not be picked up
+               config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
MemorySize.parse("20")); // this should not be picked up
                config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 
3000000); // this should not be picked up
 
                final StateBackend loadedBackend =
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index be9b706..abdc188 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -576,7 +576,7 @@ public class SavepointITCase extends TestLogger {
                        if (data == null) {
                                // We need this to be large, because we want to 
test with files
                                Random rand = new 
Random(getRuntimeContext().getIndexOfThisSubtask());
-                               data = new 
byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1];
+                               data = new byte[(int) 
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes() + 1];
                                rand.nextBytes(data);
                        }
                }
@@ -833,7 +833,7 @@ public class SavepointITCase extends TestLogger {
                final Configuration config = new Configuration();
                config.setString(CheckpointingOptions.STATE_BACKEND, 
"filesystem");
                config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
-               config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
0);
+               config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
MemorySize.ZERO);
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
                return config;
        }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 638edb5..1e7f619 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -109,7 +110,7 @@ public abstract class SavepointMigrationTestBase extends 
TestBaseUtils {
 
                config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
                config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
-               config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
0);
+               config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
MemorySize.ZERO);
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
                config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 
300L);
 

Reply via email to