This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e44efbff8070dca3489550fdeadc5e1ce31e68c1
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Fri Oct 20 00:58:03 2023 +0200

    [FLINK-28229][connectors] Introduce FLIP-27 alternative to 
StreamExecutionEnvironment#fromCollection()
---
 .../datagen/source/DataGeneratorSource.java        |  3 +-
 .../environment/StreamExecutionEnvironment.java    | 85 ++++++++++++++++------
 pom.xml                                            |  2 +
 3 files changed, 67 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
index a344eb635ad..3d2416c1e16 100644
--- 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -150,7 +150,8 @@ public class DataGeneratorSource<OUT>
         this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
         this.generatorFunction = checkNotNull(generatorFunction);
         this.typeInfo = checkNotNull(typeInfo);
-        this.numberSource = new NumberSequenceSource(0, count - 1);
+        long to = count > 0 ? count - 1 : 0; // a noop source (0 elements) is 
used in Table tests
+        this.numberSource = new NumberSequenceSource(0, to);
         ClosureCleaner.clean(
                 generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
         ClosureCleaner.clean(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9069b3a0d3c..18dc49d3895 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1222,7 +1222,23 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return fromData(Arrays.asList(data), typeInfo);
     }
 
-    private <OUT> DataStreamSource<OUT> fromData(
+    /**
+     * Creates a new data stream that contains the given elements. The 
elements must all be of the
+     * same type, for example, all of the {@link String} or {@link Integer}.
+     *
+     * <p>The framework will try and determine the exact type from the 
elements. In case of generic
+     * elements, it may be necessary to manually supply the type information 
via {@link
+     * #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, 
OUT...)}.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default 
(parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on 
the result.
+     *
+     * @param data The collection of elements to create the data stream from.
+     * @param typeInfo The type information of the elements.
+     * @param <OUT> The generic type of the returned data stream.
+     * @return The data stream representing the given collection
+     */
+    public <OUT> DataStreamSource<OUT> fromData(
             Collection<OUT> data, TypeInformation<OUT> typeInfo) {
         Preconditions.checkNotNull(data, "Collection must not be null");
 
@@ -1273,6 +1289,51 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return fromData(Arrays.asList(data), typeInfo);
     }
 
+    /**
+     * Creates a new data stream that contains the given elements.The type of 
the data stream is
+     * that of the elements in the collection.
+     *
+     * <p>The framework will try and determine the exact type from the 
collection elements. In case
+     * of generic elements, it may be necessary to manually supply the type 
information via {@link
+     * #fromData(java.util.Collection, 
org.apache.flink.api.common.typeinfo.TypeInformation)}.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default 
(parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on 
the result.
+     *
+     * @param data The collection of elements to create the data stream from.
+     * @param <OUT> The generic type of the returned data stream.
+     * @return The data stream representing the given collection
+     */
+    public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data) {
+        TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data);
+        return fromData(data, typeInfo);
+    }
+
+    private static <OUT> TypeInformation<OUT> 
extractTypeInfoFromCollection(Collection<OUT> data) {
+        Preconditions.checkNotNull(data, "Collection must not be null");
+        if (data.isEmpty()) {
+            throw new IllegalArgumentException("Collection must not be empty");
+        }
+
+        OUT first = data.iterator().next();
+        if (first == null) {
+            throw new IllegalArgumentException("Collection must not contain 
null elements");
+        }
+
+        TypeInformation<OUT> typeInfo;
+        try {
+            typeInfo = TypeExtractor.getForObject(first);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not create TypeInformation for type "
+                            + first.getClass()
+                            + "; please specify the TypeInformation manually 
via the version of the "
+                            + "method that explicitly accepts it as an 
argument.",
+                    e);
+        }
+        return typeInfo;
+    }
+
     /**
      * Creates a new data stream that contains a sequence of numbers. This is 
a parallel source, if
      * you manually set the parallelism to {@code 1} (using {@link
@@ -1415,27 +1476,7 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * @return The data stream representing the given collection
      */
     public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
-        Preconditions.checkNotNull(data, "Collection must not be null");
-        if (data.isEmpty()) {
-            throw new IllegalArgumentException("Collection must not be empty");
-        }
-
-        OUT first = data.iterator().next();
-        if (first == null) {
-            throw new IllegalArgumentException("Collection must not contain 
null elements");
-        }
-
-        TypeInformation<OUT> typeInfo;
-        try {
-            typeInfo = TypeExtractor.getForObject(first);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Could not create TypeInformation for type "
-                            + first.getClass()
-                            + "; please specify the TypeInformation manually 
via "
-                            + 
"StreamExecutionEnvironment#fromElements(Collection, TypeInformation)",
-                    e);
-        }
+        TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data);
         return fromCollection(data, typeInfo);
     }
 
diff --git a/pom.xml b/pom.xml
index c964ea7c9a6..18c8b4b3f49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2299,6 +2299,8 @@ under the License.
                                                                
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Object[])</exclude>
                                                                
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[])</exclude>
                                                                
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Class,java.lang.Object[])</exclude>
+                                                               
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude>
+                                                               
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
                                                                <!-- MARKER: 
end exclusions -->
                                                        </excludes>
                                                        
<accessModifier>public</accessModifier>

Reply via email to