This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 11631cb5956 [FLINK-34145][connector/filesystem] support dynamic source 
parallelism inference in batch jobs
11631cb5956 is described below

commit 11631cb59568df60d40933fb13c8433062ed9290
Author: sunxia <xingbe...@gmail.com>
AuthorDate: Wed Jan 24 14:26:03 2024 +0800

    [FLINK-34145][connector/filesystem] support dynamic source parallelism 
inference in batch jobs
    
    This closes #24186.
---
 .../connector/file/src/AbstractFileSource.java     |  6 ++-
 .../flink/connector/file/src/FileSource.java       | 25 +++++++++++-
 .../file/src/FileSourceTextLinesITCase.java        | 46 +++++++++++++++++++++-
 3 files changed, 73 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index 6dbed75747b..f4fb463e10e 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -69,7 +69,7 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
 
     private static final long serialVersionUID = 1L;
 
-    private final Path[] inputPaths;
+    final Path[] inputPaths;
 
     private final FileEnumerator.Provider enumeratorFactory;
 
@@ -100,6 +100,10 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
     //  Getters
     // ------------------------------------------------------------------------
 
+    FileEnumerator.Provider getEnumeratorFactory() {
+        return enumeratorFactory;
+    }
+
     public FileSplitAssigner.Provider getAssignerFactory() {
         return assignerFactory;
     }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
index da76f790627..7d3f545fc02 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.src;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
 import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
 import 
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;
 import 
org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
@@ -32,10 +33,13 @@ import 
org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -93,7 +97,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of the events/records produced by this source.
  */
 @PublicEvolving
-public final class FileSource<T> extends AbstractFileSource<T, 
FileSourceSplit> {
+public final class FileSource<T> extends AbstractFileSource<T, FileSourceSplit>
+        implements DynamicParallelismInference {
 
     private static final long serialVersionUID = 1L;
 
@@ -141,6 +146,24 @@ public final class FileSource<T> extends 
AbstractFileSource<T, FileSourceSplit>
         return FileSourceSplitSerializer.INSTANCE;
     }
 
+    @Override
+    public int inferParallelism(Context dynamicParallelismContext) {
+        FileEnumerator fileEnumerator = getEnumeratorFactory().create();
+
+        Collection<FileSourceSplit> splits;
+        try {
+            splits =
+                    fileEnumerator.enumerateSplits(
+                            inputPaths,
+                            
dynamicParallelismContext.getParallelismInferenceUpperBound());
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Could not enumerate file splits", 
e);
+        }
+
+        return Math.min(
+                splits.size(), 
dynamicParallelismContext.getParallelismInferenceUpperBound());
+    }
+
     // ------------------------------------------------------------------------
     //  Entry-point Factory Methods
     // ------------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 01cbd8aa9c2..08d53f21426 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -19,10 +19,15 @@
 package org.apache.flink.connector.file.src;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -66,6 +71,8 @@ class FileSourceTextLinesITCase {
 
     private static final int PARALLELISM = 4;
 
+    private static final int SOURCE_PARALLELISM_UPPER_BOUND = 8;
+
     @TempDir private static java.nio.file.Path tmpDir;
 
     @RegisterExtension
@@ -108,9 +115,25 @@ class FileSourceTextLinesITCase {
                 miniCluster -> testBoundedTextFileSource(tmpTestDir, 
FailoverType.JM, miniCluster));
     }
 
+    @Test
+    void testBoundedTextFileSourceWithDynamicParallelismInference(
+            @TempDir java.nio.file.Path tmpTestDir, @InjectMiniCluster 
MiniCluster miniCluster)
+            throws Exception {
+        testBoundedTextFileSource(tmpTestDir, FailoverType.NONE, miniCluster, 
true);
+    }
+
     private void testBoundedTextFileSource(
             java.nio.file.Path tmpTestDir, FailoverType failoverType, 
MiniCluster miniCluster)
             throws Exception {
+        testBoundedTextFileSource(tmpTestDir, failoverType, miniCluster, 
false);
+    }
+
+    private void testBoundedTextFileSource(
+            java.nio.file.Path tmpTestDir,
+            FailoverType failoverType,
+            MiniCluster miniCluster,
+            boolean batchMode)
+            throws Exception {
         final File testDir = tmpTestDir.toFile();
 
         // our main test data
@@ -126,11 +149,16 @@ class FileSourceTextLinesITCase {
                         .build();
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(PARALLELISM);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.setParallelism(PARALLELISM);
+
+        if (batchMode) {
+            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        }
 
         final DataStream<String> stream =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source")
+                        .setMaxParallelism(PARALLELISM * 2);
 
         final DataStream<String> streamFailingInTheMiddleOfReading =
                 RecordCounterToFail.wrapWithFailureAfter(stream, LINES.length 
/ 2);
@@ -149,6 +177,9 @@ class FileSourceTextLinesITCase {
         }
 
         verifyResult(result);
+        if (batchMode) {
+            
verifySourceParallelism(miniCluster.getExecutionGraph(jobId).get());
+        }
     }
 
     /**
@@ -253,11 +284,16 @@ class FileSourceTextLinesITCase {
     }
 
     private static MiniClusterResourceConfiguration 
createMiniClusterConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
+                SOURCE_PARALLELISM_UPPER_BOUND);
         return new MiniClusterResourceConfiguration.Builder()
                 .setNumberTaskManagers(1)
                 .setNumberSlotsPerTaskManager(PARALLELISM)
                 .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
                 .withHaLeadershipControl()
+                .setConfiguration(configuration)
                 .build();
     }
 
@@ -320,6 +356,12 @@ class FileSourceTextLinesITCase {
         assertThat(actual).isEqualTo(expected);
     }
 
+    private static void verifySourceParallelism(AccessExecutionGraph 
executionGraph) {
+        AccessExecutionJobVertex sourceVertex =
+                executionGraph.getVerticesTopologically().iterator().next();
+        assertThat(sourceVertex.getParallelism()).isEqualTo(FILE_PATHS.length);
+    }
+
     // ------------------------------------------------------------------------
     //  test data
     // ------------------------------------------------------------------------

Reply via email to